购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

1.2 Spark原理及特点

1.2.1 Spark的核心优势

Spark为什么能在短时间内突然崛起?Spark相对Hadoop MapReduce有何优势?接下来,我们将介绍Spark相对于Hadoop MapReduce的3个核心优势——高性能、高容错性和通用性。

1.高性能

Spark继承了Hadoop MapReduce大数据计算的优点,但不同于MapReduce的是:MapReduce每次执行任务时的中间结果都需要存储到HDFS磁盘上,而Spark每次执行任务时的中间结果可以保存到内存中,因而不再需要读写HDFS磁盘上的数据,具体如图1-6所示。这里假设任务的计算逻辑需要执行两次迭代计算才能完成,在MapReduce任务的计算过程中,MapReduce任务首先从HDFS磁盘上读取数据,然后执行第一次迭代计算,等到第一次迭代计算完成后,才会将计算结果写入HDFS磁盘;当第二次迭代计算开始时,需要从HDFS磁盘上读取第一次迭代计算的结果并执行第二次迭代计算,并且等到第二次迭代计算完成后,才将计算结果写到HDFS磁盘上,此时整个迭代计算过程才完成。可以看出,在MapReduce任务的计算过程中,分别经历了两次HDFS磁盘上的数据读和两次HDFS磁盘上的数据写,而大数据计算产生的耗时很大一部分来自磁盘数据的读写,尤其是在数据超过TB(太字节)级别后,磁盘读写这个耗时因素将变得更加明显。

图1-6 对比MapReduce任务计算和Spark任务计算

为了解决数据读写磁盘慢的问题,Spark会将中间的计算结果保存到内存中(前提是内存中有足够的空间)。当后面的迭代计算需要用到这些数据时,Spark可直接从内存中读取它们。因为内存中数据的读写速度和磁盘上数据的读写速度不是一个级别,所以Spark通过从内存中读写数据,这样能够更快速地完成数据的处理。例如,对于同一个需要两次迭代计算的任务,在Spark任务的计算过程中,首先会从HDFS磁盘上读取数据并执行第一次迭代计算,在第一次迭代计算完成后,Spark会将计算结果保存到分布式内存中;等到执行第二次迭代计算时,Spark会直接从内存中读取第一次迭代计算的结果并执行第二次迭代计算,并在第二次迭代计算完成后,将最终结果写入HDFS磁盘。可以看出,Spark在任务执行过程中分别进行了一次HDFS磁盘读和一次HDFS磁盘写。也就是说,Spark仅在第一次读取源数据和最后一次将结果写出时,基于HDFS进行磁盘数据的读写,而计算过程中产生的中间数据都存放在内存中。因此,Spark的计算速度自然要比MapReduce快很多。

2.高容错性

对于任何一个分布式计算引擎来说,容错性都是必不可少的功能,因为几乎没有人能够忍受任务的失败和数据的错误或丢失。在单机环境下,开发人员可以通过锁、事务等方式保障数据的正确性。但是,对于分布式环境来说,既需要将数据打散分布在多个服务器上以并发执行,也需要保障集群中的每份数据都是正确的,后者相对来说实现难度就大多了。另外,由于网络故障、系统硬件故障等问题不可避免,因此分布式计算引擎还需要保障在系统发生故障时,能及时从故障中恢复并保障故障期间数据的正确性。

Spark从基于“血统”(lineage)的数据恢复和基于检查点(checkpoint)的容错两方面提高系统的容错性。

Spark引入了RDD的概念。RDD是分布在一个或多个节点上的只读数据的集合,这些集合是弹性的并且相互之间存在依赖关系,数据集之间的这种依赖关系又称为“血缘关系”。如果数据集中的一部分数据丢失,则可以根据“血缘关系”对丢失的数据进行重建。具体如图1-7所示,这里假设一个任务中包含了Map计算、Reduce计算和其他计算,当基于 Reduce计算的结果进行计算时,如果任务失败导致数据丢失,则可以根据之前Reduce计算的结果对数据进行重建,而不必从Map计算阶段重新开始计算。这样便根据数据的“血缘关系”快速完成了故障恢复。

