JDK源码解析之AbstractQueuedSynchronizer
前言:
JDK中的锁大致可以分为两类:Synchronized和CAS。
CAS的使用中,有直接使用的,比如AtomicInteger;有间接使用的比如ReentrantLock。关于AtomicInteger的分析可参考笔者的上一篇博客:
CAS间接实现的典型代表是ReentrantLock和ReentrantReadWriteLock。
本文先不具体介绍ReentrantLock的实现方式,而且先介绍一下其基础类AbstractQueuedSynchronizer类,该类作为CAS的抽象类实现类,是并发包(java.util.concurrent)下重要的基础类。
1.AbstractQueuedSynchronizer(AQS)使用范例
AQS是一个用来构建锁和同步器的框架。
以下范例来自于网络(有删减):https://www.cnblogs.com/chengxiao/p/7141160.html
1)创建Mutex类用于实现AQS
public class Mutex implements java.io.Serializable {
//同步对象完成一系列复杂的操作,我们仅需指向它即可
private final Sync sync = new Sync();
//加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
//释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
/** 注意这个静态内部类 */
//静态内部类,继承AQS
private static class Sync extends AbstractQueuedSynchronizer {
//是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
//当状态为0的时候获取锁,CAS操作成功,则state状态为1,
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将同步状态置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
}
总结:本例中使用静态内部值Sync实现AQS,主要实现其tryAcquire()和tryRelease()方法
2)测试类
public class TestMutex {
private static CyclicBarrier barrier = new CyclicBarrier(31);
private static int a = 0;
private static Mutex mutex = new Mutex();
public static void main(String []args) throws Exception {
//说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为300000;
//加锁后
barrier.reset();//重置CyclicBarrier
a=0;
for(int i=0;i<30;i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10000;i++){
increment2();//a++采用Mutex进行同步处理
}
try {
barrier.await();//等30个线程累加完毕
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
barrier.await();
System.out.println("加锁后,a="+a);
}
/**
* 没有同步措施的a++
* @return
*/
public static void increment1(){
a++;
}
/**
* 使用自定义的Mutex进行同步处理的a++
*/
public static void increment2(){
mutex.lock();
a++;
mutex.unlock();
}
}
测试结果表明自定义的Sync拥有锁的特性,实现了多线程安全
2.AbstractQueuedSynchronizer结构
1)AQS的结构分析
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 头结点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 状态值
private volatile int state;
//
这个头结点和尾节点是做什么的呢?感觉像双端链表,先放一下,我们先看一下Node的结构
2)Node结构
static final class Node {
// Node的两种模式
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// Node的状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 从这里可以看出Node本身是一个双端链表结构
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// Node的三种构造方法
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
以上都是通过源码推测出来的,到这里我们还是云里雾里的
那么接着来看源码
3.Mutex.lock()加锁方法
public void lock() {
// 具体方法在AQS中
sync.acquire(1);
}
// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
// tryAcquire方法是被Sync重写的
// tryAcquire(1)方法将state设置为1,如果设置成功则直接返回true
// 设置失败则执行acquireQueued()方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// Thread.currentThread().interrupt();执行当前线程中断
selfInterrupt();
}
// acquireQueued()
1)tryAcquire()方法在Mutex中被重写
// Mutex.tryAcquire()
public boolean tryAcquire(int acquires) {
// AbstractQueuedSynchronizer.compareAndSetState()
// 使用CAS来原子性的设置state为1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
2)AbstractQueuedSynchronizer.addWaiter()
tryAcquire方法(设置state=1)失败后,则创建节点并添加到队列尾部
// AbstractQueuedSynchronizer.addWaiter()
private Node addWaiter(Node mode) {
// 1.创建新节点node,mode=null,此时node.nextWaiter=null
Node node = new Node(Thread.currentThread(), mode);
// 2.获取尾节点
Node pred = tail;
if (pred != null) {
// 3.将node设置为尾节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 4.如果将node设置尾节点失败,则执行enq方法
enq(node);
return node;
}
// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
// 通过无限循环的方式坚持不懈的设置node节点为尾节点
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3)AbstractQueuedSynchronizer.acquireQueued()
// AbstractQueuedSynchronizer.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 1.开启一个无限循环
for (;;) {
// 2.获取当前节点的前节点
final Node p = node.predecessor();
// 3.只有前节点为head时,执行tryAcquire方法
// 也就是继续设置状态,继续尝试锁
if (p == head && tryAcquire(arg)) {
// 如果尝试成功,则设置当前节点为head,返回false
// 返回false之后就不会再执行线程interrupt操作
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 4.如果尝试锁失败,或者当前节点前节点不是head
// 则判断当前线程是否应该阻塞,如果应该的话则执行线程中断操作
// 继续循环,当线程被中断时则跳出循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// shouldParkAfterFailedAcquire()
// 判断当前线程能够阻塞休息
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前节点的状态时SIGNAL,则当前节点可以休息
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 状态>0,说明是CANCELLED状态,则说明前节点已经无效,则从后向前遍历,找到一个非CANCELLED状态
// 的节点,并将当前节点设置为其后置节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 其他情况(CONDITION、PROPAGATE)这两种状态的前节点,将前节点设置为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 检查线程是否被中断过,如果被设置过则返回true
private final boolean parkAndCheckInterrupt() {
// 当前线程进入wait
LockSupport.park(this);
return Thread.interrupted();
}
总结:加锁过程可总结为以下三步
* 首先通过tryAcquire()方法来获取同步状态,成功则返回,失败进行下一步骤操作
* 创建Node节点,加入同步队列中
* 加入队列的Node节点,如果其前节点是head,则再次调用tryAcquire()方法来获取同步状态。否则当其前节点的状态为SIGNAL时,线程便进入阻塞状态,直至被中断或唤醒
用图再来说明一下整个过程,图片来自(http://www.cnblogs.com/waterystone/p/4920797.html ):
做一个比喻来说:
我们去医院排队挂号,等医生看病,这个资源就是医生本身,医生每一次只能给一位挂号的病人看病,目前在看病的也就是head节点。
1)又来了一位病人
他去尝试获取state,也就是让医生帮看病,发现资源被人占用(因为head在医生那正看着呢),所以他就去排队,排到队尾,也就是tail节点,然后就去休息(执行了LockSupport.park(this)方法)
2)第一个病人看好了病之后
他去释放锁,也就是从医生那出来,(喊了一嗓子,大家可以去医生那看病啦,我看好了,执行LockSupport.unpark方法),然后休息的线程都醒来了,去竞争资源,但是只有等待线程2竞争资源成功,那么他接着进去看病,他就被设置为head
3.Mutex.unlock()释放锁
public void unlock() {
sync.release(1);
}
// AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
// tryRelease方法调用的是Mutex.tryRelease()
if (tryRelease(arg)) {
// 获取头结点,如果头结点状态不为0,目前是SIGNAL=-1
Node h = head;
if (h != null && h.waitStatus != 0)
// 则执行unparkSuccessor方法
unparkSuccessor(h);
return true;
}
return false;
}
1) AbstractQueuedSynchronizer.unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果head的状态为SIGNAL,则设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 非正常情况下,head的next结点状态为CANCELLED,则从tail结点向前寻找,
// 找到一个正常结点后将其设置为head的next结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常情况下,head的next结点不为null,则执行对其唤醒操作,提醒其应该去竞争锁
if (s != null)
LockSupport.unpark(s.thread);
}
一句话来总结就是:head结点设置状态为0,然后唤醒其后置结点
总结:通过刚才的示例Mutex,我们分析了其Sync的lock和unlock方法,进而分析了AQS的源码
实际AQS就是CAS(Compare And Swap)+CLH(内置的同步队列)的格式,通过队列和CAS的方式来实现加锁和释放锁的功能
以上所说的锁是一种独占锁,AQS还提供了另一种锁:共享锁
关于共享锁,本文不再分析,等到后面分析ReentrantReadWriteLock的时候,再详述
参考:
https://www.cnblogs.com/chengxiao/p/7141160.html
http://www.cnblogs.com/waterystone/p/4920797.html