ThreadPool 线程池详解
在很多并发或异步场景中,我们总能看到线程池的身影,它几乎是Java 中运用的最多的并发框架,也是面试的常考点。但是我对它的运行机制、实现原理一直比较模糊,所以在此总结。
- 本文观点多来自《并发编程的艺术》,这是一本学习并发的好书,即容易理解,也不失深度。
- 这本书我看了三遍。书读百遍,其义自见。书中写得不清楚的内容,我加入了自己的理解,如有不正,请留言指正。
- 码字不易,喜欢的朋友点个赞呗????
一、使用线程池的好处
合理的使用线程池能够带来3 个好处:
- 降低资源消耗。线程池可以重复使用已经创建的线程,来降低线程创建和销毁带来的资源消耗。
- 提高响应速度。当有一个新任务到达时,一般情况下任务不需要等待线程的创建就能够立即执行。
- 提高线程的可管理性。我们知道一个线程的生成其实算是一个代价比较高的操作,如果无限制的生成不但会消耗系统资源,还会降低系统的稳定性,而使用线程池可以对线程进行统一分配、调优和监控。
二、怎样得到一个线程池
要理解线程池的实现原理,先要知道一个线程池的来龙去脉。
- 自定义生成线程池
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 线程存活时间
TimeUnit unit,// 存活时间的单位
BlockingQueue<Runnable> workQueue, // 采用的阻塞队列
ThreadFactory threadFactory, // 线程的创建工厂
RejectedExecutionHandler handler) // 线程池的拒绝策略
//默认拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//默认线程创建工厂
public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();}
线程池有4 个构造函数,简单的意思写在注释里面了,没有的参数采用默认值,下面详细介绍一下各参数的意义或取值:
- corePoolSize:核心线程数。当一个任务提交给线程池后,如果当前线程的数量小于这个值,不管线程池中是否有空闲的线程,都会创建一个线程。可以调用prestartAllCoreThreads() 方法让线程池提前创建好所有的核心线程。
- maximumPoolSize:线程池中允许创建的最大线程数。
- keepAliveTime:线程池中的线程完成任务后,最多能够存活的时间。
- unit:存活时间的时间单位,可以选的有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS)、千分之毫秒和纳秒(NANOSECONDS)。
-
workQueue:存储任务的阻塞队列,可选的队列有
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。可用于周期任务。
- SynchronousQueue:一个不存储元素的阻塞队列。一个任务必须等待一个线程获取任务。队列本身不存储任务,像手把手传递。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
- ThreadFactory:用于设置创建线程的工厂。
-
RejectedExecutionHandler:当线程池无法再接受任务后的拒绝策略。
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理不丢掉
也可以自定义实现拒绝策略
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
通过这些参数我们就可以设计出最适合生产环境的线程池
- 通过Executor 框架获得
除了自定义生成线程池外,Java 也提供了一些设计好的线程池。
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, // 核心线程和最大线程数相同
0L, TimeUnit.MILLISECONDS, // 线程存活时间为0
new LinkedBlockingQueue<Runnable>());// 采用有界链表阻塞队列(最多存储Integer.MAX_VALUE)
//采用默认拒绝策略——直接抛出异常
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
FixedThreadPool 适用于为了满足资源管理的需求,而需要限制当前线程数量的场景,适用于负载比较重的服务器
- SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, // 核心线程和最大线程数都为1
0L, TimeUnit.MILLISECONDS, // 线程存活时间为0
new LinkedBlockingQueue<Runnable>()));/ 采用有界链表阻塞队列(最多存储Integer.MAX_VALUE)
//采用默认拒绝策略——直接抛出异常
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
SingleThreadExecutor 适用于需要保证顺序地执行各个任务;并且任意时间点,不会有多个线程是活动的应用场景。
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,// 核心线程为0,最大线程数都为Integer.MAX_VALUE
60L, TimeUnit.SECONDS,//每个线程存活60分钟
new SynchronousQueue<Runnable>());//不存储元素的阻塞队列
//采用默认拒绝策略——直接抛出异常
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
CachedThreadPool适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器
- ScheduledThreadPool
ScheduledThreadPool 有两种实现
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
// 指定核心线程大小,最大线程数为Integer.MAX_VALUE
// 线程无存活时间
//DelayedWorkQueue 多用于处理周期任务
//采用默认拒绝策略——直接抛出异常
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
多个后台线程执行周期任务,主要为了满足资源管理的要求而需要限制后台进程的数量
单个后台线程执行周期任务,主要为了保证顺序执行各个任务的应用场景
三、线程池的执行流程及源码解读
通过前面的介绍我们对线程池应该有一个大致的了解,下面我们具体来看看一个任务是怎么被添加。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
...
在执行代码之前注释给了三步走,有道翻译了一下:
- 如果小于正在执行的核心线程数量,就开一个新的线程执行这个任务(需要获取锁)。包装成工作线程要检查运行状态和工作数量。避免在线程不应该被添加时添加。
- 如果任务能够成功入队,我们还是需要二次检验是否应该添加,因为可能在上一次检查过后有新的空闲线程,或者线程池在我们进入方法后已经关闭。。。
- 如果不能入队,那么我们要尝试添加一个线程(需要获取锁)。
- 如果失败,我们就知道线程池是关闭了还是饱和了还是这个任务被拒绝了。
总结一下就是下面这张图:
继续看源码是怎么操作的:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池线程数量(见代码解释①)
int c = ctl.get();
//第一步,小于核心线程数尝试添加核心线程
if (workerCountOf(c) < corePoolSize) {
//(addWorker见代码解释②)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二步,添加队列
//如果线程池还在执行并且添加阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检验
// 如果线程池没有执行并且删除任务成功,拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还在执行或者删除任务失败,并且重新检验线程数量为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三步,添加一个线程,失败则拒绝该任务
else if (!addWorker(command, false))
reject(command);
}
- 代码解释①
The main pool control state, ctl, is an atomic integer packing two conceptual fields
ctl 是线程池用来表示线程池状态和线程池线程数量的一个字段,如何实现的有兴趣的朋友可以去了解一下。
//线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- 代码解释②
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 当线程池处于关闭状态,并且队列为空
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 大于线程池容量或大于核心/ 最大线程数量添加失败
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 线程数CAS+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 添加线程操作
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker 有两个参数
core:为true 代表添加核心线程,false 代表添加核心线程以外的线程
firstTask:执行的线程,可为空