购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

2.2 Flink任务调度原理

2.2.1 任务链

根据前面的讲解我们已经知道,Flink中的每一个操作算子称为一个Task(任务),算子的每个具体实例则称为SubTask(子任务),SubTask是Flink中最小的处理单元,多个SubTask可能在不同的机器上执行。一个TaskManager进程包含一个或多个执行线程,用于执行SubTask。TaskManager中的一个Task Slot对应一个执行线程,一个执行线程可以执行一个或多个SubTask,如图2-5所示。

由于每个SubTask只能在一个线程中执行,为了能够减少线程间切换和缓冲的开销,在降低延迟的同时提高整体吞吐量,Flink可以将多个连续的SubTask链接成一个Task在一个线程中执行。这种将多个SubTask连在一起的方式称为任务链。在图2-6中,一个Source类算子的SubTask和一个map()算子的SubTask连在了一起,组成了任务链。

图2-5 TaskManager的工作方式

图2-6 Flink任务链

任务链的好处是,同一任务链内的SubTask可以彼此直接传递数据,而无须通过序列化或Flink的网络栈。

2.2.2 并行度

Flink应用程序可以在分布式集群上并行运行,其中每个算子的各个并行实例会在单独的线程中独立运行,并且通常情况下会在不同的机器上运行。

为了充分利用计算资源,提高计算效率,可以增加算子的实例数(SubTask数量)。一个特定算子的SubTask数量称为该算子的并行度,且任意两个算子的并行度之间是独立的,不同算子可能拥有不同的并行度。例如,将Source算子、map()算子、keyby()/window()/apply()算子的并行度设置为2,Sink算子的并行度设置为1,运行效果如图2-7所示。

图2-7 算子最高并行度为2

由于一个Task Slot对应一个执行线程,因此并行度为2的算子的SubTask将被分配到不同的Task Slot中执行。

假设一个作业图(JobGraph)有A、B、C、D、E五个算子,其中A、B、D的并行度为4,C、E的并行度为2,该作业在TaskManager中的详细数据流程可能如图2-8所示。

图2-8 算子并行度执行图

Flink中并行度的设置有4种级别:算子级别、执行环境(Execution Environment)级别、客户端(命令行)级别、系统级别。

(1)算子级别

每个算子、Source和Sink都可以通过调用setParallelism()方法指定其并行度。例如以下代码设置flatMap()算子的并行度为2:

     data.flatMap(_.split(" ")).setParallelism(2)

(2)执行环境级别

调用执行环境对象的setParallelism()方法可以指定Flink应用程序中所有算子的默认并行度,代码如下:

     val env=ExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(2)

(3)客户端(命令行)级别

在向集群提交Flink应用程序时使用-p选项可以指定并行度。例如以下提交命令:

     bin/flink run -p 2 WordCount.jar

(4)系统级别

影响所有运行环境的系统级别的默认并行度可以在配置文件flink-conf.yaml中的parallelism.default属性中指定,默认为1。

4种并行度级别的作用顺序为:算子级别>执行环境级别>客户端级别>系统级别。

2.2.3 共享Task Slot

默认情况下,Flink允许SubTask之间共享Task Slot,即使它们是不同Task(算子)的SubTask,只要它们来自同一个作业(Job)即可。在没有共享Task Slot的情况下,简单的SubTask(source()、map()等)将会占用和复杂的SubTask(keyBy()、window()等)一样多的资源,通过共享Task Slot可以充分利用Task Slot的资源,同时确保繁重的SubTask在TaskManager之间公平地获取资源。例如,将图2-7中的算子并行度从2增加到6,并行效果如图2-9所示。

图2-9 算子最高并行度为6

Flink集群的Task Slot数量最好与作业中使用的最高并行度一致,这样不需要计算作业总共包含多少个具有不同并行度的Task。

在图2-9中,最左侧的Task Slot负责作业的整个管道(Pipeline)。所谓管道,即同一作业的一个数据传输通路,它用于连接多个算子,将一个算子的执行结果输出给下一个算子。例如,算子Source[1]、map()[1]、keyBy()/window()/apply()[1]、Sink[1]可连接成一个管道。这种方式类似于Linux系统中的管道模式,将多个独立命令内聚在一起执行,形成一个工作流,上一个命令的输出可作为下一个命令的输入,以降低命令之间的耦合与协调编程的难度。例如以下Linux命令中,管道将ls -al的输出作为下一个命令less的输入,以方便浏览。

     $ ls -al /etc | less

2.2.4 数据流

一个Flink应用程序会被映射成逻辑数据流(Dataflow),而Dataflow都是以一个或多个Source开始、以一个或多个Sink结束的,且始终包括Source、Transformation、Sink三部分。Dataflow描述了数据如何在不同算子之间流动,将这些算子用带方向的直线连接起来会形成一个关于计算路径的有向无环图,称为DAG(Directed Acyclic Graph,有向无环图)或Dataflow图。各个算子的中间数据会被保存在内存中,如图2-10所示。

