购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

3.2 Spark逻辑处理流程生成方法

我们在写程序时会想到类似图3.1的逻辑处理流程。然而,Spark实际生成的逻辑处理流程图往往比我们头脑中想象的更加复杂,例如,会多出几个RDD,每个RDD会有不同的分区个数,RDD之间的数据依赖关系不同,等等。对于Spark来说,需要有一套通用的方法,其能够将应用程序自动转化为确定性的逻辑处理流程,也就是RDD及其之间的数据依赖关系。因此,需要解决以下3个问题。

①根据应用程序如何产生RDD,产生什么样的RDD?

②如何建立RDD之间的数据依赖关系?

③如何计算RDD中的数据?

3.2.1 根据应用程序如何产生RDD,产生什么样的RDD

我们能想到的一种简单解决方法是对程序中的每一个数据进行操作,也就是用transformation()方法返回(创建)一个新的RDD。这种方法的主要问题是只适用于逻辑比较简单的transformation(),如在rdd1上使用map(func)进行操作时,是对rdd1中每一个元素执行func()函数得到一个新的元素,因此只会生成一个rdd2。然而,一些复杂的transformation(),如join()、distinct()等,需要对中间数据进行一系列子操作,那么一个transformation()会创建多个RDD。例如,rdd3=rdd1.join(rdd2)需要先将rdd1和rdd2中的元素聚合在一起,然后使用笛卡儿积操作生成关联后的结果,在这个过程中会生成多个RDD。Spark依据这些子操作的顺序,将生成的多个RDD串联在一起,而且只返回给用户最后生成的RDD。这就是Spark实际创建出的RDD个数比我们想象的多一些的原因。

我们在第2章中看到RDD的类型有多种,如ParallelCollectionRDD、MapPartitionsRDD、ShuffledRDD等。为什么会有这么多不同类型的RDD,应该产生哪些RDD?虽然我们用RDD来对输入/输出、中间数据进行统一抽象,但这些数据本身可能具有不同的类型,而且是由不同的计算逻辑得到的,可能具有不同的依赖关系。因此,我们需要多种类型的RDD来表示这些不同的数据类型、不同的计算逻辑,以及不同的数据依赖。Spark实际产生的RDD类型和个数与transformation()的计算逻辑有关,官网上 [69] 也给出了典型的transformation()操作及其创建的RDD。然而,只看官网上的解释,很难理解某些操作的真正含义,我们会在3.3节中通过图示详细介绍每个操作的含义及产生的RDD。

3.2.2 如何建立RDD之间的数据依赖关系

我们已经知道transformation()操作会形成新的RDD,那么接下来的问题就是如何建立RDD 之间的数据依赖关系?数据依赖关系包括两方面:一方面是RDD之间的依赖关系,如一些transformation()会对多个RDD进行操作,则需要建立这些RDD之间的关系。另一方面是RDD本身具有分区特性,需要建立RDD自身分区之间的关联关系。具体地,我们需要解决以下3个问题。

①如何建立RDD之间的数据依赖关系?例如,生成的RDD是依赖于一个parent RDD,还是多个parent RDD?

②新生成的RDD应该包含多少个分区?

③新生成的RDD与其parent RDD中的分区间是什么依赖关系?是依赖parent RDD中的一个分区还是多个分区呢?

第1个问题可以很自然地解决,对于一元操作,如rdd2=rdd1.transformation()可以确定rdd2只依赖rdd1,所以关联关系是“rdd1=>rdd2”。对于二元操作,如rdd3=rdd1.join(rdd2),可以确定rdd3同时依赖rdd1和rdd2,关联关系是“(rdd1,rdd2)=>rdd3”。二元以上的操作可以类比二元操作。

第2个问题是如何确定新生成的RDD的分区个数?在Spark中,新生成的RDD的分区个数由用户和parent RDD共同决定,对于一些transformation(),如join()操作,我们可以指定其生成的分区的个数,如果个数不指定,则一般取其parent RDD的分区个数最大值。还有一些操作,如map(),其生成的RDD的分区个数与数据源的分区个数相同,会在后面详细讨论。

第3个问题比较复杂,分区之间的依赖关系既与transformation()的语义有关,也与RDD的分区个数有关。例如,在执行rdd2=rdd1.map()时,map()对rdd1的每个分区中的每个元素进行计算,可以得到新的元素,类似一一映射,所以并不需要改变分区个数,即rdd2的每个分区唯一依赖rdd1中对应的一个分区。而对于groupByKey()之类的聚合操作,在计算时需要对parent RDD中各个分区的元素进行计算,需要改变分区之间的依赖关系,使得RDD中的每个分区依赖其parent RDD中的多个分区,后面会详细展示。

