AQS:AbstractQueuedSynchronizer直譯"(抽象)隊列同步器"。AQS是java.util.concurrent的核心類。它是構(gòu)建同步器的模板。java.util.concurrent包中很多鎖和其它并發(fā)工具類都依靠AQS或其子類實現(xiàn)相關(guān)功能。看下圖:

AQS類圖如下:

2.1. AQS中的成員類
AQS中有兩個成員類 Node 和 ConditionObject。
2.1.1. ConditionObject
ConditionObject實現(xiàn)了Condition接口,是同步器的內(nèi)部類。一個ConditionObject對象對應(yīng)一個等待隊列,實現(xiàn)實現(xiàn)線程間的等待通知等關(guān)鍵功能。ConditionObject對wait/notify關(guān)鍵字的功能實現(xiàn)了覆蓋、擴展。

ConditionObject核心方法及屬性
signal():喚醒特定等待隊列中頭結(jié)點對應(yīng)的線程。將對應(yīng)節(jié)點從等待隊列移動到同步隊列。
signalAll():喚醒特定等待隊列中的所有線程。從等待隊列頭結(jié)點開始遍歷等待隊列,逐個signal。
await():從同步隊列中釋放,將當前線程包裝成新的節(jié)點加入到等待隊列中等待。
await衍生方法:awaitUninterruptibly()、awaitNanos(long nanosTimeout)、awaitUntil(Date deadline)、await(long time, TimeUnit unit)。
以上方法為Condition接口中定義方法的實現(xiàn)。
firstWaiter: 等待隊列的頭結(jié)點
lastWaiter: 等待隊列的尾節(jié)點
2.1.2. Node
Node是同步隊列、等待隊列中節(jié)點的類型。每個Node都保存有同步隊列前后序節(jié)點信息及等待隊列后序節(jié)點信息。還有一個比較重要的屬性是等待狀態(tài)waitStatus。詳見下面分析。

Node核心屬性
waitStatus:節(jié)點的等待狀態(tài)。
- SIGNAL:-1。當前節(jié)點的后續(xù)節(jié)點被阻塞或即將被阻塞。當前節(jié)點對應(yīng)的線程釋放或取消的時候,必須喚醒它的后續(xù)節(jié)點。為了避免競爭,acquire方法必須首先表明它們需要一個信號。然后重試CAS獲取同步狀態(tài)操作。如果失敗,則阻塞當前線程。
- CANCELLED:1。由于超時或者中斷節(jié)點被取消。節(jié)點被置為取消后不會有其它的狀態(tài)變化。
- CONDITION: -2。表示節(jié)點正在一個condition的等待隊列。此時節(jié)點不能被用作同步隊列的節(jié)點。直到節(jié)點被移動到同步隊列。這個時候節(jié)點的狀態(tài)被設(shè)置成0。
- PROPAGATE: -3。傳遞狀態(tài)。只有頭結(jié)點能被設(shè)置為傳遞狀態(tài)。在 releaseShared方法中設(shè)置保障傳遞的進行。
prev:同步隊列的前序節(jié)點
next:同步隊列的后序節(jié)點
thread:當前Node對應(yīng)的線程。線程進入同步隊列前會將自身包裝成一個Node。
nextWaiter:等待隊列的后序節(jié)點
2.2. AQS核心屬性
state:同步狀態(tài)。AQS中提供compareAndSetState方法保障狀態(tài)設(shè)置的原子性。 獨占模式下:一般0表示同步器未被占用,1、2、3...N表示同步器被占用。1、2、3...N代表重入的次數(shù)。這塊與可重入鎖相關(guān)。
exclusiveOwnerThread:同步器的獨占線程。顧名思義。繼承自AbstractOwnableSynchronizer。
head:同步隊列頭結(jié)點
tail:同步隊列尾節(jié)點
2.3. 同步隊列、等待隊列
AQS中有兩個關(guān)鍵的數(shù)據(jù)結(jié)構(gòu):同步隊列和等待隊列。同步隊列中的節(jié)點等待獲取同步狀態(tài)。等待隊列中的節(jié)點等待(條件成熟)被通知通知,然后移動到同步隊列等待獲取同步狀態(tài)。

2.3.1 同步隊列
同步隊列是一個非阻塞的 FIFO雙向隊列。通過自旋和 CAS操作保證節(jié)點插入的原子性。實現(xiàn)無鎖快速插入。每次移除的都是head節(jié)點,故移除操作不存在競爭。
同步隊列的head節(jié)點永遠是一個啞結(jié)點(dummy node), 它不關(guān)聯(lián)任何線程。

