Condition接口作為wait/notify的替代品來(lái)實(shí)現(xiàn)等待/通知,很好的解決了過(guò)早喚醒的問(wèn)題,并且boolean await(long time, TimeUnit unit)解決了public final native void wait(long timeout)方法不能斷定是超時(shí)返回還是被喚醒的問(wèn)題。
方法摘要
public interface Condition {
//使當(dāng)前線程進(jìn)入等待直到被喚醒或者中斷
//調(diào)用await方法會(huì)釋放lock鎖,并且當(dāng)前線程進(jìn)入WAITING狀態(tài),直到
//1. 別的線程調(diào)用了signal并且當(dāng)前線程被選擇喚醒
//2. 別的線程調(diào)用了signalAll方法
//3. 別的線程中斷該等待線程,并且當(dāng)前線程能夠響應(yīng)線程中斷
//4. 發(fā)生虛假喚醒(所以await與Object.wait一樣需要放在while循環(huán))
//并且被喚醒之后在await方法返回前,當(dāng)前線程需要再次申請(qǐng)鎖
void await() throws InterruptedException;
//大體與await方法類似,只是不會(huì)被中斷
//所以進(jìn)入WAITING狀態(tài)后只有1,2,4會(huì)導(dǎo)致awaitUninterruptibly()結(jié)束
void awaitUninterruptibly();
//大體與await方法類似,只是等待會(huì)超時(shí)
//所以進(jìn)入WAITING狀態(tài)后除了1,2,3,4之外,還有
//5. 規(guī)定的等待時(shí)間結(jié)束
long awaitNanos(long nanosTimeout) throws InterruptedException;
//類似于awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
//同樣是有過(guò)期時(shí)間的等待,只是這個(gè)是設(shè)置了固定的deadline
boolean awaitUntil(Date deadline) throws InterruptedException;
//喚醒一個(gè)等待線程
//喚醒跟當(dāng)前Condition相關(guān)的眾多等待線程中的一個(gè),
//且被喚醒的線程在await方法結(jié)束前需要重新申請(qǐng)lock
void signal();
//喚醒所有的等待線程,每個(gè)線程都會(huì)去申請(qǐng)鎖
void signalAll();
}
Condition.await/signal的執(zhí)行線程需要持有創(chuàng)建相應(yīng)Condition變量的Lock顯式鎖,對(duì)保護(hù)條件的判斷以及Condition.await的調(diào)用也要放在循環(huán)語(yǔ)句中,并且該循環(huán)語(yǔ)句與目標(biāo)動(dòng)作的執(zhí)行都在一個(gè)顯式鎖引導(dǎo)的臨界區(qū)內(nèi)。這都是為了避免虛假喚醒,信號(hào)丟失等問(wèn)題。
Condition.wait與Object.wait一樣,使得當(dāng)前線程進(jìn)入等待并且釋放了相應(yīng)的lock,并且此時(shí)await()的調(diào)用并未返回。
被喚醒的線程繼續(xù)運(yùn)行的時(shí)候需要再次申請(qǐng)相應(yīng)的Lock,再次獲得顯式鎖成功之后,Condition.await方法才會(huì)返回。
代碼示例
class AwaitClass{
private static final Lock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();
public void awaitMethod(){
lock.lock();
try{
while(保護(hù)條件不成立){
condition.await();
}
//在臨界區(qū)之內(nèi)執(zhí)行目標(biāo)動(dòng)作
doActionMethod();
}finally {
lock.unlock();
}
}
public void signalMethod(){
lock.lock();
try{
//更新共享變量使得保護(hù)條件成立
changeSharedState();
//喚醒等待線程,盡量在臨界區(qū)最邊上
condition.signal();
}finally {
lock.unlock();
}
}
}
解決過(guò)早喚醒的問(wèn)題

