1. 前言
任意一個(gè)Java對(duì)象,都有一組監(jiān)視器方法(定義在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關(guān)鍵字配合,可以 實(shí)現(xiàn)等待/通知模式。Condition接口也提供了類似Object的監(jiān)視器方法,與Lock配合可以實(shí)現(xiàn)等 待/通知模式,但是這兩者在使用方式以及功能特性上還是有差別的。

2. Condition的基本使用
Condition 是一個(gè)多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個(gè)條件(condition),只有滿足條件時(shí),線程才會(huì)被喚醒。
2.1 ConditionWait
public class ConditionDemoWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2.2 ConditionSignal
public class ConditionDemoSignal implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoSignal(Lock lock, Condition condition){
this.lock=lock;
this.condition=condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end - ConditionDemoSignal");
} finally {
lock.unlock();
}
}
}
通過這個(gè)案例簡(jiǎn)單實(shí)現(xiàn)了 wait 和 notify 的功能,當(dāng)調(diào)用await 方法后,當(dāng)前線程會(huì)釋放鎖并等待,而其他線程調(diào)用condition 對(duì)象的 signal 或者 signalall 方法通知并被阻塞的線程,然后自己執(zhí)行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼續(xù)執(zhí)行,最后釋放鎖。所以,condition 中兩個(gè)最重要的方法,一個(gè)是 await,一個(gè)是 signal 方法。
await:把當(dāng)前線程阻塞掛起
signal:喚醒阻塞的線程
3. Condition源碼分析
Condition 與 Object 的 wait()/notify()作用是相似的:都是使得一個(gè)線程等待某個(gè)條件(Condition),只有當(dāng)該條件具備(signal 或者 signalAll 方法被調(diào)用)時(shí)等待線程才會(huì)被喚醒,從而重新爭(zhēng)奪鎖。
不同的是:Object 的 wait()/notify()由 JVM 底層的實(shí)現(xiàn),而 Condition 接口與實(shí)現(xiàn)類完全使用Java代碼實(shí)現(xiàn)。當(dāng)需要進(jìn)行線程間的通信時(shí),建議結(jié)合使用 ReetrantLock與Condition,通過 Condition 的 await()和 signal()方法進(jìn)行線程間的阻塞與喚醒。ConditionObject 類是實(shí)現(xiàn)條件隊(duì)列的關(guān)鍵,每個(gè) ConditionObject 對(duì)象都維護(hù)一個(gè)單獨(dú)的條件等待隊(duì)列。每個(gè) ConditionObject 對(duì)應(yīng)一個(gè)條件隊(duì)列,它記錄該隊(duì)列的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)。

一個(gè) Condition 對(duì)象是一個(gè)單條件的等待隊(duì)列,如圖所示:

在一個(gè)顯式鎖上,我們可以創(chuàng)建多個(gè)等待任務(wù)隊(duì)列,這點(diǎn)和內(nèi)置鎖不同,Java 內(nèi)置鎖上只有唯一的一個(gè)等待隊(duì)列。比如,我們可以使用 newCondition 創(chuàng)建兩個(gè)等待隊(duì)列,具體如下:
private Lock lock = new ReentrantLock();
//創(chuàng)建第一個(gè)等待隊(duì)列
private Condition firstCond = lock.newCondition();
//創(chuàng)建第二個(gè)等待隊(duì)列
private Condition secondCond = lock.newCondition();
Condition 條件隊(duì)列與 AQS 同步隊(duì)列的關(guān)系,如圖所示:

調(diào)用 Condition,需要獲得 Lock 鎖,所以意味著會(huì)存在一個(gè) AQS 同步隊(duì)列,在上面那個(gè)案例中,假如兩個(gè)線程同時(shí)運(yùn)行的話,那么 AQS 的隊(duì)列可能是下面這種情況:

