Skip to content

Deploy High Available Flink Cluster

Ansible Flink

What is Apache Flink ?

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

Flink Flink Documentation

Prerequisites

Minimum requirements

  • 2 Servers or more with Docker installed
  • ZK cluster available
  • ZooKeeper Cluster
  • AWX/Ansible (Client)

General Flow

We will use gitlab ci to build,tag,release and call Ansible(AWX job)

We will deploy the Flink Cluster (JobManager and TaskManager) in HA mode.

The deploy stage uses a "tower-cli" docker image.

You can use your own or just run ansible instead of using AWX job.

Here is the GitLab Pipeline:

.gitlab-ci.yml

image: docker:latest
stages:
- build
- release
- build-release
- deploy


variables:
FLINK_VERSION: 1.9.1
IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG
LATEST_DEV: $CI_REGISTRY_IMAGE:${FLINK_VERSION}-dev
LATEST: $CI_REGISTRY_IMAGE:${FLINK_VERSION}-$CI_COMMIT_TAG


build-dev:
stage: build
script:
    - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
    - docker build --build-arg FLINK_VERSION=${FLINK_VERSION} -t $LATEST_DEV .
    - docker push $LATEST_DEV
except:
    - tags
tags:
    - general


release:
stage: release
image: registry.gitlab.com/juhani/go-semrel-gitlab:v0.21.1
script:
    - release next-version --allow-current
    - release changelog
    - release commit-and-tag --create-tag-pipeline CHANGELOG.md
only:
    - master
except:
    - tags
when: manual
tags:
    - generic
environment:
    name: production
dependencies: []


build-release:
stage: build-release
script:
    - docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
    - docker pull ${LATEST_DEV}
    - docker tag ${LATEST_DEV} ${LATEST}
    - docker tag ${LATEST_DEV} ${CI_REGISTRY_IMAGE}
    - docker push ${LATEST}
    - docker push ${CI_REGISTRY_IMAGE}
environment:
    name: master
only:
    - tags
tags:
    - generic
dependencies: []


deploy:
image: registry.yourdomain.com/devops/tower-cli
stage: deploy
script:
    - tower-cli config host ${awx_host}
    - tower-cli config username ${awx_user}
    - tower-cli config password ${awx_password}
    - tower-cli job launch --job-template ${awx_job_id} --extra-vars="IMAGE=${LATEST}" --monitor
environment:
    name: master
only:
    - tags
tags:
    - generic
when: manual
except:
    - schedules
dependencies: []

Dockerfile

The Dockerfile uses the Flink Community Docker image

We do not need to modify it, since we will create the config file during deployment.

Docker Hub

ARG FLINK_VERSION

FROM flink:$FLINK_VERSION

# Don't copy the conf.
# Let ansible generate it and mount the resulting file to the container

# COPY conf/ ${FLINK_HOME}/conf/

# ENTRYPOINT ["/opt/flink/bin/jobmanager.sh", "start-foreground"]

Here is the flink conf:

(jinja2 template)

high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /flink-cluster1
high-availability.storageDir: /flink/recovery
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: "{{ CPUS }}"
parallelism.default: 1
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
availability.jobmanager.port: 50010
rest.address: "{{ ADDRESS }}"
rest.bind-port: 8000

Custom host variables

As you can see above the "ADDRESS" & the "CPUS" variables are set according to the facts, that ansible gathers from each host.

Ansible Playbook

Here is the deployment playbook

deploy_cluster.yml

- hosts: flink-hosts
gather_facts: True
roles:
    - role: ha-cluster
serial: 5

Ansible Role

Here is the hacluster role:

roles/ha-cluster/tasks/hacluster.yaml

- name: Download the flink source code from the GitRepo
git:
    repo: 'https://github.com/apache/flink.git'
    dest: "/flink-source"
    force: true


- name: Remove conf directory
file:
    state: absent
    path: "{{ DATA_VOLUME }}/conf"


- name: Move conf directory
command: mv /flink-source/flink-dist/src/main/flink-bin/conf {{ DATA_VOLUME }}/


- name: generate flink config on the docker Host machine
vars:
    ADDRESS: "i-{{ inventory_hostname }}"
    CPUS: "{{ ansible_processor_vcpus }}"
template:
    src: conf/flink-conf.jinja2
    dest: "{{ DATA_VOLUME }}/conf/flink-conf.yaml"
    mode: '0644'


- name: Run Flink JobManager container
docker_container:
    recreate: yes
    pull: true
    name: flink-job-manager
    image: "{{ IMAGE }}"
    network_mode: host
    state: started
    restart: yes
    privileged: yes
    log_driver: json-file
    log_options:
    max-size: 100m
    max-file: 1
    ulimits:
    - nofile:250000:250000
    volumes:
    - "{{ DATA_VOLUME }}/:/flink/:rw"
    - "{{ DATA_VOLUME }}/conf:/opt/flink/conf:ro"
    entrypoint: "/opt/flink/bin/jobmanager.sh start-foreground i-{{ inventory_hostname }} 8081"


- name: Run Task Manager
docker_container:
    recreate: yes
    pull: true
    name: flink-task-manager
    image: "{{ IMAGE }}"
    network_mode: host
    state: started
    restart: yes
    privileged: yes
    log_driver: json-file
    log_options:
    max-size: 100m
    max-file: 1
    ulimits:
    - nofile:250000:250000
    volumes:
    - "{{ DATA_VOLUME }}/:/flink/:rw"
    - "{{ DATA_VOLUME }}/conf:/opt/flink/conf:ro"
    entrypoint: "/opt/flink/bin/taskmanager.sh start-foreground"

So what do we have so far ?

Each push to the repository will build a docker image with "-dev" tag. To update the production image, manually run the "release" stage.

It will tag the "-dev" as the release version tag and deploy it using AWX The "conf" directory is pulled (at deployment stage) from the Flink official source code at Github (public) It stays on the host os.

The flink-conf.yaml file is generated via the flink-conf.jinja2 file.

Ansible will update the "{{ ADDRESS }}" variable with the internal hostname of the node.

It will also update the taskmanager.numberOfTaskSlots according to the available CPUS on the host.

(gathered from ansible facts)

- name: generate flink config on the docker Host machine
vars:
    ADDRESS: "i-{{ inventory_hostname }}"
template:
    src: conf/flink-conf.jinja2
    dest: "{{ DATA_VOLUME }}/conf/flink-conf.yaml"
    mode: '0644'

Then the "conf" directory is mounted to the container as volume with Read Only permissions 🔐 The deploy stage also calls AWX job and will run the Flink cluster playbook on all nodes that are in and Ansible Inventory.

- name: Run Flink JobManager container
docker_container:
    recreate: yes
    pull: true
    name: flink-job-manager
    image: "{{ IMAGE }}"
    network_mode: host
    state: started
    restart: yes
    privileged: yes
    log_driver: json-file
    log_options:
    max-size: 100m
    max-file: 1
    ulimits:
    - nofile:250000:250000
    volumes:
    - "{{ DATA_VOLUME }}/:/etl/flink/:rw"
    - "{{ DATA_VOLUME }}/conf:/opt/flink/conf:ro"
    entrypoint: "/opt/flink/bin/jobmanager.sh start-foreground i-{{ inventory_hostname }} 8081"

The job Manager is configured to register at the Zookeeper Cluster

The list of the zookeeper hosts can be pulled at runtime from the ansible group_vars of the inventory hosts.

Comments