从0开始部署一个Flink集群:实践篇(Native k8s部署)

上一篇博文介绍了如何独立部署一个高可用的Flink集群,本篇介绍如何用Native k8s去部署高可用的Flink 集群。本篇介绍的集群构建在AWS上,和构建在自己的服务器相比,主要区别在文件系统的选择和使用上。我选用的S3服务。

  • EC2操作系统:centos7
  • 本机操作系统:Mac
  • flink version: 1.14
  • jdk version: java8
  • HA service: k8s
  • File System: S3

启动EC2

在AWS上启动3个EC2,操作系统选centos7,注意每个EC2要关联弹性ip地址,允许外网访问。
可参考https://wiki.centos.org/Cloud/AWS

部署k8s集群

安装docker

1
2
3
4
5
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo yum install docker -y
sudo systemctl start docker.service
sudo systemctl enable docker.service
sudo systemctl status docker

安装k8s

1
2
3
4
5
6
7
8
9
10
11
$ sudo vi /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=1
repo_gpgcheck=0
gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg
$ sudo yum install -y kubelet
$ sudo yum install -y kubeadm
$ sudo systemctl enable kubelet

Linux环境准备

  1. set hostnames
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// On your **Master** node, update your hostname using the following command:

//Master:192.168.100.90 (3.101.77.138)

$ sudo hostnamectl set-hostname master-node

//worker-node1:192.168.100.29 (3.101.77.139)

$ sudo hostnamectl set-hostname worker-node1

// worker-node2: 192.168.100.21 (3.101.77.140)

$ sudo hostnamectl set-hostname worker-node2

// Make a host entry or DNS record to resolve the hostname for all nodes:

$ sudo vi /etc/hosts

// With the entry:

192.168.100.63 master-node

192.168.100.36 node1 worker-node1

192.168.100.125 node2 worker-node2
  1. 关闭SElinux
    1
    2
    3
    4
    5
    $ sudo setenforce 0
    $ sudo sed -i --follow-symlinks 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/sysconfig/selinux
    $ sudo reboot
    $ sestatus
    SELinux status: disabled
  2. update iptables config
1
2
3
4
$ sudo vi /etc/sysctl.d/k8s.conf
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
$ sudo sysctl --system
  1. disable swap(3)
1
2
$ sudo sed -i '/swap/d' /etc/fstab
$ sudo swapoff -a

部署k8s集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// only master node
$ sudo kubeadm init
kubeadm join 192.168.100.63:6443 --token snmpyy.b4y506h6hr9u7fxh \
--discovery-token-ca-cert-hash sha256:87a099765ce369c519bc02af84a6d4732b1cb987d3e95277b334e3cfc3aa0960
// Create required directories and start managing Kubernetes cluster
$ mkdir -p $HOME/.kube
$ sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
$ sudo chown $(id -u):$(id -g) $HOME/.kube/config
[ec2-user@master-node kubernetes]$ kubectl get nodes
NAME STATUS ROLES AGE VERSION
master-node NotReady control-plane 84m v1.24.2
//Set up Pod network for the Cluster
$ export kubever=$(kubectl version | base64 | tr -d '\n')
$ kubectl apply -f https://cloud.weave.works/k8s/net?k8s-version=$kubever
// add nodes to your cluster
//在两个工作节点操作
$ kubeadm join 192.168.100.63:6443 --token snmpyy.b4y506h6hr9u7fxh \
--discovery-token-ca-cert-hash sha256:87a099765ce369c519bc02af84a6d4732b1cb987d3e95277b334e3cfc3aa0960
//在master节点
$ kubectl label node worker-node1 node-role.kubernetes.io/worker=worker
$ kubectl label node worker-node2 node-role.kubernetes.io/worker=worker

Session Mode 部署Flink集群

安装Java8