代碼里我們需要注意的是保護(hù)條件與Condition之間的關(guān)系,讓使用不同的保護(hù)條件等待線程調(diào)用不同的Condition.await()方法來(lái)實(shí)現(xiàn)其等待;并讓通知線程在更新了共享變量之后,僅調(diào)用涉及了這些共享變量的保護(hù)條件對(duì)應(yīng)的Condition.signal/signalAll()方法來(lái)實(shí)現(xiàn)通知。
Condition接口有一個(gè)具體實(shí)現(xiàn)ConditionObject是在AbstractQueuedSynchronizer里面。
ConditionObject里面有兩個(gè)Node類型的變量
private transient Node firstWaiter;
private transient Node lastWaiter;
而Node是AQS的一個(gè)靜態(tài)內(nèi)部類,Node是一種CLH同步隊(duì)列(它是一個(gè)自旋鎖spinlocks,能確保無(wú)饑餓性,提供先來(lái)先服務(wù)的公平性。同時(shí)它也是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請(qǐng)線程只在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。)
static final class Node {
//共享模式下的等待標(biāo)記
static final Node SHARED = new Node();
//獨(dú)占模式下的等待標(biāo)記
static final Node EXCLUSIVE = null;
//waitStatus值,標(biāo)識(shí)線程已經(jīng)被cancelled
static final int CANCELLED = 1;
//waitStatus值,標(biāo)識(shí)后繼Node的線程需要喚醒
static final int SIGNAL = -1;
//waitStatus值,標(biāo)識(shí)當(dāng)前Node的線程正在條件隊(duì)列里等待
static final int CONDITION = -2;
//waitStatus值,標(biāo)識(shí)下一個(gè)acquireShared應(yīng)該無(wú)條件傳播
static final int PROPAGATE = -3;
//SIGNAL:此節(jié)點(diǎn)的后繼節(jié)點(diǎn)已經(jīng)是(或者即將)被阻塞(通過(guò)park方法),
// 所以當(dāng)前節(jié)點(diǎn)在它要釋放鎖或者被取消Acquire的時(shí)候喚醒unpark它的后繼節(jié)點(diǎn),
// 為了避免競(jìng)爭(zhēng),申請(qǐng)鎖的方法必須先標(biāo)識(shí)他們需要signal,然后atomic acquire
//CANCELLED:一個(gè)節(jié)點(diǎn)由于timeout或者Interrupt而成為CANCELLED狀態(tài)。
// 這個(gè)狀態(tài)的節(jié)點(diǎn)永遠(yuǎn)不會(huì)變,特別是,一個(gè)線程在Cancelled的node里就再也不會(huì)阻塞。
//CONDITION:當(dāng)前節(jié)點(diǎn)正在條件等待隊(duì)列(condition queue),它不是用作同步節(jié)點(diǎn)(sync queue)
// 直到被transferred,這時(shí)候Status會(huì)被設(shè)置成0
//PROPAGATE:應(yīng)該將releaseShared傳播到其他節(jié)點(diǎn),僅僅在doReleaseShared里面對(duì)頭結(jié)點(diǎn)進(jìn)行設(shè)置
// 來(lái)確保傳播繼續(xù)進(jìn)行
//0:除了上面的集中Status
volatile int waitStatus;
/**
鏈接到當(dāng)前節(jié)點(diǎn)/線程所依賴的前繼節(jié)點(diǎn),用于檢查waitStatus。 在入隊(duì)期間分配,
并且(出于GC的考慮)僅在出隊(duì)時(shí)退出時(shí)設(shè)成null。
另外,取消前任節(jié)點(diǎn)的時(shí)候,我們會(huì)短路直到找到一個(gè)會(huì)一直存在的且不是cancelled的節(jié)點(diǎn),
因?yàn)轭^節(jié)點(diǎn)永遠(yuǎn)不會(huì)被取消:一個(gè)節(jié)點(diǎn)稱為頭結(jié)點(diǎn)只有成功acquire。
一個(gè)被取消的線程永遠(yuǎn)不會(huì)成功acquire,而且一個(gè)線程只能取消自身,而不取消其他任何節(jié)點(diǎn)。
*/
volatile Node prev;
/**
在當(dāng)前節(jié)點(diǎn)/線程釋放鎖被unpark,鏈接到當(dāng)前節(jié)點(diǎn)/線程的后繼節(jié)點(diǎn)。
入隊(duì)時(shí)分配,在繞過(guò)cancelled的那些前繼節(jié)點(diǎn)的時(shí)候調(diào)整,并且在出隊(duì)列的時(shí)候設(shè)置成null
*/
volatile Node next;
/**
*保存當(dāng)前入節(jié)點(diǎn)相關(guān)聯(lián)的線程
*/
volatile Thread thread;
/**
* 指向條件隊(duì)列里下一個(gè)等待的node,或者對(duì)于share mode會(huì)放一個(gè)特殊的值
*/
Node nextWaiter;
/**
* 如果是shared mode,則返回true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
代碼解析await()
1.If current thread is interrupted, throw InterruptedException.
2.Save lock state returned by getState.
3.Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
4.Block until signalled or interrupted.
5.Reacquire by invoking specialized version of acquire with saved state as argument.
6.If interrupted while blocked in step 4, throw InterruptedException.
Java doc上明確說(shuō)明await方法主要有6個(gè)步驟
public final void await() throws InterruptedException {
//先做Interrupt的檢查,如果當(dāng)前線程已經(jīng)被中斷,則拋出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
//添加一個(gè)新的Node(CONDITION狀態(tài))到condition queue
Node node = addConditionWaiter();
//釋放當(dāng)前線程所持有的鎖,不管重入多少次,并喚醒該鎖上的其他等待線程
int savedState = fullyRelease(node);
int interruptMode = 0;
//檢查當(dāng)前node是不是在sync queue,如果不是就會(huì)一直掛起當(dāng)前線程
while (!isOnSyncQueue(node)) {
//不在sync queue,則會(huì)掛起當(dāng)前線程,當(dāng)前await方法并沒(méi)有返回
LockSupport.park(this);
//這一步線程被喚醒之后,需要檢查當(dāng)前線程是不是被中斷,如果被中斷則跳出循環(huán)
// 且被中斷的node會(huì)嘗試進(jìn)入sync queue
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued開(kāi)始嘗試獲取鎖(只有前繼節(jié)點(diǎn)是head的情況下),
// 如果失敗則阻塞當(dāng)前線程,直到獲取成功
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
//acquireQueued是不響應(yīng)中斷的,這里會(huì)響應(yīng)中斷
reportInterruptAfterWait(interruptMode);
}
/**
* 添加一個(gè)新的waiter到等待隊(duì)列,設(shè)置ConditionObject的firstWaiter和lastWaiter
*/
private Node addConditionWaiter() {
//拿到等待隊(duì)列里的最后一個(gè)node
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//將condition queue里面cancelled的node移除
unlinkCancelledWaiters();
t = lastWaiter;
}
//新建一個(gè)CONDITION的node并添加到condition queue中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//釋放當(dāng)前線程所持有的鎖,并喚醒其他阻塞的線程,釋放鎖異常時(shí)cancel這個(gè)node
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//判斷這個(gè)node是不是轉(zhuǎn)移到sync queue了
final boolean isOnSyncQueue(Node node) {
//如果node的狀態(tài)時(shí)Condition或者node的前繼節(jié)點(diǎn)是空的,則表明不在sync queue
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果這個(gè)節(jié)點(diǎn)的后繼節(jié)點(diǎn)不為空,說(shuō)明有線程爭(zhēng)搶鎖失敗被阻塞,則當(dāng)前節(jié)點(diǎn)實(shí)在sync queue
if (node.next != null) // If has successor, it must be on queue
return true;
//如果上面條件都不滿足,就要從sync queue的最后一個(gè)開(kāi)始check,是不是當(dāng)前node在queue里面
return findNodeFromTail(node);
}
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* 檢查當(dāng)前線程是否被中斷,
* 如果重新入sync queue成功,則表示是喚醒之前被中斷,則返回THROW_IE(-1)
* 如果喚醒之后被中斷則返回REINTERRUPT(1)
* 如果沒(méi)有被中斷則返回0
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//在喚醒之后轉(zhuǎn)移node到sync queue
final boolean transferAfterCancelledWait(Node node) {
//CAS操作,如果當(dāng)前node的狀態(tài)時(shí)CONDITION,則設(shè)置成0,并且enq()到sync queue,返回true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
*上面CAS設(shè)值失敗,這種情況很少見(jiàn),所以這邊就自旋一直到這個(gè)node加入sync queue
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
//對(duì)于已經(jīng)在sync queue的線程開(kāi)始嘗試獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//拿到當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)
final Node p = node.predecessor();
//只有前繼節(jié)點(diǎn)是head,才開(kāi)始tryAcquire(這里就涉及Fair和NonFair)嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//嘗試獲取鎖成功,則設(shè)置head = node,設(shè)置node的prev=null,thread=null
setHead(node);
//將原來(lái)的head節(jié)點(diǎn)踢出sync queue
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果當(dāng)前節(jié)點(diǎn)不是head,或者嘗試獲取鎖失敗,就看前繼節(jié)點(diǎn)的狀態(tài)是不是SIGNAL
// 如果是SIGNAL,那么就要掛起當(dāng)前線程,等待unlock釋放鎖喚醒
// 如果不是SIGNAL,那么就一直循環(huán)直到變成head并且獲取鎖成功,
// 或者前繼節(jié)點(diǎn)是SIGNAL,park掛起當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
//如果shouldParkAfterFailedAcquire返回true,
// 則掛起當(dāng)前線程,等待head釋放喚醒或者被中斷喚醒
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//根據(jù)前繼節(jié)點(diǎn)的狀態(tài)判斷是不是要掛起當(dāng)前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 如果前繼節(jié)點(diǎn)已經(jīng)是signal,則node節(jié)點(diǎn)的線程可以阻塞,等待前繼節(jié)點(diǎn)釋放鎖并喚醒
*/
return true;
if (ws > 0) {
/*
* 如果前繼節(jié)點(diǎn)被cancelled,說(shuō)明前繼節(jié)點(diǎn)等待超時(shí)或者相應(yīng)中斷而取消了自己,
就需要繞過(guò)這些cancelled的節(jié)點(diǎn),找到waitStatus<=0的節(jié)點(diǎn),
并且繼續(xù)for循環(huán)嘗試獲取鎖,直到前繼節(jié)點(diǎn)是SIGNAL,然后park當(dāng)前線程
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 到了這說(shuō)明waitStatus要么是0要么是PROPAGARE,must be 0 or PROPAGATE,
CAS設(shè)置waitStatus為signal,并且繼續(xù)for循環(huán)嘗試獲取鎖,
直到前繼節(jié)點(diǎn)是SIGNAL,然后park當(dāng)前線程
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
await方法在釋放所持有的鎖之后就park掛起當(dāng)前線程,等待signal喚醒。
代碼解析signal()
public final void signal() {
//檢查當(dāng)前線程是不是獨(dú)占鎖的擁有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//拿到condition queue里面的第一個(gè)等待者
doSignal(first);
}
//自旋知道喚醒一個(gè)condition queue的node
private void doSignal(Node first) {
do {
//將firstWaiter往后移動(dòng),將nextWaiter設(shè)置成firstWaiter
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//將first提出等待隊(duì)列
first.nextWaiter = null;
//transferForSignal喚醒first,如果失敗則first去往下一個(gè)節(jié)點(diǎn)進(jìn)行循環(huán)
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//將node從condition queue轉(zhuǎn)移到sync queue
final boolean transferForSignal(Node node) {
/*
* 如果這里CAS更新node的waitStatus失敗,說(shuō)明node已經(jīng)不是Condition狀態(tài)
-- 參考await方法里checkInterruptWhileWaiting,
如果線程被中斷這會(huì)更新Condition并enq到sync queue
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 這邊Condition狀態(tài)更新成功之后,開(kāi)始enq壓入sync queue。
--參考await方法里while(!isOnSyncQueue(node)),那邊會(huì)一直循環(huán)(如果沒(méi)有中斷的話)
直到這邊enq到sync queue,while循環(huán)才會(huì)退出
另外enq(node)返回的是node的prev繼節(jié)點(diǎn)
*/
Node p = enq(node);//返回的p是node的前繼節(jié)點(diǎn)
int ws = p.waitStatus;
//如果前繼節(jié)點(diǎn)是cancelled或者設(shè)置前繼節(jié)點(diǎn)的狀態(tài)到Signal失敗,則喚醒當(dāng)前線程(提升性能)
// 參考await方法while(!isOnSyncQueue(node))退出之后進(jìn)入acquireQueued()方法
// acquireQueued里面如果嘗試獲取鎖失敗會(huì)調(diào)用shouldParkAfterFailedAcquire()
// 這里面會(huì)看node的前繼節(jié)點(diǎn)如果是SIGNAL,則會(huì)park當(dāng)前線程;
// 如果不是SIGNAL,且不是cancelled,會(huì)嘗試設(shè)置前繼節(jié)點(diǎn)為SIGNAL,
// 并且不會(huì)park當(dāng)前線程,而是繼續(xù)for循環(huán)嘗試獲取鎖,直到前繼節(jié)點(diǎn)是SIGNAL,然后park當(dāng)前線程
//所以這里如果設(shè)置前繼節(jié)點(diǎn)狀態(tài)SIGNAL失敗,就喚醒node的線程,讓它也進(jìn)入for循環(huán)開(kāi)始嘗試獲取鎖
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//為什么要喚醒當(dāng)前線程?
// 前繼節(jié)點(diǎn)不是SIGNAL,就需要喚醒當(dāng)前線程去嘗試獲取鎖(提升性能)
LockSupport.unpark(node.thread);
return true;
}
以上分析完Condition里面的await()和signal()的主要方法。
在signal之后,喚醒線程會(huì)進(jìn)行unlock,unlock方法會(huì)拿到sync queue的head,并喚醒head的后繼節(jié)點(diǎn)(因?yàn)閍wait方法里面acquireQueued()在前繼節(jié)點(diǎn)是head,并且獲取鎖成功后,會(huì)將當(dāng)前node設(shè)為head,并繼續(xù)執(zhí)行await()的后續(xù)代碼,所以u(píng)nlock喚醒的是head的后繼節(jié)點(diǎn))。
后續(xù)將分析基于AQS的ReentrantLock的實(shí)現(xiàn)。
在分析完AQS之后,我們?cè)倏纯磗ync queue和condition queue是怎么合作來(lái)完成lock, await, unlock, lock ,signal, unlock的。

