在上一篇文章中我們對Lock和AbstractQueuedSynchronizer(AQS)有了初步的認(rèn)識。在同步組件的實現(xiàn)中,AQS是核心部分,同步組件的實現(xiàn)者通過使用AQS提供的模板方法實現(xiàn)同步組件語義,AQS則實現(xiàn)了對同步狀態(tài)的管理,以及對阻塞線程進(jìn)行排隊,等待通知等等一些底層的實現(xiàn)處理。AQS的核心也包括了這些方面:同步隊列,獨占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實現(xiàn),而這些實際上則是AQS提供出來的模板方法,歸納整理如下:
獨占式鎖:
void acquire(int arg):獨占式獲取同步狀態(tài),如果獲取失敗則插入同步隊列進(jìn)行等待;
void acquireInterruptibly(int arg):與acquire方法相同,但在同步隊列中進(jìn)行等待的時候可以檢測中斷;
boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基礎(chǔ)上增加了超時等待功能,在超時時間內(nèi)沒有獲得同步狀態(tài)返回false;
boolean release(int arg):釋放同步狀態(tài),該方法會喚醒在同步隊列中的下一個節(jié)點
共享式鎖:
void acquireShared(int arg):共享式獲取同步狀態(tài),與獨占式的區(qū)別在于同一時刻有多個線程獲取同步狀態(tài);
void acquireSharedInterruptibly(int arg):在acquireShared方法基礎(chǔ)上增加了能響應(yīng)中斷的功能;
boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基礎(chǔ)上增加了超時等待的功能;
boolean releaseShared(int arg):共享式釋放同步狀態(tài)
要想掌握AQS的底層實現(xiàn),其實也就是對這些模板方法的邏輯進(jìn)行學(xué)習(xí)。在學(xué)習(xí)這些模板方法之前,我們得首先了解下AQS中的同步隊列是一種什么樣的數(shù)據(jù)結(jié)構(gòu),因為同步隊列是AQS對同步狀態(tài)的管理的基石。
2. 同步隊列
AQS提供了一個基于FIFO隊列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡稱同步器)利用了一個int來表示狀態(tài),期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類通過繼承同步器并需要實現(xiàn)它的方法來管理其狀態(tài),管理的方式就是通過類似acquire和release的方式來操縱狀態(tài)。然而多線程環(huán)境中對狀態(tài)的操縱必須確保原子性,因此子類對于狀態(tài)的把握,需要使用這個同步器提供的以下三個方法對狀態(tài)進(jìn)行操作:
- java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
- java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
- java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)
子類推薦被定義為自定義同步裝置的內(nèi)部類,同步器自身沒有實現(xiàn)任何同步接口,它僅僅是定義了若干acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當(dāng)它被定義為一個排他模式時,其他線程對其的獲取就被阻止,而共享模式對于多個線程獲取都可以成功。
同步器是實現(xiàn)鎖的關(guān)鍵,利用同步器將鎖的語義實現(xiàn),然后在鎖的實現(xiàn)中聚合同步器??梢赃@樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為,而每個鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個線程進(jìn)行加鎖,排除兩個以上的線程),但是實現(xiàn)是依托給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對資源是否能夠獲取以及線程的排隊等操作。鎖和同步器很好的隔離了二者所需要關(guān)注的領(lǐng)域,嚴(yán)格意義上講,同步器可以適用于除了鎖以外的其他同步設(shè)施上(包括鎖)。
同步器的開始提到了其實現(xiàn)依賴于一個FIFO隊列,那么隊列中的元素Node就是保存著線程引用和線程狀態(tài)的容器,每個線程對同步器的訪問,都可以看做是隊列中的一個節(jié)點。Node的主要包含以下成員變量:
Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
以上五個成員變量主要負(fù)責(zé)保存該節(jié)點的線程引用,同步等待隊列(以下簡稱sync隊列)的前驅(qū)和后繼節(jié)點,同時也包括了同步狀態(tài)。
| 屬性名稱 | 描述 |
|---|---|
| int waitStatus | 表示節(jié)點的狀態(tài)。其中包含的狀態(tài)有:1. CANCELLED,值為1,表示當(dāng)前的線程被取消; 2. SIGNAL,值為-1,表示當(dāng)前節(jié)點的后繼節(jié)點包含的線程需要運行,也就是unpark; 3. CONDITION,值為-2,表示當(dāng)前節(jié)點在等待condition,也就是在condition隊列中; 4. PROPAGATE,值為-3,表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行; 5. 值為0,表示當(dāng)前節(jié)點在sync隊列中,等待著獲取鎖。
|
| Node prev | 前驅(qū)節(jié)點,比如當(dāng)前節(jié)點被取消,那就需要前驅(qū)節(jié)點和后繼節(jié)點來完成連接。 |
| Node next | 后繼節(jié)點。 |
| Node nextWaiter | 存儲condition隊列中的后繼節(jié)點。 |
| Thread thread | 入隊列時的當(dāng)前線程。 |
節(jié)點成為sync隊列和condition隊列構(gòu)建的基礎(chǔ),在同步器中就包含了sync隊列。同步器擁有三個成員變量:sync隊列的頭結(jié)點head、sync隊列的尾節(jié)點tail和狀態(tài)state。對于鎖的獲取,請求形成節(jié)點,將其掛載在尾部,而鎖資源的轉(zhuǎn)移(釋放再獲?。┦菑念^部開始向后進(jìn)行。對于同步器維護(hù)的狀態(tài)state,多個線程對其的獲取將會產(chǎn)生一個鏈?zhǔn)降慕Y(jié)構(gòu)。

