Java并发编程--并发工具的使用和原理

Condition

我们在使用synchronized的时候,经常会用到wait/notify来实现线程间的通信,在J.U.C中也提供了锁的实现机制,那在J.U.C中是否也提供了类似的线程通信的工具呢?Condition就是J.U.C提供的一个多线程协调通信的工具类,可以让某些线程一起等待某个条件,只有满足条件时,才会被唤醒。

Condition的基本使用
我们先写两个class,ConditionDemoWait 和ConditionDemoNotify:

public class ConditionDemoWait implements Runnable {
	private Lock lock;
	private Condition condition;
	public ConditionDemoWait(Lock lock, Condition condition) {
		this.lock = lock;
		this.condition = condition;
	}
	@Override
	public void run() {
		System.out.println("begin ConditionDemoWait");
		try {
			lock.lock();
			condition.await();
			System.out.println("end ConditionDemoWait");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
}
public class ConditionDemoNotify implements Runnable {
	private Lock lock;
	private Condition condition;
	public ConditionDemoNotify(Lock lock, Condition condition) {
		this.lock = lock;
		this.condition = condition;
	}
	@Override
	public void run() {
		System.out.println("begin ConditionDemoNotify");
		try {
			lock.lock();
			condition.signal();
			System.out.println("end ConditionDemoNotify");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
}

测试代码:

public class ConditionDemo {
	public static void main(String[] args) throws InterruptedException {
		Lock lock = new ReentrantLock();
		Condition condition = lock.newCondition();
		new Thread(new ConditionDemoWait(lock, condition)).start();
		TimeUnit.SECONDS.sleep(2);
		new Thread(new ConditionDemoNotify(lock, condition)).start();
	}
}

运行结果:
Java并发编程--并发工具的使用和原理
通过上面这个例子简单实现了wait和notify的功能,当调用await方法后,当前线程会释放锁并等待,而其他线程调用了condition的signal或者signalAll方法通知被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前释放的锁后继续执行,最后释放锁。所以Condition中两个最重要的方法:await和signal/signalAll:
await:把当前线程阻塞挂起
signal/signalAll:唤醒阻塞的线程

Condition源码分析
调用Condition方法的时候需要获得锁,所以意味着会存在一个AQS同步队列,上面的例子中,如果两个线程同时运行的话,AQS队列可能是下面这种情况:
Java并发编程--并发工具的使用和原理
这个时候,如果Thread A调用了condition.await()方法,它做了什么事情呢?

调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变成等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

public final void await() throws InterruptedException {
	if (Thread.interrupted()) // 表示await允许被中断
		throw new InterruptedException();
	Node node = addConditionWaiter(); // 创建一个新节点,节点状态为Condition
	int savedState = fullyRelease(node); // 释放当前的锁,得到锁的状态
	int interruptMode = 0;
	while (!isOnSyncQueue(node)) {
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
		}
		if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
			interruptMode = REINTERRUPT;
		if (node.nextWaiter != null) // clean up if cancelled
			unlinkCancelledWaiters();
		if (interruptMode != 0)
			reportInterruptAfterWait(interruptMode);
}

addConditionWaiter方法
这个方法的主要作用是把当前线程封装成一个Node,添加到等待队列中去;这里的等待队列不再是双向链表,而是单项链表

private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.
	if (t != null && t.waitStatus != Node.CONDITION) {
		unlinkCancelledWaiters();
		t = lastWaiter;
	}
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null)
		firstWaiter = node;
	else
		t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

执行完addConditionWaiter方法后,就会产生下面这样一个等待队列:
Java并发编程--并发工具的使用和原理
接下来会执行fullyRelease方法,为什么叫fullyRelease呢?就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState(); // 获得重入锁的次数
            if (release(savedState)) { // 释放锁并唤醒下一个同步队列中的线程
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
}

此时,同步队列会触发锁的竞争,Thread B获得锁。
Java并发编程--并发工具的使用和原理
接下来会通过isOnSyncQueue(node)方法判断Thread A是否在同步队列中,返回false表示不在;返回true表示在同步队列中。
如果不在AQS同步队列中,说明当前节点没有被唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其它线程调用signal唤醒;如果在AQS同步队列中,意味着它需要去竞争同步锁去获得程序的执行权限。

为什么要做这个判断呢?因为在Condition等待队列中的节点会重新加入到AQS同步队列中去竞争锁,也就是当调用signal的时候,会把当前在等待队列中的节点从Condition等待队列转移到同步队列。
基于上面例子现在的逻辑结构,如何判断Thread A这个节点是否存在于AQS队列中呢?

  1. 如果Thread A的waitStatus是CONDITION,说明它一定在Condition队列,不在AQS队列中;因为AQS队列中节点的状态一定不能是CONDITION
  2. 如果node.prev为空,说明不存在于AQS队列中,因为prev=null在AQS队列中只有一种可能,就是它是head节点,但是head节点意味着它是获得锁的节点
  3. 如果node.next不等于空,说明一定在AQS队列中,因为只有AQS队列才会存在prev和next的关系
  4. findNodeFromTail表示从tail节点往前扫描AQS队列,一旦发现AQS中的节点和当前节点相等,说明节点一定存在于AQS队列中
final boolean isOnSyncQueue(Node node) {
	if (node.waitStatus == Node.CONDITION || node.prev == null)
		return false;
	if (node.next != null) // If has successor, it must be on queue
		return true;
	/*
	 * node.prev can be non-null, but not yet on queue because
	 * the CAS to place it on queue can fail. So we have to
	 * traverse from tail to make sure it actually made it.  It
	 * will always be near the tail in calls to this method, and
	 * unless the CAS failed (which is unlikely), it will be
	 * there, so we hardly ever traverse much.
	 */
	return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
	Node t = tail;
	for (;;) {
		if (t == node)
			return true;
		if (t == null)
			return false;
		t = t.prev;
	}
}

因为Thread A不在AQS队列中,所以isOnSyncQueue()方法返回false,接下来会执行LockSupport.park(this)将Thread A挂起并释放锁。

Condition.signal
当Thread A释放锁之后,Thread B会获得锁;在上面的例子中,thread B会执行condition.signal()方法去唤醒在等待队列中的节点:

public final void signal() {
	if (!isHeldExclusively()) // 判断当前线程是否获得了锁,如果没有则抛exception
		throw new IllegalMonitorStateException();
	Node first = firstWaiter; // 拿到Condition队列中的第一个节点
	if (first != null)
		doSignal(first);
}

protected final boolean isHeldExclusively() {
	// While we must in general read state before owner,
	// we don't need to do so to check if current thread is owner
	return getExclusiveOwnerThread() == Thread.currentThread();
}

private void doSignal(Node first) {
	do {
		// 从Condition队列中删除第一个节点
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		first.nextWaiter = null;
	} while (!transferForSignal(first) &&
			 (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	/*
	 * If cannot change waitStatus, the node has been cancelled.
	 */
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 通过cas更新节点状态为0, 如果更新失败,只有一种可能就是节点被Cancel了
		return false;

	/*
	 * Splice onto queue and try to set waitStatus of predecessor to
	 * indicate that thread is (probably) waiting. If cancelled or
	 * attempt to set waitStatus fails, wake up to resync (in which
	 * case the waitStatus can be transiently and harmlessly wrong).
	 */
	Node p = enq(node); // 调用enq方法,把当前节点加入到AQS同步队列,并返回当前节点的上一个节点;在本例子中也就是原tail节点
	int ws = p.waitStatus;
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果上一个节点的状态被取消了,或者尝试设置上一个节点的状态为SIGNAL失败,则唤醒节点上的线程
		LockSupport.unpark(node.thread);
	return true;
}

由上面代码可以看出,当Thread B调用了signal方法的时候,会获取等待队列中的第一个节点,并且调用doSignal方法;在doSignal方法中会把第一个节点从等待队列中删除,并通过transferForSignal方法将该节点转移到AQS同步队列中。

执行完doSignal方法后,会把Condition队列中的节点转移到AQS队列中去,逻辑结构图如下;这个时候会判断Thread A的prev节点也就是head节点的waitStatus,如果大于0或者设置SIGNAL失败,表示节点被设置成了CANCELLED状态,这时候需要唤醒Thread A,否则就基于AQS队列的机制唤醒,也就是等到Thread B释放锁之后来唤醒Thread A.
Java并发编程--并发工具的使用和原理
前面分析await方法的时候,thread A会在调用LockSupport.park()方法的时候被阻塞,而通过signal方法被唤醒之后会继续回到上次执行的逻辑中执行checkInterruptWhileWaiting(node)方法去判断Thread A在Condition队列被阻塞的过程中,是否被其它线程触发过中断请求:

public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	Node node = addConditionWaiter();
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	while (!isOnSyncQueue(node)) {
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

private int checkInterruptWhileWaiting(Node node) {
	return Thread.interrupted() ?
		(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
		0;
}

final boolean transferAfterCancelledWait(Node node) {
	// 使用CAS修改节点状态,如果还能修改成功,则说明线程被中断时,signal方法还没有被调用
	// 注意: 线程被唤醒,并不一定是在Java层面执行了LockSupport.unpark方法,也可能是调用了线程的interrupt()方法,这个方法会更新中断标识,并唤醒处于阻塞状态的线程
	if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
		enq(node); // 如果CAS成功,则调用enq方法将当前node加入到AQS队列
		return true;
	}
	/*
	 * If we lost out to a signal(), then we can't proceed
	 * until it finishes its enq().  Cancelling during an
	 * incomplete transfer is both rare and transient, so just
	 * spin.
	 */
	 // 如果CAS失败,则判断当前节点是否已经在AQS队列中,如果不在则让给其它线程执行;当Node被触发了signal时,node就会被加入到AQS队列中
	while (!isOnSyncQueue(node))
		Thread.yield();
	return false;
}

如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续处理是应该抛出InterruptedException还是重新中断。

这里需要注意的是如果第一次CAS失败了,则不能判断当前线程是先进行了中断还是先进行了Signal方法的调用;可能是先执行了signal然后中断;也可能是先中断然后执行signal;这时需要做的就是等待当前线程的node被添加到AQS队列后,也就是enq方法返回后,返回false告诉checkInterruptWhileWaiting方法返回REINTERRUPT,后续进行重新中断。

简单来说,这个方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为true表示中断在signal调用之前,signal还未执行,那么这个时候会根据await的语义,在await时遇到中断需要抛出interruptedException,返回true就是告诉checkInterruptWhileWaiting方法返回THROW_IE。如果返回false,表示signal方法已经执行过了,只需要重新响应中断即可。

接下来会调用acquireQueued方法,这个方法是让当前被唤醒的节点Thread A去抢占同步锁,并要恢复到原本的重入次数状态。调用完这个方法后,将AQS队列中的head节点的status设置成SIGNAL,AQS队列的状态如下:
Java并发编程--并发工具的使用和原理

Condition总结
线程awaitThread先通过lock.lock方法获得锁后,调用condition.await()方法进入等待队列,而另一个线程signalThread通过lock.lock方法获得锁后调用了condition.signal方法,使得线程awaitThread能够有机会移入到同步队列,当其他线程释放lock后使得线程awaitThread能够有机会获得锁,从而使得awaitThread能够从await方法中退出并执行后续操作;如果awaitThread获取锁失败,则会直接进入到同步队列。
Java并发编程--并发工具的使用和原理

限制

J.U.C中提供了几个比较常用的并发工具类,比如CountDownLatch、Semaphore、CyclicBarrier。接下来我们会了解一下这些常用的API

CountDownLatch
CountDownLatch是一个同步工具类,它允许一个或者多个线程一直等待,直到其它线程的操作执行完毕再执行。从名字可以解读到CountDown是倒数的意思,类似我们倒计时的概念。
CountDownLatch提供了两个方法:一个是countDown,一个是await;CountDownLatch初始化的时候需要传入一个整数,在这个整数倒数到0之前,调用了await方法的线程都必须要等待,然后通过countDown方法来倒数。

public class CountDownLatchDemo {
	public static void main(String[] args) throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(3);
		new Thread(() -> {
			System.out.println(Thread.currentThread().getName() + " - 执行中");
			countDownLatch.countDown();
			System.out.println(Thread.currentThread().getName() + "- 执行完毕");
		}, "t1").start();
		new Thread(() -> {
			System.out.println(Thread.currentThread().getName() + " - 执行中");
			countDownLatch.countDown();
			System.out.println(Thread.currentThread().getName() + "- 执行完毕");
		}, "t2").start();
		new Thread(() -> {
			System.out.println(Thread.currentThread().getName() + " - 执行中");
			countDownLatch.countDown();
			System.out.println(Thread.currentThread().getName() + "- 执行完毕");
		}, "t3").start();
		countDownLatch.await();
		System.out.println("所有线程执行完毕");
	}
}

运行结果:
Java并发编程--并发工具的使用和原理
上面的例子从代码上看,有点类似join的功能,但是比join更加灵活。CountDownLatch构造函数会接收一个int类型的参数作为计数器的初始值,当调用countDown方法的时候,这个计数器就会减一。使用CountDownLatch可以模拟高并发场景:

public class CountDownLatchDemo extends Thread {
	static CountDownLatch countDownLatch = new CountDownLatch(1);
	@Override
	public void run() {
		try {
			countDownLatch.await();
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("Thread:" + Thread.currentThread().getName());
	}
	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 1000; i++) {
			new CountDownLatchDemo().start();
		}
		System.out.println("所有线程阻塞中........");
		TimeUnit.SECONDS.sleep(5);
		System.out.println("线程阻塞开关打开");
		countDownLatch.countDown();
	}
}

运行结果:
Java并发编程--并发工具的使用和原理
总的来说,凡是涉及到需要指定某个任务在执行之前,要等到某个前置任务执行完毕后才执行的场景,都可以使用到CountDownLatch。

CountDownLatch源码分析
对于CountDownLatch我们只需要关注两个方法,一个是countDown()方法,另一个是await()方法。countDown()方法每次调用都会将state的值减一,直到state的值为0;而await方法是一个阻塞方法,当state=0的时候await方法才会返回。await方法可以被多个线程调用,所有调用了await方法的线程都阻塞在AQS的阻塞队列中,等待条件满足时,将线程一个一个从队列中唤醒。

CountDownLatch也用到了AQS,在CountDownLatch内部写了一个Sync并且集成了AQS这个抽象类,重写了AQS中的共享锁方法。
当调用了CountDownLatch.await方法的时候,代码如下:

public void await() throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	if (tryAcquireShared(arg) < 0)
		doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
	return (getState() == 0) ? 1 : -1;
}

通过上面的代码可以看到,首先要判断当前线程是否获得了共享锁,如果state不等于0, 说明当前线程要加入到共享锁队列中。
doAcquireSharedInterruptibly方法的代码如下:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head) {
				int r = tryAcquireShared(arg); // 由于state != 0,所以返回值是-1
				if (r >= 0) {
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
  1. addWaiter方法创建了一个SHARED模式的节点并且加入到AQS的队列中
  2. 由于这个时候state不为0,所以肯定会去执行if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())这个判断,在partAndCheckInterrput方法中挂起线程

这个时候所有线程都调用了await方法,由于state的值现在还不为0,所以这些线程都会加入到AQS队列中,并且都处于阻塞状态:
Java并发编程--并发工具的使用和原理


当其他线程调用CountDownLatch.countDown方法的时候,我们看看它做了什么事情:

public void countDown() {
	sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		doReleaseShared();
		return true;
	}
	return false;
}

protected boolean tryReleaseShared(int releases) {
	// Decrement count; signal when transition to zero
	for (;;) {
		int c = getState();
		if (c == 0)
			return false;
		int nextc = c-1;
		if (compareAndSetState(c, nextc))
			return nextc == 0;
	}
}

private void doReleaseShared() {
	/*
	 * Ensure that a release propagates, even if there are other
	 * in-progress acquires/releases.  This proceeds in the usual
	 * way of trying to unparkSuccessor of head if it needs
	 * signal. But if it does not, status is set to PROPAGATE to
	 * ensure that upon release, propagation continues.
	 * Additionally, we must loop in case a new node is added
	 * while we are doing this. Also, unlike other uses of
	 * unparkSuccessor, we need to know if CAS to reset status
	 * fails, if so rechecking.
	 */
	for (;;) {
		Node h = head;
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			if (ws == Node.SIGNAL) {
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				unparkSuccessor(h);
			} // 这个CAS失败的场景是:执行到这里的时候,恰好有一个节点入队,入队会将这个ws设置为-1
			else if (ws == 0 &&
					 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                // loop on failed CAS
		}
		if (h == head)                   // loop if head changed
			break;
	}
}

private void unparkSuccessor(Node node) {
	/*
	 * If status is negative (i.e., possibly needing signal) try
	 * to clear in anticipation of signalling.  It is OK if this
	 * fails or if status is changed by waiting thread.
	 */
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);

	/*
	 * Thread to unpark is held in successor, which is normally
	 * just the next node.  But if cancelled or apparently null,
	 * traverse backwards from tail to find the actual
	 * non-cancelled successor.
	 */
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);
}

