【搞定Java并发】 第10章 Executor框架
文章目录
- 1、Executor 框架简介
- 1.2、Executor 框架的结构与成员
- 1.2.1、Executor 框架的结构
- 1.2.2、Executor 框架的成员
- 1、ThreadPoolExecutor
- 2.1、FixedThreadPool:创建固定线程数的线程池。【corePoolSize 和 maxiumPoolSize 都被设置nThreads。使用 LinkedBlockingQueue无界队列】
- 2.2、SingleThreadExecutor:使用单个工作线程执行任务。【corePoolSize 和 maxiumPoolSize 都被设置1。使用 LinkedBlockingQueue无界队列】【由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。】
- 2. 3、CachedThreadPool:是一个”无限“容量的线程池,它会根据需要创建新线程。【corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即是无界的;使用没有容量的SynchronousQueue作为主线程池的工作队列,每个插入操作必须等待另一个线程的对应移除操作,这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程,极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源;keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收。】
- 2.4、具体应用案例
- 2、ScheduledThreadPoolExecutor :可另行安排在给定的延迟后运行命令,或者定期执行命令
- 3、FutureTask 详解
Java中的线程即是工作单元也是执行机制,从JDK 5后,工作单元与执行机制被分离。工作单元包括Runnable和Callable,执行机制由JDK 5中增加的java.util.concurrent包中Executor框架提供。
1、Executor 框架简介
1.1、Executor 框架的两级调度模型
-
在 HotSpot VM 的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
-
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定的数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。从图中可以看出,应用程序通过 Executor 框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
1.2、Executor 框架的结构与成员
1.2.1、Executor 框架的结构
Executor 框架主要由3大部分组成如下:
-
1、任务:包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口;
-
2、任务的执行:包括任务执行机制的核心接口Executor,以及继承自 Executor 的 ExecutorService 接口。Executor 框架有两个关键类实现了 ExecutorService 接口,即:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor;
-
3、异步计算的结果:包括接口 Future 和实现 Future 接口的 FutureTask 类。
-
1、Executor:是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来;
-
2、ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务;
-
3、ScheduledThreadPoolExecutor:是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更加灵活,功能更强大;
-
4、Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果;
-
5、Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
Executor 框架的使用示意图如下图所示:
Executor 框架的使用示意图
-
主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。
-
然后可以把Runnbale对象直接交给 ExecutorService 执行(ExecutorService.execute(Runnable command));或者也可以把Runnable或者Callable对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task) 或 ExcutorService.submit(Callable task))。
-
如果执行ExecutorService.submit( … ),ExecutorService 将返回一个实现 Future 接口的对象。由于 FutureTask 实现了 Runnable,程序员也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
-
最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。
1.2.2、Executor 框架的成员
Executor 框架的主要成员包括:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。
1、ThreadPoolExecutor
ThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors 可以创建3种类型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。 【源码在上一节已经讲过了】
2.1、FixedThreadPool:创建固定线程数的线程池。【corePoolSize 和 maxiumPoolSize 都被设置nThreads。使用 LinkedBlockingQueue无界队列】
- 创建固定长度的线程池,每次提交任务创建一个线程,直到达到线程池的最大数量,线程池的大小不再变化。特点就是可以重用固定数量线程的线程池。它的构造源码如下:
ExecutorService executorService = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool运行图如下:
执行过程如下:
- 1.如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。
- 2.当线程池的工作中的线程数量达到了corePoolSize,则将任务加入LinkedBlockingQueue(一个无界队列,Integer.MAX_VALUE),无界队列的影响如下
- 1)由于无界队列,线程池中的线程数不会超过corePoolSize,因为当等于corePoolSize后,新任务将进入无界队列,所以maxiumPoolSize(最大线程数量)变成了一个无效的参数;keepAliveTime(多余的空闲线程等待新任务的最长时间)也会变成一个无效参数,因为这个参数对核心线程数量内的线程无效【而由于无界队列,我们的线程数量不会超过corePoolSize】
- 2)由于无界队列,运行中的FixedThreadPool(未执行shutdown或shutdownNow)不会执行拒绝策略,因为不会发生情况2【执行拒绝策略有两种情况: 1、入队前还是RUNNING,入队后发现线程池现已不处于 RUNNING 状态(即交给队列时,线程池都已经关闭了),那么移除已经入队的这个任务,并且执行拒绝策略;2、如果 workQueue 队列满了,则 以 maximumPoolSize 为界创建新的 worker, 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略】
- 3.线程执行完1中的任务后会在循环中从队列中去获取任务。
2.2、SingleThreadExecutor:使用单个工作线程执行任务。【corePoolSize 和 maxiumPoolSize 都被设置1。使用 LinkedBlockingQueue无界队列】【由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。】
SingleThreadExecutor是使用单个worker线程的Executor。它的构造源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService //这个多了一个finalize方法来关闭线程池
(new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 的 corePoolSize 和 maxiumPoolSize 都被设置1。其他参数均与 FixedThreadPool 相同,其运行图如下:
执行过程如下:
-
1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。
-
2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
-
3.线程执行完1中的任务后会从队列中去任务。
2. 3、CachedThreadPool:是一个”无限“容量的线程池,它会根据需要创建新线程。【corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即是无界的;使用没有容量的SynchronousQueue作为主线程池的工作队列,每个插入操作必须等待另一个线程的对应移除操作,这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程,极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源;keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收。】
- 特点是可以根据需要来创建新的线程执行任务,下面是它的构造方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
执行过程如下:
-
1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。execute()方法执行成功,否则执行步骤2;
-
2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务;
-
3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。
SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。
2.4、具体应用案例
1、newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若低于处理需要,则新建线程。示例代码如下:
public class Demo1 {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for(int i = 0; i < 10; i++){
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable(){
@Override
public void run() {
System.out.println(index);
}
});
}
}
}
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
2、newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3、newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
public class Demo5 {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable(){
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
结果依次输出,相当于顺序执行各个任务。现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。
3、newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
public class Demo3 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable(){
@Override
public void run() {
System.out.println("延迟3秒");
}
}, 3, TimeUnit.SECONDS);
}
}
定期执行示例代码如下:
public class Demo4 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
System.out.println("延迟1秒,每3秒执行1次");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
表示延迟1秒后每3秒执行一次。ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。
2、ScheduledThreadPoolExecutor :可另行安排在给定的延迟后运行命令,或者定期执行命令
ScheduledThreadPoolExecutor,继承ThreadPoolExecutor且实现了ScheduledExecutorService接口,它就相当于提供了“延迟”和“周期执行”功能的ThreadPoolExecutor。在JDK API中是这样定义它的:ScheduledThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ScheduledThreadPoolExecutor具有额外的灵活性或功能时,此类要优于 Timer。 一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。
2.1 ScheduledThreadPoolExecutor 的运行机制
- DelayQueue 是一个无界队列,所以 ThreadPoolExecutor 的 maximumPoolSize 和keepAliveTime在 ScheduledThreadPoolExecutor 中没有什么意义
- ScheduledThreadPoolExecutor提供了如下四个方法,也就是四个调度器:
- schedule(Callable callable, long delay, TimeUnit unit) :创建并执行在给定延迟后启用的 ScheduledFuture。
- schedule(Runnable command, long delay, TimeUnit unit) :创建并执行在给定延迟后启用的一次性操作。
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
- 第一、二个方法差不多,都是一次性操作,只不过参数一个是Callable,一个是Runnable。稍微分析下第三(scheduleAtFixedRate)、四个(scheduleWithFixedDelay)方法,加入initialDelay = 5,period/delay = 3,unit为秒。如果每个线程都是都运行非常良好不存在延迟的问题,那么这两个方法线程运行周期是5、8、11、14、17…,但是如果存在延迟呢?比如第三个线程用了5秒钟,那么这两个方法的处理策略是怎样的?第三个方法(scheduleAtFixedRate)是周期固定,也就说它是不会受到这个延迟的影响的,每个线程的调度周期在初始化时就已经绝对了,是什么时候调度就是什么时候调度,它不会因为上一个线程的调度失效延迟而受到影响。但是第四个方法(scheduleWithFixedDelay),则不一样,它是每个线程的调度间隔固定,也就是说第一个线程与第二线程之间间隔delay,第二个与第三个间隔delay,以此类推。如果第二线程推迟了那么后面所有的线程调度都会推迟,例如,上面第二线程推迟了2秒,那么第三个就不再是11秒执行了,而是13秒执行。
2.2 ScheduledThreadPoolExecutor 的实现:使用 DelayQueue 作为任务队列;
ScheduledThreadPoolExecutor 会把调度的任务(ScheduledFutureTask)放到一个DelayQueue中。下面来看下ScheduledFutureTask主要包含的3个成员变量:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private long time;
private final long period;
...
}
- 1、time:表示这个任务将要执行的具体时间;2、sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;3、period,表示任务执行的间隔周期。
ScheduledThreadPoolExecutor 中的线程1执行某个周期任务的4个步骤:
DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 ScheduledFutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面,也就是说,如果两个任务的执行时间相同,那么先执行提交早的任务。
-
1、线程1从 DelayQueue 中获取已到期的 ScheduledFutureTask(DealyQueue.take())。到期任务是指 ScheduledFutureTask 的 time 大于等于当前时间;
-
2、线程1执行这个 ScheduledFutureTask;
-
3、线程1修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
-
4、线程1把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。
DelayQueue.take()方法的源代码实现:获取ScheduledFutureTask,即上面步骤1:
?
- 1、获取Lock;
- 2、获取周期任务;
- 2.1、如果 PriorityQueue 为空,当前线程到 Condition 中等待,否则执行下面的2.2;
- 2.2、如果 PriorityQueue 的头元素的 time 时间比当前时间大,到 Condition 中等待到 time 时间,否则执行2.3;
- 2.3、获取 PriorityQueue 的头元素,如果 PriorityQueue 不为空,则唤醒在 Condition 中等待的所有线程。
- 3、释放Lock。
- ScheduledThreadFutureTask 在一个循环中执行步骤2,直到线程从 PriorityQueue 获取到一个元素之后,才会退出无限循环。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
public RunnableScheduledFuture take() throws InterruptedException {
// 获取lock
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0]; // 获取任务
if (first == null)
available.await(); // 如果队列为空,则等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
DelayQueue.add() 的源代码实现::线程把 ScheduledFutureTask 放入 DelayQueue 中的过程
- 如上图所示,添加任务分为3大步骤:
- 1、获取 Lock;
- 2、添加任务;
- 2.1、向 PriorityQueue 添加任务;
- 2.2、如果在上面2.1 中添加的任务是 PriorityQueue 的头元素,则唤醒在 Conditon 中等待的所有线程;
- 3、释放 Lock。
3、FutureTask 详解
3.1FutureTask 简介
Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。
FutureTask 除了实现了 Future 接口外,还实现了 Runnable接口。因此,FutureTask 可以交给 Executor 执行,也可以调用线程直接执行(FutureTask.run())。
-
根据 FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态:
- 1、未启动:FutureTask.run()方法还没有被执行之前,FutureTask 处于未启动状态;
- 2、已启动:FutureTask.run()方法被执行的过程中,FutureTask 处于已启动状态;
- 3、已完成:FutureTask.run()方法执行完成后正常结束,或被取消而结束(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask 处于已完成状态。
FutureTask 的状态迁移的示意图如下所示:
FutureTask 的 get 和 cancel 的执行示意图如下所示:
3.2、FutureTask 的实现:java1.7的实现
-
从 FutureTask 的源码中可以看出来,它的实现是基于 AbstractQueuedSynchronizer 。AQS 是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch 和 FutureTask。
-
每一个基于 AQS 实现的同步器都会包含两种类型的操作,如下:
-
1、至少一个 acquire 操作:这个操作阻塞调用线程,除非 / 直到 AQS 的状态允许这个线程继续执行。 FutureTask 的 acquire 操作为 get() / get(long timeout, TimeUnit unit)方法调用;
-
2、至少一个 release 操作:这个操作改变 AQS 的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask 的 release 操作包括 run() 方法和 cancel(…) 方法。
-
-
基于“复合优先继承”的原则,FutureTask 声明了一个内部私有的继承于 AQS 的子类 Sync,对 FutureTask 所有公有方法的调用都会委托给这个内部子类。
-
AQS 被作为“模板方法模式”的基础类提供给 FutureTask 的内部子类 Sync,这个内部子类只需要实现状态检测和状态更新的方法即可,这些方法将控制 FutureTask 的获取和释放操作。具体来说,Sync实现了 AQS 的 tryAcquireShared(int)方法和 tryReleaseShared(int)方法,Sync 通过这两个方法来检查和更新同步状态。
FutureTask 的设计示意图如下图所示:
3.3、FutureTask.get() 方法
- 第1步:调用 FutureTask 中的 get() 方法,它会调用 Sync 中的 innerGet()方法
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
- 第2步:调用 AQS.acquireSharedInterruptibly(int args)方法。这个方法首先会在子类 Sync 中实现的 tryAcquireShared()方法来判断 acquire 操作是否可以成功,acquire 操作可以成功的条件为:state 为执行完成状态RAN 或取消状态 CANCELLED,且 runner 不为null。如果成功则get()方法立即返回,如果失败则到线程等待队列中去等待其他线程执行release
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 第3步:当其他线程执行 release 操作(比如:FutureTask.run() 或 FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行 tryAcquiredShared() 将返回正值 1,当前线程将离开线程等待队列,并唤醒它的后继节点线程。
- 第4步:最后返回计算的结果或者抛出异常。
3.4 FutureTask.run() 方法
- 第1步:调用了 FutureTask.run() 方法,内部调用了 Sync.innerRun() 方法。
public void run() {
sync.innerRun();
}
- 第2步:Sync.innerRun() 方法中以原子的方式更新同步状态(调用AQS.compareAndSetState(READY, RUNNING),将 state 值设置为 RUNNING 状态)。如果这个原子操作成功,就设置代表计算结果的变量 result 的值为 Callable.call() 的返回值,然后调用AQS.releaseShared(int args)方法。
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
- 第3步:AQS.releaseShared(int args)首先会回调子类 Sync 中实现的 tryReleaseShared(int args)方法来执行 release操作【设置运行任务线程 runner 为 null,然后返回 true。】。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
- 第4步:调用AQS.doReleaseShared() 方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
唤醒线程等待队列中的第一个线程。