JDK类库源码分析系列2--ExecutorService线程池篇(1)ThreadPoolExecutor的结构解析
一、JDK线程池相关的一些介绍
1、ThreadPoolExecutor线程池定义的状态
ThreadPoolExecutor线程池总共有五种状态:
RUNNING:表示线程此正在运行,能接收新的线程任务,并处理任务队列中。
SHUTDOWN:这种状态就不会接收新的任务了,但会将已经在队列中的任务执行
STOP:不会接收新任务,也不会再处理队列中的任务,并且会再调用正在处理的interrupt()方法
TIDYING:所有的任务都终止了,工作任务记数归0,这是一种终止的中间状态,回去调用terminated()回调方法
TERMINATED:terminated()方法调用完成
2、ThreadPoolExecutor线程池的状态切换
RUNNABLE -> SHUTDOWN :执行shutdown()方法
RUNNABLE/SHUTDOWN -> STOP :执行shutdownNew()方法
SHUTDOWN -> TIDYING :当队列与池中的数据都为空(定义的成员变量是workQueue、workers)
STOP -> TIDYING :当workers中的数据为空了
TIDYING -> TERMINGTED :terminated()回调方法完成调用
3、顶层接口(Executor)
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
这个接口就是JDK中关于线程池相关内容的顶层接口,可以看到其只有一个接口,就是用来执行Runnable的方法execute(),这个方法可能会抛出NullPointerException、RejectedExecutionException异常:NullPointerException异常就是当传入的Runnable接口为空、RejectedExecutionException异常表示的的是这个线程池目前已经不能接收更多的Runnable任务了,所以拒绝处理传入的这个Runnable任务。
4、ExecutorService
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
可以看到这个ExecutorService接口定义了一些新接口,可以看到在这个接口中又定义了线程相关的另一种表现形式Future/Callable。不过我们在这里我们先着重了解下关于线程池关闭的方法。
1)、shutdown():
这个方法是用来关闭线程池的。调用这个方法之后就会将线程此的状态变为SHUTDOWN类型,会执行已经提交在池中的内容,不会接收新任务
2)、shutdownNow :
关闭线程池。会将此线程池状态设为STOP,会调用正在执行任务的interrupted()方法,同时提取返回在等待队列(workQueue)中的任务,不再执行等待队列中的任务
4、AbstractExecutorService
这个抽象类实现ExecutorService接口,这些我们在梳理另外的线程池实现类的时候在分析。
二、ThreadPoolExecutor介绍
1、首先我们来看其的主要成员变量
1)、ctl
我们可以看到其是用的AtomicInteger来保证原子性,同时这个字段主要有两种用途,一种是用来表示这个线程池的状态的,同时也可以通过这个ctl来得出当前正在执行的任务数,这个字段与其他表示状态的字段用的是位移运算
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
2)、BlockingQueue<Runnable> workQueue
这个队列就是用来放那些需要等待运行的任务
3)、ReentrantLock mainLock
这个是用来保证并发操作
4)、HashSet<Worker> workers
这个就是用来放执行线程的(我们传入Runnable的实现,然后在线程池中会再封装到Worker对象,Worker实现了Runnable接口,如果Worker去执行传入的Runnable任务)。
5)、volatile int maximumPoolSize
这个表示线程池的workers个数的最大值,但同时也受到另外一个值CAPACITY的影响,下面的代码如果返回false就不会再添加新的Worker缓存到workers中了
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
6)、int largestPoolSize
这个是用来记录线程池中works的最大值是多少
int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;
7)、volatile long completedTasks
这个是用来记录每个Worker总共处理了多少个任务
8)、volatile ThreadFactory threadFactory
这个是用来创建Worker类中线程的工厂类
9:volatile RejectedExecutionHandler handler
这个就是当线程池不能再添加新任务的时候的拒绝处理类
10)、volatile long keepAliveTime
这个表示workers中的Woker对象的空闲存活时间,如果在keepAliveTime时间中这个Worker对象都没有处理task,就销毁,同时这个参数可以配合另外一个参数allowCoreThreadTimeOut(volatile boolean allowCoreThreadTimeOut)来决定是不是核心Worker也执行超时销毁策略。
11)、volatile int corePoolSize
核心Worker对象数
12)、static final RejectedExecutionHandler defaultHandler = new AbortPolicy():
默认的拒绝策略
2:、内部类Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; .......... Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } ........... }
可以看到这里主要是两个参数:一个是firstTask,这个就是我们再创建Worker的时候最初传入的task(command),thread这个thread其的Runnable实现就是这个Worker本身
3、构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
3、一些其他的主要方法
1)、execute(Runnable command)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; } final void reject(Runnable command) { handler.rejectedExecution(command, this); }
这个方法就是用来执行Runnable实现的。可以看到这里,首先是通过workerCountOf来获取当前正在执行的任务数,如果没有超过核心池的数量就直接调用addWorker方法,如果返回true就直接return。然后是在判断当前池的状态是不是还在运行,如果还在运行看能不能添加到等待队列中,如果添加成功,再进入此判断体,再检查一次,如果不是在运行状态了,就将本command从workQueue移除,并尝试去关闭本线程池。如果还在运作,再判断其运行的数量是不是为0,如果是则调用addWorker方法,这里command传的是null,这个是因为线程池执行command任务是先从worker对象本身中取,如果不能取到则再从workQueue中获取。
2)、addWorker(Runnable firstTask, boolean core)
这个方法我们进行拆解,一块一块来讲,避免像以前写笔记的时候将一个大方法整个放一起梳理
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
首先我们看这一部分,首先是通过runStateOf方法判断当前线程池是否>=SHUTDOWN,并且是[rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) ]都满足才继续,不然直接return,例如如果当前已经STOP了就不往下了。之后再通过workerCountOf获取当前正在执行的任务数,如果已经大于CAPACITY,或者在根据core,来判断是用corePoolSize限制还是maximumPoolSize,如果超过就也return false,表示没有完成该任务的添加执行。如果在限制内,就通过compareAndIncrementWorkerCount方法来加一跳出循环,如果失败则继续循环。
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); }
在经过前面的判断校验后,在这里就开始正式Worker创建流程。首先创建Worker,然后获取锁,进行一些同步操作,先判断线程池的状态,如果小于SHUTDOWN或者刚好在SHUTDOWN状态,并且firstTask为null,也进行下面的workers.add(w)操作。如果这里添加成功了,则调用创建的Worker的start方法来启动。如果这里没有创建成功,则会调用addWorkerFailed(w)方法,移除以及自减计数,并尝试去关闭这个线程池。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
3)、Worker的run()方法,通过前面的内容我们可以知道其是调用的runWorker方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } ....... } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
可以看到这里通过w.firstTask获取task如果能获取到,我们就直接运行Worker创建时直接初始化的task,或者就通过while (task != null || (task = getTask()) != null),(task = getTask())能一直获取到task就一种循环运行。然后进行判断,这里去掉!wt.isInterrupted()这个是否打断的判断,还有或判断。首先看(runStateAtLeast(ctl.get(), STOP) - 当前线程池已经STOP了,或者Thread.interrupted()(当前线程Worker)已经调用过打断了并且当前线程池已经调用打断了(这里用了双重检查),则调用当前线程的wt.interrupt()方法。如果不满足这进入下面正式用Worker去执行这个task。这里对于ThreadPoolExecutor类来说还有两个抽象方法beforeExecute、afterExecute用来给它的子类实现的,其本身并没有实现。
4)、getTask
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
这个方法的逻辑就是获取task的,首先也是判断状态,如果线程池处于STOP状态并且等待队列workQueue已经为空了,则先减计数,再返回null打断循环获取task。这里还会再次进行线程池运行数以及workQueue队列判断,不满足也return null。满足的换,即通过poll或者take获取,poll是等待keepAliveTime时间后不管有没有获取到直接return,take会一直阻塞获取。
5)、shutdown()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); ........... if (onlyOne) break; } }....... } private void advanceRunState(int targetState) { // assert targetState == SHUTDOWN || targetState == STOP; for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
可以看到如果是shutdown方法的话,改变状态为SHUTDOWN,然后再调用调用interruptIdleWorkers方法去调用闲置Worker的interrupt()方法。
6)、shutdownNow()
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
这里是将当前线程池置为STOP转态,这里不同的是通过drainQueue()方法会将workQueue中的未处理的task取出来并返回。