那么Spark是怎么设计出一个通用的方法来解决第3个问题,即建立分区之间的依赖关系的呢?

理论上,分区之间的数据依赖关系可以灵活自定义,如一一映射、多对一映射、多对多映射或者任意映射等。但实际上,常见数据操作的数据依赖关系具有一定的规律,Spark通过总结这些数据操作的数据依赖关系,将其分为两大类,具体如下所述。

1)窄依赖(NarrowDependency)

窄依赖的官方解释是:“Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD.Narrow dependencies allow for pipelined execution。”

中文意思:“如果新生成的child RDD中每个分区都依赖parent RDD中的一部分分区,那么这个分区依赖关系被称为NarrowDependency。”

RDD及其分区之间的数据依赖关系类型如图3.2所示。窄依赖可以进一步细分为4种依赖。

图3.2 RDD及其分区之间的数据依赖关系类型

■一对一依赖(OneToOneDependency):一对一依赖表示child RDD和parent RDD中的分区个数相同,并存在一一映射关系。典型的transformation()包括map()、fliter()等。

■区域依赖(RangeDependency):表示child RDD和parent RDD的分区经过区域化后存在一一映射关系。典型的transformation()包括union()等。

■多对一依赖(ManyToOneDependency):表示child RDD中的一个分区同时依赖多个parent RDD中的分区。典型的transformation()包括具有特殊性质的cogroup()、join()等,这个特殊性质将在3.3节中详细介绍。

■多对多依赖(ManyToManyDependency):表示child RDD中的一个分区依赖parent RDD中的多个分区,同时parent RDD中的一个分区被child RDD中的多个分区依赖。典型的transformation()是cartesian()。

注意:为了区别不同种类的依赖关系,本书定义了两种新的窄依赖关系,即ManyToOneDependency和ManyToManyDependency,实际上,在Spark代码中没有对这两种依赖关系进行命名,只是统称为NarrowDependency。另外,至于窄依赖为什么可以方便地进行流水线执行,将在下一章中介绍。

2)宽依赖(ShuffleDependency)

宽依赖的官方解释是:“Represents a dependency on the output of a shuffle stage。”这个解释是从实现角度来讲的,如果从数据流角度解释,宽依赖表示新生成的child RDD中的分区依赖parent RDD中的每个分区的一部分。什么是“依赖partent RDD中的每个分区的一部分”呢?我们对比图3.2中的ManyToManyDependency和ShuffleDependency,发现ShuffleDependency中RDD 2的每个分区虽然依赖RDD 1中的所有分区,但只依赖这些分区中id为1或2的部分,而ManyToManyDependency中RDD 2的每个分区依赖RDD 1中每个分区的所有部分。实际上,在ManyToManyDependency中,RDD 1中每个分区被依赖了2次,而在ShuffleDependency中每个分区只被依赖了1次。通常,parent RDD中的分区只需要被使用(处理)1次,因此ShuffleDependency更加常用。与NarrowDependency类似,ShuffleDependency也包含多个parent RDD的情况。在图3.2及后面的图中,我们用红色箭头来表示ShuffleDependency。

总的来说,窄依赖、宽依赖的区别是child RDD的各个分区是否完全依赖parent RDD的一个或者多个分区。根据数据操作语义和分区个数,Spark可以在生成逻辑处理流程时就明确child RDD是否需要parent RDD的一个或多个分区的全部数据。如果parent RDD的一个或者多个分区中的数据需要全部流入child RDD的某一个或者多个分区,则是窄依赖。如果parent RDD分区中的数据需要一部分流入child RDD的某一个分区,另外一部分流入child RDD的另外分区,则是宽依赖。

读者可能会问,“对数据依赖(Dependency)进行分类有什么用处”?这样做首先可以明确RDD分区之间的数据依赖关系,在执行时Spark可以确定从哪里获取数据,输出数据到哪里。其次,对数据依赖进行分类有利于生成物理执行计划。NarrowDependency在执行时可以在同一个阶段进行流水线(pipeline)操作,不需要进行Shuffle,而ShuffleDependency需要进行Shufle(将在下一章的物理执行计划中详细介绍)。最后,对数据依赖进行分类有利于代码实现,如OneToOneDependency可以采用一种实现方式,而ShuffleDependency采用另一种实现方式。这样,Spark可以根据transformation()操作的计算逻辑选择合适的数据依赖进行实现。

