1、先聊點(diǎn)別的
說(shuō)實(shí)話,關(guān)于AQS的設(shè)計(jì)理念、實(shí)現(xiàn)、使用,我有打算寫過(guò)一篇技術(shù)文章,但是在寫完初稿后,發(fā)現(xiàn)掌握的還是模模糊糊的,模棱兩可。
痛定思痛,腳踏實(shí)地重新再來(lái)一遍。這次以 Java 8源碼為基礎(chǔ)進(jìn)行解讀。
2、AQS簡(jiǎn)介
在java.util.concurrent.locks包下,有兩個(gè)這樣的類:
- AbstractQueuedSynchronizer
- AbstractQueuedLongSynchronizer
這兩個(gè)類的唯一區(qū)別就是:
- AbstractQueuedSynchronizer內(nèi)部維護(hù)的
state變量是int類型 - AbstractQueuedLongSynchronizer內(nèi)部維護(hù)的
state變量是long類型
我們常說(shuō)的AQS其實(shí)泛指的就是這兩個(gè)類,即抽象隊(duì)列同步器。
抽象隊(duì)列同步器AbstractQueuedSynchronizer (以下都簡(jiǎn)稱AQS),是用來(lái)構(gòu)建鎖或者其他同步組件的骨架類,減少了各功能組件實(shí)現(xiàn)的代碼量,也解決了在實(shí)現(xiàn)同步器時(shí)涉及的大量細(xì)節(jié)問題,例如等待線程采用FIFO隊(duì)列操作的順序。在不同的同步器中還可以定義一些靈活的標(biāo)準(zhǔn)來(lái)判斷某個(gè)線程是應(yīng)該通過(guò)還是等待。
AQS采用模板方法模式,在內(nèi)部維護(hù)了n多的模板的方法的基礎(chǔ)上,子類只需要實(shí)現(xiàn)特定的幾個(gè)方法(不是抽象方法!不是抽象方法!不是抽象方法?。?,就可以實(shí)現(xiàn)子類自己的需求。
基于AQS實(shí)現(xiàn)的組件,諸如:
- ReentrantLock 可重入鎖(支持公平和非公平的方式獲取鎖)
- Semaphore 計(jì)數(shù)信號(hào)量
- ReentrantReadWriteLock 讀寫鎖
- ...
AQS是Doug Lea的大作之一,在維基百科查關(guān)于他的資料時(shí),偶然發(fā)現(xiàn)老爺子喜歡紅色或淡粉色襯衫?
3、AQS設(shè)計(jì)思路
AQS內(nèi)部維護(hù)了一個(gè)int成員變量來(lái)表示同步狀態(tài),通過(guò)內(nèi)置的FIFO(first-in-first-out)同步隊(duì)列來(lái)控制獲取共享資源的線程。
我們可以猜測(cè)出,AQS其實(shí)主要做了這么幾件事情:
- 同步狀態(tài)(state)的維護(hù)管理
- 等待隊(duì)列的維護(hù)管理
- 線程的阻塞與喚醒
ps: 當(dāng)然了,其內(nèi)部還維護(hù)了一個(gè)
ConditionObject內(nèi)部類,主要用作線程的協(xié)作與通信,我們暫時(shí)先不講這個(gè)帥哥。
通過(guò)AQS內(nèi)部維護(hù)的int型的state,可以用于表示任意狀態(tài)!
- ReentrantLock用它來(lái)表示鎖的持有者線程已經(jīng)重復(fù)獲取該鎖的次數(shù),而對(duì)于非鎖的持有者線程來(lái)說(shuō),如果state大于0,意味著無(wú)法獲取該鎖,將該線程包裝為Node,加入到同步等待隊(duì)列里。
- Semaphore用它來(lái)表示剩余的許可數(shù)量,當(dāng)許可數(shù)量為0時(shí),對(duì)未獲取到許可但正在努力嘗試獲取許可的線程來(lái)說(shuō),會(huì)進(jìn)入同步等待隊(duì)列,阻塞,直到一些線程釋放掉持有的許可(state+1),然后爭(zhēng)用釋放掉的許可。
- FutureTask用它來(lái)表示任務(wù)的狀態(tài)(未開始、運(yùn)行中、完成、取消)。
- ReentrantReadWriteLock在使用時(shí),稍微有些不同,int型state用二進(jìn)制表示是32位,前16位(高位)表示為讀鎖,后面的16位(低位)表示為寫鎖。
- CountDownLatch使用state表示計(jì)數(shù)次數(shù),state大于0,表示需要加入到同步等待隊(duì)列并阻塞,直到state等于0,才會(huì)逐一喚醒等待隊(duì)列里的線程。
3.1 偽代碼之獲取鎖:
boolean acquire() throws InterruptedException {
while(當(dāng)前狀態(tài)不允許獲取操作) {
if(需要阻塞獲取請(qǐng)求) {
如果當(dāng)前線程不在隊(duì)列中,則將其插入隊(duì)列
阻塞當(dāng)前線程
}
else
返回失敗
}
可能更新同步器的狀態(tài)
如果線程位于隊(duì)列中,則將其移出隊(duì)列
返回成功
}
3.2 偽代碼之釋放鎖:
void release() {
更新同步器的狀態(tài)
if (新的狀態(tài)允許某個(gè)被阻塞的線程獲取成功)
解除隊(duì)列中一個(gè)或多個(gè)線程的阻塞狀態(tài)
}
大概就是闡述這么個(gè)思路。
3.3 提供的方法
3.3.1 共通方法
以下三個(gè)方法,均為protected final修飾,每個(gè)繼承AQS的類都可以調(diào)用這三個(gè)方法。
- protected final int getState() 獲取同步狀態(tài)
- protected final void setState(int newState) 設(shè)置同步狀態(tài)
- protected final boolean compareAndSetState(int expect, int update) 如果當(dāng)前狀態(tài)值等于預(yù)期值,原子性地將同步狀態(tài)設(shè)置為給定的更新值,并返回true;否則返回false
3.3.2 子類需要實(shí)現(xiàn)的方法
以下五個(gè)方法,在AQS內(nèi)部并未實(shí)現(xiàn),而是交由子類去實(shí)現(xiàn),然后AQS再調(diào)用子類的實(shí)現(xiàn)方法,完成邏輯處理。
- protected boolean tryAcquire(int) 嘗試以獨(dú)占模式獲取操作,應(yīng)查詢對(duì)象的狀態(tài)是否允許以獨(dú)占模式獲取它,如果允許則獲取它。
- protected boolean tryRelease(int) 嘗試釋放同步狀態(tài)
- protected int tryAcquireShared(int) 共享的方式嘗試獲取操作
- protected boolean tryReleaseShared(int) 共享的方式嘗試釋放
- protected boolean isHeldExclusively() 調(diào)用此方法的線程,是否是獨(dú)占鎖的持有者
子類無(wú)須實(shí)現(xiàn)上述的所有方法,可以選擇其中一部分進(jìn)行覆寫,但是要保持實(shí)現(xiàn)邏輯完整,不能穿插實(shí)現(xiàn)。根據(jù)實(shí)現(xiàn)方式不同,分為獨(dú)占鎖策略實(shí)現(xiàn)和共享鎖策略實(shí)現(xiàn)。
這也是為什么上述方法沒有定義為抽象方法的原因。如果定義為抽象方法,子類必須實(shí)現(xiàn)所有的五個(gè)方法,哪怕你壓根就用不到。
獨(dú)占鎖:
- ReentrantLock
- ReentrantReadWriteLock.WriteLock
實(shí)現(xiàn)策略: - tryAcquire(int)
- tryRelease(int)
- isHeldExclusively()
共享鎖:
- CountDownLatch
- ReentrantReadWriteLock.ReadLock
- Semaphore
實(shí)現(xiàn)策略: - tryAcquireShared(int)
- tryReleaseShared(int)
AQS還有很多內(nèi)部模板方法,就不一一舉例了,之后的源碼解讀,會(huì)展示一部分,并會(huì)配上騷氣的注釋。
4、AQS內(nèi)部屬性
4.1 CLH隊(duì)列
AQS通過(guò)內(nèi)置的FIFO(first-in-first-out)同步隊(duì)列來(lái)控制獲取共享資源的線程。CLH隊(duì)列是FIFO的雙端雙向隊(duì)列,AQS的同步機(jī)制就是依靠這個(gè)CLH隊(duì)列完成的。隊(duì)列的每個(gè)節(jié)點(diǎn),都有前驅(qū)節(jié)點(diǎn)指針和后繼節(jié)點(diǎn)指針。
頭結(jié)點(diǎn)并不在阻塞隊(duì)列內(nèi)!

Node源碼:
static final class Node {
// 共享模式下等待標(biāo)記
static final Node SHARED = new Node();
// 獨(dú)占模式下等待標(biāo)記
static final Node EXCLUSIVE = null;
// 表示當(dāng)前的線程被取消
static final int CANCELLED = 1;
// 表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是unpark
static final int SIGNAL = -1;
// 表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中
static final int CONDITION = -2;
// 表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行
static final int PROPAGATE = -3;
/**
* CANCELLED = 1 // 當(dāng)前線程因?yàn)槌瑫r(shí)或者中斷被取消。這是一個(gè)終結(jié)態(tài),也就是狀態(tài)到此為止。
* SIGNAL = -1 // 表示當(dāng)前線程的后繼線程被阻塞或即將被阻塞,當(dāng)前線程釋放鎖或者取消后需要喚醒后繼線程。這個(gè)狀態(tài)一般都是后繼節(jié)點(diǎn)設(shè)置前驅(qū)節(jié)點(diǎn)的
* CONDITION = -2 // 表示當(dāng)前線程在Condition隊(duì)列中
* PROPAGATE = -3 // 用于將喚醒后繼線程傳遞下去,這個(gè)狀態(tài)的引入是為了完善和增強(qiáng)共享鎖的喚醒機(jī)制
* 0 // 表示無(wú)狀態(tài)或者終結(jié)狀態(tài)!
*/
volatile int waitStatus;
// 前驅(qū)節(jié)點(diǎn)
volatile Node prev;
// 后繼節(jié)點(diǎn)
volatile Node next;
// 當(dāng)前節(jié)點(diǎn)的線程,初始化使用,在使用后失效
volatile Thread thread;
// 存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)
Node nextWaiter;
// 如果該節(jié)點(diǎn)處于共享模式下等待,返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果為空,直接拋出空指針異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 指定線程和模式的構(gòu)造方法
Node(Thread thread, Node mode) { // Used by addWaiter
// SHARED和EXCLUSIVE 用于表示當(dāng)前節(jié)點(diǎn)是共享還是獨(dú)占
this.nextWaiter = mode;
this.thread = thread;
}
// 指定線程和節(jié)點(diǎn)狀態(tài)的構(gòu)造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
4.2 volatile state
最為重要的屬性,這個(gè)整數(shù)可以用于表示任意狀態(tài)!在上面有說(shuō)過(guò)。
4.2 volatile head & volatile tail
head 頭結(jié)點(diǎn),但是這個(gè)頭節(jié)點(diǎn)只是個(gè)虛節(jié)點(diǎn),只是邏輯上代表持有鎖的線程節(jié)點(diǎn),且head節(jié)點(diǎn)是不存儲(chǔ)thread線程信息和前驅(qū)節(jié)點(diǎn)信息的。
tail 尾節(jié)點(diǎn),每個(gè)新節(jié)點(diǎn)都會(huì)進(jìn)入隊(duì)尾。不存儲(chǔ)后繼節(jié)點(diǎn)信息。
- 這兩個(gè)屬性是延遲初始化的,在第一次且第一個(gè)線程持有鎖時(shí),第二個(gè)線程因?yàn)楂@取失敗,進(jìn)入同步隊(duì)列時(shí)會(huì)對(duì)head和tail進(jìn)行初始化,也就是說(shuō)在所有線程都能獲取到鎖時(shí),其內(nèi)部的head和tail都為null,一旦head 和 tail被初始化后,即使后來(lái)沒有線程持有鎖,其內(nèi)部的head 和 tail 依然保留最后一個(gè)持有鎖的線程節(jié)點(diǎn)?。╤ead 和 tail都指向一個(gè)內(nèi)存地址)
- 當(dāng)一個(gè)線程獲取鎖失敗而被加入到同步隊(duì)列時(shí),會(huì)用CAS來(lái)設(shè)置尾節(jié)點(diǎn)tail為當(dāng)前線程對(duì)應(yīng)的Node節(jié)點(diǎn)。
- AQS內(nèi)部的cas操作,都是依賴Unsafe類的,自Java9之后的版本,Unsafe類被移除,取而代之的是VarHandle類。
這兩個(gè)屬性均為volatile所修飾(保證了變量具有有序性和可見性)
4.3 spinForTimeoutThreshold
自旋超時(shí)閥值,在doAcquireSharedNanos()等方法中有使用到。
- 如果用戶定義的等待時(shí)間超過(guò)這個(gè)閥值,那么線程將阻塞,在阻塞期間如果能夠等到喚醒的機(jī)會(huì)并tryAcquireShared成功,則返回true,否則返回false,超時(shí)也返回false。
- 如果用戶定義的等待時(shí)間小于等于這個(gè)閥值,則會(huì)無(wú)限循環(huán),線程不阻塞,直到有線程釋放同步狀態(tài)或者超時(shí),然后返回對(duì)應(yīng)的結(jié)果。
4.4 exclusiveOwnerThread
這是AQS通過(guò)繼承AbstractOwnableSynchronizer類,獲得的屬性,表示獨(dú)占模式下的同步器持有者。
5、AQS具體實(shí)現(xiàn)
5.1 獨(dú)占鎖實(shí)現(xiàn)思路
5.1.1 獲取鎖 ReentrantLock.lock()
/**
* 獲取獨(dú)占鎖,忽略中斷。
* 首先嘗試獲取鎖,如果成功,則返回true;否則會(huì)把當(dāng)前線程包裝成Node插入到隊(duì)尾,在隊(duì)列中會(huì)檢測(cè)是否為head的直接后繼,并嘗試獲取鎖,
* 如果獲取失敗,則會(huì)通過(guò)LockSupport阻塞當(dāng)前線程,直至被釋放鎖的線程喚醒或者被中斷,隨后再次嘗試獲取鎖,如此反復(fù)。被喚醒后繼續(xù)之前的代碼執(zhí)行
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
---------------------------------------------------------------------------------------
其中tryAcquire()方法需要由子類實(shí)現(xiàn),ReentrantLock通過(guò)覆寫這個(gè)方法實(shí)現(xiàn)了公平鎖和非公平鎖
---------------------------------------------------------------------------------------
/**
* 在同步等待隊(duì)列中插入節(jié)點(diǎn)
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 判斷尾節(jié)點(diǎn)是否為null
if (pred != null) {
node.prev = pred;
// 通過(guò)CAS在隊(duì)尾插入當(dāng)前節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// tail節(jié)點(diǎn)為null,則將新節(jié)點(diǎn)插入隊(duì)尾,必要時(shí)進(jìn)行初始化
enq(node);
return node;
}
/**
* 通過(guò)無(wú)限循環(huán)和CAS操作在隊(duì)列中插入一個(gè)節(jié)點(diǎn)成功后返回。
* 將節(jié)點(diǎn)插入隊(duì)列,必要時(shí)進(jìn)行初始化
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始化head和tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
/*
CAS設(shè)置tail為node
表面上看是把老tail的next連接到node。
如果同步隊(duì)列head節(jié)點(diǎn)和tail節(jié)點(diǎn)剛剛被這個(gè)線程初始化,實(shí)際上也把head的next也連接到了node,而老tail節(jié)點(diǎn)被node取締。
反之則是,把老tail的next連接到node,head并沒有與node產(chǎn)生連接,這樣就形成了鏈表 head <-> old_tail <-> tail
*/
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 在隊(duì)列中的節(jié)點(diǎn)通過(guò)此方法獲取鎖,忽略中斷。
* 這個(gè)方法很重要,如果上述沒有獲取到鎖,將線程包裝成Node節(jié)點(diǎn)加入到同步隊(duì)列的尾節(jié)點(diǎn),然后看代碼里的注釋
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 檢測(cè)當(dāng)前節(jié)點(diǎn)前驅(qū)是否head,這是試獲取鎖。
* 如果是的話,則調(diào)用tryAcquire嘗試獲取鎖,
* 成功,則將head置為當(dāng)前節(jié)點(diǎn)。原h(huán)ead節(jié)點(diǎn)的next被置為null等待GC垃圾回收
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* 如果未成功獲取鎖則根據(jù)前驅(qū)節(jié)點(diǎn)判斷是否要阻塞。
* 如果阻塞過(guò)程中被中斷,則置interrupted標(biāo)志位為true。
* shouldParkAfterFailedAcquire方法在前驅(qū)狀態(tài)不為SIGNAL的情況下都會(huì)循環(huán)重試獲取鎖。
* 如果shouldParkAfterFailedAcquire返回true,則會(huì)將當(dāng)前線程阻塞并檢查是否被中斷
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 根據(jù)前驅(qū)節(jié)點(diǎn)中的waitStatus來(lái)判斷是否需要阻塞當(dāng)前線程。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL狀態(tài),在釋放鎖的時(shí)候會(huì)喚醒后繼節(jié)點(diǎn),
* 所以后繼節(jié)點(diǎn)(也就是當(dāng)前節(jié)點(diǎn))現(xiàn)在可以阻塞自己。
*/
return true;
if (ws > 0) {
/*
* 前驅(qū)節(jié)點(diǎn)狀態(tài)為取消,向前遍歷,更新當(dāng)前節(jié)點(diǎn)的前驅(qū)為往前第一個(gè)非取消節(jié)點(diǎn)。
* 當(dāng)前線程會(huì)之后會(huì)再次回到循環(huán)并嘗試獲取鎖。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 等待狀態(tài)為0或者PROPAGATE(-3),設(shè)置前驅(qū)的等待狀態(tài)為SIGNAL,
* 并且之后會(huì)回到循環(huán)再次重試獲取鎖。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 該方法實(shí)現(xiàn)某個(gè)node取消獲取鎖。
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 遍歷并更新節(jié)點(diǎn)前驅(qū),把node的prev指向前部第一個(gè)非取消節(jié)點(diǎn)。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 記錄pred節(jié)點(diǎn)的后繼為predNext,后續(xù)CAS會(huì)用到。
Node predNext = pred.next;
// 直接把當(dāng)前節(jié)點(diǎn)的等待狀態(tài)置為取消,后繼節(jié)點(diǎn)調(diào)用cancelAcquire方法時(shí),也可以跨過(guò)該節(jié)點(diǎn)
node.waitStatus = Node.CANCELLED;
// 如果當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn),則將尾節(jié)點(diǎn)置為當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果node還有后繼節(jié)點(diǎn),這種情況要做的是把pred和后繼非取消節(jié)點(diǎn)拼起來(lái)。
int ws;
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
/*
* 如果node的后繼節(jié)點(diǎn)next非取消狀態(tài)的話,則用CAS嘗試把pred的后繼置為node的后繼節(jié)點(diǎn)
* 這里if條件為false或者CAS失敗都沒關(guān)系,這說(shuō)明可能有多個(gè)線程在取消,總歸會(huì)有一個(gè)能成功的。
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
/*
* 在GC層面,和設(shè)置為null具有相同的效果
*/
node.next = node;
}
}
獲取獨(dú)占鎖的執(zhí)行過(guò)程大致如下:
假設(shè)當(dāng)前鎖已經(jīng)被線程A持有,且持有鎖的時(shí)間足夠長(zhǎng)(方便我們講解,也防止抬杠),線程B、C獲取鎖失敗。
線程B:
- 1、將線程B包裝成Node節(jié)點(diǎn)(簡(jiǎn)稱BN),加入到同步等待隊(duì)列,此時(shí)BN的waitStatus=0
- 2、將tail節(jié)點(diǎn)設(shè)置為BN,且與head節(jié)點(diǎn)相連,形成鏈表
- 3、head節(jié)點(diǎn)是個(gè)虛擬節(jié)點(diǎn),也就是持有鎖的線程(但并不包含有線程信息),tail節(jié)點(diǎn)就是BN
- 4、線程B進(jìn)入"無(wú)限循環(huán)",判斷前驅(qū)節(jié)點(diǎn)是否為頭節(jié)點(diǎn)(true)并再次嘗試獲取鎖(false,獲取鎖失敗)
- 5、線程B將進(jìn)入shouldParkAfterFailedAcquire方法,在方法內(nèi)部,將BN的前驅(qū)節(jié)點(diǎn)(也就是頭結(jié)點(diǎn))的waitStatus設(shè)置為 -1,此方法返回false
- 6、因?yàn)槭菬o(wú)限循環(huán),所以線程B再次進(jìn)入shouldParkAfterFailedAcquire方法,由于BN的前驅(qū)節(jié)點(diǎn)(也就是頭結(jié)點(diǎn))的waitStatus為 -1,所以直接返回true
- 7、調(diào)用parkAndCheckInterrupt,當(dāng)前線程B被阻塞,等待喚醒。
線程C:
- 1、將線程C包裝成Node節(jié)點(diǎn)(簡(jiǎn)稱CN),加入到同步等待隊(duì)列,此時(shí)CN的waitStatus=0
- 2、將tail節(jié)點(diǎn)設(shè)置為CN,且與原tail節(jié)點(diǎn)(BN節(jié)點(diǎn))相連
- 3、線程C進(jìn)入"無(wú)限循環(huán)",判斷前驅(qū)節(jié)點(diǎn)是否為頭節(jié)點(diǎn)(false)
- 4、線程C將進(jìn)入shouldParkAfterFailedAcquire方法,在方法內(nèi)部,將CN的前驅(qū)節(jié)點(diǎn)(也就是BN結(jié)點(diǎn))的waitStatus設(shè)置為 -1,此方法返回false
- 5、因?yàn)槭菬o(wú)限循環(huán),所以線程C再次進(jìn)入shouldParkAfterFailedAcquire方法,由于CN的前驅(qū)節(jié)點(diǎn)(也就是BN結(jié)點(diǎn))的waitStatus為 -1,所以直接返回true
- 6、調(diào)用parkAndCheckInterrupt,線程C被阻塞,等待喚醒。
最終的隊(duì)列如下:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | BN | | tail |
| AN | ---> | | ---> | (CN) |
+------+ +------+ +------+
5.1.2 釋放鎖 ReentrantLock.unlock()
對(duì)于釋放獨(dú)占鎖,會(huì)調(diào)用tryRelaes(int)方法,該方法由子類實(shí)現(xiàn),在完全釋放掉鎖后,釋放掉鎖的線程會(huì)將后繼線程喚醒,后繼線程進(jìn)行鎖爭(zhēng)用(非公平鎖)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 頭結(jié)點(diǎn)不為null且后繼節(jié)點(diǎn)是需要被喚醒的
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
釋放獨(dú)占鎖的執(zhí)行過(guò)程大致如下(假設(shè)有后繼節(jié)點(diǎn)需要喚醒):
- 將head節(jié)點(diǎn)的
waitStatus設(shè)置為0 - 喚醒后繼節(jié)點(diǎn)
- 后繼節(jié)點(diǎn)線程被喚醒后,會(huì)將后繼節(jié)點(diǎn)設(shè)置為head,并對(duì)后繼節(jié)點(diǎn)內(nèi)的prev和thread屬性設(shè)置為null
- 對(duì)原h(huán)ead節(jié)點(diǎn)的next指針設(shè)置為null,等待GC回收原h(huán)ead節(jié)點(diǎn)。
+------+ +------+ +------+
| old | <-X- | new | <--- | |
| head | | head | | tail |
| AN | -X-> | BN | ---> | (CN) |
+------+ +------+ +------+
如上所示,AN節(jié)點(diǎn)(原h(huán)ead節(jié)點(diǎn))等待被GC垃圾回收。
5.2 共享鎖實(shí)現(xiàn)思路
5.2.1 獲取鎖
與獲取獨(dú)占鎖不同,關(guān)鍵在于,共享鎖可以被多個(gè)線程持有。
如果需要AQS實(shí)現(xiàn)共享鎖,在實(shí)現(xiàn)tryAcquireShared()方法時(shí):
- 返回負(fù)數(shù),表示獲取失敗
- 返回0,表示獲取成功,但是后繼爭(zhēng)用線程不會(huì)成功
- 返回正數(shù),表示獲取成功,表示后繼爭(zhēng)用線程也可能成功
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 一旦共享獲取成功,設(shè)置新的頭結(jié)點(diǎn),并且喚醒后繼線程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 這個(gè)函數(shù)做的事情有兩件:
* 1. 在獲取共享鎖成功后,設(shè)置head節(jié)點(diǎn)
* 2. 根據(jù)調(diào)用tryAcquireShared返回的狀態(tài)以及節(jié)點(diǎn)本身的等待狀態(tài)來(lái)判斷是否要需要喚醒后繼線程
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 把當(dāng)前的head封閉在方法棧上,用以下面的條件檢查
Node h = head;
setHead(node);
/*
* propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據(jù)之一
* h.waitStatus為SIGNAL或者PROPAGATE時(shí)也根據(jù)node的下一個(gè)節(jié)點(diǎn)共享來(lái)決定是否傳播喚醒
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 這是共享鎖中的核心喚醒函數(shù),主要做的事情就是喚醒下一個(gè)線程或者設(shè)置傳播狀態(tài)。
* 后繼線程被喚醒后,會(huì)嘗試獲取共享鎖,如果成功之后,則又會(huì)調(diào)用setHeadAndPropagate,將喚醒傳播下去。
* 這個(gè)函數(shù)的作用是保障在acquire和release存在競(jìng)爭(zhēng)的情況下,保證隊(duì)列中處于等待狀態(tài)的節(jié)點(diǎn)能夠有辦法被喚醒。
*/
private void doReleaseShared() {
/*
* 以下的循環(huán)做的事情就是,在隊(duì)列存在后繼線程的情況下,喚醒后繼線程;
* 或者由于多線程同時(shí)釋放共享鎖由于處在中間過(guò)程,讀到head節(jié)點(diǎn)等待狀態(tài)為0的情況下,
* 雖然不能unparkSuccessor,但為了保證喚醒能夠正確穩(wěn)固傳遞下去,設(shè)置節(jié)點(diǎn)狀態(tài)為PROPAGATE。
* 這樣的話獲取鎖的線程在執(zhí)行setHeadAndPropagate時(shí)可以讀到PROPAGATE,從而由獲取鎖的線程去釋放后繼等待線程。
*/
for (;;) {
Node h = head;
// 如果隊(duì)列中存在后繼線程。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果h節(jié)點(diǎn)的狀態(tài)為0,需要設(shè)置為PROPAGATE用以保證喚醒的傳播。
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 檢查h是否仍然是head,如果不是的話需要再進(jìn)行循環(huán)。
if (h == head)
break;
}
}
5.2.1 釋放鎖
釋放共享鎖與獲取共享鎖的代碼都使用了doReleaseShared(int)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// doReleaseShared的實(shí)現(xiàn)上面獲取共享鎖已經(jīng)介紹
doReleaseShared();
return true;
}
return false;
}
我覺得大家應(yīng)該都能看懂,還是簡(jiǎn)單說(shuō)一下吧(手動(dòng)狗頭~):
同步等待隊(duì)列中,在喚醒因?yàn)楂@取共享鎖失敗而阻塞的后繼節(jié)點(diǎn)線程后,后繼節(jié)點(diǎn)線程會(huì)依次喚醒其后繼節(jié)點(diǎn)!依次類推。
再換種說(shuō)法?
這種情況有可能是:寫鎖導(dǎo)致獲取讀鎖的一些線程阻塞,而寫鎖釋放后,會(huì)喚醒后繼節(jié)點(diǎn)線程,如果該后繼節(jié)點(diǎn),恰好是因?yàn)楂@取讀鎖失敗而阻塞的線程,那么該后繼節(jié)點(diǎn)線程會(huì)喚醒其后繼節(jié)點(diǎn)...直到全部獲取讀鎖成功,或者某一節(jié)點(diǎn)獲取寫鎖成功。
6、拓展
6.1 不得不說(shuō)的PROPAGATE
有個(gè)關(guān)于AQS的bug,真的值得大家看一看
在共享鎖獲取與釋放的操作中,我覺得有個(gè)特別的重要的waitStatus狀態(tài)值,要和大家說(shuō)一說(shuō),就是PROPAGATE,這個(gè)屬性值的意思是,用于將喚醒后繼線程傳遞下去,這個(gè)狀態(tài)的引入是為了完善和增強(qiáng)共享鎖的喚醒機(jī)制。
之前翻閱了很多關(guān)于AQS的文章,講到這個(gè)狀態(tài)值的少之又少,哪怕是《Java并發(fā)編程實(shí)戰(zhàn)》這本書,也是沒有提及,最終我看到有一位博客園的作者非常詳實(shí)的闡述了這個(gè)PEOPAGATE狀態(tài),也是給了我很大的啟發(fā)。
沒錯(cuò),我第一次看AQS的源碼的時(shí)候,甚至直接把這個(gè)PROPAGATE狀態(tài)值忽略掉了。事實(shí)上,不僅僅閱讀源碼的人,容易把這個(gè)PROPAGATE狀態(tài)值忽略掉,哪怕是Doug Lea老爺子本人,在開發(fā)時(shí)也沒有意識(shí)到,如果沒有這個(gè)狀態(tài)值會(huì)導(dǎo)致什么樣的后果,直到上面鏈接的bug出現(xiàn)后,老爺子才加上了這個(gè)狀態(tài),徹底修復(fù)了這個(gè)bug。
復(fù)現(xiàn)該bug的代碼:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
程序執(zhí)行時(shí),會(huì)偶發(fā)線程hang住。
我們?cè)賮?lái)看看之前的setHeadAndPropagate方法是什么樣的。
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
然后Semaphore.release()調(diào)用的是AQS的releaseShared,看看當(dāng)時(shí)的releaseShared長(zhǎng)什么樣:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
再看看當(dāng)時(shí)的Node:
static final class Node {
// 忽略掉無(wú)關(guān)的代碼,只展示waitStatus的狀態(tài)值
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
}
setHeadAndPropagate方法和releaseShared方法,設(shè)計(jì)的也是很簡(jiǎn)單。
當(dāng)時(shí)源碼里,Node的waitStatus是沒有PROPAGATE=-3這個(gè)狀態(tài)值的。
為了方便大家對(duì)照,我把當(dāng)時(shí)unparkSuccessor方法的源碼,也一并展示出來(lái):
private void unparkSuccessor(Node node) {
// 將node的waitStatus設(shè)置為0
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
接下來(lái),我們慢慢聊~
ps: 說(shuō)真的,現(xiàn)在老板的位置離我的位置不遠(yuǎn),雖然我的工作已經(jīng)提前很多天完成了,但,還是有點(diǎn)慌~,冒著風(fēng)險(xiǎn)還要繼續(xù)寫!
在AQS獲取共享鎖的操作中,進(jìn)入同步等待的線程(被阻塞掉),有兩種途徑可以被喚醒:
- 其他線程釋放信號(hào)量后,調(diào)用unparkSuccessor(
releaseShared方法中) - 其他線程獲取共享鎖成功后,會(huì)通過(guò)傳播機(jī)制來(lái)喚醒后繼節(jié)點(diǎn)(也就是在
setHeadAndPropagate方法中)。
bug重現(xiàn)的例子,很簡(jiǎn)單,就是在循環(huán)中重復(fù)不斷的實(shí)例化4個(gè)線程,前兩個(gè)線程獲取信號(hào)量,兩個(gè)線程釋放信號(hào)量,主線程等待4個(gè)線程全都執(zhí)行完畢再執(zhí)行打印。
在后兩個(gè)線程沒有進(jìn)行釋放信號(hào)量的操作時(shí),AQS內(nèi)部的同步等待隊(duì)列是下面這種情況:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | t1 | | t2 |
| | ---> | | ---> | |
+------+ +------+ +------+
- 1、t3釋放信號(hào)量,調(diào)用
releaseShared,喚醒后繼節(jié)點(diǎn)里的線程t1,同時(shí),head的waitStatus變?yōu)? - 2、t1被喚醒,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回0
- 3、t4釋放信號(hào)量,調(diào)用
releaseShared,在releaseShared方法中讀到的head還是原h(huán)ead,但是此時(shí)head的waitStatus已經(jīng)變?yōu)?,所以不會(huì)調(diào)用unparkSuccessor方法 - 4、t1被喚醒了,由于在步驟2里,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回的是0,所以它也不會(huì)調(diào)用
unparkSuccessor方法
至此,兩種途徑全部被封死,沒有任何線程去喚醒t2了,線程被hang住...
ps:Doug Lea 黑人問號(hào)臉,哈哈~
老爺子為了修復(fù)這個(gè)bug,做出了如下改進(jìn):
- 1、增加一個(gè)waitStatus的狀態(tài),即
PROPAGATE - 2、在
releaseShared方法中抽取提煉出了doReleaseShared()(上面有展示)在doReleaseShared方法中,如果head節(jié)點(diǎn)的狀態(tài)為0,需要設(shè)置為PROPAGATE用以保證喚醒的傳播。 - 3、在
setHeadAndPropagate方法中也多了一些判斷,其中就有head節(jié)點(diǎn)的waitStatus如果小于0,就喚醒后繼節(jié)點(diǎn)(PROPAGATE = -3)。
通過(guò)改進(jìn)之后的代碼,我們?cè)賮?lái)復(fù)盤一下:
- 1、t3釋放信號(hào)量,調(diào)用
releaseShared,喚醒后繼節(jié)點(diǎn)里的線程t1,同時(shí),head的waitStatus變?yōu)? - 2、t1被喚醒,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回0
- 3、此步驟和2和同一時(shí)刻發(fā)生,t4釋放信號(hào)量,調(diào)用
releaseShared,在doReleaseShared方法中讀到的head還是原h(huán)ead,但是此時(shí)head的waitStatus已經(jīng)變?yōu)?,將head的waitStatus設(shè)置為PROPAGATE(-3) - 4、t1被喚醒了,調(diào)用
setHeadAndPropagate方法,將t1設(shè)置為head,符合條件判斷,進(jìn)入分支語(yǔ)句,調(diào)用doReleaseShared方法,繼而喚醒t2節(jié)點(diǎn)線程。
6.2 unparkSuccessor的一點(diǎn)思考
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 通常情況下,要喚醒的線程都是當(dāng)前節(jié)點(diǎn)的后繼線程
* 但是,如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)被取消了,則從隊(duì)列尾部向前遍歷,直到找到未被取消的后繼節(jié)點(diǎn)
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor方法中,如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)被取消了,則從隊(duì)列尾部向前遍歷,直到找到未被取消的后繼節(jié)點(diǎn)。
這個(gè)問題,大家也可以自己思考一下,為什么要從tail節(jié)點(diǎn)開始向前遍歷?
假設(shè),CLH隊(duì)列如下圖所示:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | t1 | | tail |
| | ---> | | ---> | |
+------+ +------+ +------+
t1.waitStatus = 1 且 tail.waitStatus = 1
head嘗試喚醒后繼節(jié)點(diǎn)t1,發(fā)現(xiàn)t1是被取消狀態(tài),遂找出t1的后繼節(jié)點(diǎn)tail,發(fā)現(xiàn)tail也是被取消狀態(tài),但是tail.next == null。
與此同時(shí),有個(gè)新節(jié)點(diǎn)加入到隊(duì)列尾部,但是還沒有將原tail.next指向新節(jié)點(diǎn)。
也就是說(shuō),tail.next 如果恰好處在步驟1和步驟2中間的話,遍歷就會(huì)中斷。
摘錄addWaiter部分代碼:
node.prev = pred;
// 通過(guò)CAS在隊(duì)尾插入當(dāng)前節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) { // 步驟1
pred.next = node; // 步驟2
return node;
}
6.3 acquireQueued 方法里,為什么還要再tryAcquire?
以獨(dú)占模式來(lái)說(shuō),對(duì)于這個(gè)問題,我是這么想的:
時(shí)刻1:線程B嘗試獲取鎖,但是,由于鎖被線程A持有,所以,線程B準(zhǔn)備調(diào)用addWaiter,將自己入到隊(duì)列(但還沒有和head節(jié)點(diǎn)產(chǎn)生指針連接)
時(shí)刻1:同一時(shí)刻,線程A嘗試釋放鎖,進(jìn)入release方法,調(diào)用子類的tryRelease(),將代表鎖持有次數(shù)的state置為0(代表鎖沒有被任何線程持有),進(jìn)入unparkSuccessor方法,發(fā)現(xiàn)并沒有后繼節(jié)點(diǎn)(因?yàn)樾鹿?jié)點(diǎn)還未入隊(duì)),所以不會(huì)喚醒任何線程,到這里,線程A釋放鎖操作完成。
時(shí)刻2:線程B調(diào)用addWaiter方法完畢,已經(jīng)入隊(duì),并和head節(jié)點(diǎn)產(chǎn)生指針連接
時(shí)刻3:線程B調(diào)用acquireQueued方法(如下方代碼展示),如果在這個(gè)方法里面不調(diào)用tryAcquire,就會(huì)發(fā)生這樣的情況:明明可以獲取鎖,但是線程卻被休眠了,進(jìn)而導(dǎo)致整個(gè)同步隊(duì)列不可用
所以,再次調(diào)用tryAcquire是為了防止新節(jié)點(diǎn)還未入隊(duì),但是頭結(jié)點(diǎn)已經(jīng)釋放了鎖,導(dǎo)致整個(gè)同步隊(duì)列癱瘓的情況發(fā)生。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 防止新節(jié)點(diǎn)還未入隊(duì),但是頭結(jié)點(diǎn)已經(jīng)釋放了鎖,導(dǎo)致整個(gè)同步隊(duì)列中斷癱瘓的情況發(fā)生
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
結(jié)束
通過(guò)閱讀AQS的源碼,對(duì)于我們學(xué)習(xí)和掌握基于AQS實(shí)現(xiàn)的組件,是有很大幫助的。
尤其是它的設(shè)計(jì)理念和思想,更是我們學(xué)習(xí)的重點(diǎn)!