购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

1.2 Spark大数据处理框架

1.2.1 Spark速度为何如此之快

1.统一的RDD抽象和操作

Spark作为一个通用的大数据计算平台,基于“One Stack to rule them all”的理念成功成为了一体化、多元化的大数据处理平台,轻松应对大数据处理中的实时流计算、SQL交互式查询、机器学习和图计算等,如图1-23所示。

图1-23 One Stack to rule them all的理念

Spark速度快的一个核心原因就是统一的RDD抽象,基于该抽象,使得Spark的框架可轻而易举地使用Spark Core中所有的内容,并且各个框架可以在内存中无缝地集成和完成系统任务。

基于统一的技术堆栈,Spark目前已经成为大数据通用计算平台。

2.基于内存的迭代式计算

首先我们看一下Hadoop经典的处理过程,如图1-24所示。

图1-24 Hadoop经典的处理过程

MapReduce在每次执行时都要从磁盘读数据,计算完毕后都要把数据存放到磁盘上,如图1-25所示。

图1-25 MapReduce执行过程

而Spark是基于内存的,执行过程如图1-26所示。

图1-26 基于内存的Spark执行过程

3.DAG

另一方面,DAG也是Spark速度快的极为重要的原因,图1-27是一张DAG图的示例。

图1-27 一张DAG图的示例

基于DAG,Spark具备了非常精致的作业调度系统,如图1-28所示。

图1-28 Spark的作业调度系统

DAG中的依赖有宽依赖和窄依赖之分,如图1-29所示。

在DAG图中可以根据依赖对pipeline等进行优化操作,如图1-30所示。

基于RDD和DAG,并行计算整个Job,如图1-31所示。

图1-29 DAG中的宽依赖和窄依赖

图1-30 根据依赖对pipeline等进行优化操作

图1-31 并行计算整个Job

4.出色的容错机制

Spark之所以快,还有一个原因就是其容错机制,基于DAG图,lineage是轻量级且高效的。

操作之间相互具备lineage的关系,每个操作只关心其父操作,各个分片的数据之间互不影响,出现错误时只要恢复单个Split的特定部分即可,如图1-32所示。

图1-32 操作之间的逻辑关系

1.2.2 RDD:分布式函数式编程

1.RDD的基本概念

RDD是什么?RDD的全称是Resilient Distributed Dataset(弹性分布式数据集)。在Spark中,一个RDD就是一个分布式对象集合。每个RDD可分为多个片(partitions)。而分片可以在集群环境的不同结点上计算。RDD兼容Python、Java或者Scala的对象,包括用户定义的类。

用户可以通过两种方式创建RDD:加载外部数据集或者在驱动程序中部署对象集合。

我们可以通过SparkContext.textFile()来加载一个将文本文件作为字符串的RDD:

RDD被创建后,提供了两种类型的操作:转换(transformation)和动作(action)。转换是将原来的RDD构建成新的RDD。比如说:

而动作是通过RDD来计算的结果,并且将结果返回给驱动程序或者保存到外部存储系统(如HDFS)。例子如下:

上面的first()返回RDD的第一个元素。

动作和转换的不同之处取决于Spark计算RDD的方式。你可以在任何时间创建新的RDD,Spark采用一种慵懒的态度计算它们——第一次使用它们中的动作时才计算。这种方法可能有点令人不习惯,但很有意义。当进行大数据处理工作时,例如,考虑上面的例子,在这里我们定义了一个文本文件,然后过滤含有“right”的行。如果Spark加载和存储所有中间过程,就会浪费大量的存储空间,因为我们随后会立即过滤掉一部分不需要的行。相反,一旦Spark看到整个变换链,它可以计算仅需要其结果的数据。事实上,对于first()动作,Spark只扫描文件,直到它找到第一个匹配的行,它甚至不读整个文件!

最后,Spark的RDDS在默认情况下每次运行它们都要进行重新计算。如果需要重用在多个动作中,你可以使用Spark的持久化方法:RDD.persist()。计算它的第一次后,Spark将RDD内容存储在内存中(整个机器的集群分区),并在未来的动作中重新使用它们。持久化在磁盘上,而不是存储在内存中也是可行的。默认情况下的动作不进行持久化是很有意义的,使用一个很有意义的大数据集:如果你不重用RDD,没有理由浪费过多的存储空间。

实际上,一般会经常使用持久化去加载数据集到内存中,以便可以重复地查询和使用。比如,如果想计算多个关于“right”的结果,会像下面这样写:

2.创建RDD

Spark提供两种创建RDD的方式:加载外部数据集和在驱动程序中平行化集合。

