购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

3.2 Flink HA模式

Flink集群架构模式为经典的主从架构,与大部分主从架构一样,也存在单点故障问题。由于JobManager是整个Flink集群的管理节点,负责整个集群的任务调度和资源管理,默认情况下每个Flink集群只有一个JobManager实例,如果JobManager崩溃,则无法提交任何新应用程序,并且正在运行的应用程序也会失败。解决Flink集群单点故障的方式可以配置集群HA(High Availability,高可用性)。目前,Flink支持Flink Standalone和Flink On YARN两种模式配置集群HA。

3.2.1 Flink Standalone模式的HA架构

在Flink Standalone模式下,实现HA的方式可以利用ZooKeeper在所有正在运行的JobManager实例之间进行分布式协调,实现多个JobManager无缝切换,类似于HDFS的NameNode HA或YARN的ResourceManager HA。Flink Standalone模式的HA架构如图3-18所示。

图3-18 Flink Standalone模式的HA架构

可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制保证同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager,整个恢复过程大约在1分钟之内,如图3-19所示。

图3-19 JobManager故障恢复过程

此外,活动状态的JobManager在工作时会将其元数据(JobGraph、应用程序JAR文件等)写入一个远程持久化存储系统(例如HDFS)中,还会将元数据存储的位置和路径信息写入ZooKeeper存储,以便能够进行故障恢复,如图3-20所示。

图3-20 JobManager元数据存储

当活动状态的JobManager出现故障时,备用状态的JobManager会向ZooKeeper请求元数据存储位置,然后从HDFS中获取JobGraph、应用程序JAR文件、相应状态信息等元数据,以便接管发生故障的JobManager。

3.2.2 Flink Standalone模式HA集群搭建

下面仍然在3个节点(centos01、centos02、centos03)上进行Flink Standalone模式的HA集群搭建,在前面已经搭建好的Flink Standalone集群上进行操作,集群角色分配如表3-5所示。

表3-5 Flink Standalone HA集群角色分配

搭建步骤如下。

1.修改masters文件

Flink的masters文件用于配置所有需要启动的JobManager节点以及每个JobManager的WebUI绑定的端口。

进入centos01节点的Flink安装主目录,修改conf/masters文件,内容如下:

     centos01:8081
     centos02:8081

上述配置表示在集群centos01和centos02节点上启动JobManager,并且每个JobManager的WebUI访问端口分别为8081。

2.修改flink-conf.yaml文件

进入centos01节点的Flink安装主目录,修改conf/flink-conf.yaml文件,添加以下内容:

3.修改zoo.cfg文件

Flink内置了ZooKeeper服务和相关脚本文件,如果你的集群中没有安装ZooKeeper,则可以通过修改zoo.cfg文件配置Flink内置的ZooKeeper。生产环境建议使用独立的外部ZooKeeper。

进入centos01节点的Flink安装主目录,修改conf/zoo.cfg文件,添加以下内容,配置ZooKeeper启动节点与选举相关端口:

     server.1=centos01:2888:3888
     server.2=centos02:2888:3888
     server.3=centos03:2888:3888

上述配置表示在centos01、centos02和centos03节点上启动ZooKeeper服务,其中1、2、3表示每个ZooKeeper服务器的唯一ID。

zoo.cfg文件中的其他配置属性保持默认即可。

4.复制Flink安装文件到其他节点

将centos01节点配置好的Flink安装文件复制到centos02、centos03节点:

     $ scp -r flink-1.13.0/ centos02:/opt/modules/
     $ scp -r flink-1.13.0/ centos03:/opt/modules/

5.启动HDFS集群

由于在flink-conf.yaml文件中配置了用于持久化JobManager元数据的HDFS地址,因此需要启动HDFS集群。

6.启动ZooKeeper集群

如果使用Flink内置的ZooKeeper,在centos01节点执行以下命令,即可启动整个ZooKeeper集群:

     $ bin/start-zookeeper-quorum.sh

启动过程如图3-21所示。

图3-21 ZooKeeper集群的启动过程

启动成功后,在每个Flink节点上都会产生一个名为FlinkZooKeeperQuorumPeer的进程,该进程是ZooKeeper服务的守护进程。

如果使用独立外部ZooKeeper,在每个ZooKeeper节点上执行以下命令启动ZooKeeper集群(需要提前安装配置好ZooKeeper):

     $ bin/zkServer.sh start

本例使用Flink内置的ZooKeeper,关于ZooKeeper,此处不做详细讲解。

7.启动Flink Standalone HA集群

在centos01节点上执行以下命令,启动Flink Standalone HA集群:

     $ bin/start-cluster.sh

启动过程如图3-22所示。此时使用jps命令查看每个节点的JVM进程,如图3-23~图3-25所示。

图3-22 Flink Standalone HA集群的启动过程