图1-7 Spark基于“血缘关系”进行数据恢复

Spark任务在进行RDD计算时,可以通过检查点来实现容错。例如,当编写一个Spark Stream程序时,我们可以为其设置检查点,这样当出现故障时,便可以根据预先设置的检查点从故障点进行恢复,从而避免数据的丢失和保障系统的安全升级等。

如图1-8所示,这里通过val ssc = new StreamingContext(conf, Seconds(10))定义了一个名为ssc的StreamingContext,然后通过ssc.checkpoint(checkpointDir)设置了检查点。其中,checkpointDir为检查点的存储路径,当任务发生错误时,可从检查点恢复任务,从而有效保障了任务的安全性。

图1-8 Spark检查点容错

3.通用性

Spark是通用的大数据计算框架,这主要表现在两个方面:一是Spark相对于Hadoop来说支持更多的数据集操作,二是Spark支持更丰富的计算场景。

Hadoop只支持Map和Reduce操作,而Spark支持的数据集操作类型丰富得多,具体分为Transformation操作和Action操作两种。Transformation操作包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort和PartitionBy等操作,Action操作则包括Collect、Reduce、Lookup和Save等操作。另外,Spark的计算节点之间的通信模型不但支持Shuffle操作,而且支持用户命名、物化视图、控制中间结果的存储、数据分区等,具体如图1-9所示。

图1-9 Spark支持的数据集操作

缘于卓越的性能,Spark被广泛应用于复杂的批数据处理(batch data processing),这种场景下的数据延迟一般要求在几十分钟或几分钟;基于历史数据的交互式查询(interactive query)这种场景下的数据延迟一般也要求在几十分钟或几分钟;而基于实时数据流的数据处理(streaming data processing)场景下的数据延迟通常要求在数百毫秒到数秒之间。Spark还被广泛应用于图计算和机器学习领域。Spark常见的应用场景如图1-10所示。

图1-10 Spark常见的应用场景

上面总结了Spark 相对于Hadoop MapReduce都有哪些核心优势。表1-1从数据存储结构、编程范式、数据读写性能和任务执行方式的角度分别对比了Hadoop MapReduce和Spark的差别。

表1-1 Hadoop MapReduce和Spark的差别

1.2.2 Spark生态介绍

Spark生态也称为BDAS(伯克利数据分析栈),它由伯克利APMLab实验室打造,目标是在算法(algorithm)、机器(machine)和人(people)之间通过大规模集成来构建大数据应用的一个平台,具体关系如图1-11所示。BDAS通过对通信、大数据、机器学习、云计算等技术的运用以及资源的整合,试图通过对人类生活中海量的不透明数据进行收集、存储、分析和计算,来使人类从数字化的角度更好地理解我们自身所处的世界。

图1-11 Spark生态

从Spark生态的概念中可以看出,Spark生态的范围是十分广泛的。Spark生态中到底使用了哪些具体的技术呢?接下来我们从多语言支持、多调度框架的运行、多组件支撑下的多场景应用、多种存储介质、多数据格式等角度介绍Spark生态中一些常用的技术,具体如图1-12所示。

图1-12 Spark生态中常用的技术

1.2.3 Spark模块的组成

Spark基于Spark Core建立了Spark SQL、Spark Streaming、GraphX、Spark MLlib、SparkR等核心组件,基于不同的组件可以实现不同的计算任务。

Spark模块的组成如图1-13所示。

图1-13 Spark模块的组成

从运行模式看,Spark任务的运行模式有本地模式、独立模式、Mesos模式、YARN模式和Kubernetes模式。

从数据源看,Spark任务的计算可以基于HDFS、AWS S3、ElasticSearch、HBase或Cassandra等多种数据源。

1.Spark Core

Spark Core的核心组件包括基础设施、存储系统、调度系统和计算引擎,具体如图1-14所示。其中,基础设施包括SparkConf(配置信息)、SparkContext(上下文信息)、Spark RPC(远程过程调用)、ListenerBus(事件监听总线)、MetricsSystem(度量系统)和SparkEvn(环境变量);存储系统包括内存和磁盘等;调度系统包括DAG调度器和任务调度器等;而计算引擎包括内存管理器、任务管理器和Shuffle管理器等。

