上一篇博文介绍了如何独立部署一个高可用的Flink集群,本篇介绍如何用Native k8s去部署高可用的Flink 集群。本篇介绍的集群构建在AWS上,和构建在自己的服务器相比,主要区别在文件系统的选择和使用上。我选用的S3服务。
EC2操作系统:centos7
本机操作系统:Mac
flink version: 1.14
jdk version: java8
HA service: k8s
File System: S3
启动EC2 在AWS上启动3个EC2,操作系统选centos7,注意每个EC2要关联弹性ip地址,允许外网访问。 可参考https://wiki.centos.org/Cloud/AWS
部署k8s集群 安装docker 1 2 3 4 5 sudo yum-config-manager --add-repo https:// download.docker.com/linux/ centos/docker-ce.repo sudo yum install docker -y sudo systemctl start docker.service sudo systemctl enable docker.service sudo systemctl status docker
安装k8s 1 2 3 4 5 6 7 8 9 10 11 $ sudo vi /etc/yum .repos.d/kubernetes.repo [kubernetes] name=Kubernetes baseurl=https:// packages.cloud.google.com/yum/ repos/kubernetes-el7-x86_64 enabled=1 gpgcheck=1 repo_gpgcheck=0 gpgkey=https:// packages.cloud.google.com/yum/ doc/yum-key.gpg https:/ /packages.cloud.google.com/yum /doc/ rpm-package-key.gpg $ sudo yum install -y kubelet $ sudo yum install -y kubeadm $ sudo systemctl enable kubelet
Linux环境准备
set hostnames
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 $ sudo hostnamectl set-hostname master-node $ sudo hostnamectl set-hostname worker-node1 $ sudo hostnamectl set-hostname worker-node2 $ sudo vi /etc/hosts192.168 .100 .63 master-node192.168 .100 .36 node1 worker-node1192.168 .100 .125 node2 worker-node2
关闭SElinux1 2 3 4 5 $ sudo setenforce 0 $ sudo sed -i --follow-symlinks 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/sysconfig/selinux $ sudo reboot $ sestatus SELinux status: disabled
update iptables config
1 2 3 4 $ sudo vi /etc/sysctl.d/k8s.conf net.bridge.bridge-nf-call-ip6tables = 1 net.bridge.bridge-nf-call-iptables = 1 $ sudo sysctl --system
disable swap(3)
1 2 $ sudo sed -i '/swap/d' /etc/fstab $ sudo swapoff -a
部署k8s集群 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 $ sudo kubeadm init kubeadm join 192.168 .100 .63 :6443 --token snmpyy.b4y506h6hr9u7fxh \ --discovery-token-ca-cert-hash sha256:87a099765ce369c519bc02af84a6d4732b1cb987d3e95277b334e3cfc3aa0960 $ mkdir -p $HOME/.kube $ sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config $ sudo chown $(id -u):$(id -g) $HOME/.kube/config [ec2-user@master -node kubernetes]$ kubectl get nodes NAME STATUS ROLES AGE VERSION master-node NotReady control-plane 84m v1.24 .2 $ export kubever=$(kubectl version | base64 | tr -d '\n' ) $ kubectl apply -f https: $ kubeadm join 192.168 .100 .63 :6443 --token snmpyy.b4y506h6hr9u7fxh \ --discovery-token-ca-cert-hash sha256:87a099765ce369c519bc02af84a6d4732b1cb987d3e95277b334e3cfc3aa0960 $ kubectl label node worker-node1 node-role.kubernetes.io/worker=worker $ kubectl label node worker-node2 node-role.kubernetes.io/worker=worker
Session Mode 部署Flink集群 安装Java8 1 2 3 4 $ sudo yum search java|grep jdk $ sudo yum install -y java-1.8.0-openjdk $ sudo yum install java-1.8.0-openjdk-devel -y $ java -version
下载解压Flink安装包 1 2 3 $ cd /opt$ wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz --no-check-certificate $ tar -xzf flink-*.tgz
启动集群(无高可用服务版本) Native k8s是把k8s植入到了Flink安装包中,直接使用Flink的命令就可以在k8s集群中启动flink组件相关的pod。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 //service account with RBAC permissions to create , delete pods $ kubectl create namespace flink-cluster $ kubectl create serviceaccount flink -n flink-cluster $ kubectl create clusterrolebinding flink-role -binding-flink \ //启动session 集群1 (此时读取的配置为master节点的配置文件,与taskmanager节点的配置文件无关) $ ./bin/kubernetes-session .sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster -id=my-session -Dtaskmanager.numberOfTaskSlots=6 -Dkubernetes.rest-service.exposed.type =NodePort2022 -07 -01 08 :46 :49 ,621 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster my-session successfully, JobManager Web Interface: http://192.168 .100 .63 :32172 // Dashboard: http://3.101 .77 .141 :32172 /,此时没有任何资源\ //只部署了jobmanager,部署在worker-node1,两个服务,一个对内,一个对外 [root@master-node ec2-user ]# kubectl get pods,svc,ep -n flink-cluster -o wideNAME TYPE CLUSTER -IP EXTERNAL -IP PORT(S) AGE SELECTOR service/my-session ClusterIP None <none > 6123 /TCP,6124 /TCP 124 m app=my-session ,component=jobmanager,type =flink-native-kubernetes service/my-session -rest NodePort 10.109 .225 .42 <none > 8081 :31595 /TCP 124 m app=my-session ,component=jobmanager,type =flink-native-kubernetesNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES pod/my-session -556 f44f44b-94 gfk 1 /1 Running 0 124 m 10.44 .0 .2 worker-node1 <none > <none >NAME ENDPOINTS AGE endpoints/my-session 10.44 .0 .2 :6124 ,10.44 .0 .2 :6123 124 m endpoints/my-session -rest 10.44 .0 .2 :8081 124 m [root@master-node ec2-user ]# kubectl get deployment -o wide -n flink-cluster NAME READY UP-TO -DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR my-session 1 /1 1 1 125 m flink-main-container apache/flink:1.14 .5 -scala_2.12 app=my-session ,component=jobmanager,type =flink-native-kubernetes
可以通过命令行提交,也可以通过dashboard提交任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 // run之后才会显示资源 $ ./bin/flink run \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.cluster -id=my-session \ ./examples/streaming/TopSpeedWindowing.jar $ ./bin/flink run //自动扩容功能测试 //一个taskmanager的任务槽使用完之前,cluster 只有一个task managerNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES my-session -556 f44f44b-zdvrf 1 /1 Running 0 21 m 10.36 .0 .1 worker-node2 <none > <none > my-session -taskmanager-1 -1 1 /1 Running 0 20 m 10.44 .0 .1 worker-node1 <none > <none > //继续提交作业,cluster 会自动扩容NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES my-session -556 f44f44b-zdvrf 1 /1 Running 0 21 m 10.36 .0 .1 worker-node2 <none > <none > my-session -taskmanager-1 -1 1 /1 Running 0 20 m 10.44 .0 .1 worker-node1 <none > <none > my-session -taskmanager-1 -2 1 /1 Running 0 34 s 10.36 .0 .2 worker-node2 <none > <none > //当两个taskmanager的资源都用完了 [root@master-node flink-1.14 .5 ]kubectl get pods -n flink-cluster -o wideNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES my-session -58 bb97cdc-85 ssq 1 /1 Running 0 17 m 10.36 .0 .1 worker-node2 <none > <none > my-session -taskmanager-1 -1 1 /1 Running 0 8 m26s 10.44 .0 .2 worker-node1 <none > <none > my-session -taskmanager-1 -2 1 /1 Running 0 3 m11s 10.36 .0 .3 worker-node2 <none > <none > my-session -taskmanager-1 -3 0 /1 Pending 0 33 s <none > <none > <none > <none > //清理资源 $ kubectl delete deployment/my-session -n flink-cluster $ kubectl delete clusterrolebinding flink-role -binding-flink $ kubectl delete serviceaccount flink -n flink-cluster $ kubectl delete namespace flink-cluster
和独立部署相比,采用Native k8s部署有以下几个值得注意的点:
JM 和TM 只能部署在K8s集群中的工作节点上,因此对于三个节点组成的k8s集群而言,Flink集群事实上只部署在其中两个节点上。因此对于小规模集群,我认为采用Native k8s部署有些浪费资源。
尽管机器为8核,但是如果直接将-Dtaskmanager.numberOfTaskSlots参数设为8,集群会启动失败,因为默认每个taskmanager会按照slots的数量来分配CPU。当前两个工作节点要部署一个JM,两个TM,导致JM必须和TM挤在同一个节点上,JM默认占1个CPU核,出了JM和TM外,还有一些系统进程也需要CPU,大概占0.1个CPU,因此我这里将-Dtaskmanager.numberOfTaskSlots设为6,经验证6.9也是ok的。 如果希望设置更大的slots数,可以修改kubernetes.taskmanager.cpu,让CPU数和slots数不再关联。
在独立部署中,不同的TM可以配置不同的内存,不同的slots数量。但在Native k8s部署中,这些参数是所有TM共享的,不可以分开配置。
从上面的操作过程可以看到,采用native k8s启动flink Session集群事实上仅仅启动了jobmanager。而taskmanager是在任务提交之后根据任务需要的资源数来启动的。在K8s集群资源的限制内,需要多少TM就启动多少TM对应的pod。任务结束对应的资源也会释放。资源的分配和调度都交给了k8s,相比手工设置,更加自动化。但是,如果我们不想要flink组件把k8s所有的节点都用完了,想要限定节点的范围改怎么办?下面是一个通过node-selector指定jobmanager的节点示例1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 //启动Session cluster2, 修改配置 $ kubectl label nodes worker-node2 node=master [root@master-node flink-1.14 .5 ]# kubectl get nodes -l "node=master"NAME STATUS ROLES AGE VERSION worker-node2 Ready worker 10 d v1.24 .2 $ ./bin/kubernetes-session .sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster -id=my-session -Dtaskmanager.numberOfTaskSlots=8 -D kubernetes.rest-service.exposed.type =NodePort -Dkubernetes.jobmanager.node-selector=node:master [root@master-node flink-1.14 .5 ]# kubectl get pods,svc,ep -n flink-cluster -o wideNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES pod/my-session -58 bb97cdc-85 ssq 1 /1 Running 0 57 s 10.36 .0 .1 worker-node2 <none > <none >NAME TYPE CLUSTER -IP EXTERNAL -IP PORT(S) AGE SELECTOR service/my-session ClusterIP None <none > 6123 /TCP,6124 /TCP 57 s app=my-session ,component=jobmanager,type =flink-native-kubernetes service/my-session -rest NodePort 10.108 .58 .5 <none > 8081 :31181 /TCP 57 s app=my-session ,component=jobmanager,type =flink-native-kubernetesNAME ENDPOINTS AGE endpoints/my-session 10.36 .0 .1 :6124 ,10.36 .0 .1 :6123 57 s endpoints/my-session -rest 10.36 .0 .1 :8081 57 s
启动集群(含高可用服务版本) 高可用服务的部署需要一个可共享的持久化存储目录,因为部署在AWS上,这里我选择S3。 因此首先要解决的问题是如何让集群可以使用S3
配置s3
首先要赋予EC2 访问S3 bucket的权限,这可以通过添加IAM实现。
Copy the respective JAR file(flink-s3-fs-hadoop-1.14.4.jar) from the opt directory to the plugins directory of your Flink distribution before starting Flink. 直接cp即可
启动集群的时候enable内置的插件就可以使用s3了1 2 3 ./bin/kubernetes-session.sh -Dcontainerized.master .env .ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14 .4 .jar \ -Dcontainerized.taskmanager .env .ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14 .4 .jar
Native k8s部署下使用s3的坑较多,各种报错。主要有两种:
没有s3访问的权限。检查IAM的配置,检查s3的访问控制。
找不到对应的类。这主要是插件没有使用对导致的。
启动高可用集群 1 ./bin/kubernetes-session.sh -Dkubernetes.namespace =flink-cluster -Dkubernetes.jobmanager.service-account =flink -Dkubernetes.cluster-id =my-session -Dtaskmanager.numberOfTaskSlots =6 -Dkubernetes.rest-service.exposed.type =NodePort -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS =flink-s3-fs-hadoop-1.14.5.jar -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS =flink-s3-fs-hadoop-1.14.5.jar -Dhigh-availability =org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir =s3a://yunzpeng-bucket/flink-ha
总结:Native k8s部署下session模式集群启动和作业提交过程
第一个阶段:启动 Session Cluster。Flink Client 内置了 K8s Client,告诉 K8s Master 创建 Flink Master Deployment,ConfigMap,SVC。创建完成后,Master 就拉起来了。这时,Session 就部署完成了,并没有维护任何 TaskManager。
第二个阶段:当用户提交 Job 时,可以通过 Flink Client 或者 Dashboard 的方式,然后通过 Service 到 Dispatcher,Dispatcher 会产生一个 JobMaster。JobMaster 会向 K8sResourceManager 申请资源。ResourceManager 会发现现在没有任何可用的资源,它就会继续向 K8s 的 Master 去请求资源,请求资源之后将其发送回去,起新的 Taskmanager。Taskmanager 起来之后,再注册回来,此时的 ResourceManager 再向它去申请 slot 提供给 JobMaster,最后由 JobMaster 将相应的 Task 部署到 TaskManager 上。这样整个从 Session 的拉起到用户提交都完成了。
Applicatiob Mode部署Flink 集群 无高可用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 // 在root下操作// 1 . build a docker image with the flink job FROM flink:1.14 .5 RUN mkdir -p $FLINK_HOME /usrlib COPY ./examples/ streaming/TopSpeedWindowing.jar $FLINK_HOME/u srlib/my-flink-job.jar $ docker build -t pandafish1996/flink-demo .// 2 . Push image to image warehouse $ docker login --username=pandafish1996// 规范: docker push 注册用户名/镜像名// $ docker tag yunzpeng/flink-word-count pandafish1996/ flink-word-count $ docker push pandafish1996/flink-demo // 3 . start a flink application cluster $ kubectl create namespace flink-cluster $ kubectl create serviceaccount flink -n flink-cluster $ kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole=edit \ --serviceaccount=flink-cluster:flink// without HA $ ./bin/ flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=pandafish1996/flink-demo \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.service-account=flink \ -Dparallelism.default=2 \ -Dtaskmanager.numberOfTaskSlots=6 \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:// /opt/ flink/usrlib/my -flink-job.jar $ kubectl delete deployment/my-first-application-cluster -n flink-cluster
高可用版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 // 1 . 构造image并上传到远程仓库 FROM flink:1.14 .5 RUN mkdir -p $FLINK_HOME /usrlib COPY ./examples/ streaming/TopSpeedWindowing.jar $FLINK_HOME/u srlib/my-flink-job.jar RUN mkdir -p $FLINK_HOME /plugins/ flink-s3-fs-hadoop COPY ./opt/ flink-s3-fs-hadoop-1.14 .5 .jar $FLINK_HOME /plugins/ flink-s3-fs-hadoop/ $ docker build -t pandafish1996/flink-hademo . $ docker login --username=pandafish1996// 规范: docker push 注册用户名/镜像名// $ docker tag yunzpeng/flink-word-count pandafish1996/ flink-word-count $ docker push pandafish1996/flink-hademo // 2 . 启动集群 ./bin/ flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=ha-cluster1 \ -Dkubernetes.container.image=pandafish1996/flink-hademo \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.service-account=flink \ -Dparallelism.default=2 \ -Dtaskmanager.numberOfTaskSlots=6 \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ -Dhigh-availability.storageDir=s3://yu nzpeng-bucket/flink-ha \ -Drestart-strategy=fixed-delay \ -Drestart-strategy.fixed-delay.attempts=10 \ -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14 .5 .jar \ -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14 .5 .jar \ local:// /opt/ flink/usrlib/my -flink-job.jar
参考