由上面代码可以看出,tryReleaseShared方法使用自旋的方式去实现state减一,当state为0 的时候,调用doReleaseShared方法唤醒处于await状态下的线程。

共享锁的释放和独占锁的释放有一定的差别,在doReleaseShared方法中,先判断头结点的状态是否为SIGNAL,如果是,将状态改成0;修改成功之后调用unparkSuccessor方法唤醒头结点的下一个节点(在我们的例子中是 thread 1)。


一旦Thread 1被唤醒,代码又会回到doAcquireSharedInterruptibly方法中来执行,如果满足state=0,会执行setHeadAndPropagate方法;

private void setHeadAndPropagate(Node node, int propagate) {
	Node h = head; // Record old head for check below
	setHead(node);
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
		(h = head) == null || h.waitStatus < 0) {
		Node s = node.next;
		if (s == null || s.isShared())
			doReleaseShared();
	}
}

这个方法的主要作用是把被唤醒的节点设置成head节点,然后继续唤醒该节点的下一个节点 thread 2。一次循环,thread 2会唤醒thread 3…
Java并发编程--并发工具的使用和原理

Semaphore
Semaphore也就是我们常说的信号灯,semaphore可以控制同时访问的线程个数,通过acquire获取一个许可,如果没有就等待,通过release释放一个许可,有点类似限流的作用。