图1-14 Spark Core的核心组件

1)Spark基础设施

Spark基础设施为其他组件提供最基础的服务,是Spark中最底层、最常用的一类组件,具体包括如下组件。

2)Spark存储系统

Spark存储系统用于管理Spark运行过程中数据的存储方式和存储位置。Spark存储系统如图1-15所示。Spark存储系统的设计采用内存优先的原则。Spark存储系统首先会将各个计算节点产生的数据存储在内存中,当内存不足时就将数据存储到磁盘上。这种内存优先的存储策略,使得Spark的计算性能无论是在实时流计算还是在批量计算的场景下都表现十分良好,同时使Spark的内存空间和磁盘存储空间得到了灵活控制。除此之外,Spark还可以通过网络将结果存储到远程存储(比如HDFS、AWS S3、阿里云OSS等)中,以实现分离计算和存储的目的。

图1-15 Spark存储系统

3)Spark调度系统

Spark调度系统主要由DAG调度器和任务调度器组成,如图1-16所示。DAG调度器的主要功能是创建作业(job),将DAG中的RDD划分到不同的Stage中,为Stage创建对应的任务(task)、批量提交任务等。任务调度器的主要功能是对任务进行批量调度。Spark使用的调度算法有先进先出(FIFO)、公平调度等。

图1-16 Spark调度系统

4)Spark计算引擎

Spark计算引擎由内存管理器、作业管理器、任务管理器、Shuffle管理器等组成。Spark计算引擎主要负责集群任务计算过程中内存的分配、作业和任务的运行、作业和任务状态的监控及管理等。

2.Spark SQL

Spark提供了两个抽象的编程对象,分别叫作DataFrame(数据框)和Dataset(数据集),它们是分布式SQL查询引擎的基础,Spark正是基于它们构建了基于SQL的数据处理方式,具体如图1-17所示。这使得分布式数据的处理变得十分简单,开发人员只需要将数据加载到Spark中并映射为表,就可以通过SQL语句来实现数据的分析。

图1-17 Spark SQL的构建

1)DataFrame

DataFrame是Spark SQL对结构化数据所做的抽象,可简单理解为DataFrame就是Spark中的数据表,DataFrame相比RDD多了数据的结构信息,即Schema信息。DataFrame的数据结构如下:DataFrame(表)= Data(表数据)+ Schema(表结构信息)。如图1-18所示,其中,DataFrame有Name、Legs、Size三个属性,第一条数据中的Name为pig,第二条数据中的Name为cat,第三条数据中的Name为dog。

图1-18 DataFrame的数据结构

在Spark中,RDD表示分布式数据集,而DataFrame表示分布式数据框,数据集和数据框最大的差别就在于数据框中的数据是结构化的。因此,基于数据框中的数据结构,Spark可以根据不同的数据结构对数据框上的运算自动进行不同维度的优化,从而避免不必要的数据读取等问题,提高程序的运行效率。

RDD和DataFrame的数据结构对比如图1-19所示。这里假设有一个Animal数据集,开发人员从RDD的角度仅能看到每条数据,但从DataFrame的角度能看到每条数据的内部结构,比如Name字段为string类型,Legs字段为int类型,Size字段为double类型。其中,Name字段表示动物的名称,Legs字段表示动物有几条腿,Size字段表示动物的体型大小。这样当Spark程序在DataFrame上对每条数据执行运算时,便可以有针对性地进行优化。例如,要读取Legs等于4的数据,Spark在Legs字段上进行逻辑运算时就会使用int类型的函数进行运算。在Java中,int型数据的存储结构和优化空间相比string型数据要好很多,因此执行效率也会高很多。

图1-19 对比RDD和DataFrame的数据结构

在Spark中,DataFrame可以通过多种方式来构建。例如,开发人员可通过Spark RDD构建DataFrame,可通过Hive读取数据并将它们转换为DataFrame,可通过读取CSV、JSON、XML、Parquet等文件并将它们转换为DataFrame,可通过读取RDBMS中的数据并将它们转换为DataFrame。除此之外,开发人员还可通过Cassandra或HBase这样的列式数据库来构建DataFrame。构建好DataFrame之后,开发人员便可以直接将DataFrame映射为表并在表上执行SQL语句以完成数据分析,如图1-20所示。

