



了解了Spark的系统部署之后,我们接下来先给出一个Spark应用的例子,然后通过分析该应用的运行过程来学习Spark框架是如何运行应用的。
我们以Spark自带的example包中的GroupByTest.scala为例,这个应用模拟了SQL中的GroupBy语句,也就是将具有相同Key的<;Key,Value>;record(其简化形式为<;K,V>;record)聚合在一起。输入数据由GroupByTest程序自动生成,因此需要提前设定需要生成的<;K,V>;record个数、Value长度等参数。假设在Master节点上提交运行GroupByTest,具体参数和执行命令如下:
该命令启动GroupByTest应用,该应用包括3个map task,每个task随机生成4个<;K,V>;record,record中的Key从[0,1,2,3]中随机抽取一个产生,每个Value大小为1000 byte。由于Key是随机产生的,具有重复性,所以可以通过GroupBy将具有相同Key的record聚合在一起,这个聚合过程最终使用2个reduce task并行执行。这里虽然指定生成3个map task,但需要注意的是我们一般不需要在编写应用时指定map task的个数,因为map task的个数可以通过“输入数据大小/每个分片大小”来确定。例如,HDFS上的默认文件block大小为128MB,假设我们有1GB的文件需要处理,那么系统会自动算出需要启动1GB/128MB=8个map task。reduce task的个数一般在使用算子时通过设置partition number来间接设置。更多的例子会在第3章中看到,我们这里主要关注应用的基本运行流程。
GroupByTest具体代码如下,为了方便阅读和调试进行了一些简化。
阅读代码后,对照GroupByTest代码和图2.2,我们分析一下代码的具体执行流程。
图2.2 GroupByTest应用的计算逻辑图
■初始化SparkSession,这一步主要是初始化Spark的一些环境变量,得到Spark的一些上下文信息sparkContext,使得后面的一些操作函数(如flatMap()等)能够被编译器识别和使用,这一步同时创建GroupByTest Driver,并初始化Driver所需要的各种对象。
■设置参数numMappers=3,numKVPairs=4,valSize=1000,numReducers=2。
■使用sparkContext.parallelize(0 until numMappers,numMappers)将[0,1,2]划分为3份,也就是每一份包含一个数字 p ={ p =0, p =1, p =2}。接下来flatMap()的计算逻辑是对于每一个数字 p (如 p =0),生成一个数组arr1:Array[(Int,Byte[])],数组长度为numKVPairs=4。数组中的每个元素是一个(Int,Byte[])对,其中Int为[0,3]上随机生成的整数,Byte[]是一个长度为1000的数组。因为 p 只有3个值,所以该程序总共生成3个arr1数组,被命名为pairs1,pairs1被声明为需要缓存到内存中。
■接下来执行一个action()操作pairs1.count(),来统计pairs1中所有arr1中的元素个数,执行结果应该是numMappers*numKVPairs=3×4=12。这一步除了计算count结果,还将每个pairs1中的3个arr1 数组缓存到内存中,便于下一步计算。需要注意的是,缓存操作在这一步才执行,因为pairs1实际在执行action()操作后才会被生成,这种延迟(lazy)计算的方式与普通Java程序有所区别。action()操作的含义是触发Spark执行数据处理流程、进行计算的操作,即需要输出结果,更详细的含义将在下一章中介绍。
■执行完pair1.count()后,在已经被缓存的pairs1上执行groupByKey()操作,groupByKey()操作将具有相同Key的<;Int,Byte[]>;record聚合在一起,得到<;Int,list (Byte[1000],Byte[1000],…,Byte[1000])>;,总的结果被命名为results。Spark实际在执行这一步时,由多个reduce task来完成,reduce task的个数等于numReducers。
■最后执行results.count(),count()将results中所有record个数进行加和,得到结果4,这个结果也是pairs1中不同Key的总个数。
在探讨GroupByTest应用如何在Spark中执行前,我们先思考一下使用Spark编程与使用普通语言(如C++/Java/Python)编写数据处理程序的不同。使用普通语言编程时,处理的数据在本地,程序也在本地进程中运行,我们可以随意定义变量、函数、控制流(分支、循环)等,编程灵活、受限较少,且程序按照既定顺序执行、输出结果。在Spark程序中,首先要声明SparkSession的环境变量才能够使用Spark提供的数据操作,然后使用Spark操作来定义数据处理流程,如flatMap(func).groupByKey()。此时,我们只是定义了数据处理流程,而并没有让Spark真正开始计算,就像在一个画布上画出了数据处理流程,包括哪些数据处理步骤,这些步骤如何连接,每步的输入和输出是什么(如flatMap()中的=>;)。至于这些步骤和操作如何在系统中并行执行,用户并不需要关心。这有点像SQL语言,只需要声明想要得到的数据(select,where),以及如何对这些数据进行操作(GroupBy,join),至于这些操作如何实现,如何被系统执行,用户并不需要关心。在Spark中,唯一需要注意声明的数据处理流程在使用action()操作时,Spark才真正执行处理流程,如果整个程序没有action()操作,那么Spark并不会执行数据处理流程。在普通程序中,程序一步步按照顺序执行,并没有这个限制。Spark这样做与其需要分布式运行有关,更详细的内容在后续章节中介绍。
了解了Spark应用的计算逻辑后,我们接下来研究Spark应用如何执行的问题。正如第1章介绍的,Spark的实际执行流程比用户想象的要复杂,需要先建立DAG型的逻辑处理流程(Logical plan),然后根据逻辑处理流程生成物理执行计划(Physical plan),物理执行计划中包含具体的计算任务(task),最后Spark将task分配到多台机器上执行。
为了获得GroupByTest的逻辑处理流程,我们可以使用toDebugString()方法来打印出pairs1和results的产生过程,进而分析GroupByTest的整个逻辑处理流程。在这之前,我们先分析GroupByTest产生的job个数。由于GroupByTest进行了两次action()操作:pairs1.count()和results.count(),所以会生成两个Spark作业(job),如图2.3所示。接下来,我们分析pairs1和results的产生过程,即这两个job是如何产生的。
图2.3 GroupByTest应用生成的两个job
1.pairs1.toDebugString()的执行结果
第一行的“(3) MapPartitionsRDD[1]”表示的是pairs1,即pairs1的类型是MapPartitions-RDD,编号为[1],共有3个分区(partition),这是因为pairs1中包含了3个数组。由于设置了pairs1.cache,所以pairs1中的3个分区在计算时会被缓存,其类型是CachedPartitions。那么pairs1是怎么生成的呢?我们看到描述“MapPartitionsRDD[1] at flatMap at GroupByTest.scala:41”,即pairs1是由flatMap()函数生成的,对照程序代码,可以发现确实是input.parallelize().flatMap()生成的。接着出现了“ParallelCollectionRDD[0]”,根据描述是由input.parallelize()函数生成的,编号为[0],因此,我们可以得到结论:input.parallelize()得到一个ParallelCollectionRDD,然后经过flatMap()得到pairs1:MapPartitionsRDD。
2.results.toDebugString()的执行结果
同样,第1行的“(2) ShuffledRDD[2]”表示的是results,即results的类型是ShuffledRDD,由groupByKey()产生,共有两个分区(partition),这是因为在groupByKey()中,设置了partition number=numReducers=2。接着出现了“MapPartitionsRDD [1]”,这个就是之前生成的pairs1。接下来的ParallelCollectionRDD由input.parallelize()生成。
我们可以将上述过程画成逻辑处理流程图,如图2.4所示。
图2.4 GroupByTest的逻辑处理流程图
图2.4 GroupByTest的逻辑处理流程图(续)
图2.4展示了从input到最终结果的数据处理流程,即需要进行哪些操作,生成哪些中间数据,以及这些数据间的关联关系。Spark在执行到action()操作时,会根据程序中的数据操作,自动生成这样的数据流程图,这里我们根据图2.4进一步解释GroupByTest生成的两个job,并探讨其中涉及的概念。
第1个job,即pairs1.count()的执行流程如下所述。
■input是输入一个[0,1,2]的普通Scala数组。
■执行input.parallelize()操作产生一个ParrallelCollectionRDD,共3个分区,每个分区包含一个整数 p 。这一步的重要性在于将input转化成Spark系统可以操作的数据类型ParrallelCollectionRDD。也就是说,input数组仅仅是一个普通的Scala变量,并不是Spark可以并行操作的数据类型。在对input进行划分后生成了ParrallelCollectionRDD,这个RDD是Spark可以识别和并行操作的类型。可以看到input没有分区概念,而ParrallelCollectionRDD可以有多个分区,分区的意义在于可以使不同的task并行处理这些分区。RDD(Resilient Distributed Datasets)的含义是“并行数据集的抽象表示”,实际上是Spark对数据处理流程中形成的中间数据的一个抽象表示或者叫抽象类(abstract class),这个类就像一个“分布式数组”,包含相同类型的元素,但元素可以分布在不同机器上。例如,ParrallelCollectionRDD中的每个元素是一个整数,这些元素具有3个分区,最多可以分布在3台机器上。RDD还有一些其他特性,如不可改变性(immutable),这些特性会在后续章节中介绍。
■在ParrallelCollectionRDD上执行flatMap()操作,生成MapPartitionsRDD,该RDD同样包含3个分区,每个分区包含一个通过flatMap()代码生成的arr1数组。
■执行paris1.count()操作,先在MapPartitionsRDD的每个分区上进行count(),得到部分结果,然后将结果汇总到Driver端,在Driver端进行加和,得到最终结果。
■由于MapPartitionsRDD被声明要缓存到内存中,因此这里将里面的分区都换成了黄色表示。缓存的意思是将某些可以重用的输入数据或中间计算结果存放到内存中,以减少后续计算时间。
第2个job,即results.count()的执行流程如下所述。
■在已经被缓存的MapPartitionsRDD上执行groupByKey()操作,产生了另外一个名为ShuffledRDD的中间数据,也就是results,产生这个RDD的原因会在后面章节中讨论。这里我们将ShuffledRDD换了一种颜色表示,是因为ShuffledRDD与MapPartitionsRDD具有不同的分区个数,这样MapPartitionsRDD与ShuffledRDD之间的分区关系就不是一对一的,而是多对多的了,是多对多关系的原因会在后续章节中讨论。
■ShuffledRDD中的数据是MapPartitionsRDD中数据聚合的结果,而且在不同的分区中具有不同Key的数据。
■执行results.count(),首先在ShuffledRDD中每个分区上进行count()的运算,然后将结果汇总到Driver端进行加和,得到最终结果。
经过以上分析,我们会有很多疑问,如RDD到底是一个什么概念?为什么要引入RDD?为什么会产生各种不同的RDD,如ParrallelCollectionRDD、MapPartitionsRDD?这些RDD之间又有什么区别?为什么RDD之间的依赖关系有一对一、多对多,等等?这些问题我们将会在后续章节中详细解释。
这里我们只关心一个问题:有了这个逻辑处理流程图是不是就可以执行计算任务,算出结果了?答案是否定的,这个逻辑处理流程图只是表示输入/输出、中间数据,以及它们之间的依赖关系,并不涉及具体的计算任务。当然,我们可以简单地将每一个数据操作,如map()、flatMap()、groupByKey()、count(),都作为一个计算任务,但是执行效率太低、内存消耗大,而且可靠性低。我们在第4章会详细分析该方案的问题。接下来,我们分析一下Spark是怎么根据逻辑处理流程生成物理执行计划,进而得到计算任务的。
我们在分析了GroupByTest应用的逻辑处理流程后,明白该处理流程图表示的是输入/输出、中间数据及其之间的依赖关系,而不是计算任务的执行图。那么Spark是如何执行这个处理流程的,也就是如何生成具体执行任务的?
Spark采用的方法是根据数据依赖关系,来将逻辑处理流程(Logical plan)转化为物理执行计划(Physical plan),包括执行阶段(stage)和执行任务(task)。具体包括以下3个步骤。
(1)首先确定应用会产生哪些作业(job)。在GroupByTest中,有两个count()的action()操作,因此会产生两个job。
(2)其次根据逻辑处理流程中的数据依赖关系,将每个job的处理流程拆分为执行阶段(stage)。如图2.4所示,在GroupByTest中,job 0中的两个RDD虽然是独立的,但这两个RDD之间的数据依赖是一对一的关系。因此,如图2.5所示,可以将这两个RDD放在一起处理,形成一个stage,编号为stage 0。在job 1中,MapPartitionsRDD与ShuffledRDD之间是多对多的关系,Spark将这两个RDD分别处理,形成两个执行阶段stage 0和stage 1。为什么这么拆分,以及对于一般的应用怎么拆分将在后续章节中详细介绍。
(3)最后,对于每一个stage,根据RDD的分区个数确定执行的task个数和种类。对于GroupByTest应用来说,job 0中的RDD包含3个分区,因此形成3个计算任务(task)。如图2.5所示,首先,每个task从input中读取数据,进行flatMap()操作,生成一个arr1数组,然后,对该数组进行count()操作得到结果4,完成计算。最后,Driver将每个task的执行结果收集起来,加和计算得到结果12。对于job 1,其中stage 0只包含MapPartitionsRDD,共3个分区,因此生成3个task。每个task从内存中读取已经被缓存的数据,根据这些数据Key的Hash值将数据写到磁盘中的不同文件中,这一步是为了将数据分配到下一个阶段的不同task中。接下来的stage 1只包含ShuffledRDD,共两个分区,也就是生成两个task,每个task从上一阶段输出的数据中根据Key的Hash值得到属于自己的数据。图2.5中,stage 1中的第1个task只获取并处理Key为0和2的数据,第2个task只获取并处理Key为1和3的数据。从stage 0到stage 1的数据分区和获取的过程称为Shuffle机制,也就是数据经过了混洗、重新分配,并且从一个阶段传递到了下一个阶段。关于Shuffle机制如何设计和实现将在后续章节中介绍。stage 1中的task将相同Key的record聚合在一起,统计Key的个数作为count()的结果,完成计算。Driver再将所有task的结果进行加和输出,完成计算。有关task的更多细节,如task的种类,将在后续章节中介绍。
图2.5 GroupByTest的物理执行计划
生成执行任务task后,我们可以将task调度到Executor上执行,在同一个stage中的task可以并行执行。
至此,我们基本明白了Spark是如何根据应用程序代码一步步生成逻辑处理流程和物理执行计划的。然而,物理执行计划中还有很多问题我们没有探讨,如为什么要拆分为执行阶段?如何有一套通用的方法来将任意的逻辑处理流程拆分为执行阶段?task执行的时候是否会保存每一个RDD的中间数据?Shuffle机制如何实现?这里我们讨论一下第1个问题,其他问题会在后续章节中详细讨论。
为什么要拆分为执行阶段?在2.3.2节中我们讨论过,如果将每个操作都当作一个任务,那么效率太低,而且错误容忍比较困难。将job划分为执行阶段stage后,第1个好处是stage中生成的task不会太大,也不会太小,而且是同构的,便于并行执行。第2个好处是可以将多个操作放在一个task里处理,使得操作可以进行串行、流水线式的处理,这样可以提高数据处理效率。第3个好处是stage可以方便错误容忍,如一个stage失效,我们可以重新运行这个stage,而不需要运行整个job。在后续章节中,我们将会看到,如果stage划分不当,则会带来性能和可靠性的问题。
我们在2.3.2节和2.3.3节中分析了GroupByTest应用的执行过程,手工画出了该应用的逻辑处理流程和物理执行计划,这个过程费时费力,而且对于更复杂的应用来说,自己画出的逻辑处理流程和物理执行计划不一定正确,那么如何快速获得一个Spark应用的逻辑处理流程和物理执行计划呢?答案是根据Spark提供的执行界面,即job UI来进行分析。
对于GroupByTest应用,我们通过分析用户代码可以知道有两个action()操作,会形成两个job。我们也可以通过Spark的job UI(应用运行输出提示Spark UI地址)看到生成的job。接下来我们来观察这两个job生成的stage。
分析job 0及其包含的stage,单击job UI中的“count at GroupByTest.scala:52”进入Details for job 0界面。如图2.6所示,可以看到job 0包含一个stage,该stage执行了两个操作parallelize()和flatMap()。
图2.6 GroupByTest中job 0包含的stage
为了进一步分析该stage中的数据关联关系和生成的task,我们可以单击图2.6中的“count at GroupByTest.scala:52”进入Details for stage 0界面。如图2.7所示,发现stage 0中包含两个RDD,parallelize()操作生成了ParallelCollectonRDD,flatMap()操作生成了MapRartitionsRDD,并对该RDD进行缓存。cached说明该RDD已经被缓存到内存中。这里没有显示这些RDD有几个分区,但是我们看到该stage有3个task,可以断定分区个数为3。task中还有一些属性,如Attempt、Locality Level、GC Time等,我们在后续章节中再详细讨论。
图2.7 job 0中stage 0的逻辑处理流程和生成的task
分析job 1及其包含的stage,单击Spark job页面(见图2.3)中的“count at GroupByTest.scala:56”进入Details for job 1界面。如图2.8所示,可以看到job 1包含两个stage,其中stage 1执行了两个操作parallelize()和flatMap(),stage 2执行了一个操作groupByKey()。
图2.8 job 1中stage 1包含的两个stage
对于stage 1,单击图2.8中的“flatMap at GroupByTest.scala:41”进入Details for stage 1界面。如图2.9所示,发现stage 1中包含的两个RDD与上一个job中stage 0包含的RDD相同。与我们在2.3.2节中给出的GroupByTest的逻辑处理流程图相比,这里多了一个ParallelCollectonRDD。这里多出现一个RDD是因为paris1:MapPartitionsRDD是由input.parallelize().flatMap()得到的,也就是先生成ParallelCollectonRDD,然后再生成MapPartitionsRDD。在执行job 1时,MapPartitionsRDD是已经被缓存的。在真正计算时,ParallelCollectonRDD没有参与计算,因此我们在2.3.2节的图2.4中没有再次画出ParallelCollectonRDD。假设MapPartitionsRDD没有被缓存,我们就需要画出ParallelCollectonRDD。这里读者可能有一个疑问:为什么在没有被缓存的情况下,第2个job又从ParallelCollectonRDD开始计算了呢?这是因为Spark需要用户自己设定中间数据是否被缓存,如果没有被缓存,则会利用数据依赖关系计算得到所需数据,即先计算得到ParallelCollectonRDD,再计算得到MapPartitionsRDD。更多的细节将在后续章节中介绍。
图2.9 job 1中stage 1的逻辑处理流程和生成的task
对于stage 1生成的task,发现相比stage 0和stage 1中的3个task多了Input Size/Records、Write Time和Shuffle Write Size/Records 3个属性。这是因为stage 1中的task是从缓存(MapPartitionsRDD)中读取数据进行处理的,所以有Input Size属性,而stage0中的task是根据数字 p 自动生成其他数据的,没有真正的读取动作,所以没有Input Size。同样,stage 0中的task的结果直接通过网络返回给Driver端,没有磁盘写入和Shuffle动作,也就没有Write Time和Shuffle Write Size等属性。前面介绍过,stage 1中的task需要进行Shuffle,把具有不同Hash值的数据Key写入不同的磁盘文件中,因而有Write Time和Shuffle Write Size。
对于stage 2,单击图2.8中的“count at GroupByTest.scala:56”进入Details for stage 2界面,得到图2.10。与我们在2.3.2节中给出的逻辑处理流程一致,在进行groupByKey()操作后生成了ShuffledRDD。stage 2包含两个task,每个task包含一个Shuffle Read Size的属性,表示从stage 1的输出结果中Shuffle Read的数据。每个task获取了6个record,与我们画出的物理执行计划一致。
图2.10 job 1中stage 2的逻辑处理流程和生成的task
至此,我们已经学会利用Spark界面给出的图示来分析Spark应用的逻辑处理流程和物理执行计划了。与我们在2.3.2节和2.3.3节中手工画出的图的区别是,Spark界面给出的图不能展示出每个RDD产生的具体数据。为了让读者更容易理解逻辑处理流程图,我们在手工画出的图中加入了每个RDD应该会产生的数据,而在实际运行时Spark并不关心这些数据具体是什么,也不会存储每个RDD中的数据,所以也就无法图示出RDD中的数据。当然,我们可以用一些强制输出的办法输出其中我们感兴趣的RDD中的数据,具体方法将在后续章节中介绍。