图3-23 centos01节点的JVM进程

图3-24 centos02节点的JVM进程

图3-25 centos03节点的JVM进程

可以发现,centos01节点和centos02节点都产生了一个JobManager进程,即StandaloneSessionClusterEntrypoint;而centos02节点和centos03节点都产生了一个TaskManager进程,即TaskManagerRunner。

此时分别访问以下网址:

     http://centos01:8081/
     http://centos02:8081/

两个页面都可以查看Flink集群的WebUI,如图3-26和图3-27所示。

图3-26 在centos01节点查看Flink集群的WebUI

图3-27 在centos02节点查看Flink集群的WebUI

8.测试Flink Standalone HA

在centos01节点上执行以下命令,向集群提交Flink自带的单词计数程序(默认使用Flink单词计数程序中自带的单词数据,计数结果将打印到控制台):

     $ bin/flink run ./examples/batch/WordCount.jar

若控制台打印出单词计数结果,则说明提交成功。

此时使用kill -9命令杀掉centos01节点的JobManager进程,然后刷新两个节点的WebUI,发现centos01节点的WebUI已不可访问;而centos02节点的WebUI仍可成功访问(若centos02节点的JobManager是Standby状态,会有一个切换为Active的时间间隔,切换完毕后即可成功访问centos02节点的WebUI)。

接下来在centos01节点上再次提交Flink自带的单词计数程序,发现仍然能够提交成功。

9.停止Flink Standalone HA集群

若要停止Flink Standalone HA集群,在centos01节点上首先执行以下命令停止整个Flink集群:

     $ bin/stop-cluster.sh

然后执行以下命令,停止ZooKeeper集群:

     $ bin/stop-zookeeper-quorum.sh

3.2.3 Flink On YARN模式HA集群搭建

在Flink On YARN模式下,HA集群的实现主要依赖YARN的任务恢复机制,因为YARN本身对运行在YARN上的应用程序具有一定的容错保证。同时,仍然需要使用ZooKeeper,Flink在任务恢复时需要使用HDFS中存储的JobManager元数据(JobGraph、应用程序JAR文件等),而ZooKeeper存储元数据在HDFS中的位置路径信息。从2.1.3节的Flink On YARN运行架构中可以看出,Flink JobManager实际上运行在ApplicationMaster所在的Container(容器)中。

对于Flink On YARN模式下的HA集群,YARN只能运行一个JobManager实例,而不能运行多个。当Flink YARN Session集群中的JobManager出现故障时,YARN将对ApplicationMaster进行重启来恢复JobManager,通过这种方式达到HA的目的。

Flink On YARN模式下HA的搭建步骤如下。

1.修改yarn-site.xml文件

配置YARN集群的yarn-site.xml文件中的yarn.resourcemanager.am.max-attempts属性,该属性的含义是允许ApplicationMaster的最大启动上限,即最大启动次数(包含初始启动),默认值为2(表示可以容忍一次JobManager故障)。例如在yarn-site.xml文件中添加以下内容:

2.修改flink-conf.yaml文件

除了3.2.2节的Flink Standalone模式下flink-conf.yaml文件的HA配置内容外,还必须在flink-conf.yaml文件中配置JobManager最大启动次数。例如配置最大启动次数为10(包含初始启动),内容如下:

     yarn.application-attempts: 10

上述配置表示允许JobManager能够启动的次数为10(9次重试+1次初始尝试)。如果YARN的其他操作需要进行重启,例如节点硬件故障、与NodeManager重新同步等,则这些重启不计入yarn.application-attempts设置的次数。

需要注意的是,YARN中设置的yarn.resourcemanager.am.max-attempts是整个应用程序重新启动的上限,Flink中设置的yarn.application-attempts不能超过YARN中的设置,二者的关系如下:

     yarn.application-attempts<=yarn.resourcemanager.am.max-attempts

flink-conf.yaml文件中需要添加的完整内容如下:

上述配置与Flink Standalone模式下flink-conf.yaml文件的HA配置内容相比,去掉了用于配置集群唯一标识的high-availability.cluster-id属性,原因是在Flink On YARN模式下,如果不配置该属性,会默认使用YARN的applicationId,从而保证全局唯一性;否则在提交作业时就要手动保证。

3.修改zoo.cfg文件

ZooKeeper的配置见3.2.2节的Flink Standalone模式HA集群的搭建。

4.启动集群

1)启动HDFS集群。

2)启动YARN集群。

3)启动ZooKeeper集群。

ZooKeeper集群的启动见3.2.2节的Flink Standalone模式HA集群的搭建。

至此,Flink On YARN模式HA集群搭建完毕。 CdpKKj5CrkJGiPuziTK+wJAez3D59r8mKVFfqadAuf9F+JRf7PG/8/FxcR4mR2a3

点击中间区域
呼出菜单
上一章
目录
下一章
×