AbstractQueuedSynchronizer抽象队列同步器的简单理解
自己对AQS还是特别不熟悉,可能会出现较大的错误。如果有错误还请纠正。
参考文章:Java并发之AQS详解
AQS(AbstractQueuedSynchronizer抽象队列同步器)
是一个抽象类,提供了一个框架,用于实现阻塞锁(ReentrantLock、ReetrantReadWriteLock,尽管这两个类不是直接继承自AQS,但是他们的内部类Sync继承了AQS)或是同步器工具(CountDownLatch、CyclicBarrier、Semaphore等)等依赖于等待队列的工具。
AQS在设计时使用了模板方法的设计模式,即:
父类中包含抽象方法,和一些模板方法(已经规定好的步骤),抽象方法提供给子类实现,在模板方法中调用一些抽象方法用于功能的实现。
所以在实现AQS时就应覆盖它一些抽象方法: tryAcquire(独占式获取锁)、tryRelease(独占式释放锁)、tryAcquireShared(共享式获取锁)、tryReleaseShared(共享式释放锁)、isHeldExclusively(同步器是否处于独占状态)。这些是需要我们去实现的。
模板方法:
独占式获取锁:acquire、acquireInterruptibly、tryAcquireNanos
共享式获取锁:acquireShared、acquireSharedInterruptibly、tryAcquireSharedNanos
独占式释放锁:release
共享式释放锁:releaseShared
AQS提供了两种修改状态的方式:
setState(int) 和 compareAndSetState(int, int)
独占锁
独占锁就是同一时刻最多有一个线程拥有该锁。
共享锁
共享锁最典型的例子就是读写锁。读数据时,允许多个读线程同时执行,会阻塞写线程;写数据时,读写线程都会被阻塞。
AQS中的数据结构
同步队列,以双向链表的数据结构表示
源码分析
acquire过程如下:(如有错误,请告知以纠正)
首先要注意的是状态state的含义:state可以表示任意状态,在ReentrantLock中,state表示获取锁的线程已经重复获取该锁的次数;在Semaphore中,用它来表示剩余的许可数量;在FutureTask中,表示任务的状态(尚未开始,正在运行,已完成,已取消等)。
public final void acquire(int arg) {//请求获取锁,获取不到时将阻塞,arg表示获取锁的次数
if (!tryAcquire(arg) &&//tryAcquire是需要自己实现的方法,下面是ReentrantLock中的非公平锁的获取为例子,这个方法表示的是尝试去获取锁,如果获取不到锁,就会执行下面的条件。
//这里首先addWaiter会创建arg个独占模式的锁
//
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {//acquires 获取锁的次数
final Thread current = Thread.currentThread();
int c = getState(); //获取当前的状态,即获取了多少次锁
if (c == 0) {//如果为0,说明当前无线程持有该锁
if (compareAndSetState(0, acquires)) {//CAS尝试修改同步状态,获取锁
setExclusiveOwnerThread(current);//获取成功,修改内置的独占式锁拥有者线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果当前线程就是锁拥有者
int nextc = c + acquires;//修改状态
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//因为现在所已经有线程所占用,那么该锁不会被竞争,可以使用setState不用CAS
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//创建节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//先尝试插入到tail后面
pred.next = node;
return node;//成功就直接返回
}
}
enq(node);
return node;
}
private Node enq(final Node node) {//因为在插入时可能有线程竞争,有可能CAS失败
for (;;) {//自旋进行节点插入
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//循环进行获取锁,直到获取成功
final Node p = node.predecessor();//拿到前驱节点
if (p == head && tryAcquire(arg)) {//如果前驱节点是头节点,那么就该自己去获取锁
//获取锁成功
setHead(node);//设置当前节点为头节点
p.next = null; // help GC
failed = false;
return interrupted;//返回等待过程中是否被中断过
}
if (shouldParkAfterFailedAcquire(p, node) &&//获取失败后判断线程是否应该被阻塞
parkAndCheckInterrupt())//阻塞,之后等待中断或前驱节点被释放 后唤醒
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/** waitStatus value to indicate thread has cancelled */
//static final int CANCELLED = 1; 线程被取消,中断或超时,需要从队列中被移除
/** waitStatus value to indicate successor's thread needs unparking */
//static final int SIGNAL = -1; 当其结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行
/** waitStatus value to indicate thread is waiting on condition */
//static final int CONDITION = -2; 线程处于等待队列
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//static final int PROPAGATE = -3; 共享模式中使用,表示状态向后传播
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //如果前驱节点的等待状态位SIGNAL,表示节点已经设置了通知后继节点唤醒的状态。
return true;
if (ws > 0) { //如果前驱节点的状态为cancelled状态
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);// 循环将所有的cancel状态的节点从队列中清除
pred.next = node; //将队列连接起来
} else {
// 如果waitStatus为0或者propagate时,会将状态改变为SIGNAL,让前驱节点释放资源后通知后继结点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞
return Thread.interrupted();
}
- 首先,尝试获取锁,获取成功则返回。不成功则进入第二步。
- 创建Node结点,并自旋加入到同步队列尾部。
- 自旋尝试获取锁,直到获取锁成功为止。不满足条件时会被阻塞,直至中断或前驱节点释放资源。
注意:如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
release过程:(如有错误,请告知以纠正)
public final boolean release(int arg) {
if (tryRelease(arg)) {//判断是否已经完成释放
//如果释放了,我们就需要将剩下的阻塞线程唤醒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒同步队列中下一个线程
return true;
}
return false;
}
//ReentrantLock中Sync内部类的方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())//判断当前线程是不是锁拥有者
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//c==0表示锁被释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {//要唤醒node的后继结点
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); //把node的waitStatus置为0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//从tail往前找到最靠前的非cancelled节点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//将s节点唤醒
}
如果这个s节点是哪里阻塞的?就是acquireQueued中被阻塞的,然后s线程就会自旋获取锁,线程会根据shouldParkAfterFailedAcquire(p, node)再次调整,知道p == head满足且尝试获取锁成功。
propagate这个状态主要用于共享模式。表示可运行状态。
由于允许多个线程进行获取锁,如果资源有剩余,propagate会向后传播,使得等待的线程可以被唤醒且获取资源。
在共享模式中
和acquireQueued对应,多了一步setHeadAndPropagate(node, r)操作,主要是如果r>0且满足一定条件,那么就会唤醒下一个邻居线程。
以下剩余的学习之后再做更新。