《Java并发编程的艺术》第十章——Executor框架

Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程。当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常会把应用分解成若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
其模型图如下:
《Java并发编程的艺术》第十章——Executor框架
Executor框架的结构
Executor框架主要又3大部分组成:
1. 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
2. 任务的执行:包括任务执行机制的核心接口Executor,及其子接口ExecutorService接口。
3. 异步计算的结果:包括接口Future和其实现类FutureTask类。
Executor框架的类与接口关系示意图:
《Java并发编程的艺术》第十章——Executor框架
Executor框架的成员
1. ThreadPoolExecutor
ThreadPoolExecutor是Executor框架最核心的类。主要由corePool(核心线程池大小)、maximumPool(最大线程池大小)、BlockingQueue(工作队列)和RejecteExecutionHandler(饱和策略)构成。
ThreadPoolExecutor通常使用Executors来创建。Executors可以创建3中类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
1.1 SingleThreadExecuto:是使用单个worker线程的Executor。
下面是其源代码:
《Java并发编程的艺术》第十章——Executor框架
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。并使用LinkedBlockingQueue*队列作为工作队列。
其运行示意图如下:
《Java并发编程的艺术》第十章——Executor框架
1>如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。
3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。
1.2 FixedThreadPool:被称为固定线程数的线程池。
下面是其源代码:
《Java并发编程的艺术》第十章——Executor框架
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。
其运行示意图如下:
《Java并发编程的艺术》第十章——Executor框架
1>如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。
3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。
FixedThreadPool使用*队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。使用*队列作为工作队列会对线程池带来如下影响:
a>当线程池中的线程数达到corePoolSize后,新任务将在*队列中等待,因此线程池中的线程不会超过corePoolSize。
b>maximumPoolSize变为一个无效参数。
c>keepAliveTime也变为一个无效参数。
d>永远不会执行饱和策略。
1.3 CachedThreadPool:是一个会更加需要创建新线程的线程池。
下面是其源代码:
《Java并发编程的艺术》第十章——Executor框架
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool时*的。keepAliveTime被设置为60L,意味着空闲线程超过60秒后被终止。
CachedThread使用没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPool是*的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度是,CachedThreadPool会不断创建新的线程,极端情况下,会耗尽CPU和内存资源。
其执行示意图如下:
《Java并发编程的艺术》第十章——Executor框架
1>首先执行SynchronousQueue.offer(Runnable task)。如果当前有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),则配对成功,将任务交给空闲线程执行。
2>当没有空闲线程时,创建一个新线程执行任务。
3>线程在执行任务完毕后,执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),向队列请求任务,并等待60秒。如果60之后仍没有新任务,则被终止。如果有新任务则继续执行。
2.ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或定期执行任务。
ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor:
1>ScheduledThreadPoolExecutor适用于需要执行周期任务,同时为了满足资源管理的需求而限制后台线程的数量的应用场景。
2>SingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各任务的应用场景。
其执行示意图如下:
《Java并发编程的艺术》第十章——Executor框架
1>当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()或scheduleWithFixedDelay()时,会想ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。
2>线程池中的线程从DelayQueue中获取ScheduledFutureTask并执行。
2.1 ScheduledThreadPoolExecutor的实现
ScheduledThreadPoolExecutor把待调度的任务放在一个DelayQueue中。ScheduledFutureTask主要包含3个成员变量:
long型变量time,表示这个任务将要被执行的具体时间。
long型变量sequenceNumbe,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
long型成员变量period,表示任务执行的间隔周期。
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对对列中的ScheduledFutureTask进行排序。time小的先执行,被排在前面。如果两个任务time相同则比较sequenceNumbe。
ScheduledThreadPoolExecutor执行任务的步骤如下:
《Java并发编程的艺术》第十章——Executor框架
1>线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
2>线程1执行这个ScheduledFutureTask。
3>线程1修改ScheduledFutureTask的time变量为下次要被执行的时间。
4>线程1把修改time之后的ScheduledFutureTask放入DelayQueue(DelayQueue.add())。
以下是DelayQueue.take()方法源码:
《Java并发编程的艺术》第十章——Executor框架
DelayQueue.add()方法源码:
《Java并发编程的艺术》第十章——Executor框架
3. Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。当我们把Runnable接口或Callable接口的实现类通过submit()方法提交给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,会返回一个FutureTask对象。
【备注】:截止到JDK8为止,返回是FutureTask对象,但Java API只是保证返回一个实现Future接口的对象,并不一定是FutureTask。
3.1 FutureTask
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。FutureTask有未启动、已启动、已完成3种状态。
FutureTask的状态迁移示意图如下:
《Java并发编程的艺术》第十章——Executor框架
1>未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
2>已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
3>已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或FutureTask.run()时抛出异常而异常结束,FutureTask处于已完成状态。
下面是FutureTask在不同状态时调用FutureTask.get()及Future.cancel()方法的执行示意图:
《Java并发编程的艺术》第十章——Executor框架
4. Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被ThradPoolExecutor或ScheduledThreadPoolExecutor执行。他们之间的区别是Runnable不会返回结果,而Callable可以返回结果。
Executors提供把Runnable包装成一个Callable的方法:
《Java并发编程的艺术》第十章——Executor框架
及把一个Runnable和一个待返回的结果包装成一个Callable的方法:
《Java并发编程的艺术》第十章——Executor框架

【备注】:本文图片均摘自《Java并发编程的艺术》·方腾飞,若本文有错或不恰当的描述,请各位不吝斧正。谢谢!