Deploy High Available Flink Cluster
Apache Flink¶
What is Apache Flink ?
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Prerequisites¶
Minimum requirements
- 2 Servers or more with Docker installed
- ZK cluster available
- ZooKeeper Cluster
- AWX/Ansible (Client)
General Flow¶
We will use gitlab ci to build,tag,release and call Ansible(AWX job)
We will deploy the Flink Cluster (JobManager and TaskManager) in HA mode.
The deploy stage uses a "tower-cli" docker image.
You can use your own or just run ansible instead of using AWX job.
Here is the GitLab Pipeline:
.gitlab-ci.yml
image: docker:latest
stages:
- build
- release
- build-release
- deploy
variables:
FLINK_VERSION: 1.9.1
IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG
LATEST_DEV: $CI_REGISTRY_IMAGE:${FLINK_VERSION}-dev
LATEST: $CI_REGISTRY_IMAGE:${FLINK_VERSION}-$CI_COMMIT_TAG
build-dev:
stage: build
script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker build --build-arg FLINK_VERSION=${FLINK_VERSION} -t $LATEST_DEV .
- docker push $LATEST_DEV
except:
- tags
tags:
- general
release:
stage: release
image: registry.gitlab.com/juhani/go-semrel-gitlab:v0.21.1
script:
- release next-version --allow-current
- release changelog
- release commit-and-tag --create-tag-pipeline CHANGELOG.md
only:
- master
except:
- tags
when: manual
tags:
- generic
environment:
name: production
dependencies: []
build-release:
stage: build-release
script:
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker pull ${LATEST_DEV}
- docker tag ${LATEST_DEV} ${LATEST}
- docker tag ${LATEST_DEV} ${CI_REGISTRY_IMAGE}
- docker push ${LATEST}
- docker push ${CI_REGISTRY_IMAGE}
environment:
name: master
only:
- tags
tags:
- generic
dependencies: []
deploy:
image: registry.yourdomain.com/devops/tower-cli
stage: deploy
script:
- tower-cli config host ${awx_host}
- tower-cli config username ${awx_user}
- tower-cli config password ${awx_password}
- tower-cli job launch --job-template ${awx_job_id} --extra-vars="IMAGE=${LATEST}" --monitor
environment:
name: master
only:
- tags
tags:
- generic
when: manual
except:
- schedules
dependencies: []
Dockerfile¶
The Dockerfile uses the Flink Community Docker image
We do not need to modify it, since we will create the config file during deployment.
ARG FLINK_VERSION
FROM flink:$FLINK_VERSION
# Don't copy the conf.
# Let ansible generate it and mount the resulting file to the container
# COPY conf/ ${FLINK_HOME}/conf/
# ENTRYPOINT ["/opt/flink/bin/jobmanager.sh", "start-foreground"]
Flink Configuration¶
Here is the flink conf:
(jinja2 template)
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /flink-cluster1
high-availability.storageDir: /flink/recovery
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: "{{ CPUS }}"
parallelism.default: 1
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
availability.jobmanager.port: 50010
rest.address: "{{ ADDRESS }}"
rest.bind-port: 8000
Custom host variables
As you can see above the "ADDRESS" & the "CPUS" variables are set according to the facts, that ansible gathers from each host.
Ansible Playbook¶
Here is the deployment playbook
deploy_cluster.yml
- hosts: flink-hosts
gather_facts: True
roles:
- role: ha-cluster
serial: 5
Ansible Role¶
Here is the hacluster role:
roles/ha-cluster/tasks/hacluster.yaml
- name: Download the flink source code from the GitRepo
git:
repo: 'https://github.com/apache/flink.git'
dest: "/flink-source"
force: true
- name: Remove conf directory
file:
state: absent
path: "{{ DATA_VOLUME }}/conf"
- name: Move conf directory
command: mv /flink-source/flink-dist/src/main/flink-bin/conf {{ DATA_VOLUME }}/
- name: generate flink config on the docker Host machine
vars:
ADDRESS: "i-{{ inventory_hostname }}"
CPUS: "{{ ansible_processor_vcpus }}"
template:
src: conf/flink-conf.jinja2
dest: "{{ DATA_VOLUME }}/conf/flink-conf.yaml"
mode: '0644'
- name: Run Flink JobManager container
docker_container:
recreate: yes
pull: true
name: flink-job-manager
image: "{{ IMAGE }}"
network_mode: host
state: started
restart: yes
privileged: yes
log_driver: json-file
log_options:
max-size: 100m
max-file: 1
ulimits:
- nofile:250000:250000
volumes:
- "{{ DATA_VOLUME }}/:/flink/:rw"
- "{{ DATA_VOLUME }}/conf:/opt/flink/conf:ro"
entrypoint: "/opt/flink/bin/jobmanager.sh start-foreground i-{{ inventory_hostname }} 8081"
- name: Run Task Manager
docker_container:
recreate: yes
pull: true
name: flink-task-manager
image: "{{ IMAGE }}"
network_mode: host
state: started
restart: yes
privileged: yes
log_driver: json-file
log_options:
max-size: 100m
max-file: 1
ulimits:
- nofile:250000:250000
volumes:
- "{{ DATA_VOLUME }}/:/flink/:rw"
- "{{ DATA_VOLUME }}/conf:/opt/flink/conf:ro"
entrypoint: "/opt/flink/bin/taskmanager.sh start-foreground"
So what do we have so far ?
Each push to the repository will build a docker image with "-dev" tag. To update the production image, manually run the "release" stage.
It will tag the "-dev" as the release version tag and deploy it using AWX The "conf" directory is pulled (at deployment stage) from the Flink official source code at Github (public) It stays on the host os.
The flink-conf.yaml
file is generated via the flink-conf.jinja2
file.
Ansible will update the "{{ ADDRESS }}" variable with the internal hostname of the node.
It will also update the taskmanager.numberOfTaskSlots
according to the available CPUS on the host.
(gathered from ansible facts)
- name: generate flink config on the docker Host machine
vars:
ADDRESS: "i-{{ inventory_hostname }}"
template:
src: conf/flink-conf.jinja2
dest: "{{ DATA_VOLUME }}/conf/flink-conf.yaml"
mode: '0644'
Then the "conf" directory is mounted to the container as volume with Read Only permissions 🔐 The deploy stage also calls AWX job and will run the Flink cluster playbook on all nodes that are in and Ansible Inventory.
- name: Run Flink JobManager container
docker_container:
recreate: yes
pull: true
name: flink-job-manager
image: "{{ IMAGE }}"
network_mode: host
state: started
restart: yes
privileged: yes
log_driver: json-file
log_options:
max-size: 100m
max-file: 1
ulimits:
- nofile:250000:250000
volumes:
- "{{ DATA_VOLUME }}/:/etl/flink/:rw"
- "{{ DATA_VOLUME }}/conf:/opt/flink/conf:ro"
entrypoint: "/opt/flink/bin/jobmanager.sh start-foreground i-{{ inventory_hostname }} 8081"
The job Manager is configured to register at the Zookeeper Cluster
The list of the zookeeper hosts can be pulled at runtime from the ansible group_vars of the inventory hosts.