Java中的Condition
文章目录
Condition接口
Condition接口介绍
对于synchronized锁,线程之间的通信方式是wait和notify。对于Lock接口,线程间就是通过Condition方式通信的。它比wait/notify更强大的是,它支持等待过程中不响应中断、多个等待队列和在指定时间苏醒。
Condition接口使用示例如下:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
使用Condition实现wait/notify的弹夹射击例子
/**
* 使用Lock / Condition实现之前wait/notify例子:
* 生产者线程(装弹):在弹夹不满时,装弹,弹夹满了时,通知消费者线程射击
* 消费者线程(射击):在弹夹不为空时,射击一轮,射击完成,通知生产者线程装弹
* @author xu hao
* @createTime 2019/4/23 0:17.
*/
public class ConditionTest {
// 弹夹,生产者和消费者通过此通信
private static Lock clipLock = new ReentrantLock();
// 射击信号的等待队列
private static Condition shootSignal = clipLock.newCondition();
// 装弹信号的等待队列
private static Condition reloadSignal = clipLock.newCondition();
// 弹夹容量
private static final int MAX_SIZE = 20;
// 模拟弹夹容器
private static LinkedList<Integer> clipList = new LinkedList<>();
static {
// 初始化时,将弹夹填满
for (int i = 0; i < MAX_SIZE; i++) {
clipList.addFirst(1);
}
}
static class Producer implements Runnable {
@Override
public void run() {
while (true) {
clipLock.lock();
try {
while (clipList.size() < MAX_SIZE) {
clipList.addFirst(1);
System.out.println("生产者线程:向弹夹填充子弹,当前弹夹子弹数:" + clipList.size());
}
System.out.println("生产者线程:当前弹夹已满,通知消费者线程射击子弹");
// 通知在射击队列等待线程可以射击了
shootSignal.signal();
System.out.println("生产者线程:等待装弹信号");
// 在装弹队列等待
reloadSignal.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
clipLock.unlock();
}
}
}
}
static class Customer implements Runnable {
@Override
public void run() {
while (true) {
clipLock.lock();
try {
if (clipList.size() < MAX_SIZE) {
System.out.println("消费者线程:本次射击完成,让枪支休眠2秒后,通知生产者线程填充子弹");
Thread.sleep(2000);
// 通知装弹队列等待的线程可以装弹了
reloadSignal.signal();
System.out.println("消费者线程:休眠完成,等待射击信号");
// 进入射击队列等待
shootSignal.await();
} else {
int random = new Random().nextInt(clipList.size());
for (int i = 0; i < random && random <= clipList.size(); i++) {
clipList.removeFirst();
System.out.println("消费者线程:射击子弹,当前弹夹子弹数:" + clipList.size());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
clipLock.unlock();
}
}
}
}
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Customer()).start();
}
}
生产者线程:当前弹夹已满,通知消费者线程射击子弹
生产者线程:等待装弹信号
消费者线程:射击子弹,当前弹夹子弹数:19
消费者线程:射击子弹,当前弹夹子弹数:18
消费者线程:射击子弹,当前弹夹子弹数:17
消费者线程:射击子弹,当前弹夹子弹数:16
消费者线程:射击子弹,当前弹夹子弹数:15
消费者线程:射击子弹,当前弹夹子弹数:14
消费者线程:射击子弹,当前弹夹子弹数:13
消费者线程:射击子弹,当前弹夹子弹数:12
消费者线程:本次射击完成,让枪支休眠2秒后,通知生产者线程填充子弹
消费者线程:休眠完成,等待射击信号
生产者线程:向弹夹填充子弹,当前弹夹子弹数:13
生产者线程:向弹夹填充子弹,当前弹夹子弹数:14
生产者线程:向弹夹填充子弹,当前弹夹子弹数:15
Condition源码分析
Condition接口的实现类ConditionObject,是AbstractQueuedSrnchronizer的静态内部类。它提供了await和signal来实现等待通知。
等待队列
ConditionObject的等待队列,和AQS同步器的阻塞队列类似,都是用的AQS的内部类Node。在ConditionObject类中,维护了一个firstWaiter指向等待队列的头,lastWaiter指向等待队列的尾。
Condition等待队列是一个FIFO的队列,每次有新的线程在Condition等待,都构造一个新的Node节点,插入到队列的尾部。
Condition的await()等待
调用Condition的await()方法,经历以下几个过程:
-
构造新节点加入到Condition等待队列的尾部,同时清理Condition队列的CANCELLED节点
-
释放当前线程持有的锁(Lock对象)并且唤醒同步队列时头节点的后继节点
-
循环的判断当前节点是否已经进入同步队列,如果没有,就阻塞自己,直到被唤醒
-
在线程苏醒之后,不断地尝试将自己加入到同步队列的尾部
-
已经进入同步队列后,就开始不断地自旋尝试获得锁
-
从同步队列退出后,如果在等待队列或者同步队列被中断了,就自己中断自己或者抛出异常。
由于只有具有锁的线程节点才能调用await()进入等待队列,而持有同步资源的线程是同步队列的head节点,它不会直接将同步队列的head节点转移到等待队列的尾部,而是新建一个节点插入到等待队列的尾部。在被唤醒后,进入同步队列的尾部等待获取锁。
public final void await() throws InterruptedException {
// 在等待前被中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 构造一个节点,添加到等待队列的尾部同时清理Condition队列的CANCELLED节点
Node node = addConditionWaiter();
// 释放当前线程持有的lock
int savedState = fullyRelease(node);
int interruptMode = 0;
// 循环判断自己是否进入了阻塞队列
while (!isOnSyncQueue(node)) {
// 没有就等待着
LockSupport.park(this);
// 被唤醒后,进入阻塞队列并且标记自己在等待过程中是否被中断过
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 进入阻塞队列,尝试获得锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 拿到锁后,清除等待队列中的CANCELLED节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果中间被打断过,就抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Condition的signal()唤醒
通过调用Condition的signal()唤醒等待在等待队列的第一个线程。过程如下:
- 首先判断当前线程是否持有锁,没有的话抛出异常
- 将等待队列中第一个不为CANCELLED的节点移出,循环CAS尝试将此节点加入到同步队列的尾部
- 然后唤醒此线程,这个线程从await()方法醒来,判断自己已经进入到同步队列后,调用acquireQueued加入到锁的竞争中
signalAll()方法就是循环执行signal()方法,唤醒所有在等待队列等待的线程,然后加入到同步队列中
public final void signal() {
// 当前线程没有持有锁的话,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 将等待队列第一个线程转移到同步队列中,同时将其唤醒,加入到锁的竞争中
doSignal(first);
}