假设一个Flink应用程序在读取数据后先对数据进行了map()操作,然后进行了keyBy()/window()/apply()操作,最后将计算结果输出到了指定的文件中,则该程序的Dataflow图如图2-11所示。

图2-10 Flink算子组成的DAG

图2-11 程序Dataflow图

假设该程序的Source、map()、keyBy()/window()/apply()算子的并行度为2,Sink算子的并行度为1,则该程序的逻辑数据流图、物理(并行)数据流图和Flink优化后的数据流图如图2-12所示。

图2-12 程序逻辑、物理(并行)和优化后的数据流图

Flink应用程序在执行时,为了降低线程开销,会将多个SubTask连接在一起组成任务链,在一个线程中运行。对于图2-12的物理(并行)数据流来说,Flink执行时会对其进行优化,将Source[1]和map()[1]、Source[2]和map()[2]分别连接成一个任务,这是因为Source和map()之间采用了一对一的直连模式,而且没有任何的重分区(重分区往往发生在聚合阶段,类似于Spark的Shuffle。关于分区,将在2.3节详细讲解),它们之间可以直接通过缓存进行数据传递,而不需要通过网络或序列化(如果不使用任务链,Source和map()可能在不同的机器上,它们之间的数据传递就需要通过网络)。这种优化在很大程度上提升了Flink的执行效率。

2.2.5 执行图

Flink应用程序执行时会根据数据流生成多种图,每种图对应了作业的不同阶段,根据不同图的生成顺序,主要分为4层:StreamGraph→JobGraph→ExecutionGraph→物理执行图,如图2-13所示。

图2-13 Flink层次执行图

· StreamGraph:流图。使用DataStream API编写的应用程序生成的最初的图代表程序的拓扑结构,描述了程序的执行逻辑。StreamGraph在Flink客户端中生成,在客户端应用程序最后调用execute()方法时触发StreamGraph的构建。

· JobGraph:作业图。所有高级别API都需要转换为JobGraph。StreamGraph经过优化(例如任务链)后生成了JobGraph,以提高执行效率。StreamGraph和JobGraph都是在本地客户端生成的数据结构,而JobGraph需要被提交给JobManager进行解析。

· ExecutionGraph:执行图。JobManager对JobGraph进行解析后生成的并行化执行图是调度层最核心的数据结构。它包含对每个中间数据集或数据流、每个并行任务以及它们之间的通信的描述。

· 物理执行图:JobManager根据ExecutionGraph对作业进行调度后,在各个TaskManager上部署Task后形成的“图”。物理执行图并不是一个具体的数据结构,而是各个Task分布在不同的节点上所形成的物理上的关系表示。

DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成JobGraph之前的实现差别很大。流处理程序会生成StreamGraph,然后在StreamGraph的基础上生成JobGraph;而批处理程序则先生成计划(Plan),然后由优化器对其进行优化并生成优化计划(Optimized Plan),优化计划准确描述了应用程序应该如何执行。在优化计划的基础上生成对应的JobGraph。

2.2.6 执行计划

Flink的优化器会根据数据量或集群机器数等的不同自动地为程序选择执行策略。因此,准确地了解Flink如何执行编写的应用程序是很有必要的。

接下来我们对1.7.4节流处理单词计数例子的代码进行更改,从执行环境级别设置并行度为2,代码如下:

     val env=StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(2)

将最后的触发任务执行代码env.execute("StreamWordCount")改为:

     println(env.getExecutionPlan)//打印计划描述

直接在本地运行程序,将从控制台打印应用程序的逻辑执行计划对应的JSON描述,JSON字符串内容如下:

Flink为执行计划提供了可视化工具,它可以把用JSON格式表示的作业执行计划以图的形式展现,并且其中包含完整的执行策略标注。

将上述JSON字符串粘贴到可视化工具网址(http://flink.apache.org/visualizer/)提供的文本框中,可将JSON字符串解析为可视化图,该可视化图对应的是StreamGraph,如图2-14所示。

图2-14 执行计划可视化图

将应用程序提交到Flink集群后(关于应用程序的提交,将在3.4节讲解),在Flink的WebUI中还可以看到另一张可视化图,即JobGraph,如图2-15所示。

此处的单词数据来源于HDFS存储系统,而不是本地文件。

图2-15 作业的JobGraph可视化图 sz+oviPx9BAr31QxxeI5mkqRepwdTo9iRbkNRF6I+x0y5AVQ24f6BgE6cIF7cvwk

点击中间区域
呼出菜单
上一章
目录
下一章
×