多线程学习篇之AQS解析和相关实现
参考:http://www.cnblogs.com/waterystone/p/4920797.html
java并发编程艺术
AQS 即 AbstractQueuedSynchronizer(队列同步器)
第一次看到这个类 一脸懵逼, 它其实就是一个抽象类,它是用来构建锁和其他同步组件的基础框架。
如果我们想实现一个自定义的lock锁 CustomLock 只需要在它里面定义一个继承了这个同步器的静态内部类 重写它指定的部分方法 。然后在获得锁释放锁相关的方法中,直接调用同步器的模板方法,在模板方法中会调用我们自己重写的部分方法。
同步器时实现锁(包括任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义,
锁面向使用者,同步器面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程排队的若干操作。
——java并发编程的艺术
可以看出 同步器(AQS)是基于模板方法模式的,它本身并没有实现同步接口,那它是怎么实现同步的呢?
它的内部包含着一个同步队列,利用该队列实现了一套线程阻塞等待以及被唤醒时锁分配的机制,其中包含2个结点类型的引用,一个指向头结点代表当前获取到同步资源的结点,一个指向尾节点,当有一个线程获取同步资源失败就会加入尾部和尾节点形成联系
CAS提供获取同步资源的方式有2种
- 独占式 同一时刻只能有一个线程获取到同步资源
- 共享式 同一时刻可以有多个线程获取到同步资源
简单介绍一下 以独占式为例:
- 当有一个线程需要获取同步资源时,首先尝试将同步状态设为1,如果成功将被构造一个当前线程的头节点,如果失败,会构造成一个当前线程的同步结点利用cas机制加入队列尾部,然后以自旋的方式根据前置节点状态状态判断是否应该继续获取资源,如果获取不到则被阻塞等待
- 当有一个线程需要释放同步资源时,将同步状态设为0,并唤醒后继有效阻塞等待结点。
图示:
当我们需要实现一个自定义同步组件时只需要重写对同步状态的获取和释放的方式,至于具体线程同步队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了
下图中为CAS的几个实例变量 head 和 tail 代表指向头节点和尾节点的引用,state 代表同步状态
Node节点类以静态内部类的方式存在于CAS中 主要有以下几个实例变量
// 等待状态 初始值为0 代表初始状态
volatile int waitStatus;
// 等待状态取值
// 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点
// 代表结束状态,进入该状态后的结点将不会再变化。
static final int CANCELLED = 1;
// 等待状态取值 后继节点处于等待,如果当前结点释放同步资源 将会通知后继节点
static final int SIGNAL = -1;
// 等待状态取值 与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,
// 当其他线程调用了Condition的signal()方法后,
// CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
static final int CONDITION = -2;
// 等待状态取值 与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
static final int PROPAGATE = -3;
// 前驱结点
volatile Node prev;
// 后继节点
volatile Node next;
// 绑定在该节点的线程
volatile Thread thread;
定义了3个对state的操作
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
供我们对同步状态的获取和释放方式重写的方法为下面几个:
//该线程是否正在独占资源。只有用到condition才需要去实现它。
boolean sHeldExclusively()
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
int tryAcquireShared(int)
//共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
boolean tryReleaseShared(int)
这些不是抽象方法 默认抛出异常。
为什么不定义为抽象方法,因为获取资源模式分为独占和共享
想实现独占只需重写 独占相关
想实现共享只需重写 共享相关
同步器提供的维护队列的模板方法主要为下面几个,在模板方法中会调用我们重写的方法:
// 独占式获取同步状态,会调用重写的tryAcquire(int arg)方法
public final void acquire(int arg)
// 共享式获取同步状态,会调用重写的tryAcquireShared(int arg)方法
public final void acquireShared(int arg)
// 独占式释放同步状态,会调用重写的tryRelease(int arg)方法
public final boolean release(int arg)
// 共享式释放同步状态,会调用重写的tryAcquireShared(int arg)方法
public final boolean releaseShared(int arg)
// 获取等待在同步队列上的线程集合
public final Collection<Thread> getQueuedThreads()
可以看到主要分为独占式获取释放 和 共享式获取与释放,还有就是查询队列情况。
首先从独占式获取与释放同步状态开始看起:
acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这是独占式获取资源的入口,当我们需要实现自己的lock加锁方法时,一般是直接调用这个模板方法。
- 首先直接调用tryAcquire方法获取同步状态,需要我们自己实现,成功的话后面的都不用执行了
- 如果失败 首先调用addWaiter(Node.EXCLUSIVE))方法构造同步节点加入队列尾部 并返回该结点
- 调用acquireQueued(final Node node, int arg) 使该结点以自旋的方式再次尝试获得同步状态 获取不到则进入阻塞,等待被唤醒再次尝试获取( 获取同步状态和进入等待也是有条件的 后面细讲)
- 如果acquireQueued() 返回true 说明阻塞过程中被中断过,调用selfInterrupt()补中断状态
接下来分析具体步骤
第一步:需要自己实现的tryAcquire方法就不看了 默认抛出UnsupportedOperationException异常,所以子类想要调用必须重写 其实就是设置同步状态 state 的过程
第二步:addWaiter(Node.EXCLUSIVE))
Node.EXCLUSIVE 其实就是一个常量null 代表以独占模式构造结点
private Node addWaiter(Node mode) {
// 创建一个新的代表当前线程的结点,模式为独占
Node node = new Node(Thread.currentThread(), mode);
// 获取到指向尾节点的引用
Node pred = tail;
if (pred != null) {
// 将新节点的前驱指向尾节点
node.prev = pred;
// 利用cas将当前尾节点tail引用指向新的结点,第一个参数为预期值,第二个为需要修改值
if (compareAndSetTail(pred, node)) {
// 设置成功
// 将原来尾节点的后继指向新的尾节点
pred.next = node;
return node;
}
}
// 当前尾节点为null 或者设置新的尾节点失败就调用这个方法 下面详解
enq(node);
return node;
}
在addWaiter方法中如果当前尾节点为null并且cas操作如果失败是什么也不做的,enq方法用了死循环的方式保证结点能正确被添加到队列尾部。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果当前尾节点为Null 说明队列为空,cas设置一个空的标志结点作为head结点 预期值为null
// 并将tail也指向它。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 此方法对应 addWaiter方法中cas设置失败,此时重新设置
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这个enq方法是一直循环下去的 ,可以看出这个方法让并发添加结点的请求变得串行化了。
设置完尾节点之后进入第3步。
第三步:acquireQueued(final Node node, int arg)
当前驱结点为头节点才能再次获取同步资源 保证了FIFO
当前驱结点不是头节点并且等待状态为SIGNAL才能放心阻塞进入等待状态
当被唤醒后继续获取同步资源成功时才会退出自旋
否则一直自旋 一直等待被唤醒尝试 等待被唤醒尝试。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否拿到资源 failed : 失败
boolean failed = true;
try {
// 阻塞等待过程中是否被中断
boolean interrupted = false;
for (;;) {
// 拿到前驱结点
final Node p = node.predecessor();
// 如果前驱结点为头节点则调用tryAcquire尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取成功 设置当前结点为头节点
setHead(node);
// 将原来的头节点的next置为Null 方便垃圾回收 意味着之前拿完资源的头结点出队
p.next = null; // help GC
// 成功获取资源
failed = false;
// 返回等待过程是否被中断
return interrupted;
}
// shouldParkAfterFailedAcquire如果前驱结点不是头节点或者获取资源失败判断是否进入阻塞等待
// parkAndCheckInterrupt 进入等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 等待时被中断
interrupted = true;
}
} finally {
if (failed)
// 发生异常没拿到资源
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(p, node)用来判断啥时候可以进入阻塞等待
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的等待状态
int ws = pred.waitStatus;
// 前驱结点知道自己释放后要唤醒自己则放心进入阻塞等待
if (ws == Node.SIGNAL)
return true;
// 前驱结点是个等待超时或中断的无效结点
if (ws > 0) {
/*
* 往前找到最近一个正常等待的状态,排在它的后边。
* 注意:那些放弃的结点形成一个无引用链会被GC回收
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 利用cas设置前驱结点的等待状态SIGNAL 告诉它释放后要通知自己可以继续尝试获取同步资源了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果可以进入等待 调用parkAndCheckInterrupt()进入阻塞等待状态 等待被唤醒再次获取
private final boolean parkAndCheckInterrupt() {
// 进入等待
LockSupport.park(this);
// 等待被唤醒 判断是不是被中断唤醒的 这个方法会清除中断标志位 所以后面要补中断。
return Thread.interrupted();
}
第四步:如果acquireQueued() 返回true 说明等待过程中被中断过,调用selfInterrupt()补中断状态
总结下调用流程:
release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这是独占式释放资源的入口,当我们需要实现自己的unlock释放锁方法时,一般是直接调用这个模板方法
它用来释放同步状态并唤醒后继结点
- 调用我们重写的释放资源方法 主要是设置同步状态为0
- 如果成功 调用unparkSuccessor 唤醒下一个阻塞等待线程结点
private void unparkSuccessor(Node node) {
// 获取当前结点的等待状态
int ws = node.waitStatus;
if (ws < 0)
// 将当前结点等待状态设为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 找到下一个需要唤醒的结点s 并且有效 (waitStatus <=0 )
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}
当调用 LockSupport.unpark(s.thread);唤醒后继线程结点后,它就又会进入 acquireQueued()尝试获取同步状态直到获取成功。
接下来是共享式获取和释放资源
acquireShared(int arg)
这是共享式获取资源的入口
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
因为我们重写的tryAcquireShared方法的返回值中:负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 (tryAcquireShared需要自旋设置想要的同步状态)
所以这里当返回值<0时表示获取失败,然后调用了doAcquireShared(arg)方法
这个方法和独占式中的实现没什么太大区别 只是当同步资源还有剩余时自己也可以唤醒后续结点
并且将补中断放在这里面
private void doAcquireShared(int arg) {
// 创建一个共享模式的结点 添加到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 当前驱为头节点尝试获取同步资源
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将头节点指向自己 并当同步资源还有时 继续唤醒后续结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断是否可以等待 parkAndCheckInterrupt为进入等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 抛出异常 获取资源失败
if (failed)
cancelAcquire(node);
}
}
releaseShared(int arg)
共享式释放同步状态
public final boolean releaseShared(int arg) {
// 设置同步状态成功
if (tryReleaseShared(arg)) {
// 唤醒后续结点
doReleaseShared();
return true;
}
return false;
}
这里注意重写tryReleaseShared的方法与独占式tryRelease的区别在于必须保证同步状态安全释放,一般通过循环和CAS来保证,因为释放同步状态的操作可能会来自多个线程
接下来实现一个自定义的共享式获取同步资源的同步组件演示一下
这个同步组件TwinsLock 在同一个时刻只允许最多2个线程同时执行。
实现其实很简单 只需要像上面所说的那样 定义一个静态内部类继承AQS类 重写相关方法实现对同步状态的修改操作。然后在获得锁和释放锁的方法中直接调用AQS类的模板方法。
因为是共享式 允许最多2个线程 需要重写acquireShared 和 releaseShared 并且把同步状态初始值定为2
TwinsLock 需要实现Lock接口 自定义锁
/**
* Created by 周大侠
* 2019-04-18 11:14
*/
public class TwinsLock implements Lock {
// 创建Sync类初始化同步状态为2
private static final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
if (count < 0) {
throw new IllegalArgumentException("初始化资源小于0");
}
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
// for循环利用cas设置同步状态
for (;;) {
int current = getState();
int newstate = current - arg;
// 小于0 获取失败直接返回小于0的数表示失败 或者设置成功返回剩余的同步状态数
if (newstate < 0 || compareAndSetState(current, newstate)) {
return newstate;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
// 因为是释放同步资源 所有需要把原来的同步状态增加
for(;;) {
int current = getState();
int newstate = current + arg;
// 设置成功 return true;
if (compareAndSetState(current, newstate)) {
return true;
}
}
}
}
@Override
public void lock() {
// 直接调用模板方法
sync.acquireShared(1);
}
@Override
public void unlock() {
// 直接调用模板方法
sync.releaseShared(1);
}
// 其他相关方法就不实现了
测验一下
public static void main(String[] args) {
Lock twinsLock = new TwinsLock();
Runnable runnable = () ->{
twinsLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "当前时刻" + System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
twinsLock.unlock();
}
};
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
}
可以看到同一时刻只有2个线程在运行
Mutex(互斥锁)
Mutex是一个独占式的的同步组件 和上面的TwinsLock一样 只是重写的是独占式相关方法。
同步状态只有两种状态:0表示未锁定,1表示锁定。下边是Mutex的核心源码:
class Mutex implements Lock, java.io.Serializable {
// 真正同步类的实现都依赖继承于AQS的自定义同步器!
private final Sync sync = new Sync();
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判断是否锁定状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 尝试获取资源,立即返回。成功则返回true,否则false。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 这里限定只能为1个量
if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
return true;
}
return false;
}
// 尝试释放资源,立即返回。成功则为true,否则false。
protected boolean tryRelease(int releases) {
assert releases == 1; // 限定为1个量
if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//释放资源,放弃占有状态
return true;
}
}
//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
public void lock() {
sync.acquire(1);
}
//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
public boolean tryLock() {
return sync.tryAcquire(1);
}
//unlock<-->release。两者语文一样:释放资源。
public void unlock() {
sync.release(1);
}
//锁是否占有状态
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差不多,不同的地方就在获取-释放资源的方式tryAcquire-tryRelelase。