如多個線程競爭同步狀態(tài),當前線程未能獲得同步狀態(tài)。當前線程會被包裝成節(jié)點加入到同步隊列隊尾(自旋中進行CAS操作,直到成功)。

當前線程獲得同步狀態(tài)時(如執(zhí)行ReentrantLock的lock方法成功),會釋放頭結(jié)點。同時將當前線程對應(yīng)的節(jié)點設(shè)為頭結(jié)點:

2.3.2. 等待隊列&節(jié)點在隊列間的移動
一個Condition對應(yīng)一個等待隊列。實現(xiàn)和擴展等待通知(wait/notify)模式。等待隊列是一個單向隊列。

當前線程執(zhí)行require方法時,如果成功獲取同步狀態(tài),頭結(jié)點會被釋放。而執(zhí)行conditionA.await方法會將當前線程包裝成一個新的,等待狀態(tài)為CONDITION的Node節(jié)點加入到conditionA等待隊列的尾部。實際上不存在移動。而是1.從同步隊列中移除 2.在等待隊列尾部加入一個新的等待節(jié)點 兩步操作。(第2.部操作后當前線程才會釋放同步狀態(tài)。避免競爭。)

調(diào)用conditionA.signal方法時,會把conditionA對應(yīng)等待隊列的頭結(jié)點從等待隊列移除(1.firstWaiter指向原有頭結(jié)點的下一個合法節(jié)點 2.原有頭結(jié)點的nextWaiter屬性設(shè)為null)。然后將這個節(jié)點等待狀態(tài)設(shè)置為0放入同步隊列。

2.4. AQS核心方法分析
2.4.1 需要子類覆蓋的方法
tryAcquire:獨占式獲取
tryRelease:獨占式釋放
tryAcquireShared:共享式獲取
tryReleaseShared:共享式釋放
isHeldExclusively:這個同步器是否被獨占式獲得。
AQS應(yīng)用模板方法設(shè)計模式。模板方法(Template Method)模式:定義一個操作中的算法骨架,而將算法的一些步驟延遲到子類中,使得子類可以不改變該算法結(jié)構(gòu)的情況下重定義該算法的某些特定步驟。它是一種類行為型模式。
核心方法的調(diào)用邏輯及部分基礎(chǔ)方法已經(jīng)寫好。子類僅需覆蓋實現(xiàn)上述方法。便可實現(xiàn)同步器的相關(guān)功能。
2.4.2 獲取鎖、釋放鎖相關(guān)的方法
以ReentrantLock為例,分析lock、unlock的流程。
ReentrantLock中包含Sync sync(同步器)成員屬性。Sync繼承AQS。Sync 有兩個子類FairSync和NonfairSync。直譯是“公平同步器”和“非公平同步器”。它們分別是公平鎖和非公平鎖的實現(xiàn)的核心。在ReentrantLock構(gòu)造時可傳入同步器使用FairSync或是NonfairSync。默認使用NonfairSync。
公平鎖、非公平鎖的概念見鎖的分類及相關(guān)概念 章節(jié)六。
ReentrantLock的lock、unlock核心方法均由同步器實現(xiàn)。
2.4.2.1 獲取鎖相關(guān)方法
流程圖如下:

