本节在Flink Standalone集群模式下讲解已经打包好的Flink应用程序的提交。
1.命令行界面提交
例如,将Flink自带的批处理单词计数程序提交到Flink集群,执行如下命令:
$ bin/flink run examples/batch/WordCount.jar
控制台输出的部分结果(由于结果太多,只显示其中一部分)如下:
(a,5) (action,1) (after,1) (against,1) (all,2) (and,12) (arms,1) ...
若单词数据来源于外部存储系统(例如HDFS),则需要下载Hadoop依赖库flink-shaded-hadoop-2-uber-2.8.3-10.0.jar,并将该JAR包上传到集群每个节点的Flink安装主目录的lib文件夹中,然后重启Flink集群。
例如,word.txt单词数据内容如下:
hello hadoop hello java hello scala java
将word.txt上传到HDFS的/input目录中,然后将1.7.3节的批处理单词计数程序的数据读取部分改为以下代码:
readTextFile("hdfs://centos01:9000/input/word.txt")
接下来将项目导出为flink.demo.jar,上传到centos01节点的/opt/softwares目录中,执行以下命令提交应用程序到Flink集群:
$ bin/flink run -c flink.demo.WordCount /opt/softwares/flink.demo.jar
可见控制台输出结果如下:
(hadoop,1) (hello,3) (java,2) (scala,1)
2.WebUI界面提交
除了上述提交方式外,也可以在Flink提供的WebUI中提交Flink应用程序。单击WebUI的Submit New Job菜单,在右侧的页面中单击Add New按钮,在弹出的窗口中选择本地要提交的JAR包即可,如图3-28所示。
图3-28 通过Flink集群的WebUI提交应用程序
选择完本地的JAR包后,JAR包将被上传到服务器。单击列表中的JAR包名称,下方将出现多个文本框,用于添加相关参数,此处填写执行的主类为flink.demo.WordCount。也可以添加并行度、程序参数值、Savepoint(在4.13.4节将详细讲解)路径等,如图3-29所示。
单击Submit按钮即可提交flink.demo.jar应用程序到集群运行。
单击Submit按钮左侧的Show Plan按钮,可以查看当前程序的执行计划图,如图3-30所示。
图3-29 Flink WebUI添加提交参数
图3-30 Flink WebUI查看应用程序执行计划