上一篇文章:JUC系列 - AQS簡介 介紹了AQS,本篇將結(jié)合AbstractQueuedSynchronizer源碼來分析。
CLH同步隊(duì)列
CLH同步隊(duì)列是一個FIFO雙向隊(duì)列,AQS依賴它來完成同步狀態(tài)的管理,當(dāng)前線程如果獲取同步狀態(tài)失敗時,AQS則會將當(dāng)前線程已經(jīng)等待狀態(tài)等信息構(gòu)造成一個節(jié)點(diǎn)(Node)并將其加入到CLH同步隊(duì)列,同時會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時,會把首節(jié)點(diǎn)喚醒(公平鎖),使其再次嘗試獲取同步狀態(tài)。
在CLH同步隊(duì)列中,一個節(jié)點(diǎn)表示一個線程,它保存著線程的引用(thread)、狀態(tài)(waitStatus)、前驅(qū)節(jié)點(diǎn)(prev)、后繼節(jié)點(diǎn)(next),其定義如下:
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 獨(dú)占 */
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
/** 前驅(qū)節(jié)點(diǎn) */
volatile Node prev;
/** 后繼節(jié)點(diǎn) */
volatile Node next;
/** 獲取同步狀態(tài)的線程 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

入隊(duì)(enqueue)
熟悉數(shù)據(jù)結(jié)構(gòu)的同學(xué),CLH隊(duì)列入隊(duì)是再簡單不過了,無非就是tail指向新節(jié)點(diǎn)、新節(jié)點(diǎn)的prev指向當(dāng)前最后的節(jié)點(diǎn),當(dāng)前最后一個節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 入隊(duì)(enqueue)
*/
private Node enq(final Node node) {
for (;;) { // 自旋過程
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter(Node node)先通過快速嘗試設(shè)置尾節(jié)點(diǎn),如果失敗,則調(diào)用enq(Node node)方法設(shè)置尾節(jié)點(diǎn)。

出隊(duì)(dequeue)
CLH同步隊(duì)列遵循FIFO,首節(jié)點(diǎn)的線程釋放同步狀態(tài)后,將會喚醒它的后繼節(jié)點(diǎn)(next),而后繼節(jié)點(diǎn)將會在獲取同步狀態(tài)成功時將自己設(shè)置為首節(jié)點(diǎn),這個過程非常簡單,head執(zhí)行該節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的next和當(dāng)前節(jié)點(diǎn)的prev即可,注意在這個過程是不需要使用CAS來保證的,因?yàn)橹挥幸粋€線程能夠成功獲取到同步狀態(tài)。
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
獨(dú)占式同步狀態(tài)獲取
acquire(int arg)方法為AQS提供的模板方法,該方法為獨(dú)占式獲取同步狀態(tài),但是該方法對中斷不敏感,也就是說由于線程獲取同步狀態(tài)失敗加入到CLH同步隊(duì)列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步隊(duì)列中移除。代碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}