大数据Flink进阶(十六):Flink HA搭建配置

2023-04-10 17:39:38 来源:腾讯云

Flink HA搭建配置

默认情况下,每个Flink集群只有一个JobManager,这将导致单点故障(SPOF,single point of failure),如果这个JobManager挂了,则不能提交新的任务,并且运行中的程序也会失败,这是我们可以对JobManager做高可用(High Availability,简称HA),JobManager HA集群当Active JobManager节点挂掉后可以切换其他Standby JobManager成为主节点,从而避免单点故障。用户可以在Standalone、Flink on Yarn、Flink on K8s集群模式下配置Flink集群HA,Flink on K8s集群模式下的HA将单独在K8s里介绍。

一、Flink基于Standalone HA

Standalone模式下,JobManager的高可用性的基本思想是,任何时候都有一个Alive JobManager和多个Standby JobManager。Standby JobManager可以在Alive JobManager挂掉的情况下接管集群成为Alive JobManager,这样避免了单点故障,一旦某一个Standby JobManager接管集群,程序就可以继续运行。Standby JobManagers和Alive JobManager实例之间没有明确区别,每个JobManager都可以成为Alive或Standby。

1、Standalone HA配置

Standalone集群部署下实现JobManager HA 需要依赖ZooKeeper和HDFS,Zookeeper负责协调JobManger失败后的自动切换,HDFS中存储每个Flink任务的执行流程数据,因此要有一个ZooKeeper集群和Hadoop集群。这里我们选择3台节点作为Flink的JobManger,如下:


【资料图】

节点IP

节点名称

JobManager

TaskManager

192.168.179.4

node1

192.168.179.5

node2

192.168.179.6

node3

以上node1、node2、node3都是JobManager,同时只能有1个JobManager为Active主节点,其他为StandBy备用节点,配置JobManager HA 步骤如下:

所有Flink节点配置hadoop classpath

由于Flink JobManager HA 中需要连接HDFS存储job数据,所以Flink所有节点必须配置hadoop classpath 环境变量,在node1-3所有节点上配置/etc/profile配置环境变量:

#配置/etc/profileexport HADOOP_CLASSPATH=`hadoop classpath`#执行生效source /etc/profile
配置masters文件

需要在所有Flink集群节点上配置$FLINK_HOME/conf/master文件,配置上所有的JobManager节点信息:

#node1,node2,node3节点上配置$FLINK_HOME/conf/master文件node1:8081node2:8081node3:8081
配置flink-conf.yaml

需要在所有Flink集群节点上配置$FLINK_HOME/conf/flink-conf.yaml文件,这里在node1-3节点上配置,配置内容如下:

#要启用高可用,选主协调者为zookeeper,zk存储一些ck记录及选举信息high-availability: zookeeper#storageDir存储恢复JobManager失败所需的所有元数据,如:job dataflow信息high-availability.storageDir: hdfs://mycluster/flink-standalone-ha/#分布式协调器zookeeper集群high-availability.zookeeper.quorum: node3:2181,node4:2181,node5:2181#根ZooKeeper节点,所有集群节点都位于根节点之下。high-availability.zookeeper.path.root: /flink-standalone-ha#给当前集群指定cluster-id,集群所有需要的协调数据都放在该节点下。high-availability.cluster-id: /standalone-cluster

2、启动测试

Standalone HA 配置完成后,按照如下步骤进行测试:

启动Zookeeper,启动HDFS
#在 node3、node4、node5节点启动zookeeper[root@node3 ~]#  zkServer.sh start[root@node4 ~]#  zkServer.sh start[root@node5 ~]#  zkServer.sh start#在node1启动HDFS集群[root@node1 ~]# start-all.sh
启动Flink Standalone HA集群
#在node1 节点启动Flink Standalone HA集群[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./start-cluster.shStarting HA cluster with 3 masters.Starting standalonesession daemon on host node1.Starting standalonesession daemon on host node2.Starting standalonesession daemon on host node3.Starting taskexecutor daemon on host node1.Starting taskexecutor daemon on host node2.Starting taskexecutor daemon on host node3.

启动Standaloe集群时同时会在node2、node3节点上启动JobManager。

访问Flink WebUI

