Flink集成了交互式Scala Shell,在本地模式和集群模式下都可以使用。在Flink Shell中可以直接编写Flink任务,然后提交到集群与分布式数据进行交互,并且可以立即查看输出结果。Flink Shell提供了一种学习Flink API的简单方式,可以使用Scala语言进行程序的编写。
进入Flink安装目录,执行以下命令,可以查看Flink Shell的相关使用说明:
$ bin/start-scala-shell.sh --help
启动Flink Shell的命令语法如下:
$ bin/start-scala-shell.sh [local|remote|yarn] [options] <args>...
· local:以本地模式启动Flink Shell。
· remote:连接到远程集群,启动Flink Shell(Flink Standalone模式)。
· yarn:连接到YARN集群,启动Flink Shell(Flink On YARN模式)。
1.本地模式启动
执行以下命令,以本地模式启动Flink Shell:
$ bin/start-scala-shell.sh local
启动过程如图3-31所示。
图3-31 Flink Shell启动过程
Flink Shell支持DataSet、DataStream、Table API和SQL。启动Flink Shell后会自动创建4个不同的执行环境来实现批处理或流处理程序(即自动创建4个变量)。使用benv和senv分别代表批处理环境(ExecutionEnvironment)和流处理环境(StreamExecutionEnvironment),使用btenv和stenv分别代表批处理表环境(BatchTableEnvironment)和流处理表环境(StreamTableEnvironment)。进一步讲,这4个变量分别是ExecutionEnvironment、StreamExecutionEnvironment、BatchTableEnvironment、StreamTableEnvironment的实例,可以在Flink Shell中直接使用,不需要再次创建。
本地模式启动后会在本地产生一个名为FlinkShell的进程。
此时在浏览器中访问当前主机的8081端口即可查看Flink的WebUI,此处访问http://192.168.170.133:8081/,如图3-32所示。
执行启动命令时,也可以使用-a或--addclasspath选项指定外部依赖的JAR包,在执行任务时将与Flink Shell程序一起发送给JobManager,例如以下命令:
$ bin/start-scala-shell.sh local -a /path/to/demo.jar
图3-32 Flink WebUI界面
2.Flink Standalone模式启动
假设Flink Standalone集群的JobManager位于centos01节点,JobManager的REST访问端口为8081(即WebUI端口),则在集群任一节点执行以下命令即可启动Flink Shell:
$ bin/start-scala-shell.sh remote centos01 8081
如果把上述主机名和端口写错了,也能启动Flink Shell,但是当在Flink Shell中执行任务时会抛出异常。
与本地模式一样,执行启动命令时,也可以使用-a或--addclasspath选项指定外部依赖的JAR包,在执行任务时将与Flink Shell程序一起发送给JobManager,例如以下命令:
$ bin/start-scala-shell.sh remote centos01 8081 -a /path/to/demo.jar
Flink Standalone模式启动的Flink Shell仍然会在启动节点产生一个名为FlinkShell的进程。
3.Flink On YARN模式启动
(1)连接到现有的Flink YARN Session集群
如果之前已经启动了Flink YARN Session集群,则执行以下命令可以启动Flink Shell并连接到上一次启动的Flink YARN Session集群:
$ bin/start-scala-shell.sh yarn
例如,启动一个Flink YARN Session集群,在YARN的WebUI中查看当前Flink YARN Session集群的运行状态,如图3-33所示。
图3-33 Flink YARN Session集群的运行状态
此时执行上述命令启动Flink Shell即可连接该集群,后续在Flink Shell中提交的作业将在该集群中运行。若退出Flink Shell,则不影响集群的运行。
(2)新启动一个Flink YARN Session集群
如果希望在启动Flink Shell时连接到一个新的Flink YARN Session集群,则可以添加表3-9所示的任意一个选项。
表3-9 Flink On YARN模式启动Flink Shell常用选项介绍
例如,启动Flink Shell连接到一个新的Flink YARN Session集群,并定义YARN中的应用程序名称为FlinkApp,命令如下:
$ bin/start-scala-shell.sh yarn -nm 'FlinkApp'
执行上述命令后,将启动一个Flink YARN Session集群,并使用Flink Shell进行连接。
此时查看YARN的WebUI,如图3-34所示。
图3-34 新启动的Flink YARN Session集群的运行状态
4.Flink Shell单词计数
Flink Shell启动后,可以使用Scala语言编写Flink应用程序并提交,例如编写一个批处理的单词计数程序,代码如下:
上述代码中,当执行print()算子时,将触发任务执行(把指定的任务发送到JobManager执行),并在Flink Shell中显示计算结果。
编写一个流处理的单词计数程序,代码如下:
在流处理程序中,print()算子不会自动触发任务执行,需要在最后调用execute()算子执行。