什么是AQS?
AQS 即 AbstractQueuedSynchronizer 抽象隊列同步器,它是Java并發(fā)用來構(gòu)建鎖和其他同步工具的基礎框架,是一個抽象類。AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現(xiàn)都依賴于它,如常用的ReentrantLock、Semaphore、CountDownLatch。
ReentrantLock是如何加鎖的?AQS是如何被使用的?
ReentrantLock 通過調(diào)用對象的lock()和unlock()方法來加鎖解鎖,下面分析一下源碼中加鎖的過程,在加鎖過程中使用了AQS類。
lock方法的內(nèi)部調(diào)用關系如圖:

首先看一下默認使用非公平鎖的時候的調(diào)用過程:
- ReentrantLock調(diào)用lock()方法的時候,它會調(diào)用Sync的acquire(1)方法;
- 因為 Sync 是 ReentrantLock 內(nèi)抽象的靜態(tài)內(nèi)部類,它繼承了AQS抽象類,Sync 內(nèi)又沒重寫 acquire(),所以直接調(diào)用的是AQS的 acquire(1);
- AQS的 acquire(1),會調(diào)AQS內(nèi)的 tryAcquire(1);
- AQS內(nèi)的 tryAcquire(1) 方法內(nèi)只是拋出了個異常,實際上調(diào)的是 ReentrantLock 中 NonfairSync 類重寫的 tryAcquire(1);如果采用的是公平鎖的話,這里會調(diào)用 FairSync類(繼承Sync) 重寫的 tryAcquire 方法
- NonfairSync 繼承了 Sync,tryAcquire(1) 調(diào)用的是父類 Sync 的nonfairTryAcquire(1)。(FairSync 直接實現(xiàn)了 tryAcquire 方法,無其它調(diào)用)
至此就是整個調(diào)用過程,下是源碼中的Sync和它的子類:

AQS原理及源碼解讀
上面讀到nonfairTryAcquire(acquires),就必須要了解實現(xiàn)了。下面先分析一下AQS的原理。
AQS隊列結(jié)構(gòu)淺析
AQS內(nèi)部維護著一個 volatile int state 和一個FIFO(雙向隊列)線程等待隊列(多線程爭用資源被阻塞時會進入此隊列),即CLH隊列。這兩個是AQS的核心。
注:CLH 是論文《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》 三位作者的縮寫,AQS就是根據(jù) CLH 鎖的變種實現(xiàn)的
AQS的同步機制就是依靠CLH隊列實現(xiàn)的。state所代表的意義由子類來定,它的值是根據(jù)子類不同的實現(xiàn)取不同的意義。拿ReentrantLock來說,state初始為0,當線程得到了這把鎖后state會變?yōu)?,重入一次會+1,釋放一次-1,減到0完全釋放,所以在ReentrantLock中state為0和1就代表了加鎖和解鎖。
雙向隊列由鏈表結(jié)構(gòu)實現(xiàn),AQS內(nèi)部定義了
/**
* Wait queue node class.
*/
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// Node中保存的是一個個線程
volatile Thread thread;
Node nextWaiter;
// 只截取了部分源碼,完整源碼請參照jdk11
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
CLH隊列是FIFO的雙端雙向隊列。線程通過AQS獲取鎖失敗,就會將線程封裝成一個Node節(jié)點,插入隊列尾。當有線程釋放鎖時,會嘗試把
head.next節(jié)點的線程喚醒(LockSupport的unpark方法)。被喚醒的線程獲得鎖后node節(jié)點隨之出隊。

