CountDownLatch源码分析

简介

        CountDownLatch常用于多个线程同步,经典用法的套路就是主线程起多个子线程,主线程初始化一个对应数字的CountDownLatch,子线程调用countDown()方法表示自己执行完毕,主线程阻塞到await()方法,等待所有子线程都调用countDown()方法完成。达到主线程等待多个子线程任务执行完毕的目的。

        CountDownLatch类加注释才300多行,纯代码可能50行都不到,非常简单,简单前提是你已经理解了AbstractQueuedSynchronizer(下文中直接叫它AQS)。我们直接进入CountDownLatch的源码分析吧。

类成员

private static final Sync静态final子类

CountDownLatch源码分析

        加上注释空格也只有区区30行,继承AQS,内部重写了AQS的tryAcquireShared()和tryReleaseShared()方法。这个CountDownLatch就是使用这个子类的实例来进行多线程间的同步处理的。这两个方法我们放到后面分析CountDownLatch的countDown()和await()方法时再进行分析,这样从调用入口处一步一步向下看比较清晰。

         CountDownLatch使用AQS来进行同步的时候是使用AQS的共享模式来进行同步;之前分析的ReentranLock则是使用AQS的独占模式来进行同步。所谓独占模式就是同时只有一个线程获取锁;而共享模式则是多个线程可以同时获取锁。这两个模式的主要区别总结下来就是:独占模式除了判断锁的state值,还会设置锁当前占用的线程,尝试获取锁时这两个条件都会判断;而共享模式下只会判断锁的state值是否是0,不会记录锁持有线程,并且释放锁时操作可能会在等待锁链表中传递的唤醒多个等待线程,让他们去拿锁。

构造函数

构造函数代码如下,处理过程非常简单,就是使用count做参数初始化一个Sync的实例:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count); //调用Sync内部类的构造函数,使用count设置Sync的state字段
}
Sync(int count) {
    setState(count);
}

源码分析

await()方法

代码注释说明:

功能:导致当前线程处于等待状态直到sync的state为0,除非当前线程被中断。

调用此方法后,如果sync的state大于0会导致当前调用线程被禁止线程调度,并且在以下两种情况发生之前会处于休眠状态:

      1. sync的state通过调用countDown()被设为0

      2. 当前线程被其它线程中断

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1); // 调用AQS的方法尝试获取锁同步状态
}
方法非常简单,主要做三件事:
  1. 判断当前线程是否被中断了,被中断了就抛InterruptedException异常
  2. 判断当前锁state是否为0,为0则方法结束,当前线程继续执行下去
  3. 当前锁state不为0,将当前线程作为一个新节点加入到等待锁队列中,自旋的获取锁
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 判断当前线程是否被其它线程中断了,如果被中断抛出异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) // 尝试获取锁状态
        doAcquireSharedInterruptibly(arg); // 尝试获取锁状态失败,将当前线程加入锁等待队列中自旋的获取锁
}

tryAcquireShared方法很简单,它的参数没有用,只是为了重新父类AQS才带的参

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; // 判断当前sync的state是否等于0,等于0返回1,否则返回-1
}

        doAcquireSharedInterruptibly()这个方法和acquireQueued()、doAcquireNanos()这三个方法的实现非常相似,基本的功能也都是一样的。在ReentranLock源码分析中已经对这两个方法进行分析过,如果想了解这acquireQueued()、doAcquireNanos()这两个方法的实现,可以转去看之前写的ReentranLock源码分析。下面我们分析一下doAcquireSharedInterruptibly()方法。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 将当前线程以共享模式创建一个等待节点,并加入等待队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true; // 用于记录是否在调用tryAcquireShared()方法是否抛出异常
    try {
        for (;;) {
            /**
             * 自旋的等待当前线程等待节点的前序节点为head节点;结束自旋的方式有两个,1、当前节点前驱节点就是head,2、调用LockSupport.park()中断循环,等待被unpark()唤醒再次进入循环
             * 如果当前等待线程节点前序节点为head节点时,使用tryAcquireShared()方法尝试获取锁,
             * 获取成功将当前线程等待节点设置为head节点,并且如果后继节点为共享状态节点,则唤醒它们,让它们自旋的等待获取锁
             */
            final Node p = node.predecessor();
            // 如果当前线程等待节点前驱节点为head节点
            if (p == head) {
                int r = tryAcquireShared(arg); // 尝试获取锁
                if (r >= 0) { // 获取成功
                    setHeadAndPropagate(node, r); // 将当前线程等待节点设置为head节点,并且如果后继节点为共享状态节点,则唤醒它们,让它们自旋的等待获取锁
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && // 判断当前线程是否应该被park()中断于此
                    parkAndCheckInterrupt()) // 调用LockSupport.park(this)阻塞当前获取锁的线程,并且调用Thread.interrupted()返回线程是否被中断了
                throw new InterruptedException(); // 如果线程被中断了抛出InterruptedException异常
        }
    } finally {
        // 如果AQS子类实现的tryAcquireShared()方法抛异常了,failed不会被置false,这时就执行cancelAcquire方法
        if (failed)
            cancelAcquire(node);
    }
}

countdown()方法

代码注释说明:

        功能:减少锁的计数,如果锁的计数达到0就释放所有等待的线程。

        如果锁当前计数大于0就减一,锁计数被减为0,所有等待这个锁的线程都会被重新调度执行。

        如果当前锁计数等于0,调用它不会发生任何事。

        代码很简单,就是简单调用sync的releaseShared()方法,这个方法是直接继承AQS来的。以共享的模式释放锁状态。

public void countDown() {
    sync.releaseShared(1); // 对锁计数减一
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 调用CountDownLock.Sync重写的tryReleaseShared()方法尝试把锁state减1,锁state通过减1变为0后需要执行doReleaseShared()方法(即表达锁刚刚被当前线程释放了)
        doReleaseShared(); // 传递的唤醒后面共享模式等待节点,让它们接着在doAcquireSharedInterruptibly()方法自旋的获取锁
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) // 锁state已经是0了,直接返回
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc)) // 对锁state原子的减1,返回值为当前锁state状态是否是0
            return nextc == 0;
    }
}