图1-20 DataFrame的构建

2)Dataset

Dataset是数据的分布式集合。Dataset结合了RDD强类型化的优点和Spark SQL优化后执行引擎的优点。可以从JVM对象构建Dataset,然后使用Map()、FlatMap()、Filter()等函数对其进行操作。此外,Spark还提供对Hive SQL的支持。

3.Spark Streaming

Spark Streaming为Spark提供了流式计算的能力。Spark Streaming支持从Kafka、HDFS、Twitter、AWS Kinesis、Flume和TCP服务等多种数据源获取数据,然后利用Spark计算引擎,在数据经过Spark Streaming的微批处理后,最终将计算结果写入Kafka、HDFS、Cassandra、Redis和Dashboard(报表系统)。此外,Spark Streaming还提供了基于时间窗口的批量流操作,用于对一定时间周期内的流数据执行批量处理。图1-21展示了Spark Streaming的流式计算架构。

图1-21 Spark Streaming的流式计算架构

4.GraphX

GraphX用于分布式图计算。利用Pregel提供的API,开发人员可以快速实现图计算的功能。

5.Spark MLlib

Spark MLlib(见图1-22)是Spark的机器学习库。Spark MLlib提供了统计、分类、回归等多种机器学习算法的实现,其简单易用的API降低了机器学习的门槛。

图1-22 Spark MLlib

6.SparkR

SparkR(见图1-23)是一个R语言包,它提供了一种轻量级的基于R语言使用Spark的方式。SparkR实现了分布式的数据框,支持类似于查询、过滤及聚合这样的操作,功能类似于R语言中的DataFrame包dplyr。SparkR使得Spark能够基于R语言更方便地处理大规模的数据集,同时SparkR还支持机器学习。

图1-23 SparkR架构

1.2.4 Spark运行模式

Spark运行模式指的是Spark在哪个资源调度平台上以何种方式(一般分单机和集群两种方式)运行。Spark运行模式主要包括local(本地模式)、standalone(独立模式)、on YARN、on Mesos、on Kubernetes以及on Cloud(运行在AWS等公有云平台上),如表1-2所示。

表1-2 Spark运行模式

1.2.5 Spark集群的角色组成

Spark集群主要由集群管理器(cluster manager)、工作节点(worker)、执行器(executor)、Spark应用程序(application)和驱动器(driver)5部分组成,如图1-24所示。

图1-24 Spark集群的角色组成

1.集群管理器

集群管理器用于Spark集群资源的管理和分配。

2.工作节点

工作节点用于执行提交到Spark中的任务。工作节点的工作职责和交互流程如图1-25所示。

图1-25 工作节点的工作职责和交互流程

(1)工作节点通过注册机制向集群管理器汇报自身的CPU和内存等资源使用情况。

(2)工作节点在Spark主节点的指示下创建并启动执行器,执行器是真正执行计算任务的组件。

(3)Spark主节点将任务分配给工作节点上的执行器并运行。

(4)工作节点同步资源信息和执行器状态信息给集群管理器。

3.执行器

执行器是真正执行计算任务的组件,它在工作节点上以一个进程的形式存在,这个进程负责任务的运行并将运行结果保存到内存中或磁盘上。

4.Spark应用程序

Spark应用程序是基于Spark API编写的,其中包括用于实现驱动器功能的驱动程序以及运行在集群的多个节点上的执行器程序。Spark应用程序由一个或多个作业组成,如图1-26所示。

图1-26 Spark应用程序

5.驱动器

驱动器包含了运行应用程序的主函数和构建SparkContext实例的程序。Spark应用程序通过驱动器来与集群管理器和执行器进行通信。驱动器既可以运行在应用程序节点上,也可以由应用程序提交给集群管理器,再由集群管理器安排给工作节点运行。当执行器运行完毕后,驱动器负责将SparkContext关闭。驱动器的主要职责如下。