請結(jié)合流程圖理解下面的方法分析。
1.ReentrantLock的lock方法
/**
* 調(diào)用成員屬性Sync sync的lock方法實現(xiàn)功能
*/
public void lock() {
sync.lock();
}
沒啥好說的,直接調(diào)用了同步器的lock方法。默認情況下:ReentrantLock默認同步器的類型為NonfairSync。
2.NonfairSync的lock方法
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
CAS操作設(shè)定同步狀態(tài)。如果成功則將當前線程設(shè)定為同步器的獨占線程。如果失敗則調(diào)用AQS實現(xiàn)的獲取同步狀態(tài)的方法acquire。
3.AQS的acquire方法
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
調(diào)用Sync的tryAcquire方法。如果成功,則lock過程結(jié)束。如果失敗則調(diào)用AQS中實現(xiàn)的addWaiter、acquireQueued方法。
addWaiter:將當前線程包裝成節(jié)點,加入同步隊列的尾部。
acquireQueued:自旋判斷:當前線程對應(yīng)的節(jié)點是否滿足執(zhí)行條件/阻塞條件。如果滿足則做對應(yīng)的操作。
addWaiter、acquireQueued詳細分析見下文。
4.NonfairSync的tryAcquire方法
/**
*
*【NonfairSync實現(xiàn)】嘗試獲取同步狀態(tài)。不管成功與否立即返回
* @param acquires
* @return
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
*【Sync實現(xiàn)】非公平嘗試獲取同步狀態(tài)。所謂非公平即:非先到先得
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果同步狀態(tài)status為0(0代表當前同步器未被線程占用),則進行一次CAS操作設(shè)定同步狀態(tài)。如果設(shè)定同步狀態(tài)成功。則繼續(xù)操作,將當前線程設(shè)定為同步器的獨占線程。
如果同步狀態(tài)不為0且當前線程是同步器的獨占線程(說明當前線程正持有鎖)。則將同步狀態(tài)累加(可重入鎖的邏輯)。由于當前線程是同步器的獨占線程,不存在競爭,設(shè)定同步狀態(tài)使用普通賦值操作setState即可。
5.AQS的addWaiter方法
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
將當前線程包裝成一個節(jié)點Node。
如果同步隊列不為空,則進行一次CAS操作將當前節(jié)點設(shè)置成尾節(jié)點。否則不執(zhí)行。
如果設(shè)置成功,則方法返回:將Node對象作為參數(shù)傳遞給acquireQueued方法。
執(zhí)行enq方法進行入隊(同步隊列)自旋。
6.AQS的enq方法
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
主要是一個自旋CAS入隊操作,直到成功。自旋中的邏輯:
如果隊列為空則new一個Node對象設(shè)置成頭結(jié)點。(這里有一個知識點要注意。同步隊列的頭結(jié)點不關(guān)聯(lián)任何線程,是一個啞結(jié)點Dummy Node。所以當隊列為空時,必須先new一個Node放到頭部)。
如果隊列非空。則將當前節(jié)點的前序節(jié)點設(shè)為尾節(jié)點tail。然后執(zhí)行CAS操作將當前節(jié)點設(shè)為尾節(jié)點。如果成功,則將原有尾節(jié)點的next元素設(shè)為當前節(jié)點,將尾節(jié)點(即當前節(jié)點)作為參數(shù)返回給acquireQueued方法。如果失敗則繼續(xù)自旋直到成功。
7.AQS的acquireQueued方法
/**
*【AQS實現(xiàn)】阻塞節(jié)點或者頭結(jié)點出隊
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
自旋獲取同步狀態(tài)或在自旋中阻塞。方法主體也是一個自旋操作,返回是否在等待時中斷。
自旋操作中:
1.判斷 node.predecessor() == head && tryAcquire(arg)。如果為true,則原有頭結(jié)點出隊,將當前節(jié)點設(shè)置成頭結(jié)點(setHead方法中會將節(jié)點關(guān)聯(lián)的線程設(shè)置為null),返回中斷標志位。
2.當前節(jié)點是否應(yīng)該阻塞(分析見下文)。如果是,則調(diào)用parkAndCheckInterrupt阻塞當前線程,并將當前線程的中斷標志位返回給acquire方法,同時復(fù)位線程的中斷狀態(tài)。這里保存了線程被阻塞前的中斷狀態(tài)。
3.如果沒有返回/阻塞,則繼續(xù)自旋。
8.AQS的shouldParkAfterFailedAcquire方法
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
判斷前序節(jié)點的等待狀態(tài):
如果等待狀態(tài)是SIGNAL(等待喚醒;release操作會喚醒同步隊列中頭結(jié)點的下一個節(jié)點對應(yīng)的線程)。是返回true。
如果等待狀態(tài)大于0(即CANCELLED狀態(tài))則追溯前序節(jié)點,將CANCELLED狀態(tài)的前序節(jié)點移出同步隊列。直到前序節(jié)點狀態(tài)小于等于0。返回false。(在acquireQueued中繼續(xù)自旋)
如果是其它情況則進行CAS操作將前序節(jié)點的狀態(tài)設(shè)為SIGNAL。返回false。(在acquireQueued中繼續(xù)自旋)
9.AQS的parkAndCheckInterrupt方法
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
阻塞當前線程。返回中斷標志位,并將中斷標志位復(fù)位(設(shè)為false)。
2.4.2.2 釋放鎖相關(guān)方法
流程圖如下:

請結(jié)合流程圖理解下面的方法分析。
1.ReentrantLock的unlock方法
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
2.AQS的release方法
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
調(diào)用tryRelease嘗試釋放同步狀態(tài):
如果成功,且隊列不為空&&頭結(jié)點的等待狀態(tài)不為0,則喚醒后續(xù)節(jié)點。
...這里可以和lock流程關(guān)聯(lián)起來看下。被喚醒的節(jié)點繼續(xù)執(zhí)行acquireQueued自旋:判斷前序節(jié)點是否為頭結(jié)點。如果是則執(zhí)行tryAcquire方法(方法分析見上面)。如果成功,則釋放頭結(jié)點,將當前線程對應(yīng)的節(jié)點設(shè)為頭結(jié)點...
2.Sync的tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果當前線程不是同步器的獨占線程,則拋異常。如果當前線程是同步器的獨占線程(持有鎖)則繼續(xù)執(zhí)行下面的邏輯:
1.判斷狀態(tài)同步狀態(tài)是否為0(...可重入鎖..)。如果是,則將釋放標志位置為true,同時將同步器的獨占線程設(shè)為null。如果不是,則不作操作。
2.設(shè)置同步狀態(tài)為getState() - releases。(由于當前線程為同步器的獨占線程,不存在競爭。故僅需用普通的賦值操作setState設(shè)定同步狀態(tài))。
3.AQS的unparkSuccessor方法
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
主要功能:喚醒后續(xù)節(jié)點
1.如果當前節(jié)點等待狀態(tài)waitStatus小于0(即CONDITION或SIGNAL或PROPAGATE),則進行CAS操作將當前節(jié)點的等待狀態(tài)設(shè)為0。
2.如果后續(xù)節(jié)點為空或者后續(xù)節(jié)點等待狀態(tài)為CANCELLED。則從尾節(jié)點開始向前追溯,直到當前節(jié)點的前一個節(jié)點。取離當前節(jié)點最近且等待狀態(tài)不為CANCELLED的節(jié)點對應(yīng)的線程作為被喚醒的線程。
- acquireQueued方法是線程阻塞和被喚醒的地方。
2.4.3. 等待、通知相關(guān)的方法
不展開分析,方法邏輯及調(diào)用流程見下圖。
2.4.3.1 等待相關(guān)方法
ConditionObject.await方法流程如下

2.4.3.2 通知相關(guān)方法
Condition.signal方法流程如下

await/signal過程中節(jié)點在等待隊列、同步隊列之間的移動見 章節(jié)2.3.2。
關(guān)于await/signal,這里引用JAVA并發(fā)編程(六)顯示鎖的一段話
需要注意的是:一般情況下,實現(xiàn)線程等待通知使用wait()和notifyAll()方法,而不用notify()方法?;蛘逤onditionObject實現(xiàn)的await()、signal()、signalAll()方法。
這是因為:
1.使用原生關(guān)鍵字synchronized,代碼中無從得知有多少種類型的線程。不同類型的線程獲取對象的鎖之后,判定是否可執(zhí)行的條件并不相同。如接機線程、擺渡車線程,一個是判斷城市、一個判斷行距。如果僅通知一個:極端情況下,notify的都是不符合執(zhí)行條件的線程,而這些線程又馬上進入阻塞狀態(tài)。符合執(zhí)行條件的線程永遠不會被喚醒。故需要通知所有在這個對象資源上等待的線程。
2.Condition是代碼可控的條件。如我們可以聲明一個接機的Condition、一個擺渡車的Condition。兩個Condition下分別對應(yīng)一個等待隊列。當位置變化時,我們分別通知接機Condition和擺渡車Condition等待隊列中第一個線程。這樣就能夠保障符合執(zhí)行條件的線程能夠被喚醒。優(yōu)雅地實現(xiàn)基于多個條件的等待與通知操作。
lock、unlock、await、signal的流程圖整理足足用了兩周的業(yè)余時間。有些代碼寫的真的反人類:一行之中涉及幾層函數(shù)調(diào)用、能省"{}"就省、傳遞賦值等等,令人費解。整理完再次過這些流程時有種豁然開朗、受益匪淺的感覺。真正理解AQS的源碼之美(自旋中阻塞、喚醒,節(jié)點在隊列間的移動、狀態(tài)變化,CAS操作自旋入隊...),要實際走一遍代碼。
最后安利一下:源碼分析是程序猿自我修養(yǎng)的組成部分。源碼分析不僅僅能幫助我們理解通用組件/框架的工作原理,從而讓我們能更好地應(yīng)用這些組件/框架。還可以讓我們學習到其中的編碼思想及控制流轉(zhuǎn)邏輯,狀態(tài)變換、傳遞機制,數(shù)據(jù)結(jié)構(gòu)等等,在實際項目中應(yīng)用。