java多线程知识点 - AQS - CountDownLatch 基于源码剖析。
CountDownLatch
他被用来同步一个或者多个任务,强制他们等待由其他任务执行的一组操作完成。
CountDownLatch实现的是共享锁。
我们可以想CountDownLatch插入一个初始计数值,任何对象调用这个wait()方法都将阻塞,直至这个计数器的值等于0。
其他任务没有结束工作时,可以在该对象上调用countdown方法来减小这个计数值。
原则:CountDownLatch只会被触发一次,并且在调用的时候计数器的值不能被重置。(可重置的计数器CyclicBarrier);
注意:countdown为0的时候,才会解除主线程的阻塞状态。继续下面的程序,不说废话直接上代码。
private final static int threadCount = 10; //定义主函数 public static void main(String[] args) throws Exception { // 构造一个线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 构造一个countdown计数器 final CountDownLatch countDownLatch = new CountDownLatch(threadCount); //编写一个并发的操作,在for循环中循环10次,每次都要执行test方法产出一个log日志,每个for //循环结束都要执行countdowm操作 也就是-1 for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); }
//当程序减一到0的时候,会执行await方法,主线程不会被阻塞,不再执行countDownLatch.await(); log.info("finish"); exec.shutdown();}private static void test(int threadNum) throws Exception { log.info("{}", threadNum);}
上述例子的输出结果
再举一个countdown设置超时时间的例子,可以用在某些任务必须在规定时间内执行,过时不候的需求:
private final static int threadCount = 10; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } //为countDownLatch设置时间,10毫秒之后就进入await,继续执行主线程 countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); }
输出结果
注意在这里 即使调用exec.shutdown();,未执行完的线程依旧会被继续执行。
源码赏析
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
首先new一个CountDownLatch
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }其次new一个Sync对象
观察sync对象内部
Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
await源码赏析:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
源码也是非常简单的,首先判断了一下,当前线程是否有被中断,如果没有的话,就调用tryAcquireShared(int arg)方法,判断一下当前线程是否还需要“阻塞”。其实这里调用的tryAcquireShared方法。
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
当没有执行countdown方法时候,输入的是1,则tryAcquireShared(1),则return -1,则-1<0为true,则执行doAcquireSharedInterruptibly(1)方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }请注意:当执行这个方法时内部有一个for死循环,在这个死循环里面,会不断的进行判断,通过调用tryAcquireShared方法,不断判断我们上面说的那个计数器
但是死循环这种操作还是会多多少少影响性能的
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
,看看它的值是否为0了,如果是为0的话,tryAcquireShared就会返回1,然后跳出循环,也就不再阻塞当前主线程了。
非常有趣的一段代码。
countdown内部实现原理还未完全搞懂。。水平有限