(1)驱动器包含运行应用程序的主函数。

(2)在Spark中,SparkContext是在驱动器中创建的。SparkContext负责和集群管理器通信,进行资源的申请以及任务的分配和监控等。

(3)Spark在驱动器中划分RDD并生成DAG。

(4)Spark在驱动器中构建作业并将每个作业划分为多个Stage,各个Stage相互独立。作业是由多个Stage构建的并行计算任务,具体由Spark中的Action操作(如Count、Collect、Save等操作)触发。

(5)驱动器能与Spark中的其他组件进行资源协调。

(6)Spark在驱动器中生成任务并将任务发送到执行器上运行。

下面展示了一个读取JSON文件的简单Spark程序。

public class RDDSimple {
        //定义运行应用程序的主函数
    public static void main(String[] args) {
        //初始化SparkConf实例
        SparkConf conf = new
             SparkConf().setAppName(RDDSimple.class.getName()).setMaster("local");
        //初始化JavaSparkContext实例
        JavaSparkContext sc = new JavaSparkContext(conf);
        String filepath = "your_file_path/temp.json"; 
        //读取文件到Spark中
        JavaRDD<String> lines = sc.textFile(filepath);
        JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
        //通过reduce算子触发Action操作
        int totalLength = lineLengths.reduce((a, b) -> a + b);
        System.out.println("[ spark map reduce operation: ] count is:"+totalLength);
        //关闭JavaSparkContext并释放资源
         sc.close();
        }
 }       

上面这个简单的Spark程序包含了运行应用程序的主函数,主函数中定义了SparkConf实例conf和JavaSparkContext实例sc,可通过JavaSparkContext实例sc的textFile()方法读取名为temp.json 的JSON文件。读取结果为JavaRDD类型的数据,可通过调用RDD的map()方法来计算每条数据的长度并通过reduce()方法将所有的长度加起来。其中,RDD的reduce()方法会触发Action操作。统计完之后,需要调用JavaSparkContext实例sc的close()方法以关闭JavaSparkContext并释放资源。

在上面的Spark程序中,SparkConf和JavaSparkContext的初始化代码为驱动程序,运行在驱动器节点上,代码lines.map(s -> s.length())则运行在集群中一个或多个节点的执行器上。

1.2.6 Spark核心概念

1.SparkContext

SparkContext是整个Spark应用程序中非常重要的对象之一。SparkContext是应用程序和Spark集群交互的通道,如图1-27所示,主要用于初始化运行Spark应用程序所需的基础组件,具体包括如下组件。

另外,SchedulerBackend是调度器的通信终端,主要负责运行任务所需资源的申请。

图1-27 SparkContext

同时,SparkContext还负责向Spark管理节点注册应用程序等。

2.RDD

RDD是弹性分布式数据集,它是Spark对数据和计算模型所做的统一抽象。也就是说,RDD中既包含了数据,也包含了针对数据执行操作的算子。RDD可通过在其他RDD上执行算子操作转换而来,RDD之间是相互依赖的,从而形成了RDD之间的“血缘关系”,这又称为RDD之间的DAG。开发人员可通过一系列算子对RDD进行操作,比如进行Transformation操作和Action操作。

观察图1-28,开发人员可通过sc.textFile()方法从HDFS读取数据到Spark中并将其转换为RDD,然后在RDD上分别执行flatMap、map、reduceByKey算子操作,从而对RDD上的数据进行计算,计算完成后,可通过调用saveAsTextFile()方法将计算结果写到HDFS中。

图1-28 Spark RDD

具体的代码实现片段如下:

val rdd-0 = sc.textFile("your_text_file_path")
val rdd-1 = rdd-0.flatMap(x=> {x.split(" ")})
val rdd-2 = rdd-1.map(x => (x,1))
val rdd-3 = rdd-2.reduceByKey((pre, after) => pre + after)
rdd-3.saveAsTextFile(basePath+"simple-1")

从上述代码可以看出,rdd-0是通过调用sc.textFile()方法转换而来的,rdd-1是通过调用rdd-0的flatMap算子转换而来的,rdd-2是通过调用rdd-1的map算子转换而来的,rdd-3是通过调用rdd-2的reduceByKey算子转换而来的。RDD之间可以相互转换,从而形成了DAG。

