Flink是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行Flink安装部署的学习时,需要准备3台Linux机器。具体要求如下:
· 系统环境为CentOS 7.5版本。
· 安装Java 8。
· 安装Hadoop集群,Hadoop建议选择Hadoop 2.7.5以上版本。
· 配置集群节点服务器间时间同步以及免密登录,关闭防火墙。
本书中3台服务器的具体设置如下:
· 节点服务器1,IP地址为192.168.10.102,主机名为hadoop102。
· 节点服务器2,IP地址为192.168.10.103,主机名为hadoop103。
· 节点服务器3,IP地址为192.168.10.104,主机名为hadoop104。
最简单的启动方式其实是不搭建集群,直接本地启动。本地部署非常简单,直接解压安装包就可以使用,不用进行任何配置;一般用来做一些简单的测试。
具体安装步骤如下所示。
1.下载安装包
进入Flink官网,下载1.13.0版本安装包flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对应Scala版本为scala 2.12的安装包。
2.解压
在hadoop102节点服务器上创建安装目录/opt/module,将flink安装包放在该目录下,并执行解压命令,解压至当前目录。
3.启动
进入解压后的目录,执行启动命令,并查看进程。
4.访问Web UI
启动成功后,访问http://hadoop102:8081,可以对Flink集群和任务进行监控管理,如图3-2所示。
图3-2 Flink Web UI页面
5.关闭集群
如果想让Flink集群停止运行,可以执行如下命令:
可以看到,Flink本地启动非常简单,直接执行start-cluster.sh就可以了。如果我们想要扩展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。
Flink是典型的Master-Slave架构的分布式数据处理框架,其中Master角色对应着JobManager,Slave角色则对应TaskManager。我们对三台节点服务器的角色分配如表3-1所示。
表3-1 集群角色分配
具体安装部署步骤如下所示。
1.下载并解压安装包
具体操作与3.1.2节相同。
2.修改集群配置
(1)进入conf目录下,修改flink-conf.yaml文件,修改jobmanager.rpc.address参数为hadoop102,如下所示:
这就指定了hadoop102节点服务器为JobManager节点。
(2)修改workers文件,将另外两台节点服务器添加为本Flink集群的TaskManager节点,具体修改如下:
这样就指定了hadoop103和hadoop104为TaskManager节点。
(3)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下所示。
· jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600MB,可以根据集群规模进行适当调整。
· taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600MB,可以根据集群规模进行适当调整。
· taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的slots数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓slots就是TaskManager中具体运行一个任务所分配的计算资源。
· parallelism.default:Flink任务执行的默认并行度配置,优先级低于代码中进行的并行度配置和任务提交时使用参数进行的并行度配置。
关于slots和并行度的概念,我们会在下一章做详细讲解。
3.分发安装目录
配置修改完毕后,将Flink安装目录发给另外两个节点服务器。
4.启动集群
(1)在hadoop102节点服务器上执行start-cluster.sh启动Flink集群。
(2)查看进程情况。
5.访问Web UI
启动成功后,同样可以访问http://hadoop102:8081对Flink集群和任务进行监控管理,如图3-3所示。
图3-3 集群启动后的Web UI页面
这里可以明显地看到,当前集群的TaskManager数量为2;由于默认每个TaskManager的slots数量为1,所以slots总数和可用slots数都为2。
在第2章中,我们已经编写了词频统计的批处理和流处理的示例程序,并在开发环境的模拟集群上做了运行测试。现在既然已经有了真正的集群环境,那接下来我们就要把作业提交上去执行了。
本节我们将以流处理的程序为例,演示如何将任务提交到集群中执行。具体步骤如下。
1.程序打包
(1)为方便自定义结构和定制依赖,我们可以引入插件maven-assembly-plugin进行打包。在FlinkTutorial项目的pom.xml文件中添加打包插件的配置,具体如下:
(2)在插件配置完毕后,可以使用IDEA的Maven工具执行package命令,出现如下提示即表示打包成功。
打包完成后,在target目录下即可找到所需的jar包,jar包有两个:FlinkTutorial-1.0-SNAPSHOT.jar和FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,因为集群中已经具备任务运行需要的所有依赖,所以建议使用FlinkTutorial-1.0-SNAPSHOT.jar。
2.在Web UI上提交作业
(1)在任务打包完成后,我们打开Flink的Web UI页面,在左侧导航栏点击“Submit New Job”选项,然后点击“+Add New”按钮,选择要上传运行的jar包,如图3-4所示。上传完成后,如图3-5所示。
图3-4 Web UI页面任务提交入口
图3-5 jar包上传完成
(2)点击该jar包,出现任务配置页面,进行相应的配置。
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如图3-6所示,配置完成后,即可点击“Submit”按钮,将任务提交到集群运行。
图3-6 任务提交参数配置
(3)任务提交成功后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况,如图3-7所示。
图3-7 任务运行列表
(4)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”按钮结束任务运行,如图3-8所示。
图3-8 查看任务运行的具体情况
Flink的Web UI页面设计非常简洁明了,读者可以自行尝试其余操作。
3.命令行提交作业
除了通过Web UI界面提交任务,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.13.0下。
(1)首先需要启动集群。
(2)在hadoop102中执行以下命令启动netcat。
(3)进入Flink的安装路径下,在命令行使用flink run命令提交作业。
这里的参数-m指定了提交到的JobManager,-c指定了入口类。
(4)在浏览器中打开Web UI,http://hadoop102:8081查看应用执行情况,如图3-9所示。
图3-9 Web UI界面查看任务运行状况
用netcat输入数据,可以在TaskManager的标准输出中看到对应的统计结果。
(5)在log日志中,也可以查看执行结果,需要找到执行该数据任务的TaskManager节点查看日志。