AQS源码解读三 Condition
在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。下图是Condition与Object的监视器方法的对比(摘自《Java并发编程的艺术》):
Condition提供了一系列的方法来对阻塞和唤醒线程:
await() :当前线程在接到信号或被中断之前一直处于等待状态。
await(long time, TimeUnit unit) :当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
awaitNanos(long nanosTimeout) :当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
awaitUninterruptibly() :当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
awaitUntil(Date deadline) :当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
Conditon如何实现
Lock的接口里有个newConditon方法
public interface Lock { |
---|
看ReentrantLock实现类的方法
public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { |
---|
可知创建了一个ConditonObject类
接着看ConditonObject类
1、ConditonObject类是AbstractQueuedSynchronizer下的一个内部类
2、每个Conditon对象都包含这一个FIFO队列,该队列是Conditon对象实现通知/等待功能的关键,队列中的每个节点都是一个线程的
引用线程就是在该Conditon对象中等待的线程,一个图足已说明Condition的实现原理
public class ConditionObject implements Condition, java.io.Serializable { } |
---|
从上图可以看出,该FIFO队列有个首节点 firstWaiter 和 尾节点 lastWaiter。 每个线程调用await方法就创建一个节点,放在该队列的尾部
signal方法会将队首的节点拷贝到CLH队列的尾部
Node里面包含了当前线程的引用。Node定义与AQS的CLH同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node静态内部类)
Condition的队列结构比CLH同步队列的结构简单些,新增过程较为简单只需要将原尾节点的nextWaiter指向新增节点,然后更新lastWaiter即可。
等待:
ConditonObject的await方法
public final void await() throws InterruptedException { /** 释放lock的锁 跟reentrantLock里的Lock方法是一样的逻辑,就是把头节点释放 并唤醒 头节点的后续节点 第一个waitStatus不是cancelled的节点获取锁**/ int interruptMode = 0; 不是头节点就阻塞自己 是就尝试更改同步状态并将该节点设置为头节点) |
---|
此段代码的逻辑是:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的
线程释放出现在CLH同步队列中(收到signal信号之后就会在AQS队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。
加入FIFO队列(addConditionWaiter())源码如下:
该方法主要是将当前线程加入到Condition条件队列中。当然在加入到尾节点之前会清楚所有状态不为Condition的节点。
private Node addConditionWaiter() { //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点 if (t != null && t.waitStatus != Node.CONDITION) { t = lastWaiter; //当前线程新建节点,waitstatus是CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) |
---|
fullyRelease(node) 释放该线程持有的锁
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } |
---|
getState 和 release 似曾相识,是 ReentrantLock里的释放锁的方法
fullyRelease主要步骤就是 获取同步状态 并更改同步状态为0 并将头节点的后续第一个不是取消的后续节点唤醒
isOnSyncQueue(Node node): fullyRelease方法当前线程所在CLH中的节点释放了isOnSyncQueue方法是判断
该节点是否又在同步队列中了
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; } |
---|
此线程在一直判断被释放的节点是不在CLH队列的时候阻塞挂起当前线程 如果当前线程在被阻塞的时候被中断,看执行的
逻辑是什么样子的
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果挂起中途被阻塞就退出自旋操作 checkInterruptWhileWaiting(node)方法 final boolean transferAfterCancelledWait(Node node) { |
---|
线程被中断 后续的操作cas 把waitStatus状态由 Codition改为 0,就是 入队CLH队列 返回true
如果cas操作更改状态失败 说明当前线程正在被signal的过程中(至于为什么,在讲到signal的时候会讲解到)
则等sinal操作执行完之前 一直yield自己(让出cpu)return false 然后退出while循环
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; 如果发现CLH队列里有该线程的节点了,就继续抢占锁(此步骤在AQS源码解读一里又讲过,就不讲了) 如果interruptMode != THROW_IE 说明线程的中断是在被signal的时候中断的 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); 看 reportInterruptAfterWait方法 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } 如果 interruptMode == THROW_IE 就抛出 InterruptedException异常 如果 interruptMode == REINTERRUPT 就执行 selfInterrupt() static void selfInterrupt() {
Thread.currentThread().interrupt();
}
interrupt只是将中断位设置为true 至于线程是否中断自己 取决于程序本身,可以通过使用Thread.currentThread().isInterrupted()方法 去中断自己(抛出中断异常即可) |
---|
通知:
调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到
CLH同步队列中。
public final void signal() { doSignal(Node first):唤醒头节点 private void doSignal(Node first) { |
---|
doSignal(Node first)主要是做两件事:1.修改头节点,2.调用transferForSignal(Node first) 方法将节点移动到CLH同步队列中。
transferForSignal(Node first)源码如下:
public final void signal() { doSignal(Node first):唤醒头节点 private void doSignal(Node first) { |
---|
整个通知的流程如下:
判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
如果线程已经获取了锁,则将唤醒条件队列的首节点
唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中
最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。
public final void signalAll() { |
---|
唤醒条件队列里的所有的节点 每个节点逻辑跟signal一样
总结:
个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)
方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是
否获取了锁,然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调
用acquireQueued()方法竞争同步状态。
有界队列使用实例:
public class BoundedQueue<T> {
private Object[] items;
// 添加的下标,删除的下标和数组当前数量
private int addIndex,removeIndex,count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition(); //是否为空的标记条件
private Condition notFull = lock.newCondition(); //是否满的条件标记
public BoundedQueue( int size){
items = new Object[size];
}
/**
* 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
* @param t
* @throws InterruptedException
*/
public void add(T t) throws InterruptedException{
lock.lock();
try {
while (count == items.length){
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0 ;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
/**
* 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
* @return
* @throws InterruptedException
*/
@SuppressWarnings ( "unchecked" )
public T remove() throws InterruptedException{
lock.lock();
try {
while (count == 0 )
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0 ;
--count;
notFull.signal();
return (T)x;
} finally {
lock.unlock();
}
}
} |
---|
我的技术公众号 敬请关注