



在第2章中,我们了解了Spark在运行应用前,首先需要将应用程序转化为逻辑处理流程(Logical plan)。这一章我们将详细讨论这个转化过程。为了解释一些概念,我们假设Spark已经为一个典型应用生成了逻辑处理流程,如图3.1所示。图3.1表示了从数据源开始经过了哪些处理步骤得到最终结果,还有中间数据及其依赖关系。
这个典型的逻辑处理流程主要包含四部分。
(1)数据源(Data blocks):数据源表示的是原始数据,数据可以存放在本地文件系统和分布式文件系统中,如HDFS、分布式Key-Value数据库(HBase)等。在IntelliJ IDEA中运行单机测试时,数据源可以是内存数据结构,如list(1,2,3,4,5);对于流式处理来说,数据源还可以是网络流等。这里我们只讨论批式处理,所以限定数据源是静态数据。
图3.1 一个典型的Spark逻辑处理流程
(2)数据模型:确定了数据源后,我们需要对数据进行操作处理。首要问题是如何对输入/输出、中间数据进行抽象表示,使得程序能够识别处理。在使用普通的面向对象程序(C++/Java程序)处理数据时,我们将数据抽象为对象(object),如doubleObject=new Double(2.0)、listObject=new ArrayList()。然后,我们可以在对象上定义数据操作,如doubleObject.longValue()可以将数据转化为long类型,listObject.add( i ,Value)可以在list的第 i 个位置插入一个元素Value。Hadoop MapReduce框架将输入/输出、中间数据抽象为<;K,V>;record,这样map()/reduce()按照<;K,V>;record的形式读入并处理数据,最后输出为<;K,V>;record形式。这种数据表示方式的优点是简单易操作,缺点是过于细粒度。没有对这些<;K,V>;record进行更高层的抽象,导致只能使用map(K,V)这样的固定形式去处理数据,而无法使用类似面向对象程序的灵活数据处理方式,如records.operation()。
Spark认知到了这个缺点,将输入/输出、中间数据抽象表示为统一的数据模型(数据结构),命名为RDD(Resilient Distributed Datasets)。每个输入/输出、中间数据可以是一个具体的实例化的RDD,如第2章介绍的ParallelCollectionRDD等。RDD中可以包含各种类型的数据,可以是普通的Int、Double,也可以是<;K,V>;record等。RDD与普通数据结构(如ArrayList)的主要区别有两点:
■RDD只是一个逻辑概念,在内存中并不会真正地为某个RDD分配存储空间(除非该RDD需要被缓存)。RDD中的数据只会在计算中产生,而且在计算完成后就会消失,而ArrayList等数据结构常驻内存。
■RDD可以包含多个数据分区,不同数据分区可以由不同的任务(task)在不同节点进行处理。
(3)数据操作:定义了数据模型后,我们可以对RDD 进行各种数据操作,Spark将这些数据操作分为两种:transformation()操作和action()操作。两者的区别是action()操作一般是对数据结果进行后处理(post-processing),产生输出结果,而且会触发Spark提交job真正执行数据处理任务(在下一章中详细介绍)。transformation()操作和action()操作的使用方式分别为rdd.transformation()和rdd.action(),如rdd2=rdd1.map(func)表示对rdd1进行map()操作得到新的rdd2;还有二元操作,如rdd3=rdd1.join(rdd2)表示对rdd1和rdd2进行join()操作得到rdd3。这里读者可能会问一个问题:为什么操作叫作transformation()?transformation这个词隐含了一个意思是单向操作,也就是rdd1使用transformation()后,会生成新的rdd2,而不能对rdd1本身进行修改。在普通C++/Java程序中,我们既可以对ArrayList上的数据进行统计分析再生成新的ArrayList,也可以对ArrayList中的数据进行修改,而且可以对每个元素进行细粒度的修改,如ArrayList[ i ]=ArrayList[ i ]+1。然而,在Spark中,因为数据操作一般是单向操作,通过流水线执行(下一章介绍),还需要进行错误容忍等,所以RDD被设计成一个不可变类型,可以类比成一个不能修改其中元素的ArrayList。后续我们会更深入讨论这个问题。一直使用transformation()操作可以不断生成新的RDD,而action()操作用来计算最后的数据结果,如rdd1.count()操作可以统计rdd1中包含的元素个数,rdd1.collect()操作可以将rdd1中的所有元素汇集到Driver节点,并进行进一步处理。
(4)计算结果处理:由于RDD实际上是分布在不同机器上的,所以大数据应用的结果计算分为两种方式:一种方式是直接将计算结果存放到分布式文件系统中,如rdd.save(“hdfs://file_location”),这种方式一般不需要在Driver端进行集中计算;另一种方式是需要在Driver端进行集中计算,如统计RDD中元素数目,需要先使用多个task统计每个RDD中分区(partition)的元素数目,然后将它们汇集到Driver端进行加和计算。例如,在图3.1中,每个分区进行action()操作得到部分计算结果result,然后将这些result发送到Driver端后对其执行 f ()函数,得到最终结果。