現(xiàn)在我們知道了節(jié)點的數(shù)據(jù)結(jié)構(gòu)類型,并且每個節(jié)點擁有其前驅(qū)和后繼節(jié)點,很顯然這是一個雙向隊列。同樣的我們可以用一段demo看一下。
public class LockDemo {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
thread.start();
}
}
}
實例代碼中開啟了5個線程,先獲取鎖之后再睡眠10S中,實際上這里讓線程睡眠是想模擬出當(dāng)線程無法獲取鎖時進(jìn)入同步隊列的情況。通過debug,當(dāng)Thread-4(在本例中最后一個線程)獲取鎖失敗后進(jìn)入同步時,AQS時現(xiàn)在的同步隊列如圖所示:
Thread-0先獲得鎖后進(jìn)行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進(jìn)入同步隊列,同時也可以很清楚的看出來每個節(jié)點有兩個域:prev(前驅(qū))和next(后繼),并且每個節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS中有兩個重要的成員變量:
private transient volatile Node head;
private transient volatile Node tail;
也就是說AQS實際上通過頭尾指針來管理同步隊列,同時實現(xiàn)包括獲取鎖失敗的線程進(jìn)行入隊,釋放鎖時對同步隊列中的線程進(jìn)行通知等核心方法。其示意圖如下:
通過對源碼的理解以及做實驗的方式,現(xiàn)在我們可以清楚的知道這樣幾點:
-
節(jié)點的數(shù)據(jù)結(jié)構(gòu),即
AQS的靜態(tài)內(nèi)部類Node,節(jié)點的等待狀態(tài)等信息; -
同步隊列是一個雙向隊列,
AQS通過持有頭尾指針管理同步隊列;
那么,節(jié)點如何進(jìn)行入隊和出隊是怎樣做的了?實際上這對應(yīng)著鎖的獲取和釋放兩個操作:獲取鎖失敗進(jìn)行入隊操作,獲取鎖成功進(jìn)行出隊操作。
3. 獨占鎖
3.1 獨占鎖的獲?。╝cquire方法)
我們繼續(xù)通過看源碼和debug的方式來看,還是以上面的demo為例,調(diào)用lock()方法是獲取獨占式鎖,獲取失敗就將當(dāng)前線程加入同步隊列,成功則線程執(zhí)行。而lock()方法實際上會調(diào)用AQS的acquire()方法,源碼如下
public final void acquire(int arg) {
//先看同步狀態(tài)是否獲取成功,如果成功則方法結(jié)束返回
//若失敗則先調(diào)用addWaiter()方法再調(diào)用acquireQueued()方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述邏輯主要包括:
- 嘗試獲?。ㄕ{(diào)用tryAcquire更改狀態(tài),需要保證原子性);
在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個線程能夠?qū)顟B(tài)進(jìn)行成功修改,而沒有成功修改的線程將進(jìn)入sync隊列排隊。 - 如果獲取不到,將當(dāng)前線程構(gòu)造成節(jié)點Node并加入sync隊列;
進(jìn)入隊列的每個線程都是一個節(jié)點Node,從而形成了一個雙向隊列,類似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規(guī)模(也就是兩個節(jié)點左右)。 - 再次嘗試獲取,如果沒有獲取到那么將當(dāng)前線程從線程調(diào)度器上摘下,進(jìn)入等待狀態(tài)。
總結(jié): 使用LockSupport將當(dāng)前線程unpark,關(guān)于LockSupport后續(xù)會詳細(xì)介紹。關(guān)鍵信息請看注釋,acquire根據(jù)當(dāng)前獲得同步狀態(tài)成功與否做了兩件事情:1. 成功,則方法結(jié)束返回,2. 失敗,則先調(diào)用addWaiter()然后在調(diào)用acquireQueued()方法。
獲取同步狀態(tài)失敗,入隊操作
當(dāng)線程獲取獨占式鎖失敗后就會將當(dāng)前線程加入同步隊列,那么加入隊列的方式是怎樣的了?我們接下來就應(yīng)該去研究一下addWaiter()和acquireQueued()。addWaiter()源碼如下:
private Node addWaiter(Node mode) {
// 1\. 將當(dāng)前線程構(gòu)建成Node類型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 2\. 當(dāng)前尾節(jié)點是否為null?
Node pred = tail;
if (pred != null) {
// 2.2 將當(dāng)前節(jié)點尾插入的方式插入同步隊列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 2.1\. 當(dāng)前同步隊列尾節(jié)點為null,說明當(dāng)前線程是第一個加入同步隊列進(jìn)行等待的線程
enq(node);
return node;
}
上述邏輯主要包括:
- 使用當(dāng)前線程構(gòu)造Node;
對于一個節(jié)點需要做的是將當(dāng)節(jié)點前驅(qū)節(jié)點指向尾節(jié)點(current.prev = tail),尾節(jié)點指向它(tail = current),原有的尾節(jié)點的后繼節(jié)點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點的設(shè)置來保證的,也就是compareAndSetTail來完成的。 - 先行嘗試在隊尾添加;
如果尾節(jié)點已經(jīng)有了,然后做如下操作:
(1)分配引用T指向尾節(jié)點;
(2)將節(jié)點的前驅(qū)節(jié)點更新為尾節(jié)點(current.prev = tail);
(3)如果尾節(jié)點是T,那么將當(dāng)尾節(jié)點設(shè)置為該節(jié)點(tail = current,原子更新);
(4)T的后繼節(jié)點指向當(dāng)前節(jié)點(T.next = current)。
注意第3點是要求原子的。
這樣可以以最短路徑O(1)的效果來完成線程入隊,是最大化減少開銷的一種方式。 - 如果隊尾添加失敗或者是第一個入隊的節(jié)點。
如果是第1個節(jié)點,也就是sync隊列沒有初始化,那么會進(jìn)入到enq這個方法,進(jìn)入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進(jìn)入enq這個方法。
可以看到enq的邏輯是確保進(jìn)入的Node都會有機會順序的添加到sync隊列中,而加入的步驟如下:
(1)如果尾節(jié)點為空,那么原子化的分配一個頭節(jié)點,并將尾節(jié)點指向頭節(jié)點,這一步是初始化;
(2)然后是重復(fù)在addWaiter中做的工作,但是在一個while(true)的循環(huán)中,直到當(dāng)前節(jié)點入隊為止。
進(jìn)入sync隊列之后,接下來就是要進(jìn)行鎖的獲取,或者說是訪問控制了,只有一個線程能夠在同一時刻繼續(xù)的運行,而其他的進(jìn)入等待狀態(tài)。而每個線程都是一個獨立的個體,它們自省的觀察,當(dāng)條件滿足的時候(自己的前驅(qū)是頭結(jié)點并且原子性的獲取了狀態(tài)),那么這個線程能夠繼續(xù)運行。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//1\. 構(gòu)造頭結(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2\. 尾插入,CAS操作失敗自旋嘗試
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在上面的分析中我們可以看出在第1步中會先創(chuàng)建頭結(jié)點,說明同步隊列是帶頭結(jié)點的鏈?zhǔn)酱鎯Y(jié)構(gòu)。帶頭結(jié)點與不帶頭結(jié)點相比,會在入隊和出隊的操作中獲得更大的便捷性,因此同步隊列選擇了帶頭結(jié)點的鏈?zhǔn)酱鎯Y(jié)構(gòu)。那么帶頭節(jié)點的隊列初始化時機是什么?自然而然是在tail為null時,即當(dāng)前線程是第一次插入同步隊列。compareAndSetTail(t, node)方法會利用CAS操作設(shè)置尾節(jié)點,如果CAS操作失敗會在for (;;)for死循環(huán)中不斷嘗試,直至成功return返回為止。因此,對enq()方法可以做這樣的總結(jié):
- 在當(dāng)前線程是第一個加入同步隊列時,調(diào)用compareAndSetHead(new Node())方法,完成鏈?zhǔn)疥犃械念^結(jié)點的初始化;
- 自旋不斷嘗試CAS尾插入節(jié)點直至成功為止。
現(xiàn)在我們已經(jīng)很清楚獲取獨占式鎖失敗的線程包裝成Node然后插入同步隊列的過程了?那么緊接著會有下一個問題?在同步隊列中的節(jié)點(線程)會做什么事情了來保證自己能夠有機會獲得獨占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個方法的作用就是排隊獲取鎖的過程,源碼如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1\. 獲得當(dāng)前節(jié)點的先驅(qū)節(jié)點
final Node p = node.predecessor();
// 2\. 當(dāng)前節(jié)點能否獲取獨占式鎖
// 2.1 如果當(dāng)前節(jié)點的先驅(qū)節(jié)點是頭結(jié)點并且成功獲取同步狀態(tài),即可以獲得獨占式鎖
if (p == head && tryAcquire(arg)) {
//隊列頭指針用指向當(dāng)前節(jié)點
setHead(node);
//釋放前驅(qū)節(jié)點
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.2 獲取鎖失敗,線程進(jìn)入等待狀態(tài)等待獲取獨占式鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上述邏輯主要包括:
- 獲取當(dāng)前節(jié)點的前驅(qū)節(jié)點;
需要獲取當(dāng)前節(jié)點的前驅(qū)節(jié)點,而頭結(jié)點所對應(yīng)的含義是當(dāng)前站有鎖且正在運行。 - 當(dāng)前驅(qū)節(jié)點是頭結(jié)點并且能夠獲取狀態(tài),代表該當(dāng)前節(jié)點占有鎖;
如果滿足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點對鎖占有的含義,設(shè)置頭結(jié)點為當(dāng)前節(jié)點。 - 否則進(jìn)入等待狀態(tài)。
如果沒有輪到當(dāng)前節(jié)點運行,那么將當(dāng)前線程從線程調(diào)度器上摘下,也就是進(jìn)入等待狀態(tài)。
這里針對acquire做一下總結(jié):
- 狀態(tài)的維護(hù);
需要在鎖定時,需要維護(hù)一個狀態(tài)(int類型),而對狀態(tài)的操作是原子和非阻塞的,通過同步器提供的對狀態(tài)訪問的方法對狀態(tài)進(jìn)行操縱,并且利用compareAndSet來確保原子性的修改。 - 狀態(tài)的獲??;
一旦成功的修改了狀態(tài),當(dāng)前線程或者說節(jié)點,就被設(shè)置為頭節(jié)點。 - sync隊列的維護(hù)。
在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅(qū)節(jié)點不是頭節(jié)點或者沒有獲取到資源)進(jìn)入睡眠狀態(tài),停止線程調(diào)度器對當(dāng)前節(jié)點線程的調(diào)度。
這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關(guān)鍵,就是前驅(qū)節(jié)點的通知,而這一個過程就是釋放,釋放會通知它的后繼節(jié)點從睡眠中返回準(zhǔn)備運行。整體示意圖為下圖:
獲取鎖成功,出隊操作
獲取鎖的節(jié)點出隊的邏輯是:
//隊列頭結(jié)點引用指向當(dāng)前節(jié)點
setHead(node);
//釋放前驅(qū)節(jié)點
p.next = null; // help GC
failed = false;
return interrupted;
setHead()方法為:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
將當(dāng)前節(jié)點通過setHead()方法設(shè)置為隊列的頭結(jié)點,然后將之前的頭結(jié)點的next域設(shè)置為null并且pre域也為null,即與隊列斷開,無任何引用方便GC時能夠?qū)?nèi)存進(jìn)行回收。示意圖如下:
那么當(dāng)獲取鎖失敗的時候會調(diào)用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們做了什么事情。shouldParkAfterFailedAcquire()方法源碼為:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS將節(jié)點狀態(tài)由INITIAL設(shè)置成SIGNAL,表示當(dāng)前線程阻塞。當(dāng)compareAndSetWaitStatus設(shè)置失敗則說明shouldParkAfterFailedAcquire方法返回false,然后會在acquireQueued()方法中for (;;)死循環(huán)中會繼續(xù)重試,直至compareAndSetWaitStatus設(shè)置節(jié)點狀態(tài)位為SIGNAL時shouldParkAfterFailedAcquire返回true時才會執(zhí)行方法parkAndCheckInterrupt()方法,該方法的源碼為:
private final boolean parkAndCheckInterrupt() {
//使得該線程阻塞
LockSupport.park(this);
return Thread.interrupted();
}
該方法的關(guān)鍵是會調(diào)用LookSupport.park()方法(關(guān)于LookSupport會在以后的文章進(jìn)行討論),該方法是用來阻塞當(dāng)前線程的。因此到這里就應(yīng)該清楚了,acquireQueued()在自旋過程中主要完成了兩件事情:
- 如果當(dāng)前節(jié)點的前驅(qū)節(jié)點是頭節(jié)點,并且能夠獲得同步狀態(tài)的話,當(dāng)前線程能夠獲得鎖該方法執(zhí)行結(jié)束退出;
- 獲取鎖失敗的話,先將節(jié)點狀態(tài)設(shè)置成SIGNAL,然后調(diào)用LookSupport.park方法使得當(dāng)前線程阻塞。
經(jīng)過上面的分析,獨占式鎖的獲取過程也就是acquire()方法的執(zhí)行流程如下圖所示:
3.2 獨占鎖的釋放(release()方法)
獨占鎖的釋放就相對來說比較容易理解了,廢話不多說先來看下源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
這段代碼邏輯就比較容易理解了,如果同步狀態(tài)釋放成功(tryRelease返回true)則會執(zhí)行if塊中的代碼,當(dāng)head指向的頭結(jié)點不為null,并且該節(jié)點的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()方法。unparkSuccessor方法源碼:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//頭節(jié)點的后繼節(jié)點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//后繼節(jié)點不為null時喚醒該線程
LockSupport.unpark(s.thread);
}
源碼的關(guān)鍵信息請看注釋,首先獲取頭節(jié)點的后繼節(jié)點,當(dāng)后繼節(jié)點的時候會調(diào)用LookSupport.unpark()方法,該方法會喚醒該節(jié)點的后繼節(jié)點所包裝的線程。因此,每一次鎖釋放后就會喚醒隊列中該節(jié)點的后繼節(jié)點所引用的線程,從而進(jìn)一步可以佐證獲得鎖的過程是一個FIFO(先進(jìn)先出)的過程。
到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學(xué)習(xí)源碼的方式非常深刻的學(xué)習(xí)到了獨占式鎖的獲取和釋放的過程以及同步隊列??梢宰鲆幌驴偨Y(jié):
- 線程獲取鎖失敗,線程被封裝成Node進(jìn)行入隊操作,核心方法在于addWaiter()和enq(),同時enq()完成對同步隊列的頭結(jié)點初始化工作以及CAS操作失敗的重試;
- 線程獲取鎖是一個自旋的過程,當(dāng)且僅當(dāng) 當(dāng)前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且成功獲得同步狀態(tài)時,節(jié)點出隊即該節(jié)點引用的線程獲得鎖,否則,當(dāng)不滿足條件時就會調(diào)用LookSupport.park()方法使得線程阻塞;
- 釋放鎖的時候會喚醒后繼節(jié)點;
總體來說:在獲取同步狀態(tài)時,AQS維護(hù)一個同步隊列,獲取同步狀態(tài)失敗的線程會加入到隊列中進(jìn)行自旋;移除隊列(或停止自旋)的條件是前驅(qū)節(jié)點是頭結(jié)點并且成功獲得了同步狀態(tài)。在釋放同步狀態(tài)時,同步器會調(diào)用unparkSuccessor()方法喚醒后繼節(jié)點。
獨占鎖特性學(xué)習(xí)
3.3 可中斷式獲取鎖(acquireInterruptibly方法)
我們知道lock相較于synchronized有一些更方便的特性,比如能響應(yīng)中斷以及超時等待等特性,現(xiàn)在我們依舊采用通過學(xué)習(xí)源碼的方式來看看能夠響應(yīng)中斷是怎么實現(xiàn)的。可響應(yīng)中斷式鎖可調(diào)用方法lock.lockInterruptibly();而該方法其底層會調(diào)用AQS的acquireInterruptibly方法,源碼為:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
//線程獲取鎖失敗
doAcquireInterruptibly(arg);
}
在獲取同步狀態(tài)失敗后就會調(diào)用doAcquireInterruptibly方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//將節(jié)點插入到同步隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//獲取鎖出隊
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//線程中斷拋異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
關(guān)鍵信息請看注釋,現(xiàn)在看這段代碼就很輕松了吧:),與acquire方法邏輯幾乎一致,唯一的區(qū)別是當(dāng)parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷,代碼拋出被中斷異常。
3.4 超時等待式獲取鎖(tryAcquireNanos()方法)
通過調(diào)用lock.tryLock(timeout,TimeUnit)方式達(dá)到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:
- 在超時時間內(nèi),當(dāng)前線程成功獲取了鎖;
- 當(dāng)前線程在超時時間內(nèi)被中斷;
- 超時時間結(jié)束,仍未獲得鎖返回false。
我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學(xué)習(xí)底層具體是怎么實現(xiàn)的,該方法會調(diào)用AQS的方法tryAcquireNanos(),源碼為:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
//實現(xiàn)超時等待的效果
doAcquireNanos(arg, nanosTimeout);
}
很顯然這段源碼最終是靠doAcquireNanos方法實現(xiàn)超時等待的效果,該方法源碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1\. 根據(jù)超時時間和當(dāng)前時間計算出截止時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//2\. 當(dāng)前線程獲得鎖出隊列
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 3.1 重新計算超時時間
nanosTimeout = deadline - System.nanoTime();
// 3.2 已經(jīng)超時返回false
if (nanosTimeout <= 0L)
return false;
// 3.3 線程阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 3.4 線程被中斷拋出被中斷異常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
程序邏輯如圖所示:
程序邏輯同獨占鎖可響應(yīng)中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對超時時間的處理上,在第1步會先計算出按照現(xiàn)在時間和超時時間計算出理論上的截止時間,比如當(dāng)前時間是8h10min,超時時間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout計算出剛好達(dá)到超時時間時的系統(tǒng)時間就是8h 10min+10min = 8h 20min。然后根據(jù)deadline - System.nanoTime()就可以判斷是否已經(jīng)超時了,比如,當(dāng)前系統(tǒng)時間是8h 30min很明顯已經(jīng)超過了理論上的系統(tǒng)時間8h 20min,deadline - System.nanoTime()計算出來就是一個負(fù)數(shù),自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos使得當(dāng)前線程阻塞,同時在3.4步增加了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。
4. 共享鎖
4.1 執(zhí)行過程概述
獲取鎖的過程:
- 當(dāng)線程調(diào)用
acquireShared()申請獲取鎖資源時,如果成功,則進(jìn)入臨界區(qū)。 - 當(dāng)獲取鎖失敗時,則創(chuàng)建一個共享類型的節(jié)點并進(jìn)入一個
FIFO等待隊列,然后被掛起等待喚醒。 - 當(dāng)隊列中的等待線程被喚醒以后就重新嘗試獲取鎖資源,如果成功則喚醒后面還在等待的共享節(jié)點并把該喚醒事件傳遞下去,即會依次喚醒在該節(jié)點后面的所有共享節(jié)點,然后進(jìn)入臨界區(qū),否則繼續(xù)掛起等待。
釋放鎖過程:
- 當(dāng)線程調(diào)用
releaseShared()進(jìn)行鎖資源釋放時,如果釋放成功,則喚醒隊列中等待的節(jié)點,如果有的話。
4.2 源碼深入分析
基于上面所說的共享鎖執(zhí)行流程,我們接下來看下源碼實現(xiàn)邏輯:
首先來看下獲取鎖的方法acquireShared(),如下
public final void acquireShared(int arg) {
//嘗試獲取共享鎖,返回值小于0表示獲取失敗
if (tryAcquireShared(arg) < 0)
//執(zhí)行獲取鎖失敗以后的方法
doAcquireShared(arg);
}
這里tryAcquireShared()方法是留給用戶去實現(xiàn)具體的獲取鎖邏輯的。關(guān)于該方法的實現(xiàn)有兩點需要特別說明:
該方法必須自己檢查當(dāng)前上下文是否支持獲取共享鎖,如果支持再進(jìn)行獲取。
該方法返回值是個重點。其一、由上面的源碼片段可以看出返回值小于0表示獲取鎖失敗,需要進(jìn)入等待隊列。其二、如果返回值等于0表示當(dāng)前線程獲取共享鎖成功,但它后續(xù)的線程是無法繼續(xù)獲取的,也就是不需要把它后面等待的節(jié)點喚醒。最后、如果返回值大于0,表示當(dāng)前線程獲取共享鎖成功且它后續(xù)等待的節(jié)點也有可能繼續(xù)獲取共享鎖成功,也就是說此時需要把后續(xù)節(jié)點喚醒讓它們?nèi)L試獲取共享鎖。
有了上面的約定,我們再來看下doAcquireShared方法的實現(xiàn):
//參數(shù)不多說,就是傳給acquireShared()的參數(shù)
private void doAcquireShared(int arg) {
//添加等待節(jié)點的方法跟獨占鎖一樣,唯一區(qū)別就是節(jié)點類型變?yōu)榱斯蚕硇?,不再贅? final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//表示前面的節(jié)點已經(jīng)獲取到鎖,自己會嘗試獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
//注意上面說的, 等于0表示不用喚醒后繼節(jié)點,大于0需要
if (r >= 0) {
//這里是重點,獲取到鎖以后的喚醒操作,后面詳細(xì)說
setHeadAndPropagate(node, r);
p.next = null;
//如果是因為中斷醒來則設(shè)置中斷標(biāo)記位
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//掛起邏輯跟獨占鎖一樣,不再贅述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//獲取失敗的取消邏輯跟獨占鎖一樣,不再贅述
if (failed)
cancelAcquire(node);
}
}
獨占鎖模式獲取成功以后設(shè)置頭結(jié)點然后返回中斷狀態(tài),結(jié)束流程。而共享鎖模式獲取成功以后,調(diào)用了setHeadAndPropagate方法,從方法名就可以看出除了設(shè)置新的頭結(jié)點以外還有一個傳遞動作,一起看下代碼:
//兩個入?yún)?,一個是當(dāng)前成功獲取共享鎖的節(jié)點,一個就是tryAcquireShared方法的返回值,注意上面說的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //記錄當(dāng)前頭節(jié)點
//設(shè)置新的頭節(jié)點,即把當(dāng)前獲取到鎖的節(jié)點設(shè)置為頭節(jié)點
//注:這里是獲取到鎖之后的操作,不需要并發(fā)控制
setHead(node);
//這里意思有兩種情況是需要執(zhí)行喚醒操作
//1.propagate > 0 表示調(diào)用方指明了后繼節(jié)點需要被喚醒
//2.頭節(jié)點后面的節(jié)點需要被喚醒(waitStatus<0),不論是老的頭結(jié)點還是新的頭結(jié)點
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果當(dāng)前節(jié)點的后繼節(jié)點是共享類型或者沒有后繼節(jié)點,則進(jìn)行喚醒
//這里可以理解為除非明確指明不需要喚醒(后繼等待節(jié)點是獨占類型),否則都要喚醒
if (s == null || s.isShared())
//后面詳細(xì)說
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
最終的喚醒操作也很復(fù)雜,專門拿出來分析一下:
注:這個喚醒操作在releaseShare()方法里也會調(diào)用。
private void doReleaseShared() {
for (;;) {
//喚醒操作由頭結(jié)點開始,注意這里的頭節(jié)點已經(jīng)是上面新設(shè)置的頭結(jié)點了
//其實就是喚醒上面新獲取到共享鎖的節(jié)點的后繼節(jié)點
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后繼節(jié)點需要被喚醒
if (ws == Node.SIGNAL) {
//這里需要控制并發(fā),因為入口有setHeadAndPropagate跟release兩個,避免兩次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//執(zhí)行喚醒操作
unparkSuccessor(h);
}
//如果后繼節(jié)點暫時不需要喚醒,則把當(dāng)前節(jié)點狀態(tài)設(shè)置為PROPAGATE確保以后可以傳遞下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果頭結(jié)點沒有發(fā)生變化,表示設(shè)置完成,退出循環(huán)
//如果頭結(jié)點發(fā)生變化,比如說其他線程獲取到了鎖,為了使自己的喚醒動作可以傳遞,必須進(jìn)行重試
if (h == head)
break;
}
}
接下來看下釋放共享鎖的過程:
public final boolean releaseShared(int arg) {
//嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
//喚醒過程,詳情見上面分析
doReleaseShared();
return true;
}
return false;
}
注:上面的setHeadAndPropagate()方法表示等待隊列中的線程成功獲取到共享鎖,這時候它需要喚醒它后面的共享節(jié)點(如果有),但是當(dāng)通過releaseShared()方法去釋放一個共享鎖的時候,接下來等待獨占鎖跟共享鎖的線程都可以被喚醒進(jìn)行嘗試獲取。
4.3 總結(jié)
跟獨占鎖相比,共享鎖的主要特征在于當(dāng)一個在等待隊列中的共享節(jié)點成功獲取到鎖以后(它獲取到的是共享鎖),既然是共享,那它必須要依次喚醒后面所有可以跟它一起共享當(dāng)前鎖資源的節(jié)點,毫無疑問,這些節(jié)點必須也是在等待共享鎖(這是大前提,如果等待的是獨占鎖,那前面已經(jīng)有一個共享節(jié)點獲取鎖了,它肯定是獲取不到的)。當(dāng)共享鎖被釋放的時候,可以用讀寫鎖為例進(jìn)行思考,當(dāng)一個讀鎖被釋放,此時不論是讀鎖還是寫鎖都是可以競爭資源的。
5. 總結(jié):
如果獲取共享鎖失敗后,將請求共享鎖的線程封裝成Node對象放入AQS的隊列中,并掛起Node對象對應(yīng)的線程,實現(xiàn)請求鎖線程的等待操作。待共享鎖可以被獲取后,從頭節(jié)點開始,依次喚醒頭節(jié)點及其以后的所有共享類型的節(jié)點。實現(xiàn)共享狀態(tài)的傳播。這里有幾點值得注意:
1. 與AQS的獨占功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實現(xiàn)。
2. 與AQS的獨占功能不同,當(dāng)鎖被頭節(jié)點獲取后,獨占功能是只有頭節(jié)點獲取鎖,其余節(jié)點的線程繼續(xù)沉睡,等待鎖被釋放后,才會喚醒下一個節(jié)點的線程,而共享功能是只要頭節(jié)點獲取鎖成功,就在喚醒自身節(jié)點對應(yīng)的線程的同時,繼續(xù)喚醒AQS隊列中的下一個節(jié)點的線程,每個節(jié)點在喚醒自身的同時還會喚醒下一個節(jié)點對應(yīng)的線程,以實現(xiàn)共享狀態(tài)的“向后傳播”,從而實現(xiàn)共享功能。
以上的分析都是從AQS子類的角度去看待AQS的部分功能的,而如果直接看待AQS,或許可以這么去解讀:
首先,AQS并不關(guān)心“是什么鎖”,對于AQS來說它只是實現(xiàn)了一系列的用于判斷“資源”是否可以訪問的API,并且封裝了在“訪問資源”受限時將請求訪問的線程的加入隊列、掛起、喚醒等操作,AQS只關(guān)心“資源不可以訪問時,怎么處理?”、“資源是可以被同時訪問,還是在同一時間只能被一個線程訪問?”、“如果有線程等不及資源了,怎么從AQS的隊列中退出?”等一系列圍繞資源訪問的問題,而至于“資源是否可以被訪問?”這個問題則交給AQS的子類去實現(xiàn)。
當(dāng)AQS的子類是實現(xiàn)獨占功能時,例如ReentrantLock,“資源是否可以被訪問”被定義為只要AQS的state變量不為0,并且持有鎖的線程不是當(dāng)前線程,則代表資源不能訪問。
當(dāng)AQS的子類是實現(xiàn)共享功能時,例如:CountDownLatch,“資源是否可以被訪問”被定義為只要AQS的state變量不為0,說明資源不能訪問。這是典型的將規(guī)則和操作分開的設(shè)計思路:規(guī)則子類定義,操作邏輯因為具有公用性,放在父類中去封裝。當(dāng)然,正式因為AQS只是關(guān)心“資源在什么條件下可被訪問”,所以子類還可以同時使用AQS的共享功能和獨占功能的API以實現(xiàn)更為復(fù)雜的功能。
比如:ReentrantReadWriteLock,我們知道ReentrantReadWriteLock的中也有一個叫Sync的內(nèi)部類繼承了AQS,而AQS的隊列可以同時存放共享鎖和獨占鎖,對于ReentrantReadWriteLock來說分別代表讀鎖和寫鎖,當(dāng)隊列中的頭節(jié)點為讀鎖時,代表讀操作可以執(zhí)行,而寫操作不能執(zhí)行,因此請求寫操作的線程會被掛起,當(dāng)讀操作依次推出后,寫鎖成為頭節(jié)點,請求寫操作的線程被喚醒,可以執(zhí)行寫操作,而此時的讀請求將被封裝成Node放入AQS的隊列中。如此往復(fù),實現(xiàn)讀寫鎖的讀寫交替進(jìn)行。
參考文獻(xiàn)
《java并發(fā)編程的藝術(shù)》