我們來(lái)看lock-await這個(gè)過(guò)程,假設(shè)lock2-thread線程里面,會(huì)判斷共享?xiàng)l件不成立,然后調(diào)用await():
- 首先lock2-thread對(duì)應(yīng)的Node2節(jié)點(diǎn),在unlock被喚醒之后,會(huì)重新進(jìn)入acquireQueued的for循環(huán),判斷前繼節(jié)點(diǎn)Node1是不是head,現(xiàn)在是,那么就開(kāi)始tryAcquire嘗試獲取鎖,獲取鎖成功之后,將Node2設(shè)置為head節(jié)點(diǎn)。
- 獲取鎖并設(shè)置head節(jié)點(diǎn)之后,開(kāi)始進(jìn)入await方法,會(huì)新增一個(gè)Node2,waitStatus是CONDITION,然后釋放鎖,并park當(dāng)前線程,等待signal。
-
上一步釋放鎖之后,會(huì)喚醒lock3-thread來(lái)嘗試獲取鎖。假設(shè)有l(wèi)ock4-thead提供condition.signal方法, 因?yàn)槭欠枪芥i,lock4-thread和lock3-thread會(huì)爭(zhēng)搶鎖,如果lock4-thread爭(zhēng)搶成功,那么lock3-thread還是繼續(xù)在sync queue中等待喚醒;lock4-thread獲取到鎖之后會(huì)進(jìn)行signal,拿到condition queue中firstWaiter,即Node2,將它的狀態(tài)變成0,并加入sync queue的末端,并設(shè)置它的前繼節(jié)點(diǎn)狀態(tài)到SIGNAL,此時(shí)firstWaiter=lastWaiter=null;
lock-signal
