AQS:ReentrantLock源码分析
接着上一篇的
关于java线程(4)----JUC之 AQS 状态依赖的抽象
看一下ReentrantLock的源码,这里只是从AQS的角度出发,并不是从Lock的角度来看,那个以后再分析把
从AQS的状态角度,代码整体结构上是这样的:
//检查状态
while(!checkAndChangeState){
enque(currentthread) //将当前线程加入等待队列
park(currentThread) //挂起当前线程
}
//do sth
…….
//释放锁,恢复状态
changeState(){
Dequeue(current) //出对
Unpark(queue.thread)//唤醒其他等待中的线程
}
和真正的代码比起来主要是一些小地方的优化,比如一些自旋操作,特别是对于很小范围内的锁,另外,就是队列中的线程可能是已经取消的,这也要做相应的处理,下面就具体分析下
Lock的结构体系成对的主要有lock和unlock,其他方法到时候再看把:
我们看下lock方法:
public void lock() {
sync.lock();
}
//先暂且不管公平锁还是非公平锁,从独占锁的角度来说,就是请求一个状态
final void lock() {
acquire(1);
}
看下acquire的代码,完全就是上面伪代码的结构:
public final void acquire(int arg) {
//判断状态是否ok(自己实现),不ok的话就加入队列,并且阻塞(父类实现)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再来看下状态判断,这个是需要每个synchronizer自己做的:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//这里初看起来会有并发问题,但是下面的cas操作保证了不会有并发问题,并且state字段是volatile的
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//重入锁,单个线程内,不会有并发性的,所以直接set就好
setState(nextc);
return true;
}
return false;
}
这个方法如果返回true的话,代表条件允许,这样线程也就获得到锁了,其他啥也不用做了,等着释放锁,但是如果返回false,表示其他线程占用着,就需要加入等待队列,并且阻塞线程:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
先看addWaiter:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 这里假设队列非空,并且没有太多的并发性,尝试快速入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队列为空,或者存在高并发,入队的时候失败了,需要用while尝试入队
enq(node);
return node;
}
// 一个完整的入队操作,需要在一个循环里面判断队列是否为空、高并发等情况
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,需要初始化
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {//设置队列头,原子操作
tail = node;
return h;
}
}
else {// 队列非空,直接插到最后面
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//在入队之后,在看下真正阻塞线程的acquireQueued,这里是一个循环操作,唤醒的时候也是重新去竞争,不是立刻获得锁
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//在真正阻塞线程之前,如果发现自己前面的节点是head,那还要再尝试下去获取锁,获取到以后就把head踢了,自己当head,因为head的意义表示正在占用锁的节点(某些同步范围很短的锁很有用)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
//该方法判断是否需要阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //park当前线程
interrupted = true; //默认不处理interrupted
}
} catch (RuntimeException ex) {
cancelAcquire(node); //发送异常的情况下,取消当前节点,并且如果当前节点是head,或者当前节点是等待唤醒状态,那么,还要尝试唤醒后面的节点
throw ex;
}
}
//决定是否需要阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
//如果当前节点的前一个节点的状态小于0(signal/condition),表示前面那个节点也在等待唤醒,果断把自己挂起
if (s < 0)
return true;
//如果前面节点的状态大于0,(cancelled),表示前面的线程已经取消,向前遍历,直到找到状态不大于0的节点
if (s > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else
/*
* 前面一个节点的状态为0,那么就是前一个节点认为他后面没有节点需要唤醒 * 啦,这时候要果断把他的状态改为SIGNAL,因为SIGNAL状态表示后面有阻塞
线程需要唤醒.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
/**
在state>=0的情况下,会走到这里,这里不能返回true把自己挂起,
因为这时候线程切换,占用锁的线程A已经结束(并且发出了unpark信号),如果这时候线程B直接返回true阻塞自己,那可能会因为错失信号B永远无法唤醒;返回false,当前线程会去再次尝试获取锁,如果还是不能获取,则阻塞;
*/
return false;
}
//挂起当前线程,因为park会相应中断,但是不是抛出异常,因此这里将是否中断作为boolean类型返回,交给外部代码处理
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
//并不是只有unpark和interrupt才能唤醒他,参考前面的关于LockSupport的讲解;唤醒的线程,重新去竞争锁,在高并发的情况下,有可能另一个节点正好也在请求lock,那么他刚好tryAcquire成功了,则当前线程又会重新阻塞!
}
释放锁
/**
* 释放锁主要做下面两件事情:
* 1.修改状态,改为可以获取锁
* 2.如果有等待的线程,则唤醒一个
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//如果waitStatus为0,则表示后面没阻塞线程了,没必要进行唤醒了
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//先看修改状态的方法,这里只修改锁的状态,不修改队列的任何东西
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//进行唤醒操作
private void unparkSuccessor(Node node) {
/*
* 把自己状态改为0,表示后面没有节点需要唤醒;如果后面有需要唤醒的节点,在请求锁的时候会把他的状态改为SIGNAL,不是很明白为什么这么做,难道是一种小小的优化?
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//取队列里面状态不大于0(cancelled)的节点,如果大于0,则从队列移除,然后从后往前找,找到最前面的一个非cancelled节点,并且唤醒这个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总结
1.通过AQS的state字段来表示锁是否被占用,特别需要注意的就是他的操作都是原子的
2.通过队列来保存阻塞的线程
3.通过LockSupport来进行挂起和唤醒操作(LockSupport的注意点可以看上一篇)
从挂起的时机上,AQS加了一些自旋操作,不是每次发现不能获取锁就挂起,而是后来会再次尝试获取锁,这样,对那些同步范围非常小,时间非常短的锁,应该是一种性能的提高。
另外,对于队列的操作,从我个人的角度来看,每次取当前正在跑的节点,跑完以后再取下一个节点,如果是取消状态,那么再去下一个,最后找到一个ok的唤醒,这样就可以了。不过AQS增加了一些状态判断,比如通过waitStatus来判断后面是否有节点需要唤醒,从角度来看,这完全增加了成本,不过性能提高多少就难说喽!