Java 并发编程之AbstractQueuedSynchronizer源码解析
上一篇中,我们讲了 AbstractQueuedSynchronizer 的使用,链接为 Java 并发编程之AbstractQueuedSynchronizer解析 ,这一节中,我们将会从源码的角度解读:
一、双向链表:
AbstractQueuedSynchronizer中使用了双向链表来作为同步器的队列,来保证FIFO。
双向链表,首先会有一个header与tail的指针,这个不是节点,而是指针,header指向第一个节点,即Node1,所以 header == node1 。
其次各个节点之间会有pre与next指针。
二、获取锁:
下面将以Thread1获取到锁,Thread2接着获取锁,然后Thread2被阻塞,Thread1执行完成后,调用release()方法,接着Thread2获取到锁的例子来讲。
首先看之前需要明确,compareAndSetState, compareAndSetHead(设置AbstractQueuedSynchronizer中的头结点)compareAndSetTail(设置AbstractQueuedSynchronizer中的尾结点),这些都是原子方法,多个线程同时调用可以保证线程安全,所以不必考虑多线程的影响。
另外,看源码可以使用IDEA的调试功能,否则只自己看看不出来的,一调试就出来了。
1、Thread1获取到锁
获取到锁,方法即为acquire():
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
注意这个方法虽然没几行,但是东西可多了去了,首先就是第一个方法,tryAcquire()方法,其会调用自己的获取锁方法,如果成功了,那么就直接返回。
因为Thread1调用acquire()方法的时候此时没有线程获取锁,然后Thread1获取到了锁,直接返回,此时 AbstractQueuedSynchronizer 内部是这样:
2、Thread2接着获取锁,然后Thread2被阻塞:
接着被阻塞的Thread2会生成一个第一个节点的,然后把自己置为最后一个节点:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
因为Thread1获取到了锁,并没有释放,所以此时Thread2调用 acquire() 方法中的第一个tryAcquire()方法肯定是失败的,所以开始走 addWaiter() 方法,然后发现 pred 为空(也就是tail为空),然后走 enq()方法,此时发现tail为空,那么设置调用 compareAndSetHead() 来设置header为一个空节点,然后继续循环,在将自己设置为尾节点,因为 compareAndSetHead() 与 compareAndSetTail() 都为原子方法,同一时间两个线程来调用只有一个能成功,而且这个失败了可以循环,所以两个线程来调用这个方法,是没有问题,可以自己想一下。
此时,开始走 acquireQueued() 方法了,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看,这个线程会一直阻塞在这个方法里面,也就是自旋,当它发现自己的前驱节点就是header的时候,会调用 tryAcquire() 方法来获取锁。所以第二个节点会一直去试着获取锁,第三个就不会,但当然第三个也会一直自旋。
3、Thread1调用release()方法,Thread2获取到锁:
Thread1调用release()方法后,并没有去做什么特殊的操作,这时候会调用tryRelease()方法来将Sync状态置为可获取锁状态。然后此时其它线程调用 tryAcquire() 方法是可以成功,所以Thread2 在acquireQueued()方法中获取成功后,会调用 setHead(node) 方法将header节点置为自己,然后将自己节点中的Thread置为空,然后把自己的前置指针置为null,然后把之前的node1的next置为null,这样node1就会被gc掉。