并发编程之AQS(AbstractQueuedSynchronizer)详解
ReentrantLock实现了一个内部类Sync,该内部类继承了AbstractQueuedSynchronizer,所有锁机制的实现都是依赖于Sync内部类,也可以说ReentrantLock的实现就是依赖于AbstractQueuedSynchronizer类。于此类似,CountDownLatch, CyclicBarrier, Semaphore这些类也是采用同样的方式来实现自己对于锁的控制。可见,AbstractQueuedSynchronizer是这些类的基石。那么AQS内部到底实现了什么以至于所有这些类都要依赖于它呢?可以这样说,AQS为这些类提供了基础设施,也就是提供了一个密码锁,这些类拥有了密码锁之后可以自己来设置密码锁的密码。
此外,AQS还提供了一个排队区,并且提供了一个线程训导员,我们知道线程就像一个原始的野蛮人,它不懂得讲礼貌,它只会横冲直撞,所以你得一步一步去教它,告诉它什么时候需要去排队了,要到哪里去排队,排队前要做些什么,排队后要做些什么。这些教化工作全部都由AQS帮你完成了,从它这里教化出来的线程都变的非常文明懂礼貌,不再是原始的野蛮人,所以以后我们只需要和这些文明的线程打交道就行了,千万不要和原始线程有过多的接触!
一、AbstractQueuedSynchronizer密码锁
AbstractQueuedSynchronizer源码如下:
private transient volatile Node head;//同步队列的头结点
private transient volatile Node tail;//同步队列的尾结点
private volatile int state;//同步状态
protected final boolean compareAndSetState(int expect, int update) {//以CAS方式设置同步状态 // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
这三个成员变量都使用了volatile关键字进行修饰,这就确保了多个线程对它的修改都是内存可见的。整个类的核心就是这个同步状态,可以看到同步状态其实就是一个int型的变量,大家可以把这个同步状态看成一个密码锁,而且还是从房间里面锁起来的密码锁,state具体的值就相当于密码控制着密码锁的开合。当然这个锁的密码是多少就由各个子类来规定了。
例如在ReentrantLock中,state等于0表示锁是开的,state大于0表示锁是锁着的,而在Semaphore中,state大于0表示锁是开的,state等于0表示锁是锁着的。
AbstractQueuedSynchronizer内部其实有两个排队区,一个是同步队列,一个是条件队列。
同步队列只有一条,而条件队列可以有多条。同步队列的结点分别持有前后结点的引用(类似于双向链表),而条件队列的结点只有一个指向后继结点的引用(类似于单向链表)。
图中T表示线程,每个结点包含一个线程,线程在获取锁失败后首先进入同步队列排队,而想要进入条件队列该线程必须持有锁才行。队列中每个结点的结构如下:
static final class Node {//同步队列的结点 /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node();//表示当前线程以共享模式持有锁 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;//表示当前线程以独占模式持有锁 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1;//表示当前结点已经取消获取锁 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1;//表示后继结点的线程需要运行 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2;//表示当前结点在条件队列中排队 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;//表示后继结点可以直接获取锁 volatile int waitStatus;//表示当前结点的等待状态 volatile Node prev;//表示同步队列中的前继结点 volatile Node next;//表示同步队列中的后继结点 volatile Thread thread;//当前结点持有的线程引用 Node nextWaiter;//表示条件队列中的后继结点 /** * Returns true if node is waiting in shared mode. */ final boolean isShared() {//当前结点状态是否是共享模式 return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException {//返回当前结点的前继结点 Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {//构造器1 } Node(Thread thread, Node mode) {//构造器2, 默认用这个构造器 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { //构造器3, 只在条件队列中用到 this.waitStatus = waitStatus; this.thread = thread; } }
Node代表同步队列和条件队列中的一个结点,它是AbstractQueuedSynchronizer的内部类。Node有很多属性,比如持有模式,等待状态,同步队列中的前继和后继,以及条件队列中的后继引用等等。
可以把同步队列和条件队列看成是排队区,每个结点看成是排队区的座位,将线程看成是排队的客人。客人刚来时会先去敲敲门,看看锁有没有开,如果锁没开它就会去排队区领取一个号码牌,声明自己想要以什么样的方式来持有锁,最后再到队列的末尾进行排队。
二、等待状态
等待状态分为CANCELLED(1),SIGNAL(-1),CONDITION(-2),PROPAGATE(-3)四种状态
可以将这个等待状态看作是挂在座位旁边的牌子,标识当前座位上的人的等待状态。这个牌子的状态不仅自己可以修改,其他人也可以修改。
例如当这个线程在排队过程中已经打算放弃了,它就会将自己座位上的牌子设置为CANCELLED,这样其他人看到了就可以将它清理出队列。还有一种情况是,当线程在座位上要睡着之前,它怕自己睡过了头,就会将前面位置上的牌子改为SIGNAL,因为每个人在离开队列前都会回到自己座位上看一眼,如果看到牌子上状态为SIGNAL,它就会去唤醒下一个人。只有保证前面位置上的牌子为SIGNAL,当前线程才会安心的睡去。
CONDITION状态表示该线程在条件队列中排队,PROPAGATE状态提醒后面来的线程可以直接获取锁,这个状态只在共享模式用到。
三、结点进入同步队列原理
入队操作使用一个死循环,只有成功将结点添加到同步队列尾部才会返回,返回结果是同步队列原先的尾结点。
private Node enq(final Node node) {//结点入队操作, 返回前一个结点 for (;;) { Node t = tail;//获取同步队列尾结点引用 if (t == null) { //如果尾结点为空说明同步队列还没有初始化 if (compareAndSetHead(new Node())) //初始化同步队列 tail = head; } else { node.prev = t;//1.指向当前尾结点 if (compareAndSetTail(t, node)) {//2.设置当前结点为尾结点 t.next = node;//3.将旧的尾结点的后继指向新的尾结点 return t;//for循环唯一的出口 } } } }
添加尾结点的顺序步骤:
(1)指向尾结点
(2)CAS更改尾结点
(3)将旧尾结点的后继指向当前结点
在并发环境中这三步操作不一定能保证完成,所以在清空同步队列所有已取消的结点这一操作中,为了寻找非取消状态的结点,不是从前向后遍历而是从后向前遍历的。还有就是每个结点进入队列中时它的等待状态是为0,只有后继结点的线程需要挂起时才会将前面结点的等待状态改为SIGNAL。
四、独占模式(exclusive mode)
1、获取锁分别提供三种获取方式
(1)不响应线程中断获取
public final void acquire(int arg) {//不响应中断方式获取(独占模式) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上面代码中,它按照顺序执行了下图所示的4个步骤
1)!tryAcquire(arg)
protected boolean tryAcquire(int arg) {//尝试去获取锁(独占模式) throw new UnsupportedOperationException(); }
这时候来了一个人,他首先尝试着去敲了敲门,如果发现门没锁(tryAcquire(arg)=true),那就直接进去了。如果发现门锁了(tryAcquire(arg)=false),就执行下一步。这个tryAcquire方法决定了什么时候锁是开着的,什么时候锁是关闭的。这个方法必须要让子类去覆盖,重写里面的判断逻辑。
2)addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {//将当前线程包装成结点并添加到同步队列尾部 Node node = new Node(Thread.currentThread(), mode);//指定持有锁的模式 // Try the fast path of enq; backup to full enq on failure Node pred = tail;//获取同步队列尾结点引用 if (pred != null) {//如果尾结点不为空, 表明同步队列已存在结点 node.prev = pred;//1.指向当前尾结点 if (compareAndSetTail(pred, node)) {//2.设置当前结点为尾结点 pred.next = node;//3.将旧的尾结点的后继指向新的尾结点 return node; } } enq(node);//否则表明同步队列还没有进行初始化 return node; }
private Node enq(final Node node) {//结点入队操作 for (;;) { Node t = tail;//获取同步队列尾结点引用 if (t == null) { //如果尾结点为空说明同步队列还没有初始化 if (compareAndSetHead(new Node()))//初始化同步队列 tail = head; } else { node.prev = t;//1.指向当前尾结点 if (compareAndSetTail(t, node)) {//2.设置当前结点为尾结点 t.next = node;//3.将旧的尾结点的后继指向新的尾结点 return t; } } } }
执行到这一步表明第一次获取锁失败,那么这个人就给自己领了块号码牌进入排队区去排队了,在领号码牌的时候会声明自己想要以什么样的方式来占用房间(独占模式or共享模式)。注意,这时候他并没有坐下来休息(将自己挂起)。
3)acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
领完号码牌进入排队区后就会立马执行这个方法,当一个结点首次进入排队区后有两种情况,
一种是发现他前面的那个人已经离开座位进入房间了,那他就不坐下来休息了,会再次去敲一敲门看看那小子有没有完事。如果里面的人刚好完事出来了,都不用他叫自己就直接冲进去了。否则,就要考虑坐下来休息一会儿了,但是他还是不放心,如果他坐下来睡着后没人提醒他怎么办?他就在前面那人的座位上留一个小纸条,好让从里面出来的人看到纸条后能够唤醒他。
还有一种情况是,当他进入排队区后发现前面还有好几个人在座位上排队呢,那他就可以安心的坐下来咪一会儿了,但在此之前他还是会在前面那人(此时已经睡着了)的座位上留一个纸条,好让这个人在走之前能够去唤醒自己。当一切事情办妥了之后,他就安安心心的睡觉了,注意,我们看到整个for循环就只有一个出口,那就是等线程成功的获取到锁之后才能出去,在没有获取到锁之前就一直是挂在for循环的parkAndCheckInterrupt()方法里头。线程被唤醒后也是从这个地方继续执行for循环。
final boolean acquireQueued(final Node node, int arg) {//以不可中断方式获取锁(独占模式) boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取给定结点的前继结点的引用 if (p == head && tryAcquire(arg)) {//如果当前结点是同步队列的第一个结点, 就尝试去获取锁 setHead(node);//将给定结点设置为head结点 p.next = null; //为了帮助垃圾收集, 将上一个head结点的后继清空 failed = false;//设置获取成功状态 return interrupted;//返回中断的状态, 整个循环执行到这里才是出口 } //否则说明锁的状态还是不可获取, 这时判断是否可以挂起当前线程 //如果判断结果为真则挂起当前线程, 否则继续循环, //在这期间线程不响应中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed)//在最后确保如果获取失败就取消获取 cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//判断是否可以将当前结点挂起 int ws = pred.waitStatus;//获取前继结点的等待状态 if (ws == Node.SIGNAL) //如果前继结点状态为SIGNAL, 表明前继结点会唤醒当前结点, //所以当前结点可以安心的挂起了 return true; if (ws > 0) { //下面的操作是清理同步队列中所有已取消的前继结点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //到这里表示前继结点状态不是SIGNAL, 很可能还是等于0, //这样的话前继结点就不会去唤醒当前结点了 //所以当前结点必须要确保前继结点的状态为SIGNAL才能安心的挂起自己 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() {//挂起当前线程 LockSupport.park(this); return Thread.interrupted(); }
4)selfInterrupt()
由于上面整个线程一直是挂在for循环的parkAndCheckInterrupt()方法里头,没有成功获取到锁之前不响应任何形式的线程中断,只有当线程成功获取到锁并从for循环出来后,他才会查看在这期间是否有人要求中断线程,如果是的话再去调用selfInterrupt()方法将自己挂起。
static void selfInterrupt() {//当前线程将自己中断 Thread.currentThread().interrupt(); }
(2)响应线程中断获取
响应线程中断方式和不响应线程中断方式获取锁流程上大致上是相同的。唯一的一点区别就是线程从parkAndCheckInterrupt方法中醒来后会检查线程是否中断,如果是的话就抛出InterruptedException异常,而不响应线程中断获取锁是在收到中断请求后只是设置一下中断状态,并不会立马结束当前获取锁的方法,一直到结点成功获取到锁之后才会根据中断状态决定是否将自己挂起。
private void doAcquireInterruptibly(int arg) throws InterruptedException {//以可中断模式获取锁(独占模式) final Node node = addWaiter(Node.EXCLUSIVE);//将当前线程包装成结点添加到同步队列中 boolean failed = true; try { for (;;) { final Node p = node.predecessor();//获取当前结点的前继结点 if (p == head && tryAcquire(arg)) {//如果p是head结点, 那么当前线程就再次尝试获取锁 setHead(node); p.next = null; // help GC failed = false;//获取锁成功后返回 return; } //如果满足条件就挂起当前线程, 此时响应中断并抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //线程被唤醒后如果发现中断请求就抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
(3)设置超时时间获取
设置超时时间获取首先会去获取一下锁,第一次获取锁失败后会根据情况,如果传入的超时时间大于自旋时间那么就会将线程挂起一段时间,否则的话就会进行自旋,每次获取锁之后都会将超时时间减去获取一次锁所用的时间。一直到超时时间小于0也就说明超时时间用完了,那么这时就会结束获取锁的操作然后返回获取失败标志。注意在以超时时间获取锁的过程中是可以响应线程中断请求的。
private boolean doAcquireNanos(int arg, long nanosTimeout)//以限定超时时间获取锁(独占模式) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE);//将当前线程包装成结点添加到同步队列中 boolean failed = true; try { for (;;) { final Node p = node.predecessor();//获取当前结点的前继结点 if (p == head && tryAcquire(arg)) {//如果前继是head结点, 那么当前线程就再次尝试获取锁 setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); //超时时间用完了就直接退出循环 if (nanosTimeout <= 0L) return false; //如果超时时间大于自旋时间, 那么等判断可以挂起线程之后 //就会将线程挂起一段时间 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //将当前线程挂起一段时间, 之后再自己醒来 LockSupport.parkNanos(this, nanosTimeout); //在获取锁的期间收到中断请求就抛出异常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
2、线程释放锁并离开同步队列
public final boolean release(int arg) {//释放锁的操作(独占模式) if (tryRelease(arg)) {//拨动密码锁, 看看是否能够开锁 Node h = head;//获取head结点 if (h != null && h.waitStatus != 0)//如果head结点不为空并且等待状态不等于0就去唤醒后继结点 unparkSuccessor(h);//唤醒后继结点 return true; } return false; }
private void unparkSuccessor(Node node) {//唤醒后继结点 int ws = node.waitStatus;//获取给定结点的等待状态 if (ws < 0)//将等待状态更新为0 compareAndSetWaitStatus(node, ws, 0); //获取给定结点的后继结点 Node s = node.next; if (s == null || s.waitStatus > 0) {//后继结点为空或者等待状态为取消状态 s = null; for (Node t = tail; t != null && t != node; t = t.prev)//从后向前遍历队列找到第一个不是取消状态的结点 if (t.waitStatus <= 0) s = t; } if (s != null)//唤醒给定结点后面首个不是取消状态的结点 LockSupport.unpark(s.thread); }
线程持有锁进入房间后就会去办自己的事情,等事情办完后它就会释放锁并离开房间。通过tryRelease方法可以拨动密码锁进行解锁,我们知道tryRelease方法是需要让子类去覆盖的,不同的子类实现的规则不一样,也就是说不同的子类设置的密码不一样。像在ReentrantLock当中,房间里面的人每调用tryRelease方法一次,state就减1,直到state减到0的时候密码锁就开了。
这个过程像不像我们在不停的转动密码锁的转轮,而每次转动转轮数字只是减少1。CountDownLatch和这个也有点类似,只不过它不是一个人在转,而是多个人每人都去转一下,集中大家的力量把锁给开了。线程出了房间后它会找到自己原先的座位,也就是找到head结点。看看座位上有没有人给它留了小纸条,如果有的话它就知道有人睡着了需要让它帮忙唤醒,那么它就会去唤醒那个线程。如果没有的话就表明同步队列中暂时还没有人在等待,也没有人需要它唤醒,所以它就可以安心的离去了。以上过程就是在独占模式下释放锁的过程。
五、共享模式
1、在共享模式下获取锁的方式也是和独占模式一致。
(1)不响应线程中断的获取
public final void acquireShared(int arg) {//以不可中断模式获取锁(共享模式) if (tryAcquireShared(arg) < 0)//1.尝试去获取锁 doAcquireShared(arg);//2.如果获取失败就进入这个方法 }
//尝试去获取锁(共享模式)
//负数:表示获取失败
//零值:表示当前结点获取成功, 但是后继结点不能再获取了
//正数:表示当前结点获取成功, 并且后继结点同样可以获取成功
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
调用acquireShared方法是不响应线程中断获取锁的方式。在该方法中,首先调用tryAcquireShared去尝试获取锁,tryAcquireShared方法返回一个获取锁的状态,这里AQS规定了返回状态若是负数代表当前结点获取锁失败,若是0代表当前结点获取锁成功,但后继结点不能再获取了,若是正数则代表当前结点获取锁成功,并且这个锁后续结点也同样可以获取成功。
子类在实现tryAcquireShared方法获取锁的逻辑时,返回值需要遵守这个约定。如果调用tryAcquireShared的返回值小于0,就代表这次尝试获取锁失败了,接下来就调用doAcquireShared方法将当前线程添加进同步队列。我们看到doAcquireShared方法。
private void doAcquireShared(int arg) {//在同步队列中获取(共享模式) final Node node = addWaiter(Node.SHARED);//添加到同步队列中 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取当前结点的前继结点 if (p == head) {//如果前继结点为head结点就再次尝试去获取锁 //再次尝试去获取锁并返回获取状态 //r < 0, 表示获取失败 //r = 0, 表示当前结点获取成功, 但是后继结点不能再获取了 //r > 0, 表示当前结点获取成功, 并且后继结点同样可以获取成功 int r = tryAcquireShared(arg); if (r >= 0) { //到这里说明当前结点已经获取锁成功了, 此时它会将锁的状态信息传播给后继结点 setHeadAndPropagate(node, r); p.next = null; // help GC //如果在线程阻塞期间收到中断请求, 就在这一步响应该请求 if (interrupted) selfInterrupt(); failed = false; return; } } //每次获取锁失败后都会判断是否可以将线程挂起, 如果可以的话就会在parkAndCheckInterrupt方法里将线程挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
进入doAcquireShared方法首先是调用addWaiter方法将当前线程包装成结点放到同步队列尾部。这个添加结点的过程我们在讲独占模式时讲过。
结点进入同步队列后,如果它发现在它前面的结点就是head结点,因为head结点的线程已经获取锁进入房间里面了,那么下一个获取锁的结点就轮到自己了,所以当前结点先不会将自己挂起,而是再一次去尝试获取锁,如果前面那人刚好释放锁离开了,那么当前结点就能成功获得锁,如果前面那人还没有释放锁,那么就会调用shouldParkAfterFailedAcquire方法,在这个方法里面会将head结点的状态改为SIGNAL,只有保证前面结点的状态为SIGNAL,当前结点才能放心的将自己挂起,所有线程都会在parkAndCheckInterrupt方法里面被挂起。
如果当前结点恰巧成功的获取了锁,那么接下来就会调用setHeadAndPropagate方法将自己设置为head结点,并且唤醒后面同样是共享模式的结点。下面看下setHeadAndPropagate方法具体的操作。
private void setHeadAndPropagate(Node node, int propagate) {//设置head结点并传播锁的状态(共享模式) Node h = head; // Record old head for check below setHead(node);//将给定结点设置为head结点 //如果propagate大于0表明锁可以获取了 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next;//获取给定结点的后继结点 if (s == null || s.isShared())//如果给定结点的后继结点为空, 或者它的状态是共享状态 doReleaseShared();//唤醒后继结点 } }
private void doReleaseShared() {//释放锁的操作(共享模式) for (;;) { Node h = head;//获取同步队列的head结点 if (h != null && h != tail) { int ws = h.waitStatus;//获取head结点的等待状态 if (ws == Node.SIGNAL) {//如果head结点的状态为SIGNAL, 表明后面有人在排队 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//先把head结点的等待状态更新为0 continue; // loop to recheck cases unparkSuccessor(h);//再去唤醒后继结点 } //如果head结点的状态为0, 表明此时后面没人在排队, 就只是将head状态修改为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //只有保证期间head结点没被修改过才能跳出循环 if (h == head) // loop if head changed break; } }
调用setHeadAndPropagate方法首先将自己设置成head结点,然后再根据传入的tryAcquireShared方法的返回值来决定是否要去唤醒后继结点。前面已经讲到当返回值大于0就表明当前结点成功获取了锁,并且后面的结点也可以成功获取锁。
这时当前结点就需要去唤醒后面同样是共享模式的结点,注意,每次唤醒仅仅只是唤醒后一个结点,如果后一个结点不是共享模式的话,当前结点就直接进入房间而不会再去唤醒更后面的结点了。共享模式下唤醒后继结点的操作是在doReleaseShared方法进行的,共享模式和独占模式的唤醒操作基本也是相同的,都是去找到自己座位上的牌子(等待状态),如果牌子上为SIGNAL表明后面有人需要让它帮忙唤醒,如果牌子上为0则表明队列此时并没有人在排队。
在独占模式下是如果发现没人在排队就直接离开队列了,而在共享模式下如果发现队列后面没人在排队,当前结点在离开前仍然会留个小纸条(将等待状态设置为PROPAGATE)告诉后来的人这个锁的可获取状态。那么后面来的人在尝试获取锁的时候可以根据这个状态来判断是否直接获取锁。
(2)响应线程中断的获取
public final void acquireSharedInterruptibly(int arg)//以可中断模式获取锁(共享模式) throws InterruptedException { if (Thread.interrupted())//首先判断线程是否中断, 如果是则抛出异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//1.尝试去获取锁 doAcquireSharedInterruptibly(arg);//2. 如果获取失败则进人该方法 }
private void doAcquireSharedInterruptibly(int arg)//以可中断模式获取(共享模式) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//将当前结点插入同步队列尾部 boolean failed = true; try { for (;;) { final Node p = node.predecessor();//获取当前结点的前继结点 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果线程在阻塞过程中收到过中断请求, 那么就会立马在这里抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
响应线程中断获取锁的方式和不响应线程中断获取锁的方式在流程上基本是相同的,唯一的区别就是在哪里响应线程的中断请求。在不响应线程中断获取锁时,线程从parkAndCheckInterrupt方法中被唤醒,唤醒后就立马返回是否收到中断请求,即使是收到了中断请求也会继续自旋直到获取锁后才响应中断请求将自己给挂起。而响应线程中断获取锁会才线程被唤醒后立马响应中断请求,如果在阻塞过程中收到了线程中断就会立马抛出InterruptedException异常。
(3)设置超时时间的获取
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)//以限定超时时间获取锁(共享模式) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //1.调用tryAcquireShared尝试去获取锁 //2.如果获取失败就调用doAcquireSharedNanos return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)//以限定超时时间获取锁(共享模式) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED);//获取当前结点的前继结点 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L)//如果超时时间用完了就结束获取, 并返回失败信息 return false; //1.检查是否满足将线程挂起要求(保证前继结点状态为SIGNAL) //2.检查超时时间是否大于自旋时间 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout);//若满足上面两个条件就将当前线程挂起一段时间 if (Thread.interrupted())//如果在阻塞时收到中断请求就立马抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
如果看懂了上面两种获取方式,再来看设置超时时间的获取方式就会很轻松,基本流程都是一样的,主要是理解超时的机制是怎样的。如果第一次获取锁失败会调用doAcquireSharedNanos方法并传入超时时间,进入方法后会根据情况再次去获取锁,如果再次获取失败就要考虑将线程挂起了。
这时会判断超时时间是否大于自旋时间,如果是的话就会将线程挂起一段时间,否则就继续尝试获取,每次获取锁之后都会将超时时间减去获取锁的时间,一直这样循环直到超时时间用尽,如果还没有获取到锁的话就会结束获取并返回获取失败标识。在整个期间线程是响应线程中断的。
2、共享模式下结点的出队操作
public final boolean releaseShared(int arg) {//释放锁的操作(共享模式) if (tryReleaseShared(arg)) {//1.尝试去释放锁 doReleaseShared();//2.如果释放成功就唤醒其他线程 return true; } return false; }
protected boolean tryReleaseShared(int arg) {//尝试去释放锁(共享模式) throw new UnsupportedOperationException(); }
private void doReleaseShared() {//释放锁的操作(共享模式) for (;;) { Node h = head;//获取同步队列的head结点 if (h != null && h != tail) { int ws = h.waitStatus;//获取head结点的等待状态 if (ws == Node.SIGNAL) {//如果head结点的状态为SIGNAL, 表明后面有人在排队 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//先把head结点的等待状态更新为0 continue; // loop to recheck cases unparkSuccessor(h);//再去唤醒后继结点 } //如果head结点的状态为0, 表明此时后面没人在排队, 就只是将head状态修改为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) //只有保证期间head结点没被修改过才能跳出循环 break; } }
线程在房间办完事之后就会调用releaseShared方法释放锁,首先调用tryReleaseShared方法尝试释放锁,该方法的判断逻辑由子类实现。如果释放成功就调用doReleaseShared方法去唤醒后继结点。走出房间后它会找到原先的座位(head结点),看看座位上是否有人留了小纸条(状态为SIGNAL),如果有就去唤醒后继结点。如果没有(状态为0)就代表队列没人在排队,那么在离开之前它还要做最后一件事情,就是在自己座位上留下小纸条(状态设置为PROPAGATE),告诉后面的人锁的获取状态,整个释放锁的过程和独占模式唯一的区别就是在这最后一步操作。