ReentrantLock源码分析
AQS简要
reentrantLock内部最重要的实现是基于这个同步容器做的
官方解释
AQS(AbstractQueuedSynchronizer):为java中管理锁的抽象类。该类为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同
步器(信号量、事件,等等)提供一个框架。该类提供了一个非常重要的机制,在JDK API中是这样描述的:为实现依赖于先进先出
(FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状
态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假
定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步而只追踪使
用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。
这么长的话用一句话概括就是:维护锁的当前状态和线程等待列表。
AQS同步队列结构
获取同步状态的流程图
reentrantLock结构图
reentrantLock默认是非公平锁,此处先讲了公平锁
公平锁(FairSync)
lock()
当调用ReentrantLock的lock方法时,会直接调用他的内部对象Sync的方法
public void lock() { sync.lock(); } final void lock() { acquire(1); } |
由上可知,最终会调用AQS的acquire方法
public final void acquire(int arg) { // 尝试获取锁,如果获取锁失败,那么将当前线程执行信息,放入到AQS的内部队列中 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 中断当前线程 selfInterrupt(); } |
tryAcquire方法是在FairySync中实现的,具体源码如下
protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取变量state, 这个变量非常重要,全程主要是围绕这个变量来做事 int c = getState(); // 当变量state==0 的时候,表示当前锁是空闲的,可以获取 if (c == 0) { // 判断当前线程是不是等待最久的线程,就是说判断当前线程是不是在等待队列的第二个元素 // 因为队列head是当前(之前)拥有锁的线程 // 如果是,则表示等待时间最久 , 返回false , 表示没有线程比它等的更久 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 设置当前执行线程(一个内部变量,为了后面的可重入功能)为 “当前线程” setExclusiveOwnerThread(current); return true; } } // state !=0 , 并且当前线程==当前执行器线程(一个内部变量) else if (current == getExclusiveOwnerThread()) { // 对state赋值, 由此可以看出,ReentrantLock是一个可重入锁, // 但是有一点,就是重入多少次,就必须要unlock多少次,以保证最终state==0 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } |
公平锁的格外处理,保证等待最久的线程被优先获取锁
public final boolean hasQueuedPredecessors() { Node t = tail; // 尾部元素 Node h = head; // 头部元素 // 队列为空的情况下也是返回false Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } |
addWaiter方法在AbstractQueuedSynchronizer中实现
private Node addWaiter(Node mode) { // 构建一个节点 Node node = new Node(Thread.currentThread(), mode); // 放在队列尾端,设置当前的尾节点,为新建节点的上级节点 Node pred = tail; if (pred != null) { node.prev = pred; //如果尾部节点存在,那么新加入的节点的上级节点为之前的尾部节点 if (compareAndSetTail(pred, node)) { //通过CAS操作设置尾部节点 pred.next = node; //之前的尾部节点,把他的下级节点设置为新加入的那个 return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 说明队列为null,必须初始化 //compareAndSetHead 初始化一个空的头部节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } |
以上代码,通过compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全的添加到同步队列的尾部。
在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在死循环中,只有通过CAS将节点设置为尾
节点后,当前线程才能从该方法返回,否则,当前线程不断得尝试设置
然后还有一个方法,acquireQueued, 这个方法的作用是阻塞线程,重试的去获取锁
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取上级节点 if (p == head && tryAcquire(arg)) {//成功获取同步状态 setHead(node);//将获取到锁节点设置为头节点 p.next = null; // help GC failed = false; return interrupted; } // 此处应该是检查上级节点的waitStatue , 判断是否要阻塞当前节点线程 // parkAndCheckInterrupt() 就是调用线程阻塞 if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt() interrupted = true; } } finally { if (failed) cancelAcquire(node); } } |
通过上面可以得知,锁的竞争主要是获取state的状态,0表示没有线程占有他,大于0表示已经被占用了。
公平锁的实现在于判断当前线程是否是head的下一个元素,如果是,那么就OK,就是你了。 因为head一般是
上一个占有锁的线程所有。具体获取同步状态的流程图,可以看上面AQS的简要,第三点
unlock()
public void unlock() { sync.release(1); } |
public final boolean release(int arg) { // 修改state值,减一 if (tryRelease(arg)) { Node h = head; // 获取他头结点信息 (头部结点,就是当前拥有锁的线程所拥有) if (h != null && h.waitStatus != 0) // 主要作用是唤醒头结点的下一个结点 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取它的next结点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒 LockSupport.unpark(s.thread); } |
protected final boolean tryRelease(int releases) { // 减法 int c = getState() - releases; // 判断是否是执行线程去调用的unLock() if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 当state=0,释放成功(锁已经空闲) free = true; setExclusiveOwnerThread(null); } setState(c); return free; } |
非公平锁(NonfairSync)
lock()
final void lock() { // 当线程进来之后,会直接判断当前state的值。如果是0 ,他能够直接通过 // cas操作,设置state的值为1 的话,那么竞争锁成功 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 上面设置失败的话,那么直接调用acquire方法 acquire(1); } |
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } |
调用nofairSync的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 大致逻辑和公平锁一致,唯一不同的是,这里并不会去判断当前线程是否是等待最久的线程。 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } |
通过公平锁acquire的分析,我们可以看到,在acquire方法中,会调用nonfairTryAcquire() 方法,再次竞争一次锁,
不会去判断是否是等待最久的线程,因此失去了公平性
unlock()
跟公平锁一致
总结:
通过对公平锁的分析,在unlock的时候,会对head节点的next节点做唤醒操作, 也就是唤醒下一个节点来竞争锁,
那么体现非公平锁的特性来了, 当唤醒下一个节点来竞争锁的时候, 又有几个其他线程调用了lock方法,这个时候
另外几个线程和next节点所代表的线程都会去竞争锁,并不保证next节点能够一定获取到锁。