叫做信号灯的原因也和它的用处相关,比如m某商场就有5个停车位,如果这个时候来了10辆车,必须要等前面有空的车位才能进入。

public class SemaphoreDemoTest {
	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(5);
		for (int i = 0; i < 10; i++) {
			new Car(i, semaphore).start();
		}
	}
	
	static class Car extends Thread {
		private int num;
		private Semaphore semaphore;
		public Car(int num, Semaphore semaphore) {
			this.num = num;
			this.semaphore = semaphore;
		}
		
		public void run() {
			try {
				semaphore.acquire();
				System.out.println("第" + num + "占用一个车位");
				TimeUnit.SECONDS.sleep(2);
				System.out.println("第" + num + "辆车开走了");
				semaphore.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

运行结果:

第0占用一个车位
第2占用一个车位
第3占用一个车位
第4占用一个车位
第1占用一个车位
第1辆车开走了
第0辆车开走了
第4辆车开走了
第3辆车开走了
第2辆车开走了
第8占用一个车位
第6占用一个车位
第5占用一个车位
第7占用一个车位
第9占用一个车位
第8辆车开走了
第7辆车开走了
第6辆车开走了
第5辆车开走了
第9辆车开走了

Semaphore源码分析
从Semaphore的功能来看,我们可以猜测到它的底层实现y一定是基于AQS的共享锁,因为需要实现多个线程共享一个令牌池。
创建Semaphore实例的时候,需要一个参数permits,这个基本上可以确定是设置给AQS的stated的,然后每个线程调用acquire的时候,执行state=state-1,release的时候执行state=state+1;当然acquire的时候,如果state=0,说明没有资源了需要等待其他线程release。

Semaphore分公平策略和非公平策略,区别就在于是不是会先判断是否有线程再排队,然后才进行CAS减操作

static final class NonfairSync extends Sync {
	private static final long serialVersionUID = -2694183684443567898L;
	NonfairSync(int permits) {
		super(permits);
	}
	protected int tryAcquireShared(int acquires) {
		return nonfairTryAcquireShared(acquires);
	}
}

/**
 * Fair version
 */
static final class FairSync extends Sync {
	private static final long serialVersionUID = 2014338818796000944L;
	FairSync(int permits) {
		super(permits);
	}
	protected int tryAcquireShared(int acquires) {
		for (;;) {
			if (hasQueuedPredecessors())
				return -1;
			int available = getState();
			int remaining = available - acquires;
			if (remaining < 0 ||
				compareAndSetState(available, remaining))
				return remaining;
		}
	}
}

CyclicBarrier
CyclicBarrier的字面意思是可循环使用的屏障,它要做的事情是让一组线程达到一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数,每个线程调用await方法告诉CyclicBarrier当前线程已经到达了屏障,然后当前线程被阻塞。

使用线程
当存在需要所有的子任务都完成时,才执行主任务,这个时候j就可以选择使用CyclicBarrier

使用案例:

public class DataImportThread extends Thread {
	private CyclicBarrier cyclicBarrier;
	private String path;
	public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
		this.cyclicBarrier = cyclicBarrier;
		this.path = path;
	}
	public void run() {
		System.out.println("开始导入" + path + " 位置的数据");
		try {
			cyclicBarrier.await();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

public class CyclicBarrierDemo extends Thread {
	public static void main(String[] args) {
		CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new CyclicBarrierDemo());
		new Thread(new DataImportThread(cyclicBarrier, "file1")).start();
		new Thread(new DataImportThread(cyclicBarrier, "file2")).start();
		new Thread(new DataImportThread(cyclicBarrier, "file3")).start();
	}
	public void run() {
		System.out.println("开始分析数据");
	}
}

运行结果:
Java并发编程--并发工具的使用和原理
注意

  1. 对于指定计数值parties,若由于某种原因,没有足够的线程调用CyclicBarrier的await方法,则所有调用await的线程都会被阻塞
  2. CyclicBarrier也可以调用await(timeout, unit),设置超时时间,在设定时间内,如果没有足够的线程,则接触阻塞状态继续工作
  3. 通过reset重置计数,会使得进入awaitd的线程c出现BrokenBarrierException
  4. 如果采用的是CyclicBarrier(int parties, Runnable barrierAction)构造方法,执行barrierAction操作的是最后一个到达的线程。