在上述操作过程中,textFile、flatMap和map操作属于Transformation操作,reduceByKey和saveAsTextFile操作属于Action操作。在实践中,我们一般不会定义这么多RDD,而是通过链式编程一气呵成,具体的代码实现片段如下:

val rdd-0 = sc.textFile("your_text_file_path")
rdd-0.flatMap(x=> {x.split(" ")}).map(x => (x,1)).reduceByKey((pre, after) => pre + after)
     .saveAsTextFile(basePath+"simple-1")

3.DAG

DAG是有向无环图,通常用于建模。Spark是通过DAG对RDD之间的关系进行建模的。也就是说,DAG描述了RDD之间的依赖关系,这种依赖关系也叫作血缘关系。Spark通过Dependency对象来维护RDD之间的依赖关系。

当处理数据时,Spark会将RDD之间的依赖关系转换为DAG。基于DAG的血缘关系,当计算发生故障时,Spark便能够对RDD快速地进行数据恢复。观察图1-29,这里一共有4个RDD——RDD0、RDD1、RDD2、RDD3。其中,RDD0由外部数据源“数据输入1”转换而来;RDD1由外部数据源“数据输入2”转换而来;RDD2由RDD0和RDD1转换而来,并且在转换过程中发生了数据的Shuffle操作;RDD2在经过转换后生成了RDD3;RDD3执行完毕后,就会将计算结果写入“数据输出”。这几个RDD之间的依赖关系是:RDD3依赖于RDD2,RDD2依赖于RDD0和RDD1。因此,如果在执行RDD3计算时发生故障,那么只需要从RDD2开始重新计算RDD3,而不必从“数据输入1”和“数据输入2”重新开始计算。

图1-29 Spark DAG

4.DAG调度器

DAG调度器面向Stage级别,执行逻辑层面的调度。DAG调度器主要负责Stage的划分、提交、状态跟踪以及结果的获取,如图1-30所示。

图1-30 Spark DAG调度器

5.任务调度器

任务调度器的主要职责包括物理资源调度管理、任务集调度管理、任务执行、任务状态跟踪以及将任务的运行结果汇报给DAG调度器,如图1-31所示。

图1-31 Spark任务调度器

6.作业

Spark应用程序通常包含一个或多个作业。Spark将根据Action操作(如saveAsTextFile、Collect)划分作业并触发作业的执行,而一个作业又分为一个或多个可以并行计算的Stage(至于是否可以并行计算,则需要根据Stage的依赖关系来定)。Stage是根据Shuffle操作来划分的,一个Stage和一个任务集对应,任务集是多个任务的集合,每个任务则对应RDD中某个分区上数据的处理,如图1-32所示。

图1-32 Spark作业

7.Stage

DAG调度器会把DAG划分为相互依赖的多个Stage,Stage的划分依据则是RDD之间依赖的宽窄。当遇到宽依赖(数据发生了Shuffle操作)时,就划分出一个Stage,每个Stage中则包含一个或多个任务。然后,DAG调度器会将这些任务以任务集的形式提交给任务调度器并运行。和RDD之间的依赖关系类似,Stage之间也存在依赖关系。Spark中的Stage分为ShuffleMapStage和ResultStage两种类型。在Spark应用程序中,最后一个Stage为ResultStage,其他的Stage均为ShuffleMapStage。

观察图1-33,其中包含3个Stage,分别为Stage-1、Stage-2和Stage-3。其中,Stage-3依赖于Stage-1和Stage-2。由于A和B的依赖关系为宽依赖,也就是说,从A到B会发生数据的Shuffle操作,因此划分出一个Stage;由于F和G的依赖关系为宽依赖,因此同样划分出一个Stage;由于C和D、D和F、E和F的依赖关系均为窄依赖,因此它们都被划分到同一个Stage中,也就是划分到Stage-2中。

图1-33 Spark Stage

8.任务集