1
2
3
4
$ sudo yum search java|grep jdk
$ sudo yum install -y java-1.8.0-openjdk
$ sudo yum install java-1.8.0-openjdk-devel -y
$ java -version

下载解压Flink安装包

1
2
3
$ cd /opt
$ wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz --no-check-certificate
$ tar -xzf flink-*.tgz

启动集群(无高可用服务版本)

Native k8s是把k8s植入到了Flink安装包中,直接使用Flink的命令就可以在k8s集群中启动flink组件相关的pod。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//service account with RBAC permissions to create, delete pods
$ kubectl create namespace flink-cluster
$ kubectl create serviceaccount flink -n flink-cluster
$ kubectl create clusterrolebinding flink-role-binding-flink \
--clusterrole=edit \
--serviceaccount=flink-cluster:flink
//启动session集群1(此时读取的配置为master节点的配置文件,与taskmanager节点的配置文件无关)
$ ./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.numberOfTaskSlots=6 -Dkubernetes.rest-service.exposed.type=NodePort
2022-07-01 08:46:49,621 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster my-session successfully, JobManager Web Interface: http://192.168.100.63:32172
// Dashboard: http://3.101.77.141:32172/,此时没有任何资源\

//只部署了jobmanager,部署在worker-node1,两个服务,一个对内,一个对外
[root@master-node ec2-user]# kubectl get pods,svc,ep -n flink-cluster -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
service/my-session ClusterIP None <none> 6123/TCP,6124/TCP 124m app=my-session,component=jobmanager,type=flink-native-kubernetes
service/my-session-rest NodePort 10.109.225.42 <none> 8081:31595/TCP 124m app=my-session,component=jobmanager,type=flink-native-kubernetes

NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
pod/my-session-556f44f44b-94gfk 1/1 Running 0 124m 10.44.0.2 worker-node1 <none> <none>

NAME ENDPOINTS AGE
endpoints/my-session 10.44.0.2:6124,10.44.0.2:6123 124m
endpoints/my-session-rest 10.44.0.2:8081 124m
[root@master-node ec2-user]# kubectl get deployment -o wide -n flink-cluster
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
my-session 1/1 1 1 125m flink-main-container apache/flink:1.14.5-scala_2.12 app=my-session,component=jobmanager,type=flink-native-kubernetes

可以通过命令行提交,也可以通过dashboard提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// run之后才会显示资源
$ ./bin/flink run \
--target kubernetes-session \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.cluster-id=my-session \
./examples/streaming/TopSpeedWindowing.jar
$ ./bin/flink run --target kubernetes-session -Dkubernetes.namespace=flink-cluster -Dkubernetes.cluster-id=my-session -Dparallelism.default=2 ./examples/streaming/TopSpeedWindowing.jar
//自动扩容功能测试
//一个taskmanager的任务槽使用完之前,cluster只有一个task manager
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
my-session-556f44f44b-zdvrf 1/1 Running 0 21m 10.36.0.1 worker-node2 <none> <none>
my-session-taskmanager-1-1 1/1 Running 0 20m 10.44.0.1 worker-node1 <none> <none>
//继续提交作业,cluster会自动扩容
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
my-session-556f44f44b-zdvrf 1/1 Running 0 21m 10.36.0.1 worker-node2 <none> <none>
my-session-taskmanager-1-1 1/1 Running 0 20m 10.44.0.1 worker-node1 <none> <none>
my-session-taskmanager-1-2 1/1 Running 0 34s 10.36.0.2 worker-node2 <none> <none>
//当两个taskmanager的资源都用完了
[root@master-node flink-1.14.5]kubectl get pods -n flink-cluster -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
my-session-58bb97cdc-85ssq 1/1 Running 0 17m 10.36.0.1 worker-node2 <none> <none>
my-session-taskmanager-1-1 1/1 Running 0 8m26s 10.44.0.2 worker-node1 <none> <none>
my-session-taskmanager-1-2 1/1 Running 0 3m11s 10.36.0.3 worker-node2 <none> <none>
my-session-taskmanager-1-3 0/1 Pending 0 33s <none> <none> <none> <none>
//清理资源
$ kubectl delete deployment/my-session -n flink-cluster
$ kubectl delete clusterrolebinding flink-role-binding-flink
$ kubectl delete serviceaccount flink -n flink-cluster
$ kubectl delete namespace flink-cluster