了解RDD之间的分区依赖关系后,我们还需要解决的一个问题是如何对RDD内部的数据进行分区?常用的数据分区方法(Partitioner)包括3种:水平划分、Hash划分(HashPartitioner)和Range划分(RangePartitioner)。Spark采用了这3种分区方法,具体如下所述。

(1)水平划分:按照record的索引进行划分。例如,我们经常使用的sparkContext.parallelize(list(1,2,3,4,5,6,7,8,9),3),就是按照元素的下标划分,(1,2,3)为一组,(4,5,6)为一组,(7,8,9)为一组。这种划分方式经常用于输入数据的划分,如使用Spark处理大数据时,我们先将输入数据上传到HDFS上,HDFS自动对数据进行水平划分,也就是按照128MB为单位将输入数据划分为很多个小数据块(block),之后每个Spark task可以只处理一个数据块。

(2)Hash划分(HashPartitioner):使用record的Hash值来对数据进行划分,该划分方法的好处是只需要知道分区个数,就能将数据确定性地划分到某个分区。在水平划分中,由于每个RDD中的元素数目和排列顺序不固定,同一个元素在不同RDD中可能被划分到不同分区。而使用HashPartitioner,可以根据元素的Hash值,确定性地得出该元素的分区。该划分方法经常被用于数据Shuffle阶段。

(3)Range划分(RangePartitioner):该划分方法一般适用于排序任务,核心思想是按照元素的大小关系将其划分到不同分区,每个分区表示一个数据区域。例如,我们想对一个数组进行排序,数组里每个数字是[0,100]中的随机数,Range划分首先将上下界[0,100]划分为若干份(如10份),然后将数组中的每个数字分发到相应的分区,如将18分发到(10,20]的分区,最后对每个分区进行排序,这个排序过程可以并行执行,排序完成后是全局有序的结果。Range划分需要提前划分好数据区域,因此需要统计RDD中数据的最大值和最小值。为了简化这个统计过程,Range划分经常采用抽样方法来估算数据区域边界。

3.2.3 如何计算RDD中的数据

在上面两小节中,我们理解了如何生成RDD,以及建立RDD之间的数据依赖关系,但还有一个问题是,如何计算RDD中的数据?RDD中的每个分区中包含 n 条数据,我们需要计算其中的每条数据,那么怎么计算这些数据呢?

在确定了数据依赖关系后,相当于我们知道了child RDD中每个分区的输入数据是什么,那么只需要使用transformation(func)处理这些输入数据,将生成的数据推送到child RDD中对应的分区即可。在普通程序中,我们得到输入数据后,可以写任意的控制逻辑程序进行处理。例如,输入一个数组,我们可以对数组进行前向迭代、后向迭代或者循环处理等。然而,Spark中的大多数transformation()类似数学中的映射函数,具有固定的计算方式(控制流),如map(func)操作需要每读入一个record,就进行处理,然后输出一个record。reduceByKey(func)操作中的func对中间结果和下一个record进行聚合计算并输出结果。当然,有些大数据应用需要更灵活的控制流,Spark也提供了一些类似普通程序的操作,如mapPartitions()可以对分区中的数据进行多次操作后再输出结果。

图3.3展示了数据操作map()和mapPartitions()的区别,rdd 2=rdd1.map(func)和rdd2=rdd1.mapPartitions(func)都会生成一个新的rdd 2,而且rdd 2和rdd 1之间是OneToOneDependency,不同的是,两个func的控制流不一样。假设rdd 1中某个分区的数据是[1,2,3,4,5],rdd2=rdd1.map(func)的计算逻辑类似于下面的单机程序:

rdd2=rdd1.mapPartitions(func)的计算逻辑类似于下面的单机程序:

Spark中mapPartitions()的计算逻辑更接近Hadoop MapReduce中的map()和cleanup()函数。在Hadoop MapReduce中,map()函数对每个到来的<K,V>record都进行处理,等对这些record处理完成后,使用cleanup()对处理结果进行集中输出。同样,Spark中的mapPartitions()可以在对分区中所有record处理后,再集中输出。

图3.3 RDD中数据的计算过程示例

至此,我们了解了逻辑处理流程的生成过程,接下来我们详细讨论常用的transformation()的语义、计算逻辑、产生的RDD及数据依赖关系,理解这些知识,将有助于开发Spark应用,并进行性能调优等。 KdxB2cynFvNHp51u9e9sn8G2hA2FJ+I/3yfmNUbsg0eHNl59aNUh8vJC7ubQWZdj

点击中间区域
呼出菜单
上一章
目录
下一章
×