JDK剖析之AQS—— AbstractQueuedSynchronizer
一、概述
谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronized(AQS)!
类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。
请尊重作者劳动成果,转载请标明原文链接:http://www.cnblogs.com/waterystone/p/4920797.html
二、框架
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
三、源码解析
3.1acquire
3.1.1.acquire(int)
1.tryAcquire()尝试获取资源若成功直接返回
2.addWaiter()将该线程添加到队列尾部,并且表为独占模式。
3.acquireQueued()使线程在等待队列中获取,一直到获取资源后才返回。如果整个过程中被中断过则返回true,否则返回false
4.如果线程在等待中被中断过,并不会响应。只是会在获取资源后调用自身的selfInterrupt。
3.1.2.tryAcquire(int)
此方法尝试获取资源如果获取成功返回true,否则返回false。
可是源码中直接返回异常。为什么?
前面提到过AQS只是一个框架具体的实现应该交由自定义同步器去实现
这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
3.1.3.addWaiter(Node)
将当前线程放入队列尾,若放入失败则调用enq(Node)将node放入队尾,并返回当前线程所在的节点。
3.1.4.enq(Node)
4
3.1.5.acquireQueued()
1.failed = true //是否拿到前驱标志
2.获得队列前驱(node.predecessor())
3.如果前驱是head则说明该节点为队列中的老二,有资格获取资源。
4.拿到资源后将head节点设为该节点,返回interrupt
5.如果该节点可以休息了,就进入waiting状态,直到被unpark()
6.如果等待过程被中断过,interrupet被设为true
3.1.6.shouldParkAfterFailedAcquire()
步骤:
1.如果已经告诉前驱拿到资源后通知自己一下,那么久可以进入休息状态
2.如果前驱已经放弃就往前找,直到找到一个正常等待的节点,并排到他后面
3.如果前驱正常,就把前驱设为SIGNAL,告诉他拿完资源后通知自己。
3.1.7.arkAndCheckInterrupt()
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。
3.1.8acquire()总结
1.使用自定义同步器的tryAcquire去获取资源,如果得到了直接返回
2.addWaiter()加入等待队列尾部,并标记为独占模式
3.acquireQueued()使得线程加入等待队列中
4.如果线程在等待中被中断过则返回true。
该方法流程:
3.2release
3.2.1release(int)
1.如果释放成功(tryRelease()),找到头结点
2.唤醒等待队列里的下一个线程
3.2.2tryRelease(int)
与tryAcquire一样需要自定义同步器去实现
3.2.3unparkSuccessor(Node)
本方法用于唤醒等待队列中的下一个线程
用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了
3.2.3小结
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。