一组任务就是一个任务集,对应一个Stage。任务集中包含多个任务,并且同一任务集中的所有任务之间不会发生数据的Shuffle操作,因此,同一任务集中的所有任务可以相互不受影响地并行执行,如图1-34所示。

图1-34 Spark任务集

9.任务

任务是Spark中独立的工作单元,它以线程的方式在执行器上运行。一般情况下,一个任务对应一个线程,负责处理RDD中某个分区上的数据。任务根据返回类型的不同,又分为ShuffleMapTask和ResultTask两种。

10.总结

图1-35对Spark核心概念做了总结。开发人员构建出来的、可运行的Spark项目称为Spark应用程序,Spark应用程序包含了驱动程序,而驱动程序包含了SparkConf、SparkContext等核心组件的初始化代码。同时,SparkContext又包含了DAG调度器和任务调度器两个核心组件。在执行Spark应用程序的过程中,Spark会根据Action操作将Spark 应用程序划分为多个作业并交给DAG调度器处理,DAG调度器负责将作业构建为DAG并划分Stage,同时提交Stage到任务调度器。任务调度器负责加载并注册任务集到集群管理器,集群管理器负责集群管理、资源分配、任务分配并跟踪作业的提交和执行。实际情况是,任务是在工作节点的执行器上执行的。

图1-35 Spark核心概念

1.2.7 Spark作业运行流程

1.Spark作业运行流程简述

Spark应用程序以进程集合为单位运行在分布式集群上,可通过驱动程序的主函数创建SparkContext对象并通过SparkContext对象与集群进行交互,如图1-36所示。

图1-36 Spark作业运行流程

(1)Spark通过SparkContext向集群管理器申请运行应用程序所需的资源(CPU、内存等资源)。

(2)集群管理器分配执行应用程序所需的资源,并在工作节点上创建执行器。

(3)SparkContext将程序代码和任务发送到执行器上运行并收集运行结果到驱动程序节点上。程序代码一般为Jar包或Python文件。

2.Spark RDD迭代过程

Spark数据计算主要通过RDD迭代来完成,RDD是弹性分布式数据集,可以看作对各种数据计算模型所做的统一抽象。在Spark RDD迭代过程中,数据被分到多个分区以进行并行计算,分区的数量取决于应用程序对此是如何设定的。每个分区里的数据只会在一个任务上计算,所有分区可在多个机器节点的执行器上并行执行。

Spark RDD迭代过程如图1-37所示。

(1)SparkContext创建RDD对象,计算RDD之间的依赖关系并由此生成DAG。

(2)DAG调度器将DAG划分为多个Stage,并将Stage对应的任务集提交到集群管理器。Stage的划分依据就是RDD之间依赖的宽窄。当遇到宽依赖时,就划分出一个Stage,每个Stage包含一个或多个任务。

(3)任务调度器通过集群管理器为每个任务申请系统资源并将任务提交到工作节点以执行。

(4)工作节点上的执行器负责执行具体的任务。

图1-37 Spark RDD迭代过程

3.Spark作业运行的详细流程

在简要了解了Spark作业运行的流程之后,接下来介绍Spark作业运行的详细流程,如图1-38所示。

(1)SparkContext向资源管理器注册任务。

(2)资源管理器申请运行任务所需的执行器。

(3)资源管理器分配执行器。

(4)资源管理器启动执行器。

(5)执行器发送心跳到资源管理器。

(6)SparkContext根据代码构建DAG。

(7)DAG调度器将DAG划分为Stage。

(8)DAG调度器将Stage以任务集的方式发送给任务调度器。

(9)执行器向SparkContext申请任务。

图1-38 Spark作业运行的详细流程

(10)SparkContext发送应用程序代码到执行器。

(11)任务调度器将任务发送给执行器运行。

(12)在执行器上运行任务。

(13)应用程序完成运行,释放资源。

4.YARN资源管理器

YARN是一种分布式资源管理和任务调度框架,由资源管理器、节点管理器和应用程序管理器3个核心模块组成。其中,资源管理器负责集群资源的管理、监控和分配;节点管理器负责节点的维护;应用程序管理器负责具体应用程序的调度和协调。由于资源管理器负责所有应用程序的控制以及资源的分配权,因此每个应用程序管理器都会与资源管理器协商资源,同时与节点管理器通信并监控任务的运行。YARN 资源管理器如图1-39所示。