创建RDD最简单的方法就是采用现有的内存集合并把它传递给SparkContext的并行化方法。在学习Spark时,这样的做法是非常有用的。因为可以在shell中快速创建自己的RDD并对其执行相关的操作。

创建RDD一个更常见的方法是加载外部存储数据。我们已经看到了加载一个文本文件字符串的方法,用SparkContext.textFile()来创建一个RDD:

3.RDD操作

RDD支持两种类型的操作:转换和动作。转换是上一个RDD返回一个新的RDD,如映射和过滤器。动作是返回结果并将结果返回给驱动程序或写入到存储,并进行计算,如count()和first()。Spark对待转换和动作非常不同,所以理解正在执行的是哪一类操作类型是非常重要的。如果感到困惑,不知道一个给定的function是转换还是动作,你可以看看它的返回类型:转换返回RDD而动作返回一些其他数据类型。

1)转换

转换是RDD操作的一种类型,每次进行转换操作都会返回一个新的RDD对象。转换RDD的计算方式是惰性的,只有在动作中使用它们才会进行计算。

注意,filter()的操作不改变现有的inputRDD。相反,它返回一个全新的RDD。inputRDD仍可以在以后的程序重新使用,例如,要搜索“error”以外的其他词语,使用inputRDD再次搜索含有“warning”的行,然后使用另一种转型union(),打印出含有“error”或“warning”的行。例子如下:

Union()和filter()有点不同,因为它操作在两个RDD之上而不是一个RDD。转换可以在任意数量的RDD上进行操作。最后,当相互转换得到并使用新的RDD时,Spark跟踪记录且设定不同的RDD之间的依赖关系,这种关系称为血统图。它使用这个信息来按照需求计算每个RDD,以及恢复持续化的RDD丢失的那一部分数据。

图1-33给出了血统图的例子,包括inputRDD和结束的两个动作。需要注意的是,每次我们调用一个新的动作,整个RDD必须“从头开始”计算。为了提高效率,用户可以将这些中间结果持久化。

图1-33 血统图

2)动作

我们已经看到如何从相互转换中创造RDD,但在某些时候,我们会对数据集做一些真正有用的操作。动作是第二类RDD操作,它们返回一个最终值到驱动程序或返回数据写入到一个外部存储系统。动作迫使所需的转换被调用执行,因为它们需要实际产生输出。

我们可能想打印出一些关于badLinesRDD的信息。为了做到这一点,我们使用两个动作,count(),它返回计数,为一个数字;take()包含了大量来自RDD的元素。

在这个例子中,我们使用take()在驱动程序中检索一个小数目的RDD元素。然后,我们在驱动程序中遍历它们并在本地打印出它们的信息。RDDS还有一个collect()函数,用来获取整个RDD。程序中的filterRDDs会降到一个非常小的数量,并且可以通过本地方式对付它。需要记住,整个数据集必须在一台机器上装入内存中再使用collect(),因此collect()不应该用在大型数据集上。

在大多数情况下,RDD不能仅仅被collect()到驱动,因为它们太大。在这些情况下,常见的做法是把数据写到一个分布式的存储系统中,如HDFS或Amazon S3。RDD的内容可以使用saveAsTextFile()或者saveAsSequenceFile()以及其他动作来保存。

3)惰性评估(Lazy Evaluation)

RDDS在转换过程中是惰性的,这意味着Spark一开始不会执行直到它看到一个动作。这可能对一些新用户是有些反直观的。惰性评估意味着,当我们调用RDD的转换时,不立即执行该操作。相反,Spark在内部记录元数据以表明该操作已被请求,而不是考虑RDD包含的具体的数据。最好是把每个RDD的转换操作看作关于如何计算数据的指令。将数据加载到RDD中也是以同样的方式(惰性的方式)转换的。因此,当我们执行sc.textFile()时,数据并没有被加载,直到它有动作需要执行时才加载数据。

转换是惰性的,迫使Spark在任何时间只能通过运行一个动作来执行它们,如count()。这是一个用来测试一部分程序的简单方法。Spark通过使用惰性评估,以减少其在各种转换操作中所需要储存的中间数据。在MapReduce系统中,如Hadoop,开发者往往要花掉很多时间考虑如何把一些操作组合到一起,以尽量减少MapReduce的数据量。因此,用户可以自由地安排自己的程序,并将其转换成更小、更容易管理的操作。 pIkiBvm12fvxM49mlr6IsrjzrCF5WgOCni/CDpj2k0eSz/U7TC24DeWY1+FMA9z6

点击中间区域
呼出菜单
上一章
目录
下一章
×