Flink集群架构模式为经典的主从架构,与大部分主从架构一样,也存在单点故障问题。由于JobManager是整个Flink集群的管理节点,负责整个集群的任务调度和资源管理,默认情况下每个Flink集群只有一个JobManager实例,如果JobManager崩溃,则无法提交任何新应用程序,并且正在运行的应用程序也会失败。解决Flink集群单点故障的方式可以配置集群HA(High Availability,高可用性)。目前,Flink支持Flink Standalone和Flink On YARN两种模式配置集群HA。
在Flink Standalone模式下,实现HA的方式可以利用ZooKeeper在所有正在运行的JobManager实例之间进行分布式协调,实现多个JobManager无缝切换,类似于HDFS的NameNode HA或YARN的ResourceManager HA。Flink Standalone模式的HA架构如图3-18所示。
图3-18 Flink Standalone模式的HA架构
可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制保证同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager,整个恢复过程大约在1分钟之内,如图3-19所示。
图3-19 JobManager故障恢复过程
此外,活动状态的JobManager在工作时会将其元数据(JobGraph、应用程序JAR文件等)写入一个远程持久化存储系统(例如HDFS)中,还会将元数据存储的位置和路径信息写入ZooKeeper存储,以便能够进行故障恢复,如图3-20所示。
图3-20 JobManager元数据存储
当活动状态的JobManager出现故障时,备用状态的JobManager会向ZooKeeper请求元数据存储位置,然后从HDFS中获取JobGraph、应用程序JAR文件、相应状态信息等元数据,以便接管发生故障的JobManager。
下面仍然在3个节点(centos01、centos02、centos03)上进行Flink Standalone模式的HA集群搭建,在前面已经搭建好的Flink Standalone集群上进行操作,集群角色分配如表3-5所示。
表3-5 Flink Standalone HA集群角色分配
搭建步骤如下。
1.修改masters文件
Flink的masters文件用于配置所有需要启动的JobManager节点以及每个JobManager的WebUI绑定的端口。
进入centos01节点的Flink安装主目录,修改conf/masters文件,内容如下:
centos01:8081 centos02:8081
上述配置表示在集群centos01和centos02节点上启动JobManager,并且每个JobManager的WebUI访问端口分别为8081。
2.修改flink-conf.yaml文件
进入centos01节点的Flink安装主目录,修改conf/flink-conf.yaml文件,添加以下内容:
3.修改zoo.cfg文件
Flink内置了ZooKeeper服务和相关脚本文件,如果你的集群中没有安装ZooKeeper,则可以通过修改zoo.cfg文件配置Flink内置的ZooKeeper。生产环境建议使用独立的外部ZooKeeper。
进入centos01节点的Flink安装主目录,修改conf/zoo.cfg文件,添加以下内容,配置ZooKeeper启动节点与选举相关端口:
server.1=centos01:2888:3888 server.2=centos02:2888:3888 server.3=centos03:2888:3888
上述配置表示在centos01、centos02和centos03节点上启动ZooKeeper服务,其中1、2、3表示每个ZooKeeper服务器的唯一ID。
zoo.cfg文件中的其他配置属性保持默认即可。
4.复制Flink安装文件到其他节点
将centos01节点配置好的Flink安装文件复制到centos02、centos03节点:
$ scp -r flink-1.13.0/ centos02:/opt/modules/ $ scp -r flink-1.13.0/ centos03:/opt/modules/
5.启动HDFS集群
由于在flink-conf.yaml文件中配置了用于持久化JobManager元数据的HDFS地址,因此需要启动HDFS集群。
6.启动ZooKeeper集群
如果使用Flink内置的ZooKeeper,在centos01节点执行以下命令,即可启动整个ZooKeeper集群:
$ bin/start-zookeeper-quorum.sh
启动过程如图3-21所示。
图3-21 ZooKeeper集群的启动过程
启动成功后,在每个Flink节点上都会产生一个名为FlinkZooKeeperQuorumPeer的进程,该进程是ZooKeeper服务的守护进程。
如果使用独立外部ZooKeeper,在每个ZooKeeper节点上执行以下命令启动ZooKeeper集群(需要提前安装配置好ZooKeeper):
$ bin/zkServer.sh start
本例使用Flink内置的ZooKeeper,关于ZooKeeper,此处不做详细讲解。
7.启动Flink Standalone HA集群
在centos01节点上执行以下命令,启动Flink Standalone HA集群:
$ bin/start-cluster.sh
启动过程如图3-22所示。此时使用jps命令查看每个节点的JVM进程,如图3-23~图3-25所示。
图3-22 Flink Standalone HA集群的启动过程
图3-23 centos01节点的JVM进程
图3-24 centos02节点的JVM进程
图3-25 centos03节点的JVM进程
可以发现,centos01节点和centos02节点都产生了一个JobManager进程,即StandaloneSessionClusterEntrypoint;而centos02节点和centos03节点都产生了一个TaskManager进程,即TaskManagerRunner。
此时分别访问以下网址:
http://centos01:8081/ http://centos02:8081/
两个页面都可以查看Flink集群的WebUI,如图3-26和图3-27所示。
图3-26 在centos01节点查看Flink集群的WebUI
图3-27 在centos02节点查看Flink集群的WebUI
8.测试Flink Standalone HA
在centos01节点上执行以下命令,向集群提交Flink自带的单词计数程序(默认使用Flink单词计数程序中自带的单词数据,计数结果将打印到控制台):
$ bin/flink run ./examples/batch/WordCount.jar
若控制台打印出单词计数结果,则说明提交成功。
此时使用kill -9命令杀掉centos01节点的JobManager进程,然后刷新两个节点的WebUI,发现centos01节点的WebUI已不可访问;而centos02节点的WebUI仍可成功访问(若centos02节点的JobManager是Standby状态,会有一个切换为Active的时间间隔,切换完毕后即可成功访问centos02节点的WebUI)。
接下来在centos01节点上再次提交Flink自带的单词计数程序,发现仍然能够提交成功。
9.停止Flink Standalone HA集群
若要停止Flink Standalone HA集群,在centos01节点上首先执行以下命令停止整个Flink集群:
$ bin/stop-cluster.sh
然后执行以下命令,停止ZooKeeper集群:
$ bin/stop-zookeeper-quorum.sh
在Flink On YARN模式下,HA集群的实现主要依赖YARN的任务恢复机制,因为YARN本身对运行在YARN上的应用程序具有一定的容错保证。同时,仍然需要使用ZooKeeper,Flink在任务恢复时需要使用HDFS中存储的JobManager元数据(JobGraph、应用程序JAR文件等),而ZooKeeper存储元数据在HDFS中的位置路径信息。从2.1.3节的Flink On YARN运行架构中可以看出,Flink JobManager实际上运行在ApplicationMaster所在的Container(容器)中。
对于Flink On YARN模式下的HA集群,YARN只能运行一个JobManager实例,而不能运行多个。当Flink YARN Session集群中的JobManager出现故障时,YARN将对ApplicationMaster进行重启来恢复JobManager,通过这种方式达到HA的目的。
Flink On YARN模式下HA的搭建步骤如下。
1.修改yarn-site.xml文件
配置YARN集群的yarn-site.xml文件中的yarn.resourcemanager.am.max-attempts属性,该属性的含义是允许ApplicationMaster的最大启动上限,即最大启动次数(包含初始启动),默认值为2(表示可以容忍一次JobManager故障)。例如在yarn-site.xml文件中添加以下内容:
2.修改flink-conf.yaml文件
除了3.2.2节的Flink Standalone模式下flink-conf.yaml文件的HA配置内容外,还必须在flink-conf.yaml文件中配置JobManager最大启动次数。例如配置最大启动次数为10(包含初始启动),内容如下:
yarn.application-attempts: 10
上述配置表示允许JobManager能够启动的次数为10(9次重试+1次初始尝试)。如果YARN的其他操作需要进行重启,例如节点硬件故障、与NodeManager重新同步等,则这些重启不计入yarn.application-attempts设置的次数。
需要注意的是,YARN中设置的yarn.resourcemanager.am.max-attempts是整个应用程序重新启动的上限,Flink中设置的yarn.application-attempts不能超过YARN中的设置,二者的关系如下:
yarn.application-attempts<=yarn.resourcemanager.am.max-attempts
flink-conf.yaml文件中需要添加的完整内容如下:
上述配置与Flink Standalone模式下flink-conf.yaml文件的HA配置内容相比,去掉了用于配置集群唯一标识的high-availability.cluster-id属性,原因是在Flink On YARN模式下,如果不配置该属性,会默认使用YARN的applicationId,从而保证全局唯一性;否则在提交作业时就要手动保证。
3.修改zoo.cfg文件
ZooKeeper的配置见3.2.2节的Flink Standalone模式HA集群的搭建。
4.启动集群
1)启动HDFS集群。
2)启动YARN集群。
3)启动ZooKeeper集群。
ZooKeeper集群的启动见3.2.2节的Flink Standalone模式HA集群的搭建。
至此,Flink On YARN模式HA集群搭建完毕。