JUC包(三) 锁与锁实现(AQS)
前言
在原 JUC包 (一) 原子类 与 CAS操作中我们主要了解了什么是原子操作,以及原子操作的实现CAS(Compare ans Swap)
操作. 本章,我们继续接着介绍使用CAS原理
实现的锁操作. 锁在JUC包
中使用非常广泛, 其取代了传统的synchronized
关键字被广泛使用. 下文中,我们将一起了解锁的实现原理与基本使用场景.
队列同步器(AQS)
AQS,是(Abstract Queued Synchronizer)队列同步器, 简称同步器.它是用于构建锁和其他同步操作的基础框架.使用一个int
变量来标示同步操作. 就个人理解而言, 它就是一个模板类, 其中实现了一个同步队列的基本操作.
其实现与队列基本一致. 主要维护了3个线程状态的方法,分别为getState() / setState(int newState) 和 compareAndSetState()
.
同步器中其他可重写方法:
-
protected boolean tryAcquire(int args)
独占式获取同步状态 -
protected boolean tryRelease(int args)
独占式释放同步状态 -
protected int tryAcquireShared(int args)
共享式获取同步状态 -
protected boolean tryReleasedShared(int args)
共享式释放同步状态 -
protected boolean isHeldExclusively()
当前同步器是否被独占
其中的模板方法主要有: void acquire(int arg)
void acuqireInterruptibly(int arg)
void tryAcquireNanos(int arg, long naos)
void acquireShared(int arg)
void acquireSharedInterruptily(int arg)
boolean tryAcquireSharedNanos(int arg, long naos)
boolean release(int arg)
boolean releaseShared(int arg)
-
Collection<Thread> getQueuedThreads()
上述的模板方法主要包括2类,try/release
同步锁与非同步锁 及 获取队列所以线程的getQueuedThreads()
方法.
我们在自定义锁的构造时,通常是使用同步器来进行实现.使用同步器,我们可以构建自定义锁.(类似ReentrantLock
)
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock{
private static class Sync extends AbstractQueuedSynchronizer{
// 是否处于占用状态
protected boolean isHeldExclusively(){
// 判断状态是否为1
return getState()==1;
}
// CAS 尝试获取 成功返回true 失败返回false
public boolean tryAcquire(int acquires){
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
public boolean tryRelease(){
// 如果 状态为0 说明没有获取锁.(所以无法释放)
if(getState()==0)throw new IllegalStateException();
// 如果存在空 设置同步器的所有线程为空
setExclusiveOwnerThread(null);
// 设置当前同步器状态为0
setState(0);
//释放成功
return true;
}
Condition newCondition(){return new ConditionObject();}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public static void main(String[] args) {
Collections.synchronizedList(new ArrayList<>());
}
}
我们经常使用的ReentrantLock
就是靠类似上方的数据进行实现的.(感兴趣的可以读读文尾的附注部分.)
tryAcquire
与acquire()
的区别?tryAcquire()
是尝试一次获取锁, 获取成功返回true
. 否则, 返回false
. 而acquire()
是写成了一个死循环进行获取锁. 获取失败,则进入等待队列. 这是Java
内部一个用于提高运行效率的设计.
从本章开始, 我们具体讲解下同步器是如何具体实现这些需求的.(独占锁与共享锁)
在同步器内部维护了诸如上图的同步队列.同步器维护了两个分别指向head
与tail
结点的两个指针.各个结点分别记录了prev/next/状态
等多个信息.
Node内的是属性如下所示:
- int waitStatus(CANCELLED-(1)-中断/SIGNAL-(-1)-后继等待/CONDITION-(-2)-Condition等待/PROPAGATE-(-3)-共享式等待/INITAL-(0)-初始状态 )
- Node prev
- Node next
- Node nextWaiter(等待队列中的后继结点)
- Thread thread
在同步队列中,主要有两个主要操作: 设置头结点/设置尾结点.
- 设置头结点: 将当前
head
的下一个结点置为head
结点.(head = head->next
) 无多线程问题. - 设置尾结点: 使用
compareAndSetTail(Node expect, Noode update)
进行更新.由于是CAS
操作, 所以在尝试失败后会不断的使用重试机制进行重复操作. 会遇到多线程问题, 通过CAS进行解决.
OK, 我们下面看下源码:
compareAndSetHead() & compareAndSetTail()
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 使用Unsafe类,调用系统底层进行CAS操作.
acquire() -> tryAcquire/acquireQueued -> (addWaiter-> enq)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// AQS内并没有太多关于tryAcquire()的操作(后续给出一个ReentrantLock的例子)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 获取队列内的值
final boolean acquireQueued(final Node node, int arg) {
// 失败标示标为true
boolean failed = true;
try {
// 中断标示标为false
boolean interrupted = false;
// 自旋操作 阻塞当前线程
for (;;) {
// 获取前置结点
final Node p = node.predecessor();
// 如果前置结点为头结点(当前锁持有者) 并且获取锁成功
if (p == head && tryAcquire(arg)) {
// 如果完成上述条件 则将当前结点设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 放弃获取
cancelAcquire(node);
}
}
// 增加等待结点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS进行将新结点设置到末尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果CAS失败 则进入enq()方法
enq(node);
return node;
}
// 如果第一次CAS失败 那么不断循环自旋,直到将结点添加到末尾为止
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在上述到过程中, 当线程获取锁时. 线程会先通过tryAcquire()
获取线程当锁. 当尝试失败时, 线程会将当前线程通过addWaiter()
方法将当前线程放入同步队列中.(如果放入失败,则通过enq()
方法不断循环添加, 直到成功结束.) 对于放入成功的结点,则不断进行自旋for循环
, 直到当前线程获取到锁(成为同步队列的头结点为止).
下面我们来看下release()
释放锁的方法:
release() -> tryRelease() / unparkSuccessor() -> LockSupport.unpark(s.thread)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 模板方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 使用`LockSupport`的`unpark`方法唤醒后续结点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
本章中, 讲述了AQS
获取独占锁的获取和释放的过程:
- 获取: 在获取同步状态时, 同步器构建了一个同步队列, 当获取同步队列失败时, 会将当前线程放入队列的结尾, 并进行阻塞自旋操作.
- 释放: 移出队列的条件是当前结点的前结点是头结点. 将当前结点的后一个结点作为新的队首结点.
共享式同步状态
上文分析了独占式同步队列的基本实现. 本节讲述下共享式同步状态的获取和释放.
共享式同步状态的获取和释放和独占式不太一样. 值得一提的是, 独占式的同步状态与共享式同步状态不能同时存在. 两种模式式互斥的. 基本源码如下所示:
acquireShared() -> tryAcquireShared()/doAcquireShared() ->addWaiter()/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果当前结点为头结点 尝试获取
int r = tryAcquireShared(arg);
if (r >= 0) {
// r>=0 说明已经获取到锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //记录当前头节点
//设置新的头节点,即把当前获取到锁的节点设置为头节点
//注:这里是获取到锁之后的操作,不需要并发控制
setHead(node);
//这里意思有两种情况是需要执行唤醒操作
//1.propagate > 0 表示调用方指明了后继节点需要被唤醒
//2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
//后面详细说
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
doReleaseShared() -> unparkSuccessor()
// 这段没怎么看懂
private void doReleaseShared() {
for (;;) {
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
共享式同步状态的获取和释放真的比较绕. 就个人理解而言,
- 共享的获取.在于将
Head
头结点的值进行了改变,导致了后续的结点也能够获取到了锁. (Head结点为volatile
类型变量) - 资源的释放,在于,释放自身资源的同时, 还要释放相同获取到资源的其他线程.
深入浅出AQS之共享锁模式
独占式超时获取同步状态
tryAcquireNanos -> doAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通过if (nanosTimeout <= 0)return false;
可以看出, 当超过时间界限, 会退出等待循环.
自定义等待组件 TwinsLock
TwinsLock
package com.yanxml.multithreading.art.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 同步式锁 允许2个线程进行访问
* */
public class TwinsLock implements Lock{
private final Sync sync = new Sync(2);
public static final class Sync extends AbstractQueuedSynchronizer{
public Sync(int count){
if(count<0){
throw new IllegalArgumentException();
}
setState(count);
}
// public int tryAcquireShared(int reduceCount){
public int tryAcquireShared(int reduceCount){
for(;;){
int current = getState();
int newCount = current - reduceCount;
if(newCount < 0 || compareAndSetState(current, newCount)){
// if(compareAndSetState(current, newCount)){
return newCount;
}
}
}
// public boolean tryReleaseShared(int returnCount){
public boolean tryReleaseShared(int returnCount){
for(;;){
int current = getState();
int newCount = current+returnCount;
if(compareAndSetState(current, newCount)){
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
TwinsLockTest 测试类
package com.yanxml.multithreading.art.lock;
import java.util.concurrent.locks.Lock;
import com.yanxml.multithreading.art.lock.TwinsLock;
/**
* 用于测试TwinsLock的功能.
*
* */
class Worker extends Thread{
Lock lock;
public Worker(Lock lock){
this.lock = lock;
}
public void run(){
while(true){
lock.lock();
try{
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
}
public class TwinsLockTest {
public static void main(String[] args) throws InterruptedException {
Lock lock = new TwinsLock();
for(int i=0;i<10;i++){
Worker w = new Worker(lock);
w.setDaemon(true);
w.start();
}
for(int i=0;i<10;i++){
Thread.sleep(3000);
System.out.println();
}
}
}
Condition及其实现
Condition常见API
-
await()
线程等待 -
awaitUninterruptibly()
等待状态直到被通知 -
awaitNanos(long nanosTimeout)
等待(时间段内未收到信息,唤醒) -
boolean awaitUntil(Date deadline)
等待(死亡时间) -
void signal()
唤醒一个等待Condition
的线程 -
void signalAll()
唤醒所有等待Condition
的线程
Condition实现(阻塞队列)
Condition原理
AQS的同步队列中, 同时维护一个等待队列.
- 当调用
Condition.await()
方法后,将其从AQS队列中取出,放入Condition的队列中. - 当调用
Condition.singnal()
方法后,将其从Condition的队列中拿出,放入AQS队列.
await /
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
Tips
LockSupport工具
-
void park()
阻塞当前线程 -
void parkNanos(long nanos)
阻塞当前线程(时间间隔nanos) -
void parkUtil(long deadline)
阻塞当前线程(直到deadline) -
void unpark(Thread thread)
释放某个线程的锁定
双锁样例
Others
- 重入锁
获取时, 如果当前线程为锁的持有者,自增.
释放时, 需要进行多次释放.
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// CAS 获取锁 成功即为所有者
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果当前线程为线程所有者 nextc=c+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果c==0 说明全部释放成功了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- 公平锁 / 非公平锁
ReentrantLoc
k内FairLock
与NoFairLock
的实现各不相同.
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 我还是看不懂 这个判断的幺蛾子.
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- 读写锁
读写锁可以参考ReentrantWriteReadLock
.
Reference
[1] Java多线程核心
[2] Java多线程编程艺术
[3] Java并发之AQS详解
[4] 一步步透彻理解Lock的Acquire和Release原理源码
[5] JDK中多线程之JUC锁的JDK源码解读配合大神的一起看,秒懂。
[6] java并发编程的艺术——第五章总结(Lock锁与队列同步器)
[7] setState 和 compareAndSetState方法作用分析
[8] 深入浅出AQS之共享锁模式
[9] 锁与CAS介绍
[10] Java并发源码剖析(二)——AbstractQueuedSynchronizer共享模式
[11] 同步器节点的waitStatus解释
[12] AQS——同步队列共享模式
[13] Java中的锁——队列同步器
[14] Java线程并发中的锁——Lock(下)