Spark对内存数据的抽象,即为RDD。RDD是一种分布式、多分区、只读的数组,Spark相关操作都是基于RDD进行的。Spark可以将HDFS块文件转换成RDD,也可以由一个或多个RDD转换成新的RDD。基于这些特性,RDD在分布式环境下能够被高效地并行处理。
PySpark是Apache Spark社区发布的一个工具,让熟悉Python的开发人员可以方便地使用Spark组件。PySpark借助Py4j库,可以使用Python编程语言处理RDD。
PySpark的基本数据流架构如图2.6所示。
由图2.6可知,PySpark首先利用Python创建Spark Context对象,然后用Socket与JVM上的Spark Context通信,当然,这个过程需要借助Py4J库。JVM上的Spark Context负责与集群上的Spark Worker节点进行交互。
在PySpark中,对RDD提供了Transformation和Action两种操作类型。Transformation操作非常丰富,采用延迟执行的方式,在逻辑上定义了RDD的依赖关系和计算逻辑,但并不会真正触发执行动作,只有等到Action操作才会触发真正执行操作。Action操作常用于最终结果的输出。
图2.6 PySpark的数据流架构示意图
常用的Transformation操作及其描述如下:
· map: map接收一个处理函数并行处理源RDD中的每个元素,返回与源RDD元素一一对应的新RDD。
· filter: filter并行处理源RDD中的每个元素,接受一个函数,并根据定义的规则对RDD中的每个元素进行过滤处理,返回处理结果为true的元素重新组成新的RDD。
· flatMap: flatMap是map和flatten的组合操作,与map函数相似,不过map函数返回的新RDD包含的元素可能是嵌套类型。
· mapPartitions:与map函数应用于RDD中的每个元素不同,mapPartitions应用于RDD中的每个分区。mapPartitions函数接受的参数为一个函数,该函数的参数为每个分区的迭代器,返回值为每个分区元素处理之后组成的新的迭代器,该函数会作用于分区中的每一个元素。
· mapPartitionsWithIndex:作用与mapPartitions函数相同,只是接受的参数(一个函数)需要传入两个参数,分区的索引作为第一个参数传入,按照分区的索引对分区中元素进行处理。
· Union:将两个RDD进行合并,返回结果为RDD中元素(不去重)。
· Intersection:对两个RDD进行取交集运算,返回结果为RDD无重复元素。
· Distinct:对RDD中元素去重。
· groupByKey:在键值对(K-V)类型的RDD中按Key分组,将相同Key的元素聚集到同一个分区内,此函数不能接受函数作为参数,只接受一个可选参数,即任务数。
· reduceByKey:对K-V类型的RDD按Key分组,它接受两个参数,第一个参数为处理函数,第二个参数为可选参数,用于设置reduce的任务数。reduceByKey函数能够在RDD分区本地提前进行聚合运算,这有效减少了shuffle过程传输的数据量。相对于groupByKey函数更简洁高效。
· aggregateByKey:对K-V类型的RDD按Key分组进行reduce计算,可接受三个参数,第一个参数是初始化值,第二个参数是分区内处理函数,第三个参数是分区间处理函数。
· sortByKey:对K-V类型的RDD内部元素按照Key进行排序,排序过程会涉及Shuffle操作。
· join:对K-V类型的RDD进行关联操作,它只能处理两个RDD之间的关联,超过两个RDD关联需要多次使用join函数。另外,join操作只会关联出具有相同Key的元素,相当于SQL语句中的inner join。
· cogroup:对K-V类型的RDD进行关联,cogroup在处理多个RDD的关联上比join更加优雅,它可以同时传入多个RDD作为参数进行关联。
· coalesce:对RDD重新分区,将RDD中的分区数减小到参数numPartitions个,不会产生shuffle。在较大的数据集中使用filer等过滤操作后可能会产生多个大小不等的中间结果数据文件,重新分区并减小分区可以提高作业的执行效率,是Spark中常用的一种优化手段。
· repartition:对RDD重新分区,接受一个参数,即numPartitions分区数,它是coalesce函数设置shuffle为true的一种实现形式。
常用的Action操作及其描述如下:
· reduce:处理RDD两两之间元素的聚集操作。
· collect:返回RDD中所有数据元素。
· Count:返回RDD中元素个数。
· First:返回RDD中的第一个元素。
· Take:返回RDD中的前N个元素。
· saveAsTextFile:将RDD写入文本文件,保存至本地文件系统或者HDFS中。
· saveAsSequenceFile:将K-V类型的RDD写入Sequence File文件,保存至本地文件系统或者HDFS中。
· countByKey:返回K-V类型的RDD,这个RDD中数据为每个Key包含的元素个数。
· Foreach:遍历RDD中所有元素,接受参数为一个函数,常用操作是传入println函数打印所有元素。
从HDFS文件生成RDD,经过map、filter、join等多次Transformation操作,最终调用saveAsTextFile操作,将结果输出到HDFS中,并以文件形式保存。RDD操作的基本流程示意图如图2.7所示。
在PySpark中,RDD可以缓存到内存或者磁盘上,提供缓存的主要目的是减少同一数据集被多次使用的网络传输次数,提高PySpark的计算性能。PySpark提供对RDD的多种缓存级别,可以满足不同场景对RDD的使用需求。RDD的缓存具有容错性,如果有分区丢失,可以通过系统自动重新计算。
图2.7 RDD操作的基本流程示意图
在代码中可以使用persist()方法或cache()方法缓存RDD。cache()方法默认将RDD缓存到内存中,cache()方法和persist()方法都可以用unpersist()方法来取消RDD缓存。具体如下所示:
rdd= sc.textFile("hdfs://data/hadoop/test.text") rdd.cache() // 缓存RDD到内存
或者
rdd.persist(StorageLevel.MEMORY_ONLY) rdd.unpersist() // 取消缓存
Spark各缓存级别及其描述:
· MEMORY_ONLY: RDD仅缓存一份到内存,此为默认级别。
· MEMORY_ONLY_2:将RDD分别缓存在集群的两个节点上,RDD在集群内存中保存两份。
· MEMORY_ONLY_SER:将RDD以Java序列化对象的方式缓存到内存中,有效减少了RDD在内存中占用的空间,不过读取时会消耗更多的CPU资源。
· DISK_ONLY: RDD仅缓存一份到磁盘。
· MEMORY_AND_DISK: RDD仅缓存一份到内存,当内存中空间不足时,会将部分RDD分区缓存到磁盘。
· MEMORY_AND_DISK_2:将RDD分别缓存在集群的两个节点上,当内存中空间不足时,会将部分RDD分区缓存到磁盘,RDD在集群内存中保存两份。
· MEMORY_AND_DISK_SER:将RDD以Java序列化对象的方式缓存到内存中,当内存中空间不足时,会将部分RDD分区缓存到磁盘,有效减少了RDD在内存中占用的空间,不过读取时会消耗更多的CPU资源。
· OFF_HEAP:将RDD以序列化的方式缓存到JVM之外的存储空间中,与其他缓存模式相比,减少了JVM垃圾回收开销。
Spark的操作还有很多,具体可以参考Spark官网的相关文档。这里只是给出了基本操作的语言描述,因此会比较难于理解,后续会详细通过示例来说明每个操作的具体用法。