登录Flink WebUI (https://node1:8081/https://node2:8081/https://node3:8081),无论登录node1,node2,node3节点任意一台节点的WebUI 页面都相同:

在WebUI中无法看到哪个节点是Active JobManager,我们也可以通过zookeeper查看当前Active JobManager节点,命令如下:

#登录zookeeper 客户端[root@node5 ~]# zkCli.sh#查看对应节点路径信息[zk: localhost:2181(CONNECTED) 1] get /flink-standalone-ha/standalone-cluster/leader/dispatcher/connection_info ...w42akka.tcp://flink@node1:33274/user/rpc/dispatcher_1srjava.util.UUID...
测试JobManager切换

我们可以在Flink Standalone集群中提交一个Flink 任务,提交之后无论在通过哪个节点的8081WebUI都可以看到此任务。提交任务命令如下:

#在node5节点启动 socket服务[root@node5 ~]# nc -lk 9999#在node4客户端向Standalone集群提交任务[root@node4 ~]# cd /software/flink-1.16.0/bin[root@node4 bin]# ./flink run -m node1:8081 -d -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

通过https://node1:8081、https://node2:8081、https://node3:8081 WebUI都可以看到提交的任务信息:

在HDFS中也可以看到提交的任务信息:

将node1节点上的JobManager进程kill掉,查看Active JobManager是否变化:

#kill node1 JobManager进程[root@node1 bin]# jps...16309 StandaloneSessionClusterEntrypoint...[root@node1 bin]# kill -9 16309

将Active JobManager kill之后访问各个节点的WebUI可以看到短暂的不可用,稍等一会就可以看到正常可以访问除node1之外的其他节点WebUI,通过查询Zookeeper中节点信息,可以看到Active JobManager 节点切换成了其他节点:

#zookeeper查询命令[zk: localhost:2181(CONNECTED) 1] get /flink-standalone-ha/standalone-cluster/leader/dispatcher/connection_info...w42akka.tcp://flink@node2:35581/user/rpc/dispatcher_1srjava.util.UUID...

通过以上测试Flink Standalone HA 生效,如果想要把在node1上kill掉的JobManager启动起来,需要手动执行如下命令:

#在node1启动JobManager[root@node1 bin]# ./jobmanager.sh start

被kill的JobManager重新启动后作为备用的JobManager也可以访问WebUI查看集群中执行的任务。

二、Flink 基于Yarn HA

正常基于Yarn提交Flink程序,无论使用哪种模式提交任务都会启动JobManager角色,JobManager角色是哪个进程可以通过Yarn WebUI查看对应的ApplicationID启动所在节点的对应进程, Yarn Session提交任务模式中该角色进程为"YarnSessionClusterEntrypoint"、Yarn Per-Job提交任务模式中该角色进程为"YarnJobClusterEntrypoint"、Yarn Application提交任务模式中该角色进程为"YarnApplicationClusterEntryPoint"。

当JobManager进程挂掉后,也就是Yarn Application任务失败后默认不会进行任务重试,所以Flink 基于Yarn JobManager HA的本质是当Yarn Application程序失败后重试启动JobManager,实际上就是通过配置Yarn重试次数来实现高可用。JobManager重试过程需要借助zookeeper 协调JobManger失败后的切换,进而进行恢复对应的任务,同时需要HDFS存储每个Flink任务的执行流程数据。

1、Yarn HA配置

Yarn HA配置步骤如下:

Hadoop中所有节点的yarn-site.xml

在所有Hadoop节点上配置$HADOOP_HOME/etc/hadoop/yarn-site.xml文件,配置应用程序失败后最大尝试次数,以下该参数默认值为2,不配置也可以:

#设置提交应用程序的最大尝试次数,建议不低于4,这里重试的是ApplicationMaster  yarn.resourcemanager.am.max-attempts  4
配置flink-conf.yaml文件

只需要在向Yarn提交任务的客户端节点上配置Flink的flink-conf.yaml文件。未来我们在node5节点上来基于Yarn 各种模式提交任务,所以这里我们在node5节点上配置$FLINK_HOME/conf/flink-conf.yaml文件,配置内容如下:

#要启用高可用,选主协调者为zookeeper,zk存储一些ck记录及选举信息high-availability: zookeeper#storageDir存储恢复JobManager失败所需的所有元数据,如:job dataflow信息high-availability.storageDir: hdfs://mycluster/flink-yarn-ha/#分布式协调器zookeeper集群high-availability.zookeeper.quorum: node3:2181,node4:2181,node5:2181#根ZooKeeper节点,所有集群节点都位于根节点之下。high-availability.zookeeper.path.root: /flink-yarn-ha#给当前集群指定cluster-id,集群所有需要的协调数据都放在该节点下。high-availability.cluster-id: /yarn-cluster#该参数同yarn-site.xml中yarn.resourcemanager.am.max-attempts参数,指向yarn提交一个application重试的次数,也可以不设置,非高可用默认为1,高可用默认为2,建议不大于yarn.resourcemanager.am.max-attempts参数,否则会被yarn.resourcemanager.am.max-attempts替换掉。yarn.application-attempts: 4

2、启动测试

启动Zookeeper和HDFS
#在 node3、node4、node5节点启动zookeeper[root@node3 ~]#  zkServer.sh start[root@node4 ~]#  zkServer.sh start[root@node5 ~]#  zkServer.sh start#在node1启动HDFS集群[root@node1 ~]# start-all.sh
node5节点向Yarn提交任务

这里以在node5节点上以Yarn Application模式提交任务为例,命令如下:

#在node5节点启动 socket服务[root@node5 ~]# nc -lk 9999#以Application模式提交任务,命令如下[root@node5 ~]# cd /software/flink-1.16.0/bin/[root@node5 bin]# ./flink run-application -t yarn-application -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上任务提交后可以在Yarn WebUI中看到对应的Application信息:

测试Flink Yarn HA

在Yarn WebUI中进入到FlinkWebUi页面,查看该JobManager启动所在的节点:

进入JobManager所在节点,并kill对应的JobManager进程,模拟JobManager进程意外中断,在Yarn WebUI中可以看到对应的Yarn ApplicationID重试执行,点击该ApplicatID 可以看到该任务重试信息:

通过以上测试,Flink Yarn HA 生效。

标签:

国美与阿里云达成战略合作

8月12日,据新华网报道,国美零售与阿里云签订战略合作协议。双方将基于云计算、物联网、数据技术,在零...

2022-08-15 15:07:54

唐山加快验收时间 为建筑工地开复工保驾护航

唐山市住建局坚持两手抓、两手硬,统筹疫情防控和经济发展,想尽一切办法,简化一切手续,实现一路绿灯...

2022-03-19 15:15:42

消费者“身材焦虑” 减肥市场疯狂吸金

三月不减肥,四月徒伤悲。冬去春来之际,为了能穿上各种漂亮轻薄的服装,变得更瘦成为了许多人的追求,...

2022-03-19 15:14:49

2月份秦皇岛新建商品住宅销售价格环比下降0.2% 同比下降4%

3月16日,国家统计局公布2月份70个大中城市商品住宅销售价格变动情况,河北省入统的石家庄、唐山、秦皇...

2022-03-19 15:13:01

深圳技术进出口全年合同数量共1347项 同比增长2.51%

科技部火炬中心近日公布了2021年度全国技术合同交易数据。截至2021年12月31日,全国共登记技术合同67050...

2022-03-19 15:10:37

邢台柏乡:打造羊肚菌产业示范带 引领村级集体经济发展

我们通过打造食用菌产业高标准试验示范园,不仅盘活了闲置土地,还进一步增加了群众和村集体经济收入。...

2022-03-19 15:09:40

胡金秋32分17板 浙江广厦男篮“双杀”稠州金租

CBA浙江德比二番战昨晚开打,再度狭路相逢的浙江广厦男篮与浙江稠州金租男篮表现难分伯仲,比赛直到最后...

2022-03-19 15:07:37

黑龙江新增本土核酸检测初筛阳性人员5例 均在讷河市

  中新网12月13日电 据黑龙江省卫生健康委员会网站消息,2021年12月12日0-24时,黑龙江省无新增确诊...

2021-12-13 08:35:21

“恋爱盲盒”抽的不是爱情,是急功近利的心

  越是急功近利,越是焦虑,对恋爱关系的处理就越可能出现问题。  “玩法”简单,最低只需花费0 99...

2021-12-13 08:35:20

北京12月12日新增2例境外输入确诊病例

  中新网12月13日电 据北京市卫生健康委员会官方微博消息,北京12月12日新增2例境外输入确诊病例。 ...

2021-12-13 08:35:20
x 广告
x 广告

Copyright ©  2015-2022 亚太经营网版权所有  备案号:沪ICP备2020036824号-11   联系邮箱: 562 66 29@qq.com