和独立部署相比,采用Native k8s部署有以下几个值得注意的点:

  1. JM 和TM 只能部署在K8s集群中的工作节点上,因此对于三个节点组成的k8s集群而言,Flink集群事实上只部署在其中两个节点上。因此对于小规模集群,我认为采用Native k8s部署有些浪费资源。
  2. 尽管机器为8核,但是如果直接将-Dtaskmanager.numberOfTaskSlots参数设为8,集群会启动失败,因为默认每个taskmanager会按照slots的数量来分配CPU。当前两个工作节点要部署一个JM,两个TM,导致JM必须和TM挤在同一个节点上,JM默认占1个CPU核,出了JM和TM外,还有一些系统进程也需要CPU,大概占0.1个CPU,因此我这里将-Dtaskmanager.numberOfTaskSlots设为6,经验证6.9也是ok的。 如果希望设置更大的slots数,可以修改kubernetes.taskmanager.cpu,让CPU数和slots数不再关联。
  3. 在独立部署中,不同的TM可以配置不同的内存,不同的slots数量。但在Native k8s部署中,这些参数是所有TM共享的,不可以分开配置。
  4. 从上面的操作过程可以看到,采用native k8s启动flink Session集群事实上仅仅启动了jobmanager。而taskmanager是在任务提交之后根据任务需要的资源数来启动的。在K8s集群资源的限制内,需要多少TM就启动多少TM对应的pod。任务结束对应的资源也会释放。资源的分配和调度都交给了k8s,相比手工设置,更加自动化。但是,如果我们不想要flink组件把k8s所有的节点都用完了,想要限定节点的范围改怎么办?下面是一个通过node-selector指定jobmanager的节点示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    //启动Session cluster2, 修改配置
    $ kubectl label nodes worker-node2 node=master
    [root@master-node flink-1.14.5]# kubectl get nodes -l "node=master"
    NAME STATUS ROLES AGE VERSION
    worker-node2 Ready worker 10d v1.24.2
    $ ./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.numberOfTaskSlots=8 -D
    kubernetes.rest-service.exposed.type=NodePort -Dkubernetes.jobmanager.node-selector=node:master
    [root@master-node flink-1.14.5]# kubectl get pods,svc,ep -n flink-cluster -o wide
    NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
    pod/my-session-58bb97cdc-85ssq 1/1 Running 0 57s 10.36.0.1 worker-node2 <none> <none>

    NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
    service/my-session ClusterIP None <none> 6123/TCP,6124/TCP 57s app=my-session,component=jobmanager,type=flink-native-kubernetes
    service/my-session-rest NodePort 10.108.58.5 <none> 8081:31181/TCP 57s app=my-session,component=jobmanager,type=flink-native-kubernetes

    NAME ENDPOINTS AGE
    endpoints/my-session 10.36.0.1:6124,10.36.0.1:6123 57s
    endpoints/my-session-rest 10.36.0.1:8081 57s

启动集群(含高可用服务版本)

高可用服务的部署需要一个可共享的持久化存储目录,因为部署在AWS上,这里我选择S3。
因此首先要解决的问题是如何让集群可以使用S3

配置s3

  1. 首先要赋予EC2 访问S3 bucket的权限,这可以通过添加IAM实现。
  2. Copy the respective JAR file(flink-s3-fs-hadoop-1.14.4.jar) from the opt directory to the plugins directory of your Flink distribution before starting Flink. 直接cp即可
  3. 启动集群的时候enable内置的插件就可以使用s3了
    1
    2
    3
    ./bin/kubernetes-session.sh
    -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.4.jar \
    -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.4.jar
    Native k8s部署下使用s3的坑较多,各种报错。主要有两种:
  4. 没有s3访问的权限。检查IAM的配置,检查s3的访问控制。
  5. 找不到对应的类。这主要是插件没有使用对导致的。

