Java并发编程--并发工具的使用和原理
Condition
我们在使用synchronized的时候,经常会用到wait/notify来实现线程间的通信,在J.U.C中也提供了锁的实现机制,那在J.U.C中是否也提供了类似的线程通信的工具呢?Condition就是J.U.C提供的一个多线程协调通信的工具类,可以让某些线程一起等待某个条件,只有满足条件时,才会被唤醒。
Condition的基本使用
我们先写两个class,ConditionDemoWait 和ConditionDemoNotify:
public class ConditionDemoWait implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end ConditionDemoWait");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionDemoNotify implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin ConditionDemoNotify");
try {
lock.lock();
condition.signal();
System.out.println("end ConditionDemoNotify");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
测试代码:
public class ConditionDemo {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(new ConditionDemoWait(lock, condition)).start();
TimeUnit.SECONDS.sleep(2);
new Thread(new ConditionDemoNotify(lock, condition)).start();
}
}
运行结果:
通过上面这个例子简单实现了wait和notify的功能,当调用await方法后,当前线程会释放锁并等待,而其他线程调用了condition的signal或者signalAll方法通知被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前释放的锁后继续执行,最后释放锁。所以Condition中两个最重要的方法:await和signal/signalAll:
await:把当前线程阻塞挂起
signal/signalAll:唤醒阻塞的线程
Condition源码分析
调用Condition方法的时候需要获得锁,所以意味着会存在一个AQS同步队列,上面的例子中,如果两个线程同时运行的话,AQS队列可能是下面这种情况:
这个时候,如果Thread A调用了condition.await()方法,它做了什么事情呢?
调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变成等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 表示await允许被中断
throw new InterruptedException();
Node node = addConditionWaiter(); // 创建一个新节点,节点状态为Condition
int savedState = fullyRelease(node); // 释放当前的锁,得到锁的状态
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法
这个方法的主要作用是把当前线程封装成一个Node,添加到等待队列中去;这里的等待队列不再是双向链表,而是单项链表
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
执行完addConditionWaiter方法后,就会产生下面这样一个等待队列:
接下来会执行fullyRelease方法,为什么叫fullyRelease呢?就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); // 获得重入锁的次数
if (release(savedState)) { // 释放锁并唤醒下一个同步队列中的线程
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
此时,同步队列会触发锁的竞争,Thread B获得锁。
接下来会通过isOnSyncQueue(node)方法判断Thread A是否在同步队列中,返回false表示不在;返回true表示在同步队列中。
如果不在AQS同步队列中,说明当前节点没有被唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其它线程调用signal唤醒;如果在AQS同步队列中,意味着它需要去竞争同步锁去获得程序的执行权限。
为什么要做这个判断呢?因为在Condition等待队列中的节点会重新加入到AQS同步队列中去竞争锁,也就是当调用signal的时候,会把当前在等待队列中的节点从Condition等待队列转移到同步队列。
基于上面例子现在的逻辑结构,如何判断Thread A这个节点是否存在于AQS队列中呢?
- 如果Thread A的waitStatus是CONDITION,说明它一定在Condition队列,不在AQS队列中;因为AQS队列中节点的状态一定不能是CONDITION
- 如果node.prev为空,说明不存在于AQS队列中,因为prev=null在AQS队列中只有一种可能,就是它是head节点,但是head节点意味着它是获得锁的节点
- 如果node.next不等于空,说明一定在AQS队列中,因为只有AQS队列才会存在prev和next的关系
- findNodeFromTail表示从tail节点往前扫描AQS队列,一旦发现AQS中的节点和当前节点相等,说明节点一定存在于AQS队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
因为Thread A不在AQS队列中,所以isOnSyncQueue()方法返回false,接下来会执行LockSupport.park(this)将Thread A挂起并释放锁。
Condition.signal
当Thread A释放锁之后,Thread B会获得锁;在上面的例子中,thread B会执行condition.signal()方法去唤醒在等待队列中的节点:
public final void signal() {
if (!isHeldExclusively()) // 判断当前线程是否获得了锁,如果没有则抛exception
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 拿到Condition队列中的第一个节点
if (first != null)
doSignal(first);
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
private void doSignal(Node first) {
do {
// 从Condition队列中删除第一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 通过cas更新节点状态为0, 如果更新失败,只有一种可能就是节点被Cancel了
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // 调用enq方法,把当前节点加入到AQS同步队列,并返回当前节点的上一个节点;在本例子中也就是原tail节点
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果上一个节点的状态被取消了,或者尝试设置上一个节点的状态为SIGNAL失败,则唤醒节点上的线程
LockSupport.unpark(node.thread);
return true;
}
由上面代码可以看出,当Thread B调用了signal方法的时候,会获取等待队列中的第一个节点,并且调用doSignal方法;在doSignal方法中会把第一个节点从等待队列中删除,并通过transferForSignal方法将该节点转移到AQS同步队列中。
执行完doSignal方法后,会把Condition队列中的节点转移到AQS队列中去,逻辑结构图如下;这个时候会判断Thread A的prev节点也就是head节点的waitStatus,如果大于0或者设置SIGNAL失败,表示节点被设置成了CANCELLED状态,这时候需要唤醒Thread A,否则就基于AQS队列的机制唤醒,也就是等到Thread B释放锁之后来唤醒Thread A.
前面分析await方法的时候,thread A会在调用LockSupport.park()方法的时候被阻塞,而通过signal方法被唤醒之后会继续回到上次执行的逻辑中执行checkInterruptWhileWaiting(node)方法去判断Thread A在Condition队列被阻塞的过程中,是否被其它线程触发过中断请求:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 使用CAS修改节点状态,如果还能修改成功,则说明线程被中断时,signal方法还没有被调用
// 注意: 线程被唤醒,并不一定是在Java层面执行了LockSupport.unpark方法,也可能是调用了线程的interrupt()方法,这个方法会更新中断标识,并唤醒处于阻塞状态的线程
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 如果CAS成功,则调用enq方法将当前node加入到AQS队列
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 如果CAS失败,则判断当前节点是否已经在AQS队列中,如果不在则让给其它线程执行;当Node被触发了signal时,node就会被加入到AQS队列中
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续处理是应该抛出InterruptedException还是重新中断。
这里需要注意的是如果第一次CAS失败了,则不能判断当前线程是先进行了中断还是先进行了Signal方法的调用;可能是先执行了signal然后中断;也可能是先中断然后执行signal;这时需要做的就是等待当前线程的node被添加到AQS队列后,也就是enq方法返回后,返回false告诉checkInterruptWhileWaiting方法返回REINTERRUPT,后续进行重新中断。
简单来说,这个方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为true表示中断在signal调用之前,signal还未执行,那么这个时候会根据await的语义,在await时遇到中断需要抛出interruptedException,返回true就是告诉checkInterruptWhileWaiting方法返回THROW_IE。如果返回false,表示signal方法已经执行过了,只需要重新响应中断即可。
接下来会调用acquireQueued方法,这个方法是让当前被唤醒的节点Thread A去抢占同步锁,并要恢复到原本的重入次数状态。调用完这个方法后,将AQS队列中的head节点的status设置成SIGNAL,AQS队列的状态如下:
Condition总结
线程awaitThread先通过lock.lock方法获得锁后,调用condition.await()方法进入等待队列,而另一个线程signalThread通过lock.lock方法获得锁后调用了condition.signal方法,使得线程awaitThread能够有机会移入到同步队列,当其他线程释放lock后使得线程awaitThread能够有机会获得锁,从而使得awaitThread能够从await方法中退出并执行后续操作;如果awaitThread获取锁失败,则会直接进入到同步队列。
限制
J.U.C中提供了几个比较常用的并发工具类,比如CountDownLatch、Semaphore、CyclicBarrier。接下来我们会了解一下这些常用的API
CountDownLatch
CountDownLatch是一个同步工具类,它允许一个或者多个线程一直等待,直到其它线程的操作执行完毕再执行。从名字可以解读到CountDown是倒数的意思,类似我们倒计时的概念。
CountDownLatch提供了两个方法:一个是countDown,一个是await;CountDownLatch初始化的时候需要传入一个整数,在这个整数倒数到0之前,调用了await方法的线程都必须要等待,然后通过countDown方法来倒数。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " - 执行中");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "- 执行完毕");
}, "t1").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " - 执行中");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "- 执行完毕");
}, "t2").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " - 执行中");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "- 执行完毕");
}, "t3").start();
countDownLatch.await();
System.out.println("所有线程执行完毕");
}
}
运行结果:
上面的例子从代码上看,有点类似join的功能,但是比join更加灵活。CountDownLatch构造函数会接收一个int类型的参数作为计数器的初始值,当调用countDown方法的时候,这个计数器就会减一。使用CountDownLatch可以模拟高并发场景:
public class CountDownLatchDemo extends Thread {
static CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void run() {
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Thread:" + Thread.currentThread().getName());
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new CountDownLatchDemo().start();
}
System.out.println("所有线程阻塞中........");
TimeUnit.SECONDS.sleep(5);
System.out.println("线程阻塞开关打开");
countDownLatch.countDown();
}
}
运行结果:
总的来说,凡是涉及到需要指定某个任务在执行之前,要等到某个前置任务执行完毕后才执行的场景,都可以使用到CountDownLatch。
CountDownLatch源码分析
对于CountDownLatch我们只需要关注两个方法,一个是countDown()方法,另一个是await()方法。countDown()方法每次调用都会将state的值减一,直到state的值为0;而await方法是一个阻塞方法,当state=0的时候await方法才会返回。await方法可以被多个线程调用,所有调用了await方法的线程都阻塞在AQS的阻塞队列中,等待条件满足时,将线程一个一个从队列中唤醒。
CountDownLatch也用到了AQS,在CountDownLatch内部写了一个Sync并且集成了AQS这个抽象类,重写了AQS中的共享锁方法。
当调用了CountDownLatch.await方法的时候,代码如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
通过上面的代码可以看到,首先要判断当前线程是否获得了共享锁,如果state不等于0, 说明当前线程要加入到共享锁队列中。
doAcquireSharedInterruptibly方法的代码如下:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 由于state != 0,所以返回值是-1
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- addWaiter方法创建了一个SHARED模式的节点并且加入到AQS的队列中
- 由于这个时候state不为0,所以肯定会去执行if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())这个判断,在partAndCheckInterrput方法中挂起线程
这个时候所有线程都调用了await方法,由于state的值现在还不为0,所以这些线程都会加入到AQS队列中,并且都处于阻塞状态:
当其他线程调用CountDownLatch.countDown方法的时候,我们看看它做了什么事情:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} // 这个CAS失败的场景是:执行到这里的时候,恰好有一个节点入队,入队会将这个ws设置为-1
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
由上面代码可以看出,tryReleaseShared方法使用自旋的方式去实现state减一,当state为0 的时候,调用doReleaseShared方法唤醒处于await状态下的线程。
共享锁的释放和独占锁的释放有一定的差别,在doReleaseShared方法中,先判断头结点的状态是否为SIGNAL,如果是,将状态改成0;修改成功之后调用unparkSuccessor方法唤醒头结点的下一个节点(在我们的例子中是 thread 1)。
一旦Thread 1被唤醒,代码又会回到doAcquireSharedInterruptibly方法中来执行,如果满足state=0,会执行setHeadAndPropagate方法;
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
这个方法的主要作用是把被唤醒的节点设置成head节点,然后继续唤醒该节点的下一个节点 thread 2。一次循环,thread 2会唤醒thread 3…
Semaphore
Semaphore也就是我们常说的信号灯,semaphore可以控制同时访问的线程个数,通过acquire获取一个许可,如果没有就等待,通过release释放一个许可,有点类似限流的作用。
叫做信号灯的原因也和它的用处相关,比如m某商场就有5个停车位,如果这个时候来了10辆车,必须要等前面有空的车位才能进入。
public class SemaphoreDemoTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Car(i, semaphore).start();
}
}
static class Car extends Thread {
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
public void run() {
try {
semaphore.acquire();
System.out.println("第" + num + "占用一个车位");
TimeUnit.SECONDS.sleep(2);
System.out.println("第" + num + "辆车开走了");
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行结果:
第0占用一个车位
第2占用一个车位
第3占用一个车位
第4占用一个车位
第1占用一个车位
第1辆车开走了
第0辆车开走了
第4辆车开走了
第3辆车开走了
第2辆车开走了
第8占用一个车位
第6占用一个车位
第5占用一个车位
第7占用一个车位
第9占用一个车位
第8辆车开走了
第7辆车开走了
第6辆车开走了
第5辆车开走了
第9辆车开走了
Semaphore源码分析
从Semaphore的功能来看,我们可以猜测到它的底层实现y一定是基于AQS的共享锁,因为需要实现多个线程共享一个令牌池。
创建Semaphore实例的时候,需要一个参数permits,这个基本上可以确定是设置给AQS的stated的,然后每个线程调用acquire的时候,执行state=state-1,release的时候执行state=state+1;当然acquire的时候,如果state=0,说明没有资源了需要等待其他线程release。
Semaphore分公平策略和非公平策略,区别就在于是不是会先判断是否有线程再排队,然后才进行CAS减操作
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
CyclicBarrier
CyclicBarrier的字面意思是可循环使用的屏障,它要做的事情是让一组线程达到一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数,每个线程调用await方法告诉CyclicBarrier当前线程已经到达了屏障,然后当前线程被阻塞。
使用线程
当存在需要所有的子任务都完成时,才执行主任务,这个时候j就可以选择使用CyclicBarrier
使用案例:
public class DataImportThread extends Thread {
private CyclicBarrier cyclicBarrier;
private String path;
public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
this.cyclicBarrier = cyclicBarrier;
this.path = path;
}
public void run() {
System.out.println("开始导入" + path + " 位置的数据");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierDemo extends Thread {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new CyclicBarrierDemo());
new Thread(new DataImportThread(cyclicBarrier, "file1")).start();
new Thread(new DataImportThread(cyclicBarrier, "file2")).start();
new Thread(new DataImportThread(cyclicBarrier, "file3")).start();
}
public void run() {
System.out.println("开始分析数据");
}
}
运行结果:
注意:
- 对于指定计数值parties,若由于某种原因,没有足够的线程调用CyclicBarrier的await方法,则所有调用await的线程都会被阻塞
- CyclicBarrier也可以调用await(timeout, unit),设置超时时间,在设定时间内,如果没有足够的线程,则接触阻塞状态继续工作
- 通过reset重置计数,会使得进入awaitd的线程c出现BrokenBarrierException
- 如果采用的是CyclicBarrier(int parties, Runnable barrierAction)构造方法,执行barrierAction操作的是最后一个到达的线程。