JDK类库源码分析系列2--ExecutorService线程池篇(1)ThreadPoolExecutor的结构解析

一、JDK线程池相关的一些介绍

  1、ThreadPoolExecutor线程池定义的状态

    ThreadPoolExecutor线程池总共有五种状态:

      RUNNING:表示线程此正在运行,能接收新的线程任务,并处理任务队列中。

     SHUTDOWN:这种状态就不会接收新的任务了,但会将已经在队列中的任务执行

     STOP:不会接收新任务,也不会再处理队列中的任务,并且会再调用正在处理的interrupt()方法

     TIDYING:所有的任务都终止了,工作任务记数归0,这是一种终止的中间状态,回去调用terminated()回调方法

     TERMINATED:terminated()方法调用完成

  2、ThreadPoolExecutor线程池的状态切换

    RUNNABLE -> SHUTDOWN :执行shutdown()方法

    RUNNABLE/SHUTDOWN -> STOP :执行shutdownNew()方法

    SHUTDOWN -> TIDYING :当队列与池中的数据都为空(定义的成员变量是workQueue、workers)

    STOP -> TIDYING :当workers中的数据为空了

    TIDYING -> TERMINGTED :terminated()回调方法完成调用

  3、顶层接口(Executor)

public interface Executor {
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

    这个接口就是JDK中关于线程池相关内容的顶层接口,可以看到其只有一个接口,就是用来执行Runnable的方法execute(),这个方法可能会抛出NullPointerException、RejectedExecutionException异常:NullPointerException异常就是当传入的Runnable接口为空、RejectedExecutionException异常表示的的是这个线程池目前已经不能接收更多的Runnable任务了,所以拒绝处理传入的这个Runnable任务。

  4、ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

    可以看到这个ExecutorService接口定义了一些新接口,可以看到在这个接口中又定义了线程相关的另一种表现形式Future/Callable。不过我们在这里我们先着重了解下关于线程池关闭的方法。

    1)、shutdown():

        这个方法是用来关闭线程池的。调用这个方法之后就会将线程此的状态变为SHUTDOWN类型,会执行已经提交在池中的内容,不会接收新任务

    2)、shutdownNow :

        关闭线程池。会将此线程池状态设为STOP,会调用正在执行任务的interrupted()方法,同时提取返回在等待队列(workQueue)中的任务,不再执行等待队列中的任务

  4、AbstractExecutorService

      JDK类库源码分析系列2--ExecutorService线程池篇(1)ThreadPoolExecutor的结构解析

    这个抽象类实现ExecutorService接口,这些我们在梳理另外的线程池实现类的时候在分析。

二、ThreadPoolExecutor介绍

  1、首先我们来看其的主要成员变量

     JDK类库源码分析系列2--ExecutorService线程池篇(1)ThreadPoolExecutor的结构解析

    1)、ctl

        我们可以看到其是用的AtomicInteger来保证原子性,同时这个字段主要有两种用途,一种是用来表示这个线程池的状态的,同时也可以通过这个ctl来得出当前正在执行的任务数,这个字段与其他表示状态的字段用的是位移运算

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

    2)、BlockingQueue<Runnable> workQueue

        这个队列就是用来放那些需要等待运行的任务

    3)、ReentrantLock mainLock 

        这个是用来保证并发操作

    4)、HashSet<Worker> workers

        这个就是用来放执行线程的(我们传入Runnable的实现,然后在线程池中会再封装到Worker对象,Worker实现了Runnable接口,如果Worker去执行传入的Runnable任务)。

    5)、volatile int maximumPoolSize

        这个表示线程池的workers个数的最大值,但同时也受到另外一个值CAPACITY的影响,下面的代码如果返回false就不会再添加新的Worker缓存到workers中了

if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

    6)、int largestPoolSize

        这个是用来记录线程池中works的最大值是多少

