购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

3.5 Flink Shell的使用

Flink集成了交互式Scala Shell,在本地模式和集群模式下都可以使用。在Flink Shell中可以直接编写Flink任务,然后提交到集群与分布式数据进行交互,并且可以立即查看输出结果。Flink Shell提供了一种学习Flink API的简单方式,可以使用Scala语言进行程序的编写。

进入Flink安装目录,执行以下命令,可以查看Flink Shell的相关使用说明:

     $ bin/start-scala-shell.sh --help

启动Flink Shell的命令语法如下:

     $ bin/start-scala-shell.sh [local|remote|yarn] [options] <args>...

· local:以本地模式启动Flink Shell。

· remote:连接到远程集群,启动Flink Shell(Flink Standalone模式)。

· yarn:连接到YARN集群,启动Flink Shell(Flink On YARN模式)。

1.本地模式启动

执行以下命令,以本地模式启动Flink Shell:

     $ bin/start-scala-shell.sh local

启动过程如图3-31所示。

图3-31 Flink Shell启动过程

Flink Shell支持DataSet、DataStream、Table API和SQL。启动Flink Shell后会自动创建4个不同的执行环境来实现批处理或流处理程序(即自动创建4个变量)。使用benv和senv分别代表批处理环境(ExecutionEnvironment)和流处理环境(StreamExecutionEnvironment),使用btenv和stenv分别代表批处理表环境(BatchTableEnvironment)和流处理表环境(StreamTableEnvironment)。进一步讲,这4个变量分别是ExecutionEnvironment、StreamExecutionEnvironment、BatchTableEnvironment、StreamTableEnvironment的实例,可以在Flink Shell中直接使用,不需要再次创建。

本地模式启动后会在本地产生一个名为FlinkShell的进程。

此时在浏览器中访问当前主机的8081端口即可查看Flink的WebUI,此处访问http://192.168.170.133:8081/,如图3-32所示。

执行启动命令时,也可以使用-a或--addclasspath选项指定外部依赖的JAR包,在执行任务时将与Flink Shell程序一起发送给JobManager,例如以下命令:

     $ bin/start-scala-shell.sh local -a /path/to/demo.jar

图3-32 Flink WebUI界面

2.Flink Standalone模式启动

假设Flink Standalone集群的JobManager位于centos01节点,JobManager的REST访问端口为8081(即WebUI端口),则在集群任一节点执行以下命令即可启动Flink Shell:

     $ bin/start-scala-shell.sh remote centos01 8081

如果把上述主机名和端口写错了,也能启动Flink Shell,但是当在Flink Shell中执行任务时会抛出异常。

与本地模式一样,执行启动命令时,也可以使用-a或--addclasspath选项指定外部依赖的JAR包,在执行任务时将与Flink Shell程序一起发送给JobManager,例如以下命令:

     $ bin/start-scala-shell.sh remote centos01 8081 -a /path/to/demo.jar

Flink Standalone模式启动的Flink Shell仍然会在启动节点产生一个名为FlinkShell的进程。

3.Flink On YARN模式启动

(1)连接到现有的Flink YARN Session集群

如果之前已经启动了Flink YARN Session集群,则执行以下命令可以启动Flink Shell并连接到上一次启动的Flink YARN Session集群:

     $ bin/start-scala-shell.sh yarn

例如,启动一个Flink YARN Session集群,在YARN的WebUI中查看当前Flink YARN Session集群的运行状态,如图3-33所示。

图3-33 Flink YARN Session集群的运行状态

此时执行上述命令启动Flink Shell即可连接该集群,后续在Flink Shell中提交的作业将在该集群中运行。若退出Flink Shell,则不影响集群的运行。

(2)新启动一个Flink YARN Session集群

如果希望在启动Flink Shell时连接到一个新的Flink YARN Session集群,则可以添加表3-9所示的任意一个选项。

表3-9 Flink On YARN模式启动Flink Shell常用选项介绍

例如,启动Flink Shell连接到一个新的Flink YARN Session集群,并定义YARN中的应用程序名称为FlinkApp,命令如下:

     $ bin/start-scala-shell.sh yarn -nm 'FlinkApp'

执行上述命令后,将启动一个Flink YARN Session集群,并使用Flink Shell进行连接。

此时查看YARN的WebUI,如图3-34所示。

图3-34 新启动的Flink YARN Session集群的运行状态

4.Flink Shell单词计数

Flink Shell启动后,可以使用Scala语言编写Flink应用程序并提交,例如编写一个批处理的单词计数程序,代码如下:

上述代码中,当执行print()算子时,将触发任务执行(把指定的任务发送到JobManager执行),并在Flink Shell中显示计算结果。

编写一个流处理的单词计数程序,代码如下:

在流处理程序中,print()算子不会自动触发任务执行,需要在最后调用execute()算子执行。 Drppi07Q6xDMwuutLd/fLTUkgcU+wEzXIqIlTREtL8EU8CSq9kKjX29bHAWgZCqe

点击中间区域
呼出菜单
上一章
目录
下一章
×