java多线程知识点 - AQS - CountDownLatch 基于源码剖析。

CountDownLatch

他被用来同步一个或者多个任务,强制他们等待由其他任务执行的一组操作完成。

CountDownLatch实现的是共享锁。

我们可以想CountDownLatch插入一个初始计数值,任何对象调用这个wait()方法都将阻塞,直至这个计数器的值等于0。

其他任务没有结束工作时,可以在该对象上调用countdown方法来减小这个计数值。

原则:CountDownLatch只会被触发一次,并且在调用的时候计数器的值不能被重置。(可重置的计数器CyclicBarrier);

注意:countdown为0的时候,才会解除主线程的阻塞状态。继续下面的程序,不说废话直接上代码。

private final static int threadCount = 10;
//定义主函数
public static void main(String[] args) throws Exception {
    // 构造一个线程池
    ExecutorService exec = Executors.newCachedThreadPool();
    // 构造一个countdown计数器
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    //编写一个并发的操作,在for循环中循环10次,每次都要执行test方法产出一个log日志,每个for
    //循环结束都要执行countdowm操作 也就是-1
    for (int i = 0; i < threadCount; i++) {
        final int threadNum = i;
        exec.execute(() -> {
            try {
                test(threadNum);
            } catch (Exception e) {
                log.error("exception", e);
            } finally {
                countDownLatch.countDown();
            }
        });
    }
//当程序减一到0的时候,会执行await方法,主线程不会被阻塞,不再执行
countDownLatch.await(); log.info("finish"); exec.shutdown();}private static void test(int threadNum) throws Exception { log.info("{}", threadNum);}

上述例子的输出结果

java多线程知识点 - AQS - CountDownLatch 基于源码剖析。


再举一个countdown设置超时时间的例子,可以用在某些任务必须在规定时间内执行,过时不候的需求:

private final static int threadCount = 10;
public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newCachedThreadPool();
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
        final int threadNum = i;
        exec.execute(() -> {
            try {
                test(threadNum);
            } catch (Exception e) {
                log.error("exception", e);
            } finally {
                countDownLatch.countDown();
            }
        });
    }
    //countDownLatch设置时间,10毫秒之后就进入await继续执行主线程
    countDownLatch.await(10, TimeUnit.MILLISECONDS);
    log.info("finish");
    exec.shutdown();
}

private static void test(int threadNum) throws Exception {
    Thread.sleep(100);
    log.info("{}", threadNum);
}

输出结果

java多线程知识点 - AQS - CountDownLatch 基于源码剖析。

注意在这里 即使调用exec.shutdown();,未执行完的线程依旧会被继续执行。


源码赏析


final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
首先new一个CountDownLatch

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
其次new一个Sync对象

观察sync对象内部
Sync(int count) {
    setState(count);
}

int getCount() {
    return getState();
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}


await源码赏析:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

源码也是非常简单的,首先判断了一下,当前线程是否有被中断,如果没有的话,就调用tryAcquireShared(int arg)方法,判断一下当前线程是否还需要“阻塞”。其实这里调用的tryAcquireShared方法。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

当没有执行countdown方法时候,输入的是1,则tryAcquireShared(1),则return -1,则-1<0为true,则执行doAcquireSharedInterruptibly(1)方法


private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
请注意:当执行这个方法时内部有一个for死循环,在这个死循环里面,会不断的进行判断,通过调用tryAcquireShared方法,不断判断我们上面说的那个计数器

但是死循环这种操作还是会多多少少影响性能的

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

,看看它的值是否为0了,如果是为0的话,tryAcquireShared就会返回1,然后跳出循环,也就不再阻塞当前主线程了。

非常有趣的一段代码。

countdown内部实现原理还未完全搞懂。。水平有限