图1-39 YARN资源管理器

1)资源管理器

资源管理器负责整个集群的资源管理和分配。节点管理器以心跳的方式向资源管理器汇报CPU内存等资源的使用情况。资源管理器接收节点管理器的资源汇报信息,具体的资源处理则交给节点管理器负责。

2)节点管理器

节点管理器负责具体节点的资源管理和任务分配,相当于资源管理器,用来管理节点的代理节点,主要负责节点程序的运行以及资源的管理与监控。YARN集群的每个节点上都运行着一个节点管理器。

节点管理器定时向资源管理器汇报节点资源的使用情况和容器的运行状态。当资源管理器宕机时,节点管理器会自动连接资源管理器的备用节点。同时,节点管理器还会接收并处理来自应用程序管理器的容器启动、停止等请求。

3)应用程序管理器

每个应用程序都有一个应用程序管理器。应用程序管理器的主要职责如下。

4)容器

容器是对YARN集群中物理资源的抽象,它封装了每个节点上的资源(如内存、CPU、磁盘、网络等)信息。当应用程序管理器向资源管理器申请资源时,资源管理器为应用程序管理器返回的资源就是以容器表示的。YARN会将任务分配到容器中运行,同时任务只能使用容器中描述的资源,从而达到隔离资源的目的。

5.YARN任务的提交和运行流程

YARN任务的提交由客户端向资源管理器发起,然后由资源管理器启动应用程序管理器并为其分配用于运行作业的容器资源,应用程序管理器收到容器资源后便初始化容器,最后交由节点管理器启动容器并运行具体的任务。这里的任务既可以是MapReduce任务,也可以是Spark任务或Flink任务。任务运行完之后,应用程序管理器向资源管理器申请注销自己并释放资源。YARN任务的提交和运行流程如图1-40所示。

(1)客户端向资源管理器提交任务,其中包括启动应用程序必需的信息。

(2)资源管理器启动一个容器,并在这个容器中启动应用程序管理器。

(3)启动中的应用程序管理器向资源管理器注册自己,并在启动成功后与资源管理器保持心跳。

(4)应用程序管理器向资源管理器发送请求,申请相应数目的容器。

(5)资源管理器返回应用程序管理器申请的容器信息。

图1-40 YARN任务的提交和运行流程

(6)申请成功的容器由应用程序管理器进行初始化。

(7)在初始化容器的启动信息后,应用程序管理器与对应的节点管理器通信,要求节点管理器启动容器。

(8)应用程序管理器与节点管理器保持定时心跳,以便实时对节点管理器上运行的任务进行监控和管理。

(9)容器在运行期间,通过RPC协议向对应的应用程序管理器汇报自己的进度和状态等信息,应用程序管理器对容器进行监控。

(10)在应用程序运行期间,客户端通过RPC协议与应用程序管理器通信以获取应用程序的运行状态、进度更新等信息。

(11)应用程序完成运行,应用程序管理器向资源管理器申请注销自己,并释放占用的容器资源。

6.Spark应用程序在YARN上的执行流程

Spark应用程序在生产环境中一般运行在YARN上。下面介绍Spark应用程序在YARN 上的执行流程,如图1-41所示。

图1-41 Spark应用程序在YARN上的执行流程

(1)提交Spark应用程序和相关依赖到YARN资源管理器。

(2)Spark引擎加载应用程序管理器。

(3)Spark驱动器开始执行。

(4)SparkContext向应用程序管理器申请资源。

(5)应用程序管理器向YARN资源管理器申请容器资源。

(6)YARN节点管理器启动容器。

(7)YARN节点管理器启动Spark执行器。

(8)将执行器注册到Spark驱动器。

(9)SparkContext加载并运行任务。 eb26dOERvoRE1vFzougCBsk6Eh9dUre83292QVBXvAyKWFZDA8DUfNhV7zdG3RHr

点击中间区域
呼出菜单
上一章
目录
下一章
×