jdk1.8 ReentrantLock以及Condition分析
ReentrantLock
ReentrantLock是可重入锁,所谓可重入锁,就是说线程可以进入任何一个它已经拥有的锁所同步着的代码块,底层是基于AQS实现的并发控制,可以选择是公平的还是不公平的
属性
private final Sync sync;
下面分析Sync及其三种子类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// 这里实现的是不公平的获取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 状态为0代表资源可用
if (c == 0) {
// 使用cas将状态设置为1
// 并且设置当前独占资源的线程为当前线程
// setExclusiveOwnerThread是实现可重入锁的关键
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前状态为1,代表不能使用资源
// 首先判断当前线程是否是独占资源的线程,如果是,那么仍然可以使用资源
// 这里就是可重入的体现
else if (current == getExclusiveOwnerThread()) {
// 更新对应的状态
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果既不是独占资源的线程,当前也不能获取资源,那么返回false
return false;
}
// 释放资源
protected final boolean tryRelease(int releases) {
// 更新state
int c = getState() - releases;
// 释放资源的线程必须是当前独占资源的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果资源量是0,那么代表当前线程不再独占资源
// 因为存在重入的情况,所以c的值并不是0和1,还有可能是2,3,4,5等等
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新状态
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
公平的同步,套路就是在获取资源的时候,都会判断等待队列中是否有等待任务
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c等于0代表资源可获取
if (c == 0) {
// 没有等待任务,或者当前线程时等待队列中的下一个任务
// 如果成功获取资源,将当前线程设置为独占资源的线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
不公平的同步
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 不理会等待队列,直接尝试获取资源
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
获取和释放
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
Condition
下面分析下Condition
Condition内部也会维护一个等待队列,会将所有在当前Condition上调用await方法的线程加入到等待队列中
首先放一张原理示意图
public Condition newCondition() {
return sync.newCondition();
}
实际上调用的sync的newCondition方法,再看sync的newCondition方法
final ConditionObject newCondition() {
return new ConditionObject();
}
从源码中可以知道ConditionObject实现了Condition接口
属性
// 等待队列中的第一个结点
private transient Node firstWaiter;
// 等待队列中的最后一个节点
private transient Node lastWaiter;
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 删除队列中Cancelled状态的节点,并且将一个代表当前线程的节点添加到队列的末尾
Node node = addConditionWaiter();
// 释放当前线程获取到的ReentrantLock锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前节点是否在aqs的等待队列中
// 如果不在就阻塞并且就算被唤醒如果不在aqs的等待队列中也会接着阻塞
// 如果在等待的过程被中断了,那么就会跳出循环
// 如果加入到aqs的等待队列中了,也会跳出循环
// 这里的流程是这样的
// 当线程调用condition.await时,会加入到这个condition的condition 队列中,注意这个队列和aqs的等待队列是不同,condition 队列中的节点表示等待被其他线程唤醒的线程
// 而aqs的等待队列中是等待获取锁的线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 下面的步骤就是竞争锁
// 如果竞争到了锁,那么线程就会跳出await方法继续执行
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 将队列中所有状态为Cancelled的节点从队列中删除,同时更新队列尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个代表当前线程的节点添加到等待队列的末尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
addConditionWaiter就是将当前线程封装成一个Node,然后加入到等待队列的末尾
这里的Node就是AQS中的Node,不了解的可以看之前关于AQS源码的分析
private void unlinkCancelledWaiters() {
// 当前节点
Node t = firstWaiter;
// 上一个遍历的节点
Node trail = null;
// 从头往后遍历
while (t != null) {
// 下一个节点
Node next = t.nextWaiter;
// 如果当前节点的状态是CANCELLED
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 如果前一个节点是Null,代表当前节点是头结点
// 更新头结点为下一个节点
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
unlinkCancelledWaiters方法就是将队列中所有已经取消的任务从队列中删除
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 释放锁,因为能够执行到condition的await方法,之前一定获取到了ReentrantLock的锁
// 这里底层调用的是ReentrantLock的tryRelease
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
// 如果当前节点的状态是CONDITION或者当前节点是头结点,那么判断为不在同步队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 从aqs的等待队列中从后往前判断当前节点是否在aqs的等待队列中
// 对于ReentrantLock来说,aqs就是它的属性sync
return findNodeFromTail(node);
}
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 从头遍历condition 等待队列
// 将第一个没有取消的任务节点从conditiion 等待队列中删除,并且添加到aqs的等待队列中,代表该节点已经被其他线程唤醒,去竞争aqs的锁了
// 每遇到一个取消的任务节点,就直接从等待队列中删除了
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 将condition 等待队列的头结点从队列中取出,令下一个节点为队列的头结点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 如果无法更改节点的状态为0,那么代表节点对饮的任务已经被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将当前节点添加到aqs的等待队列的末尾,并且返回在aqs的等待队列中当前节点前面一个节点
// 运行到这里该节点已经从condition的等待队列中移除了,并且该节点对应的任务并没有取消
Node p = enq(node);
int ws = p.waitStatus;
// 如果在aqs队列中,当前节点的前一个节点取消了,或者将前一个节点的状态设置为SINGAL失败了,那么直接唤醒当前节点,节点被唤醒之后,就会执行await方法跳出循环之后竞争锁的代码
// 如果之前的节点被取消或者不能设置为SINGAL,那么代表当前节点在aqs队列中不会被唤醒,所以我们就需要直接唤醒
// 如果之前的节点的状态是不是取消,那么就代表当前节点可以被之前的节点唤醒,所以我们就不用再这里进行唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}