Java中的Condition

Condition接口

Condition接口介绍

对于synchronized锁,线程之间的通信方式是wait和notify。对于Lock接口,线程间就是通过Condition方式通信的。它比wait/notify更强大的是,它支持等待过程中不响应中断、多个等待队列和在指定时间苏醒。

Java中的Condition

Condition接口使用示例如下:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
    	condition.await();
    } finally {
    	lock.unlock();
    }
}
public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
    	condition.signal();
    } finally {
    	lock.unlock();
    }
}

使用Condition实现wait/notify的弹夹射击例子

/**
 * 使用Lock / Condition实现之前wait/notify例子:
 * 生产者线程(装弹):在弹夹不满时,装弹,弹夹满了时,通知消费者线程射击
 * 消费者线程(射击):在弹夹不为空时,射击一轮,射击完成,通知生产者线程装弹
 * @author xu hao
 * @createTime 2019/4/23 0:17.
 */
public class ConditionTest {
	// 弹夹,生产者和消费者通过此通信
    private static Lock clipLock = new ReentrantLock();
	// 射击信号的等待队列
    private static Condition shootSignal = clipLock.newCondition();
	// 装弹信号的等待队列
    private static Condition reloadSignal = clipLock.newCondition();
	// 弹夹容量
    private static final int MAX_SIZE = 20;
	// 模拟弹夹容器
    private static LinkedList<Integer> clipList = new LinkedList<>();

    static {
        // 初始化时,将弹夹填满
        for (int i = 0; i < MAX_SIZE; i++) {
            clipList.addFirst(1);
        }
    }

    static class Producer implements Runnable {

        @Override
        public void run() {
            while (true) {
                clipLock.lock();
                try {
                    while (clipList.size() < MAX_SIZE) {
                        clipList.addFirst(1);
                        System.out.println("生产者线程:向弹夹填充子弹,当前弹夹子弹数:" + clipList.size());
                    }
                    System.out.println("生产者线程:当前弹夹已满,通知消费者线程射击子弹");
                    // 通知在射击队列等待线程可以射击了
                    shootSignal.signal();
                    System.out.println("生产者线程:等待装弹信号");
                    // 在装弹队列等待
                    reloadSignal.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    clipLock.unlock();
                }
            }
        }
    }

    static class Customer implements Runnable {

        @Override
        public void run() {
            while (true) {
                clipLock.lock();
                try {
                    if (clipList.size() < MAX_SIZE) {
                        System.out.println("消费者线程:本次射击完成,让枪支休眠2秒后,通知生产者线程填充子弹");
                        Thread.sleep(2000);
                        // 通知装弹队列等待的线程可以装弹了
                        reloadSignal.signal();
                        System.out.println("消费者线程:休眠完成,等待射击信号");
                        // 进入射击队列等待
                        shootSignal.await();
                    } else {
                        int random = new Random().nextInt(clipList.size());
                        for (int i = 0; i < random && random <= clipList.size(); i++) {
                            clipList.removeFirst();
                            System.out.println("消费者线程:射击子弹,当前弹夹子弹数:" + clipList.size());
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    clipLock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Customer()).start();
    }
}


生产者线程:当前弹夹已满,通知消费者线程射击子弹
生产者线程:等待装弹信号
消费者线程:射击子弹,当前弹夹子弹数:19
消费者线程:射击子弹,当前弹夹子弹数:18
消费者线程:射击子弹,当前弹夹子弹数:17
消费者线程:射击子弹,当前弹夹子弹数:16
消费者线程:射击子弹,当前弹夹子弹数:15
消费者线程:射击子弹,当前弹夹子弹数:14
消费者线程:射击子弹,当前弹夹子弹数:13
消费者线程:射击子弹,当前弹夹子弹数:12
消费者线程:本次射击完成,让枪支休眠2秒后,通知生产者线程填充子弹
消费者线程:休眠完成,等待射击信号
生产者线程:向弹夹填充子弹,当前弹夹子弹数:13
生产者线程:向弹夹填充子弹,当前弹夹子弹数:14
生产者线程:向弹夹填充子弹,当前弹夹子弹数:15

Condition源码分析

Condition接口的实现类ConditionObject,是AbstractQueuedSrnchronizer的静态内部类。它提供了await和signal来实现等待通知。

等待队列

ConditionObject的等待队列,和AQS同步器的阻塞队列类似,都是用的AQS的内部类Node。在ConditionObject类中,维护了一个firstWaiter指向等待队列的头,lastWaiter指向等待队列的尾。

Java中的Condition

Condition等待队列是一个FIFO的队列,每次有新的线程在Condition等待,都构造一个新的Node节点,插入到队列的尾部。

Condition的await()等待

调用Condition的await()方法,经历以下几个过程:

  • 构造新节点加入到Condition等待队列的尾部,同时清理Condition队列的CANCELLED节点

  • 释放当前线程持有的锁(Lock对象)并且唤醒同步队列时头节点的后继节点

  • 循环的判断当前节点是否已经进入同步队列,如果没有,就阻塞自己,直到被唤醒

  • 在线程苏醒之后,不断地尝试将自己加入到同步队列的尾部

  • 已经进入同步队列后,就开始不断地自旋尝试获得锁

  • 从同步队列退出后,如果在等待队列或者同步队列被中断了,就自己中断自己或者抛出异常。

由于只有具有锁的线程节点才能调用await()进入等待队列,而持有同步资源的线程是同步队列的head节点,它不会直接将同步队列的head节点转移到等待队列的尾部,而是新建一个节点插入到等待队列的尾部。在被唤醒后,进入同步队列的尾部等待获取锁。

public final void await() throws InterruptedException {
    // 在等待前被中断,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 构造一个节点,添加到等待队列的尾部同时清理Condition队列的CANCELLED节点
    Node node = addConditionWaiter();
    // 释放当前线程持有的lock
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循环判断自己是否进入了阻塞队列
    while (!isOnSyncQueue(node)) {
        // 没有就等待着
        LockSupport.park(this);
        // 被唤醒后,进入阻塞队列并且标记自己在等待过程中是否被中断过
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 进入阻塞队列,尝试获得锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 拿到锁后,清除等待队列中的CANCELLED节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 如果中间被打断过,就抛出异常
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

Java中的Condition

Java中的Condition

Condition的signal()唤醒

通过调用Condition的signal()唤醒等待在等待队列的第一个线程。过程如下:

  • 首先判断当前线程是否持有锁,没有的话抛出异常
  • 将等待队列中第一个不为CANCELLED的节点移出,循环CAS尝试将此节点加入到同步队列的尾部
  • 然后唤醒此线程,这个线程从await()方法醒来,判断自己已经进入到同步队列后,调用acquireQueued加入到锁的竞争中

signalAll()方法就是循环执行signal()方法,唤醒所有在等待队列等待的线程,然后加入到同步队列中

public final void signal() {
    // 当前线程没有持有锁的话,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 将等待队列第一个线程转移到同步队列中,同时将其唤醒,加入到锁的竞争中
        doSignal(first);
}