那么這個(gè)時(shí)候 ThreadA 調(diào)用了 condition.await 方法,它做了什么事情呢?
3.1 condition.await
調(diào)用 Condition 的 await()方法(或者以 await 開頭的方法),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)。當(dāng)從 await()方法返回時(shí),當(dāng)前線程一定獲取了Condition 相關(guān)聯(lián)的鎖。
public final void await() throws InterruptedException {
//表示 await 允許被中斷
if (Thread.interrupted())
throw new InterruptedException();
//創(chuàng)建一個(gè)新的節(jié)點(diǎn),節(jié)點(diǎn)狀態(tài)為 condition,采用的數(shù)據(jù)結(jié)構(gòu)仍然是鏈表
Node node = addConditionWaiter();
//釋放當(dāng)前的鎖,得到鎖的狀態(tài),并喚醒 AQS 隊(duì)列中的一個(gè)線程
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果當(dāng)前節(jié)點(diǎn)沒有在同步隊(duì)列上,即還沒有被 signal,則將當(dāng)前線程阻塞
//判斷這個(gè)節(jié)點(diǎn)是否在 AQS 隊(duì)列上,第一次判斷的是 false,因?yàn)榍懊嬉呀?jīng)釋放鎖了
while (!isOnSyncQueue(node)) {
//通過 park 掛起當(dāng)前線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 當(dāng)這個(gè)線程醒來,會(huì)嘗試拿鎖, 當(dāng) acquireQueued返回 false 就是拿到鎖了.
// interruptMode != THROW_IE -> 表示這個(gè)線程沒有成功將 node 入隊(duì),但 signal 執(zhí)行了 enq 方法讓其入隊(duì)了.
// 將這個(gè)變量設(shè)置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一個(gè)等待者不是 null, 則進(jìn)行清理,清理 Condition 隊(duì)列上的節(jié)點(diǎn).
// 如果是 null ,就沒有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果線程被中斷了,需要拋出異常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
3.1.1 addConditionWaiter
這個(gè)方法的主要作用是把當(dāng)前線程封裝成 Node,添加到等待隊(duì)列。這里的隊(duì)列不再是雙向鏈表,而是單向鏈表。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果lastWaiter不等于空并且waitStatus 不等于 CONDITION 時(shí),把這個(gè)節(jié)點(diǎn)從鏈表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//構(gòu)建一個(gè) Node,waitStatus=CONDITION。這里的鏈表是一個(gè)單向的,所以相比 AQS 來說會(huì)簡(jiǎn)單很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
3.1.2 圖解分析
執(zhí)行完 addConditionWaiter 這個(gè)方法之后,就會(huì)產(chǎn)生一個(gè)這樣的 condition 隊(duì)列

3.1.3 fullyRelease
fullRelease,就是徹底的釋放鎖,什么叫徹底呢,就是如果當(dāng)前鎖存在多次重入,那么在這個(gè)方法中只需要釋放一次就會(huì)把所有的重入次數(shù)歸零。
final int fullyRelease(Node node) {
boolean failed = true;
try {
//獲得重入的次數(shù)
int savedState = getState();
// 釋放鎖并且喚醒下一個(gè)同步隊(duì)列中的線程
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
3.1.4 圖解分析
此時(shí),同步隊(duì)列會(huì)觸發(fā)鎖的釋放和重新競(jìng)爭(zhēng)。ThreadB 獲得了鎖。

3.1.5 isOnSyncQueue
判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中,返回 false 表示不在,返回 true 表示在,如果不在 AQS 同步隊(duì)列,說明當(dāng)前節(jié)點(diǎn)沒有喚醒去爭(zhēng)搶同步鎖,所以需要把當(dāng)前線程阻塞起來,直到其他的線程調(diào)用 signal 喚醒,如果在 AQS 同步隊(duì)列,意味著它需要去競(jìng)爭(zhēng)同步鎖去獲得執(zhí)行程序執(zhí)行權(quán)限。
為什么要做這個(gè)判斷呢?
原因是在 condition 隊(duì)列中的節(jié)點(diǎn)會(huì)重新加入到 AQS 隊(duì)列去競(jìng)爭(zhēng)鎖。也就是當(dāng)調(diào)用 signal的時(shí)候,會(huì)把當(dāng)前節(jié)點(diǎn)從 condition 隊(duì)列轉(zhuǎn)移到 AQS 隊(duì)列。
如何去判斷ThreadA 這個(gè)節(jié)點(diǎn)是否存在于 AQS 隊(duì)列中呢?
- 如果 ThreadA 的 waitStatus 的狀態(tài)為 CONDITION,說明它存在于 condition 隊(duì)列中,不在 AQS 隊(duì)列。因?yàn)锳QS 隊(duì)列的狀態(tài)一定不可能有 CONDITION
- 如果 node.prev 為空,說明也不存在于 AQS 隊(duì)列,原因是 prev=null 在 AQS 隊(duì)列中只有一種可能性,就是它是head 節(jié)點(diǎn),head 節(jié)點(diǎn)意味著它是獲得鎖的節(jié)點(diǎn)。
- 如果 node.next 不等于空,說明一定存在于 AQS 隊(duì)列中,因?yàn)橹挥?AQS 隊(duì)列才會(huì)存在 next 和 prev 的關(guān)系
- findNodeFromTail,表示從 tail 節(jié)點(diǎn)往前掃描 AQS 隊(duì)列,一旦發(fā)現(xiàn) AQS 隊(duì)列的節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)相等,說明節(jié)點(diǎn)一定存在于 AQS 隊(duì)列中。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
3.2 Condition.signal
await 方法會(huì)阻塞 ThreadA,然后 ThreadB 搶占到了鎖獲得了執(zhí)行權(quán)限,這個(gè)時(shí)候在 ThreadB 中調(diào)用了 Condition的 signal()方法,將會(huì)喚醒在等待隊(duì)列中節(jié)點(diǎn)。
public final void signal() {
//先判斷當(dāng)前線程是否獲得了鎖,這個(gè)判斷比較簡(jiǎn)單,直接用獲得鎖的線程和當(dāng)前線程相比即可
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到 Condition隊(duì)列上第一個(gè)節(jié)點(diǎn)
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
3.2.1 Condition.doSignal
對(duì) condition 隊(duì)列中從首部開始的第一個(gè) condition 狀態(tài)的節(jié)點(diǎn),執(zhí)行 transferForSignal 操作,將 node 從 condition隊(duì)列中轉(zhuǎn)換到 AQS 隊(duì)列中,同時(shí)修改 AQS 隊(duì)列中原先尾節(jié)點(diǎn)的狀態(tài)
private void doSignal(Node first) {
do {
//從 Condition 隊(duì)列中刪除 first 節(jié)點(diǎn)
if ( (firstWaiter = first.nextWaiter) == null)
// 將 next 節(jié)點(diǎn)設(shè)置成 null
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
3.2.2 AQS.transferForSignal
該方法先是 CAS 修改了節(jié)點(diǎn)狀態(tài),如果成功,就將這個(gè)節(jié)點(diǎn)放到 AQS 隊(duì)列中,然后喚醒這個(gè)節(jié)點(diǎn)上的線程。此時(shí),那個(gè)節(jié)點(diǎn)就會(huì)在 await 方法中蘇醒
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//更新節(jié)點(diǎn)的狀態(tài)為 0,如果更新失敗,只有一種可能就是節(jié)點(diǎn)被 CANCELLED 了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//調(diào)用 enq,把當(dāng)前節(jié)點(diǎn)添加到AQS 隊(duì)列。并且返回返回按當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn),也就是原tail 節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一個(gè)節(jié)點(diǎn)的狀態(tài)被取消了, 或者嘗試設(shè)置上一個(gè)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 失敗了
// (SIGNAL表示: 他的 next節(jié)點(diǎn)需要停止阻塞),
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒節(jié)點(diǎn)上的線程.
LockSupport.unpark(node.thread);
//如果 node 的 prev 節(jié)點(diǎn)已經(jīng)是signal 狀態(tài),那么被阻塞的 ThreadA 的喚醒工作由 AQS 隊(duì)列來完成
return true;
}
3.2.3 圖解分析
執(zhí)行完 doSignal 以后,會(huì)把 condition 隊(duì)列中的節(jié)點(diǎn)轉(zhuǎn)移到 aqs 隊(duì)列上,邏輯結(jié)構(gòu)圖如下

這個(gè)時(shí)候會(huì)判斷 ThreadA 的 prev 節(jié)點(diǎn)也就是 head 節(jié)點(diǎn)的 waitStatus,如果大于 0 或者設(shè)置 SIGNAL 失敗,表示節(jié)點(diǎn)被設(shè)置成了 CANCELLED 狀態(tài)。這個(gè)時(shí)候會(huì)喚醒ThreadA 這個(gè)線程。否則就基于 AQS 隊(duì)列的機(jī)制來喚醒,也就是等到 ThreadB 釋放鎖之后來喚醒 Thread。
3.3 被阻塞的線程喚醒后的邏輯
前面在分析 await 方法時(shí),線程會(huì)被阻塞。而通過 signal被喚醒之后又繼續(xù)回到上次執(zhí)行的邏輯中標(biāo)注為紅色部分的代碼
checkInterruptWhileWaiting 這個(gè)方法是干嘛呢?其實(shí)從名字就可以看出來,就是 ThreadA 在 condition 隊(duì)列被阻塞的過程中,有沒有被其他線程觸發(fā)過中斷請(qǐng)求。
3.3.1 checkInterruptWhileWaiting
如果當(dāng)前線程被中斷,則調(diào)用transferAfterCancelledWait 方法判斷后續(xù)的處理應(yīng)該是拋出 InterruptedException 還是重新中斷。
如果第一次 CAS 失敗了,則不能判斷當(dāng)前線程是先進(jìn)行了中斷還是先進(jìn)行了 signal 方法的調(diào)用,可能是先執(zhí)行了 signal 然后中斷,也可能是先執(zhí)行了中斷,后執(zhí)行了 signal,當(dāng)然,這兩個(gè)操作肯定是發(fā)生在 CAS 之前。這時(shí)需要做的就是等待當(dāng)前線程的 node被添加到 AQS 隊(duì)列后,也就是 enq 方法返回后,返回false 告訴 checkInterruptWhileWaiting 方法返回REINTERRUPT(1),后續(xù)進(jìn)行重新中斷。
簡(jiǎn)單來說,該方法的返回值代表當(dāng)前線程是否在 park 的時(shí)候被中斷喚醒,如果為 true 表示中斷在 signal 調(diào)用之前,signal 還未執(zhí)行,那么這個(gè)時(shí)候會(huì)根據(jù) await 的語義,在 await 時(shí)遇到中斷需要拋出interruptedException,返回 true 就是告訴checkInterruptWhileWaiting 返回 THROW_IE(-1)。如果返回 false,否則表示 signal 已經(jīng)執(zhí)行過了,只需要重新響應(yīng)中斷即可。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 使用 cas 修改節(jié)點(diǎn)狀態(tài),如果還能修改成功,說明線程被中斷時(shí),signal 還沒有被調(diào)用。
// 線程被喚醒,并不一定是在 java 層面執(zhí)行了locksupport.unpark,
// 也可能是調(diào)用了線程的 interrupt()方法,這個(gè)方法會(huì)更新一個(gè)中斷標(biāo)識(shí),并且會(huì)喚醒處于阻塞狀態(tài)下的線程
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// //如果 cas 成功,則把node 添加到 AQS 隊(duì)列
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 如果 cas 失敗,則判斷當(dāng)前 node 是否已經(jīng)在 AQS 隊(duì)列上,如果不在,則讓給其他線程執(zhí)行
// 當(dāng) node 被觸發(fā)了 signal 方法時(shí),node 就會(huì)被加到 aqs 隊(duì)列上
// 循環(huán)檢測(cè) node 是否已經(jīng)成功添加到 AQS 隊(duì)列中。如果沒有,則通過 yield,
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
3.3.2 acquireQueued
當(dāng)前被喚醒的節(jié)點(diǎn)ThreadA 去搶占同步鎖,并且要恢復(fù)到原本的重入次數(shù)狀態(tài),調(diào)用完這個(gè)方法之后,AQS 隊(duì)列的狀態(tài)如下:將 head 節(jié)點(diǎn)的 waitStatus 設(shè)置為-1,Signal 狀態(tài)。

3.3.3 reportInterruptAfterWait
根據(jù) checkInterruptWhileWaiting 方法返回的中斷標(biāo)識(shí)來進(jìn)行中斷上報(bào)。如果是 THROW_IE,則拋出中斷異常如果是 REINTERRUPT,則重新響應(yīng)中斷
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
4 Condition總結(jié)
await 和 signal 的總結(jié)
線程 awaitThread 先通過 lock.lock()方法獲取鎖成功后調(diào)用了 condition.await 方法進(jìn)入等待隊(duì)列,而另一個(gè)線程 signalThread 通過 lock.lock()方法獲取鎖成功后調(diào)用了 condition.signal 或者 signalAll 方法,使得線程awaitThread 能夠有機(jī)會(huì)移入到同步隊(duì)列中,當(dāng)其他線程釋放 lock 后使得線程 awaitThread 能夠有機(jī)會(huì)獲取lock,從而使得線程 awaitThread 能夠從 await 方法中退出執(zhí)行后續(xù)操作。如果 awaitThread 獲取 lock 失敗會(huì)直接進(jìn)入到同步隊(duì)列。

阻塞:await()方法中,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在 AQS 等待隊(duì)列,則阻塞當(dāng)前線程,如果在等待隊(duì)列,則自旋等待嘗試獲取鎖
釋放:signal()后,節(jié)點(diǎn)會(huì)從 condition 隊(duì)列移動(dòng)到 AQS等待隊(duì)列,則進(jìn)入正常鎖的獲取流程