启动高可用集群

1
./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.numberOfTaskSlots=6  -Dkubernetes.rest-service.exposed.type=NodePort -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3a://yunzpeng-bucket/flink-ha 

总结:Native k8s部署下session模式集群启动和作业提交过程

  • 第一个阶段:启动 Session Cluster。Flink Client 内置了 K8s Client,告诉 K8s Master 创建 Flink Master Deployment,ConfigMap,SVC。创建完成后,Master 就拉起来了。这时,Session 就部署完成了,并没有维护任何 TaskManager。
  • 第二个阶段:当用户提交 Job 时,可以通过 Flink Client 或者 Dashboard 的方式,然后通过 Service 到 Dispatcher,Dispatcher 会产生一个 JobMaster。JobMaster 会向 K8sResourceManager 申请资源。ResourceManager 会发现现在没有任何可用的资源,它就会继续向 K8s 的 Master 去请求资源,请求资源之后将其发送回去,起新的 Taskmanager。Taskmanager 起来之后,再注册回来,此时的 ResourceManager 再向它去申请 slot 提供给 JobMaster,最后由 JobMaster 将相应的 Task 部署到 TaskManager 上。这样整个从 Session 的拉起到用户提交都完成了。

Applicatiob Mode部署Flink 集群

无高可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 在root下操作
// 1. build a docker image with the flink job
FROM flink:1.14.5
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/my-flink-job.jar
$ docker build -t pandafish1996/flink-demo .
// 2. Push image to image warehouse
$ docker login --username=pandafish1996
// 规范: docker push 注册用户名/镜像名
//$ docker tag yunzpeng/flink-word-count pandafish1996/flink-word-count
$ docker push pandafish1996/flink-demo

//3. start a flink application cluster
$ kubectl create namespace flink-cluster
$ kubectl create serviceaccount flink -n flink-cluster
$ kubectl create clusterrolebinding flink-role-binding-flink \
--clusterrole=edit \
--serviceaccount=flink-cluster:flink
// without HA
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=pandafish1996/flink-demo \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.service-account=flink \
-Dparallelism.default=2 \
-Dtaskmanager.numberOfTaskSlots=6 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/my-flink-job.jar

$ kubectl delete deployment/my-first-application-cluster -n flink-cluster

高可用版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 1. 构造image并上传到远程仓库
FROM flink:1.14.5
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/my-flink-job.jar
RUN mkdir -p $FLINK_HOME/plugins/flink-s3-fs-hadoop
COPY ./opt/flink-s3-fs-hadoop-1.14.5.jar $FLINK_HOME/plugins/flink-s3-fs-hadoop/
$ docker build -t pandafish1996/flink-hademo .
$ docker login --username=pandafish1996
// 规范: docker push 注册用户名/镜像名
//$ docker tag yunzpeng/flink-word-count pandafish1996/flink-word-count
$ docker push pandafish1996/flink-hademo
// 2. 启动集群
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=ha-cluster1 \
-Dkubernetes.container.image=pandafish1996/flink-hademo \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.service-account=flink \
-Dparallelism.default=2 \
-Dtaskmanager.numberOfTaskSlots=6 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=s3://yunzpeng-bucket/flink-ha \
-Drestart-strategy=fixed-delay \
-Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar \
local:///opt/flink/usrlib/my-flink-job.jar

参考


从0开始部署一个Flink集群:实践篇(Native k8s部署)
https://yunzhen.github.io/2022/12/19/Flink部署k8s篇/
作者
云祯
发布于
2022年12月19日
许可协议