购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.3 ThreadPoolExecutor线程池

Executor和ExecutorService接口定义了任务提交的相关方法,是Executor线程池框架的抽象定义,而ThreadPoolExecutor是Executor线程池框架的具体实现,是我们在应用程序中定义线程池时最常使用的一个类,是Executor线程池框架提供给应用程序使用的一个核心类。ThreadPoolExecutor的类继承体系结构如图4.1所示。

图4.1 ThreadPoolExecutor的类继承体系结构

在内部实现层面,ThreadPoolExecutor维护一个Thread线程池和任务等待队列。当应用程序通过调用ThreadPoolExecutor对象实例的execute方法或者submit方法提交一个任务时,ThreadPoolExecutor对象实例会在内部调度一个空闲的线程来执行该任务。

如果内部的线程池当前没有空闲线程,即所有线程都在执行其他任务,则将该任务放到任务等待队列中去排队等待。在之后线程池存在空闲线程时,会自动从该队列中获取任务并执行。还有一种极端情况是,既没有空闲线程,任务等待队列也没有空闲空间,则使用相应的拒绝策略来拒绝执行这个任务。

4.3.1 用法

在使用方面,主要是需要关注ThreadPoolExecutor类的构造函数的相关参数的含义,因为需要通过指定这些参数的值来定制一个线程池实现。

ThreadPoolExecutor根据需要指定的参数的数量的不同,定义了多个版本的构造函数。其中需要指定所有参数,即没有默认参数值的构造函数如下,其他版本的构造函数也是基于这个构造函数来定义的。

构造函数的各参数含义如下。

(1)corePoolSize:线程池的核心大小,即任务需要排队之前的线程池的最大值,如果corePoolSize个线程全部繁忙,则新来的任务需要在任务等待队列排队等待。

(2)maximumPoolSize:线程池的最大值,即线程池最多可以创建这么多个线程。不过如果workQueue使用的是无界队列实现,则该参数无效。

(3)keepAliveTime:当keepAliveTime的值大于0时,如果线程池的线程数量超过corePoolSize个且存在空闲线程,则空闲时间超过了keepAliveTime的线程需要被销毁回收。如果值为0,则在corePoolSize范围之外的线程,只要空闲会被立即销毁回收。在默认情况下,corePoolSize范围内的线程一旦创建,即使是空闲也不会销毁回收的。

(4)workQueue:当前线程池没有空闲线程时,新提交的任务放在该任务等待队列,排队等待空闲线程来执行。

(5)threadFactory:线程池的线程创建工厂类,默认实现类为DefaultThreadFactory。通常需要在该工厂类为线程池的线程指定一个名字,这样在通过jstack命令查看线程堆栈时就可以找到该线程池的线程,方便问题的定位和排查。

(6)RejectedExecutionHandler:任务拒绝策略,是指当线程池没有空闲线程且无法继续创建线程(线程池的线程数量达到maximumPoolSize),队列也排满等待执行的任务,没有空闲空间时,新提交任务的拒绝策略。这是策略模式的一种实现,拒绝策略类型后面会具体分析。

以上构造函数需要传入所有的这些参数,ThreadPoolExecutor类还包含其他几个简化版的构造函数,这些简化版的构造函数,一般都提供了默认的线程创建工厂和默认的任务拒绝策略,具体可以参考官方API文档。

ThreadPoolExecutor的使用例子如下,定义一个线程池,该线程池只包含一个工作线程。定义一个Runnable任务并提交给该线程池执行,具体实现如下:

使用方法很简单,先创建一个ThreadPoolExecutor类的对象实例来定义一个线程池,然后调用该对象实例的execute或者submit方法来提交任务到该线程池执行。下面具体分析内部核心组件的实现机制。

4.3.2 工作线程池

在ThreadPoolExecutor的内部实现中,工作线程池主要通过集合类HashSet来实现的。我们知道HashSet不是线程安全的,这里使用HashSet而不会存在线程安全问题,是因为在ThreadPoolExecutor内部对HashSet集合内部的数据节点的增删,即线程节点的增删,都是需要通过一个ReentrantLock来加锁的。

工作线程池的定义如下:

工作线程主要是通过定义一个Worker包装类来实现,实现如下:

分析:继承于AQS,即队列线程同步器AbstractQueuedSynchronizer,实现了Runnable接口。

(1)继承AQS的主要目的:基于AQS的状态变量state来定义当前线程的工作状态,并且可以以线程安全的方式来对该状态进行检查和更新。具体为根据状态变量state是否等于1来判断当前worker线程是在处理任务还是空闲。

(2)实现Runnable接口的主要目:将该worker线程对象自身作为一个task放到Worker内部的线程对象thread去执行。在run方法中定义该工作线程的工作逻辑,具体为调用runWorker(this)方法,在runWorker方法的实现中,从任务等待队列获取任务并执行。

4.3.3 任务的提交

在使用层面,可以根据是否需要获取任务的执行结果来选择调用execute或submit来提交任务,即如果不需要任务执行结果,则调用execute方法,execute方法的返回值为void,没有返回值;如果需要获取任务执行结果,则可以使用submit方法来提交任务,这样可以获取一个Future对象返回值,通过该对象来跟踪这个任务的执行和获取执行结果。

在内部实现层面,任务的提交主要是基于ThreadPoolExecutor的execute方法实现的。submit方法也是调用了execute来提交任务到线程池。

1.任务提交execute方法定义

execute方法的定义如下:

在execute方法中,完成了Executor框架设计中对corePoolSize、任务等待队列,maximumPoolSize、拒绝策略相关语义的实现。即对一个新任务的提交,按照以下顺利处理。