當線程acquire(1)以后,如果state的值是0,那就直接拿到鎖,默認是非公平的上來就搶,搶不著就進隊列里acquireQueued()。
來看一下源碼:
// AQS類方法
public final void acquire(int arg) {
// 先tryAcquire嘗試獲取鎖,若返回true表示獲取成功,
// 由于&&符號邏輯短路,符號后部分不會執(zhí)行
if (!tryAcquire(arg) &&
// 若獲取失敗,線程排隊等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire失敗會調(diào)用 acquireQueued() 方法:
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// 節(jié)點進入等待隊列,執(zhí)行死循環(huán)自旋
for (;;) {
// 拿到前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點是head,并且當前線程成功獲得了鎖
if (p == head && tryAcquire(arg)) {
// 把當前節(jié)點設置為頭節(jié)點(head指針指向當前節(jié)點)
// 并把thread和prev置空(因為現(xiàn)在已經(jīng)獲取到鎖,不再需要記錄這個節(jié)點所對應的線程了)
setHead(node);
// 把舊的head的next置空,使舊的head出隊
p.next = null; // help GC
return interrupted;
}
// 判定前一個節(jié)點的狀態(tài)是否為 Node.SIGNAL
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
這是一個死循環(huán),除非進入if(p == head && tryAcquire(arg))判定條件,獲取node節(jié)點的前一個節(jié)點p。當p為頭節(jié)點并且當前線程tryAcquire成功時,用setHead(node)把當前節(jié)點設置為頭節(jié)點,同時把節(jié)點屬性thread、prev置空。然后再把p的next節(jié)點置空使使舊的head出隊。
總結(jié)起來就是把舊的頭部斷開,head指針指向一個新的頭部,這個新head內(nèi)thread屬性也是空的,因為現(xiàn)在已經(jīng)獲取到鎖,不再需要記錄這個節(jié)點所對應的線程了。

- 如果上面判定條件失敗
會首先判定:shouldParkAfterFailedAcquire(p , node),這個方法內(nèi)部會判定前一個節(jié)點的狀態(tài)是否為:Node.SIGNAL,若是則返回true,不是返回false,不過會再做一些操作:判定節(jié)點的狀態(tài)是否大于0,若大于0則認為被 CANCELLED 掉了(大于0的只可能被CANCELLED的狀態(tài)),因此會從前一個節(jié)點開始逐步循環(huán)找到一個沒有被 CANCELLED 節(jié)點,然后與這個節(jié)點的next、prev的引用相互指向;如果前一個節(jié)點的狀態(tài)不是大于0的,則通過CAS嘗試將狀態(tài)修改為“Node.SIGNAL”,自然的如果下一輪循環(huán)的時候會返回值應該會返回true。
如果這個方法返回了true,則會執(zhí)行:parkAndCheckInterrupt()方法,它是通過LockSupport.park(this)將當前線程掛起到WATING狀態(tài),它需要等待一個中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實現(xiàn)了Lock的操作。
AQS隊列初始化
程序第一個線程直接搶到鎖后并不會入隊,那隊列是什么時候初始化的呢?

public final void acquire(int arg) {
// 先tryAcquire嘗試獲取鎖,若返回true表示獲取成功,
// 由于&&符號邏輯短路,符號后部分不會執(zhí)行
if (!tryAcquire(arg) &&
// 若獲取失敗,線程排隊等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看到在第二個線程獲取鎖失敗入隊時會調(diào)用 addWaiter(Node.EXCLUSIVE),默認EXCLUSIVE排它鎖,下面再來看一下這個方法:
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 死循環(huán),直到執(zhí)行完畢
for (;;) {
Node oldTail = tail;
// 第一次入隊未初始化時tail為空
if (oldTail != null) {
// 把新節(jié)點的前置節(jié)點設置在等待隊列的末端(prev指針指向oldTail)
node.setPrevRelaxed(oldTail);
// cas操作,把新節(jié)點設置為tail末端
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// tail為空則初始化隊列
initializeSyncQueue();
}
}
}
可以看到在addWaiter方法中進行了初始化。
如何使用AQS獲取鎖?tryAcquire()
ReentrantLock 可以實現(xiàn)公平鎖和非公平鎖,在NonfairSync、FairSync靜態(tài)內(nèi)部類中分別重寫了tryAcquire()
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
- 看一下非公平時如何獲取鎖:
NonfairSync 中 tryAcquire() 調(diào)用了父類Sync 的 nonfairTryAcquire(acquires)
// jdk11 ReentrantLock 部分源碼
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
// 獲取state狀態(tài)
int c = getState();
// 如果狀態(tài)值為0
if (c == 0) {
// 嘗試用cas操作將0變成1,獲取鎖
if (compareAndSetState(0, acquires)) {
// cas操作成功說明搶到鎖,把這個線程設置為獨占這把鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 如果這個線程是 獨占同步鎖的線程
else if (current == getExclusiveOwnerThread()) {
// 改變狀態(tài)值,指鎖重入機制
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
先得到當前線程,然后獲取state的值,如果state的值等于0,用compareAndSetState(0,acquire)方法嘗試把state的值改為1,假如改成了setExclusiveOwnerThread()把當前線程設置為獨占statie這個把鎖的狀態(tài),說明我已經(jīng)得到這把鎖,而且這把鎖是互斥的,我得到以后,別人是得不到的,因為別人再來的時候這個state的值已經(jīng)變成1了,如果說當前線程已經(jīng)是獨占state這把鎖了,就往后加個1就表示可重入了。
- 公平時如何獲取鎖:
FairSync自己直接重寫了tryAcquire。源碼跟nonfairTryAcquire差不多,
區(qū)別是它會先查詢是否已經(jīng)有線程在隊列中等待
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先判斷隊列中是否有等待線程
if (!hasQueuedPredecessors() &&
// 沒有等待線程的話進行cas操作嘗試加鎖,改變state
compareAndSetState(0, acquires)) {
// 設置當前線程獨占這把鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 下面代碼跟nonfairTryAcquire的一樣
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如何釋放鎖,并喚醒隊列中其它線程?
ReentrantLock 的 unlock() 被調(diào)用時會釋放鎖
public void unlock() {
sync.release(1);
}
它實際調(diào)用的AQS的release:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 喚醒head的next節(jié)點
// 調(diào)用的LockSupport.unpark()
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 改變狀態(tài)
int c = getState() - releases;
// 當前線程不是獨占鎖的線程時,拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 定義是否被釋放的標記
boolean free = false;
// 判斷state狀態(tài)是否為0
if (c == 0) {
// 狀態(tài)為0表示鎖被釋放,標記設為true
free = true;
// 把AQS中獨占鎖的線程設置為空,表示當前沒有任何線程占用鎖
setExclusiveOwnerThread(null);
}
// 更新狀態(tài)信息
setState(c);
return free;
}
AQS中的細節(jié):VarHandle存在的意義
上面講AQS隊列初始化時看的 addWaiter 源碼中調(diào)用了 node.setPrevRelaxed(oldTail) ,點進去看一下:
final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}
有這個調(diào)用:PREV.set(this, p),這個PREV是這樣定義的:
// AQS的Node類 部分源碼 jdk11
private static final VarHandle PREV;
static {
try {
// Lookup 對象是一個創(chuàng)建handles的工廠
MethodHandles.Lookup l = MethodHandles.lookup();
// 返回某個類的Var Handle對象
PREV = l.findVarHandle(Node.class, "prev", Node.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
它是VarHandle類型,VarHandle是在JDK1.9開始有的??此拿?,Var 變量,Handle 句柄,意思是方便操縱變量的工具。
舉個例子,Object o = new Object(); 引用o指向內(nèi)存中Object對象,用VarHandle的話它也是指的這個引用。
既然兩個引用都指向同一個對象,為什么還需要VarHandle?
下面寫個小程序來說明:
public class T01_VarHandle {
int var = 10;
private static VarHandle varHandle;
static {
// 初始化一個handle工廠
MethodHandles.Lookup l = MethodHandles.lookup();
try {
// 初始化var的handle
varHandle = l.findVarHandle(T01_VarHandle.class, "var", int.class);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
T01_VarHandle instance = new T01_VarHandle();
// 通過handle讀取變量(讀)
System.out.println(varHandle.get(instance));
// 通過handle改變變量(寫)
varHandle.set(instance, 20);
System.out.println(instance.var);
//進行線程安全的累加操作
varHandle.getAndAdd(instance, 10);
System.out.println(instance.var);
// 通過handle進行CAS操作
varHandle.compareAndSet(instance, 30, 40);
System.out.println(instance.var);
}
}
輸出結(jié)果為10 20 30 40。
也就是說可以用VarHandle對變量進行讀寫,這點跟普通引用沒太大區(qū)別。重要的是VarHandle可以做一些compareAndSet操作(原子操作),還可以handle.getAndAdd()操作這也是原子操作,這些是線程安全的。
所以VarHandle除了可以完成普通屬性的原子操作,還可以完成原子性的線程安全的操作,這也是VarHandle的含義。
AQS效率高的原因
AQS中線程進入等待隊列,節(jié)點插入隊尾時需要保證線程安全,如果對整個隊列加鎖效率就太低了,所以AQS用VarHandle的特性,采用CAS的方法插入節(jié)點,這樣效率就會很高。
所以也可以說volatile和CAS是AQS的核心。
AQS隊列為什么是雙向鏈表?
要添加一個線程節(jié)點的時候,需要看一下前面這個節(jié)點的狀態(tài),如果前面的節(jié)點是持有線程的過程中,這個時候就得在后面等著,如果說前面這個節(jié)點已經(jīng)取消掉了,那就應該越過這個節(jié)點,不去考慮它的狀態(tài),所以有需要看前面節(jié)點狀態(tài)的時候,必須是雙向的。
比如說前面看過的acquireQueued()方法的for死循環(huán)中,等待隊列中的節(jié)點需要關注前面節(jié)點的狀態(tài),如果前面成head了,就嘗試獲取鎖。這個過程就要求必須是雙向鏈表。
后記
關于AQS其實還有很多可分析的地方,比如讀寫鎖中的排他鎖和共享鎖是如何實現(xiàn)的,ReentrantLock的newCondition()等。本文暫時就寫這些了,其它源碼有空再讀......