购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

2.2 Spark系统架构

如第1章所介绍,与Hadoop MapReduce的结构类似,Spark也采用Master-Worker结构。如果一个Spark集群由4个节点组成,即1个Master节点和3个Worker节点,那么在部署Standalone版本后,Spark部署的系统架构图如图2.1所示。简单来说,Master节点负责管理应用和任务,Worker节点负责执行任务。

图2.1 Spark部署的系统架构图

我们接下来先介绍Master节点和Worker节点的具体功能,然后介绍一些Spark系统中的基本概念,以及一些实现细节。

Master节点和Worker节点的职责如下所述。

■Master节点上常驻Master进程。该进程负责管理全部的Worker节点,如将Spark任务分配给Worker节点,收集Worker节点上任务的运行信息,监控Worker节点的存活状态等。

■Worker节点上常驻Worker进程。该进程除了与Master节点通信,还负责管理Spark任务的执行,如启动Executor来执行具体的Spark任务,监控任务运行状态等。

启动Spark集群时(使用Spark部署包中start-all.sh脚本),Master节点上会启动Master进程,每个Worker节点上会启动Worker进程。启动Spark集群后,接下来可以提交Spark应用到集群中执行,如用户可以在Master节点上使用

来提交一个名为SparkPi的应用。Master节点接收到应用后首先会通知Worker节点启动Executor,然后分配Spark计算任务(task)到Executor上执行,Executor接收到task后,为每个task启动1个线程来执行。这里有几个概念需要解释一下。

■Spark application,即Spark应用,指的是1个可运行的Spark程序,如WordCount.scala,该程序包含main()函数,其数据处理流程一般先从数据源读取数据,再处理数据,最后输出结果。同时,应用程序也包含了一些配置参数,如需要占用的CPU个数,Executor内存大小等。用户可以使用Spark本身提供的数据操作来实现程序,也可以通过其他框架(如Spark SQL)来实现应用,Spark SQL框架可以将SQL语句转化成Spark程序执行。

■Spark Driver,也就是Spark驱动程序,指实际在运行Spark应用中main()函数的进程,官方解释是“The process running the main() function of the application and creating the SparkContext”,如运行SparkPi应用main()函数而产生的进程被称为SparkPi Driver。在图2.1中,运行在Master节点上的Spark应用进程(通常由SparkSubmit脚本产生)就是Spark Driver,Driver独立于Master进程。如果是YARN集群,那么Driver也可能被调度到Worker节点上运行。另外,也可以在自己的PC上运行Driver,通过网络与远程的Master进程连接,但一般不推荐这样做,一个原因是需要本地安装一个与集群一样的Spark版本,另一个原因是自己的PC一般和集群不在同一个网段,Driver和Worker节点之间的通信会很慢。简单来说,我们可以在自己的IntelliJ IDEA中运行Spark应用,IDEA会启动一个进程既运行应用程序的main()函数,又运行具体计算任务task,即Driver和task共用一个进程。

■Executor,也称为Spark执行器,是Spark计算资源的一个单位。Spark先以Executor为单位占用集群资源,然后可以将具体的计算任务分配给Executor执行。由于Spark是由Scala语言编写的,Executor在物理上是一个JVM进程,可以运行多个线程(计算任务)。在Standalone版本中,启动Executor实际上是启动了一个名叫CoarseGrainedExectuorBackend的JVM进程。之所以起这么长的名字,是为了不与其他版本中的Executor进程名冲突,如Mesos、YARN等版本会有不同的Executor进程名。Worker进程实际只负责启停和观察Executor的执行情况。

■task,即Spark应用的计算任务。Driver在运行Spark应用的main()函数时,会将应用拆分为多个计算任务,然后分配给多个Executor执行。task是Spark中最小的计算单位,不能再拆分。task以线程方式运行在Executor进程中,执行具体的计算任务,如map算子、reduce算子等。由于Executor可以配置多个CPU,而1个task一般使用1个CPU,因此当Executor具有多个CPU时,可以运行多个task。例如,在图2.1中Worker节点1有8个CPU,启动了2个Executor,每个Executor可以并行运行4个task。Executor的总内存大小由用户配置,而且Executor的内存空间由多个task共享。

如果上述解释不够清楚,那么我们可以用一个直观例子来理解Master、Worker、Driver、Executor、task的关系。例如,一个农场主(Master)有多片草场(Worker),农场主要把草场租给3个牧民来放马、牛、羊。假设现在有3个项目(application)需要农场主来运作:第1个牧民需要一片牧场来放100匹马,第2个牧民需要一片牧场来放50头牛,第3个牧民需要一片牧场来放80只羊。每个牧民可以看作是一个Driver,而马、牛、羊可以看作是task。为了保持资源合理利用、避免冲突,在放牧前,农场主需要根据项目需求为每个牧民划定可利用的草场范围,而且尽量让每个牧民在每个草场都有一小片可放牧的区域(Executor)。在放牧时,每个牧民(Driver)只负责管理自己的动物(task),而农场主(Master)负责监控草场(Worker)、牧民(Driver)等状况。

回到Spark技术点讨论,这里有个问题是Spark为什么让task以线程方式运行而不以进程方式运行。在Hadoop MapReduce中,每个map/reduce task以一个Java进程(命名为Child JVM)方式运行。这样的好处是task之间相互独立,每个task独享进程资源,不会相互干扰,而且监控管理比较方便,但坏处是task之间不方便共享数据。例如,当同一个机器上的多个map task需要读取同一份字典来进行数据过滤时,需要将字典加载到每个map task进程中,则会造成重复加载、浪费内存资源的问题。另外,在应用执行过程中,需要不断启停新旧task,进程的启动和停止需要做很多初始化等工作,因此采用进程方式运行task会降低执行效率。为了数据共享和提高执行效率,Spark采用了以线程为最小的执行单位,但缺点是线程间会有资源竞争,而且Executor JVM的日志会包含多个并行task的日志,较为混乱。更多关于内存资源管理和竞争的问题将在后续章节进行阐述。

在图2.1中还有一些实现细节。

■每个Worker进程上存在一个或者多个ExecutorRunner对象。每个ExecutorRunner对象管理一个Executor。Executor持有一个线程池,每个线程执行一个task。

■Worker进程通过持有ExecutorRunner对象来控制CoarseGrainedExecutorBackend进程的启停。

■每个Spark应用启动一个Driver和多个Executor,每个Executor里面运行的task都属于同一个Spark应用。 KdxB2cynFvNHp51u9e9sn8G2hA2FJ+I/3yfmNUbsg0eHNl59aNUh8vJC7ubQWZdj

点击中间区域
呼出菜单
上一章
目录
下一章
×