CountDownLatch 和 CyclicBarrier
一、CountDownLatch
其实要完成这种某个线程等待其他线程结果才能开始任务的业务,直接在需要准备的线程中join()依赖的线程就能完成要求,但是在博客的上一篇《三个线程顺序输出》中也说到过,join的线程返回,必须是子线程已经结束。而CountDownLatch提供了更灵活的方案,可在子线程完成好其他线程依赖的工作后调用countDown()方法主动减少计数,同时继续做线程间业务无依赖的其他工作。
这里CountDownLatch只提供了一个构造方法,参数为大于0整数:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
具体演示如下:
public class CountDownLatchTest { private static CountDownLatch counter = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { Thread threadA = new Thread(new Runnable() { @Override public void run() { System.out.println("子线程:" + Thread.currentThread().getName() + " start working!"); try { Thread.sleep(2000); counter.countDown(); System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!"); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread_A"); Thread threadB = new Thread(new Runnable() { @Override public void run() { System.out.println("子线程:" + Thread.currentThread().getName() + " start working!"); try { Thread.sleep(1000); counter.countDown(); System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!"); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread_B"); threadA.start(); threadB.start(); counter.await(); System.out.println("A、B准备就绪!"); // do something C... } }
结果:
可以看到在counter.countDown()被调用两次后,主线程便继续执行,同时子线程A、B仍可继续完成各自的工作。需要注意的是,这里初始化的计数值为2,调用2次正好返回,如果初始值为3,而实际只调用2次,那么程序就会在await()处无限等待。因此await()也提供了带等待时间的使用方法:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
例如把测试程序中的await()改成如下:
counter.await(1000,TimeUnit.MILLISECONDS);
可以看到结果中,await()在没有等到计数归0,就回到主线程继续执行:
另外CountDownLatch也提供了getCount()方法随时获取当前剩余计数。
二、CyclicBarrier
译作同步屏障,提供了2种初始化方法,第一种为简单int参数:
public CyclicBarrier(int parties) { this(parties, null); }
即挂起当前线程,直到所有设定的线程都到达Barrier状态,再一起执行后续任务。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
第二种方法可以设定一个任务,在程序都到达设定的barrier后,再执行barrierAction,截取部分源码如下:
if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
可以看到在index为0时,如果command不为null,会执行run()方法。注意run()方法并不是start(),也就是并没有再起一个线程,而是在最后到达barrier的线程中继续执行barrierCommand,再返回。
这里直接演示第二种初始化下的使用,代码如下:
public class CyclicBarrierTest { private static int N = 3; private static CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println("barrierAction:" + Thread.currentThread().getName() + " 开始工作"); try { Thread.sleep(9000); System.out.println("barrierAction:" + Thread.currentThread().getName() + " 结束工作"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier barr) { this.cyclicBarrier = barr; } @Override public void run() { System.out.println("线程" + Thread.currentThread().getName() + "开始写入数据到文件..."); long sleepTime = (long) (Math.random() * 1000 + 1000); try { Thread.sleep(sleepTime); System.out .println("线程" + Thread.currentThread().getName() + "用时" + sleepTime + "ms写入数据完毕,等待其他线程完成写入..."); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } public static void main(String[] args) { for (int i = 0; i < N; i++) { new Writer(barrier).start(); } } }
运行结果:
特意多运行了几次,从结果中可以看出:
1、在barrier处等待的线程数到达设定值(这里为3)前,当前线程会在cyclicBarrier.await()处挂起;
2、如果设定了barrierAction任务,当最后到达barrier处的线程在执行await()方法时,会执行barrierAction任务,图中打出了每个任务的执行时间,可以看到每次都是执行任务时间最长,即最后到达barrier处的线程,会继续执行barrierAction方法;
3、虽然到达了设定的await()线程数,但是必须在最后一个到达的线程执行完barrierAction方法,等await()返回后,所有线程才会继续执行后续的代码,示例中barrierActio()方法故意设定了较长的等待时间,但是程序依然在等待await()返回。
另外,为了避免await()无休止的等待,JDK同样提供了带等待时间的await方法,观察源码片段:
if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); }
这里要特别注意的时当某个线程抛出TimeoutException异常后会调用breakBarrier()方法把broken值设置为true:
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
之所以这里提到这个,是因为在dowait()方法中,如果broken为真,函数将直接返回,抛出BrokenBarrierException,造成结果:
1、如果使用了带barrierCommand方法的构造函数,这里barrierCommand将不会执行;
2、如果多个线程设置的await()时间不一样,例如A设置的等待时间较短,假设1秒后超时返回,那么B、C即使设置更长的等待时间,他们也不会在await()处等待,而是在运行到await()后立刻抛出BrokenBarrierException。
/** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
使用带等待参数的await(),运行上文中的代码,故意使等待超时,结果如下:
另外CyclicBarrier是可以重用的,这里直接给出@海子博客中的例子:
public class Test { public static void main(String[] args) { int N = 2; CyclicBarrier barrier = new CyclicBarrier(N); for (int i = 0; i < N; i++) { new Writer(barrier).start(); } try { Thread.sleep(25000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CyclicBarrier重用"); for (int i = 0; i < N; i++) { new Writer(barrier).start(); } } static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据..."); try { Thread.sleep(5000); // 以睡眠来模拟写入数据操作 System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务..."); } } }
运行效果如图:
三、CountDownLatch 和 CyclicBarrier区别
1、从使用上可以发现二者侧重点不同,在CountDownLatch使用时其实是存在主线程和子线程的概念,子线程在准备好主线程需要的资源后,主线程结束等待,继续剩下的工作;而在CyclicBarrier使用中,并不存在主和次的说法,更像是一组线程在互相等待后,然后在同一时间,继续后面的操作,可以类比于现实中跑步比赛的场景,一组运动员在各自准备好起跑动作后,由裁判员发令后,统一起跑(额,这里想不到合适的业务场景,尴想了一个强行解释,或许,在优化资源利用时,设置一定的请求数,攒满了后再一起请求?有实际的应用场景欢迎补充)。
2、等待超时CountDownLatch会直接返回,继续后续工作,CyclicBarrier首先会抛TimeoutException,同时如果barrier要等的线程数大于1,其他线程不会按设定的等待时间等待,而是抛出BrokenBarrierException后直接返回,所以使用CyclicBarrier要注意异常处理的逻辑。
3、使用细节差异如下表:
4、最后要提的是CountDownLatch 和 CyclicBarrier虽然基本都是在多线程中使用,但是同一个线程多次调用countDown()和await()其实也会使计数改变。
参考资料:
1、http://www.importnew.com/21889.html
2、https://blog.****.net/tolcf/article/details/50925145