int s = workers.size();
if (s > largestPoolSize)
    largestPoolSize = s;

    7)、volatile long completedTasks

        这个是用来记录每个Worker总共处理了多少个任务

    8)、volatile ThreadFactory threadFactory

       这个是用来创建Worker类中线程的工厂类

    9:volatile RejectedExecutionHandler handler

        这个就是当线程池不能再添加新任务的时候的拒绝处理类

    10)、volatile long keepAliveTime

        这个表示workers中的Woker对象的空闲存活时间,如果在keepAliveTime时间中这个Worker对象都没有处理task,就销毁,同时这个参数可以配合另外一个参数allowCoreThreadTimeOut(volatile boolean allowCoreThreadTimeOut)来决定是不是核心Worker也执行超时销毁策略。

    11)、volatile int corePoolSize

        核心Worker对象数

    12)、static final RejectedExecutionHandler defaultHandler = new AbortPolicy():

        默认的拒绝策略

  2:、内部类Worker

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    final Thread thread;
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    ..........
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
public void run() {
    runWorker(this);
}
    ...........
}

    可以看到这里主要是两个参数:一个是firstTask,这个就是我们再创建Worker的时候最初传入的task(command),thread这个thread其的Runnable实现就是这个Worker本身

  3、构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

  3、一些其他的主要方法

    1)、execute(Runnable command)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

    这个方法就是用来执行Runnable实现的。可以看到这里,首先是通过workerCountOf来获取当前正在执行的任务数,如果没有超过核心池的数量就直接调用addWorker方法,如果返回true就直接return。然后是在判断当前池的状态是不是还在运行,如果还在运行看能不能添加到等待队列中,如果添加成功,再进入此判断体,再检查一次,如果不是在运行状态了,就将本command从workQueue移除,并尝试去关闭本线程池。如果还在运作,再判断其运行的数量是不是为0,如果是则调用addWorker方法,这里command传的是null,这个是因为线程池执行command任务是先从worker对象本身中取,如果不能取到则再从workQueue中获取。

    2)、addWorker(Runnable firstTask, boolean core)

     这个方法我们进行拆解,一块一块来讲,避免像以前写笔记的时候将一个大方法整个放一起梳理

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

    首先我们看这一部分,首先是通过runStateOf方法判断当前线程池是否>=SHUTDOWN,并且是[rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) ]都满足才继续,不然直接return,例如如果当前已经STOP了就不往下了。之后再通过workerCountOf获取当前正在执行的任务数,如果已经大于CAPACITY,或者在根据core,来判断是用corePoolSize限制还是maximumPoolSize,如果超过就也return false,表示没有完成该任务的添加执行。如果在限制内,就通过compareAndIncrementWorkerCount方法来加一跳出循环,如果失败则继续循环。

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}

    在经过前面的判断校验后,在这里就开始正式Worker创建流程。首先创建Worker,然后获取锁,进行一些同步操作,先判断线程池的状态,如果小于SHUTDOWN或者刚好在SHUTDOWN状态,并且firstTask为null,也进行下面的workers.add(w)操作。如果这里添加成功了,则调用创建的Worker的start方法来启动。如果这里没有创建成功,则会调用addWorkerFailed(w)方法,移除以及自减计数,并尝试去关闭这个线程池。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

    3)、Worker的run()方法,通过前面的内容我们可以知道其是调用的runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                }
                       .......
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

    可以看到这里通过w.firstTask获取task如果能获取到,我们就直接运行Worker创建时直接初始化的task,或者就通过while (task != null || (task = getTask()) != null),(task = getTask())能一直获取到task就一种循环运行。然后进行判断,这里去掉!wt.isInterrupted()这个是否打断的判断,还有或判断。首先看(runStateAtLeast(ctl.get(), STOP) - 当前线程池已经STOP了,或者Thread.interrupted()(当前线程Worker)已经调用过打断了并且当前线程池已经调用打断了(这里用了双重检查),则调用当前线程的wt.interrupt()方法。如果不满足这进入下面正式用Worker去执行这个task。这里对于ThreadPoolExecutor类来说还有两个抽象方法beforeExecute、afterExecute用来给它的子类实现的,其本身并没有实现。

    4)、getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

   这个方法的逻辑就是获取task的,首先也是判断状态,如果线程池处于STOP状态并且等待队列workQueue已经为空了,则先减计数,再返回null打断循环获取task。这里还会再次进行线程池运行数以及workQueue队列判断,不满足也return null。满足的换,即通过poll或者take获取,poll是等待keepAliveTime时间后不管有没有获取到直接return,take会一直阻塞获取。

    5)、shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
               ...........
            if (onlyOne)
                break;
        }
    }.......
}
private void advanceRunState(int targetState) {
    // assert targetState == SHUTDOWN || targetState == STOP;
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

    可以看到如果是shutdown方法的话,改变状态为SHUTDOWN,然后再调用调用interruptIdleWorkers方法去调用闲置Worker的interrupt()方法。

    6)、shutdownNow()

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

    这里是将当前线程池置为STOP转态,这里不同的是通过drainQueue()方法会将workQueue中的未处理的task取出来并返回。