(1)当线程池线程数量少于corePoolSize时,则创建一个新的线程来执行这个任务,此时不管当前线程池是否存在空闲线程。

(2)当线程池线程数量达到corePoolSize时,则将该任务放到任务等待队列workQueue中。这样当线程池中之后存在空闲线程时,空闲线程会从这个队列取出任务并处理。

(3)当任务等待队列workQueue也满了时,如果线程池当前的工作线程数量少于maximumPoolSize个,则可以继续创建新的线程来执行这个任务。

如果工作线程数量超过了maximumPoolSize,则此时说明提交了太多任务,线程数量也已经达到极限,无法再创建新线程来处理新提交的任务了,则调用reject方法,根据具体的任务拒绝策略处理这个任务。任务拒绝策略默认为抛异常实现AbortPolicy,即在execute方法抛异常,所以如果可能存在这种情况,则一般需要在应用代码中捕获该异常,关于任务拒绝策略的更多实现在后面详细分析。

2.addWorker方法定义:工作线程的创建

由以上代码分析可知,在任务提交时,如果没有空闲线程,则会调用addWorker方法来创建新的工作线程。addWorker方法的具体实现如下:

首先,在第一个自旋中通过比较当前线程池线程数量和corePoolSize与maximumPoolSize来确定是否可以继续创建线程。

其次,如果可以,则创建Worker工作线程包装类对象实例,并在使用mainLock加锁的情况下,将该工作线程添加到线程池workers中。

最后,调用该工作线程内部的线程thread的start的方法,开始这个工作线程的执行。

4.3.4 任务的执行

工作线程Worker主要是在run方法中调用runWorker方法来定义工作线程的具体工作逻辑,即执行创建该工作线程时提交的第一个任务和从任务等待队列workQueue获取其他等待执行的任务并执行。

任务执行对应的runWorker方法,主要是在while循环中调用getTask方法从任务等待队列获取任务。其中getTask为阻塞执行的,即如果等待队列没有任务,则阻塞等待。核心源码实现如下:

由以上逻辑分析可知,runWorker方法主要是在while循环中调用getTask方法从任务等待队列获取任务并执行,如果当前队列不存在任务,则阻塞等待;否则取出该任务,调用其run方法,从而完成任务的业务逻辑的执行。

在getTask方法中,实现从任务等待队列workQueue阻塞等待获取任务。同时这里也是实现了不在corePoolSize范围内的线程的超时清理回收机制的核心。

getTask方法的具体实现如下所示:

如果线程池当前工作线程的数量超过了corePoolSize,则调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,即阻塞keepAliveTime时间,如果时间超时后还是没有任务,则返回null。此时在runWorker方法中获取到getTask方法的返回值null,退出while循环,进入finally代码块,调用processWorkerExit方法来关闭这个空闲的工作线程。否则调用工作队列的take方法,无限阻塞等待任务队列有任务过来,线程自身不会被销毁回收。

4.3.5 任务的执行结果

如果需要获取任务的执行结果,则需要通过ThreadPoolExecutor的submit方法来提交任务。submit方法会返回一个Future接口的实现类对象,具体为FutureTask。submit方法在ThreadPoolExecutor的基类,即抽象类AbstractExecutorService中定义,如下所示:

分析:将任务task包装成了FutureTask对象实例,然后通过execute方法提交到线程池,最后返回这个FutureTask对象实例给应用代码。关于Future和FutureTask的设计,如何通过get获取执行结果,通过cannel取消任务执行的实现原理,在后面会详细分析。

4.3.6 任务拒绝策略

当线程池无法继续创建新的工作线程(此时线程池的线程数量达到了maximumPoolSize)并且任务等待队列都满了,没有空闲空间时,对于新提交的任务需要通过调用reject方法来使用对应的拒绝策略来处理。

ThreadPoolExecutor提供了以下四种任务拒绝策略。

(1)抛Abort异常:AbortPolicy;

(2)在主线程中直接执行该任务:CallerRunsPolicy;

(3)默默丢弃不做任务操作:DiscardPolicy;

(4)从任务等待队列移除等待最久的任务:DiscardOldestPolicy,即移除任务等待队列的队列头的任务。

默认任务拒绝策略是抛Abort异常AbortPolicy。以上这四种策略的源码实现如下:

4.3.7 线程池的关闭

线程池的关闭主要包括平滑关闭和暴力关闭两种,对应的方法分别为shutdown和shutdownNow。在这两个方法的内部主要是通过runState状态变量来做控制的,具体分析如下。

1.shutdown方法:平滑关闭

平滑关闭,停止新任务的提交,但是会等待正在排队的任务和正在执行的任务都执行完成。源码实现如下:

将runState设置为SHUTDOWN,从而可以和其他方法进行协作,如处理新提交任务的execute方法会检查runState的状态,如果发现是SHUTDOWN则不会再接收这个新任务。同时会中断空闲线程。

2.shutdownNow:暴力关闭

暴力关闭,停止新任务的提交,中断正在执行任务的线程,停止排队任务的执行和尝试停止正在执行的任务。源码实现如下:

与shutdown方法只是先停止新任务的提交,但是会等待所有已成功提交的任务都执行完不同的是,shutdownNow方法会中断所有的工作线程,包括正在执行任务的线程,所以会导致已成功提交的任务也无法执行完成。 v4cypmhgNBkMGyZrkueyLMq+/ShlTT8Eu+3bS/FcUfhYdAOnZNPjDwuSV98FZjso

点击中间区域
呼出菜单
上一章
目录
下一章
×