- 任意一個(gè)Java對(duì)象,都擁有一組監(jiān)視器方法(定義在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關(guān)鍵字配合,可以實(shí)現(xiàn)等待/通知模式。
- Condition接口也提供了類似Object的監(jiān)視器方法,與Lock配合可以實(shí)現(xiàn)等待/通知模式
1.Condition接口
Condition接口意義在于提供一種針對(duì)使用條件狀態(tài)(隊(duì)列或變量)的線程進(jìn)行阻塞和喚醒的機(jī)制 :
阻塞: 阻塞線程進(jìn)入等待狀態(tài)同時(shí)釋放關(guān)聯(lián)鎖,直到被其他線程將條件狀態(tài)設(shè)置為true為止
喚醒: 喚醒等待在條件隊(duì)列中的線程,同時(shí)提供對(duì)喚醒全部的支持
鎖關(guān)聯(lián): 值得注意的是,由于多線程下該狀態(tài)是共享的,因此通常會(huì)通過于鎖關(guān)聯(lián)保證其原子性
解除等待: 當(dāng)線程解除等待狀態(tài)(被喚醒、中斷、超時(shí)等)后仍需重新競爭鎖,獲取鎖后才能夠從暫停位置開始繼續(xù)往后執(zhí)行
/*
* Conditions (also known as `condition queues` or `condition variables`)
* provide a means for one thread to suspend execution to wait
* until notified by another thread that some state condition may now be true.
*
* Conditions(如條件隊(duì)列或條件變量)的作用如下:
* 暫停線程并進(jìn)入等待狀態(tài),直到被其他線程將條件狀態(tài)設(shè)置為true為止
* 值得注意的是:由于多線程下該狀態(tài)是共享的,因此通常會(huì)通過于鎖關(guān)聯(lián)保證其原子性
* 如Lock#newCondition newCondition()方法
*/
public interface Condition {
/**
* Causes the current thread to wait until it is signalled or {Thread#interrupt interrupted}
* 線程等待直到被喚醒或被中斷
* 阻塞方法的實(shí)現(xiàn)有幾個(gè)準(zhǔn)則:
* 1.當(dāng)前線程持有的鎖(關(guān)聯(lián)該條件變量)將被釋放
* 2.當(dāng)線程解除等待狀態(tài)后仍需重新競爭鎖,獲取鎖后才能夠從當(dāng)前位置開始繼續(xù)往后執(zhí)行
*/
void await() throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled.
* 線程等待直到被喚醒
*/
void awaitUninterruptibly();
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified waiting time elapses.
* 線程等待直到被喚醒或被中斷或超時(shí)
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified waiting time elapses. This method is behaviorally
* equivalent to: {@code awaitNanos(unit.toNanos(time)) > 0}
* 線程等待直到被喚醒或被中斷或超時(shí)
* 該方法等同于 awaitNanos(unit.toNanos(time)) > 0
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified deadline elapses.
* 線程等待直到被喚醒或被中斷或超時(shí)
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* Wakes up one waiting thread.
* 喚醒在條件隊(duì)列中等待的一個(gè)線程
* 注意:線程被喚醒后仍需重新競爭鎖,獲取鎖后才能夠從當(dāng)前位置開始繼續(xù)往后執(zhí)行
*/
void signal();
/**
* Wakes up all waiting threads.
* 喚醒在條件隊(duì)列中等待的所有線程
*/
void signalAll();
}
2.ConditionObject綜述
ConditionObject包括如下方面:
定位: ConditionObject是AQS的內(nèi)部類,其通過實(shí)現(xiàn)Condition接口提供對(duì)管程形式的條件的支持
作用: ConditionObject在AQS中作為對(duì)Lock的實(shí)現(xiàn)支持
使用: 當(dāng)條件不滿足時(shí)線程入隊(duì),當(dāng)條件滿足時(shí)出隊(duì)并重新嘗試獲取鎖,獲取成功后從暫停位置開始繼續(xù)往后執(zhí)行
3.ConditionObject實(shí)現(xiàn)原理
ConditionObject的實(shí)現(xiàn)有幾個(gè)注意事項(xiàng):
1.內(nèi)部會(huì)維護(hù)一個(gè)條件等待同步隊(duì)列,根據(jù)FIFO原則執(zhí)行入隊(duì)出隊(duì)操作
2.通過firstWaiter維護(hù)頭節(jié)點(diǎn),lastWaiter維護(hù)尾節(jié)點(diǎn)
3.節(jié)點(diǎn)通過nextWaiter記錄后繼條件節(jié)點(diǎn)形成鏈表結(jié)構(gòu),遍歷時(shí)從頭節(jié)點(diǎn)開始沿著nextWaiter順序遍歷
4.條件隊(duì)列中節(jié)點(diǎn)狀態(tài)waitStatus只能為0或Node.CONDITION
5.使用條件隊(duì)列的前提是線程需要持有同步鎖,且只支持獨(dú)占模式
6.當(dāng)節(jié)點(diǎn)在條件隊(duì)列被喚醒(因signal|timeout|interrupt)后,需要進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移,即由條件節(jié)點(diǎn)轉(zhuǎn)變?yōu)橥焦?jié)點(diǎn)
4.ConditionObject組成
4.1 類定義
public class ConditionObject implements Condition, java.io.Serializable
4.2 構(gòu)造器
public ConditionObject() { }
4.3 重要變量
/** 條件等待隊(duì)列首節(jié)點(diǎn) */
private transient Node firstWaiter;
/** 條件等待隊(duì)列尾節(jié)點(diǎn) */
private transient Node lastWaiter;
/** 退出時(shí)重新中斷*/
private static final int REINTERRUPT = 1;
/** 退出時(shí)直接拋出異常 */
private static final int THROW_IE = -1;
/** 獨(dú)占模式時(shí)才能使用條件隊(duì)列 ,用于鏈接下一個(gè)等待節(jié)點(diǎn) */
Node nextWaiter;
5.條件節(jié)點(diǎn)阻塞
5.1 不響應(yīng)中斷阻塞
條件節(jié)點(diǎn)阻塞的一般流程:
1.入隊(duì): 根據(jù)FIFO原則,新節(jié)點(diǎn)會(huì)被封裝成Node并被加入到條件隊(duì)列隊(duì)尾
2.釋放: 為不影響其他線程,當(dāng)前已持有鎖的線程需要先釋放所有鎖,讓出鎖資源
3.阻塞: 在條件隊(duì)列中的節(jié)點(diǎn)線程需要被阻塞,直到條件滿足后重試獲取同步鎖
4.重試: 當(dāng)條件滿足后需要重試獲取同步鎖,獲取成功后才能繼續(xù)從暫停位置繼續(xù)向后執(zhí)行
補(bǔ)充:條件滿足指的是線程被其他線程喚醒、或超時(shí)、或中斷后且位于同步隊(duì)列中
public final void awaitUninterruptibly() {
//1.新節(jié)點(diǎn)入條件隊(duì)列
Node node = addConditionWaiter();
//2.當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放鎖
int savedState = fullyRelease(node);
//3.阻塞在條件隊(duì)列中的不滿足條件的節(jié)點(diǎn)線程
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//由于不響應(yīng)中斷,因此只是記錄是否中斷,此時(shí)會(huì)清除中斷標(biāo)識(shí)
if (Thread.interrupted())
interrupted = true;
}
/**
* 4.被喚醒需要重新嘗試獲取鎖,獲取鎖成功后才能繼續(xù)往后執(zhí)行
* 若期間又被中斷后,需要再次設(shè)置已被清除的中斷標(biāo)識(shí)
*/
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
小問:為神馬需要循環(huán)判斷isOnSyncQueue? 即while (!isOnSyncQueue(node)) ?
友情小提示:讀者可以回顧一下同步隊(duì)列獲取鎖的過程
小答:循環(huán)只有一個(gè)目的,就是確保節(jié)點(diǎn)已經(jīng)脫離條件隊(duì)列且進(jìn)入同步隊(duì)列,這樣才有資格重新獲取同步鎖
5.2 響應(yīng)中斷阻塞
public final void await() throws InterruptedException {
//響應(yīng)中斷,直接拋異常,沒必要往下走了
if (Thread.interrupted())
throw new InterruptedException();
//新節(jié)點(diǎn)入條件隊(duì)列
Node node = addConditionWaiter();
//當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放鎖
int savedState = fullyRelease(node);
//只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
/**
* 檢測中斷,一旦發(fā)生中斷
* 1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
* 2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
* 注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
* 這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/**
* 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/**
* 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//interruptMode != 0 說明是需要進(jìn)行中斷處理的
if (interruptMode != 0)
//執(zhí)行中斷處理
reportInterruptAfterWait(interruptMode);
}
/**
* 中斷處理:
* 1.中斷模式為THROW_IE:向上拋出異常
* 2.中斷模式為REINTERRUPT:重新設(shè)置中斷狀態(tài)
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
5.3 響應(yīng)中斷和超時(shí)阻塞-納秒可選
/**
* Implements timed condition wait.
* 響應(yīng)中斷和超時(shí)阻塞
* @return long 剩余超時(shí)時(shí)間 <0 說明已超時(shí)
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
//響應(yīng)中斷,直接拋異常,沒必要往下走了
if (Thread.interrupted())
throw new InterruptedException();
//新節(jié)點(diǎn)入條件隊(duì)列
Node node = addConditionWaiter();
//當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放同步鎖
int savedState = fullyRelease(node);
//截止時(shí)長
final long deadline = System.nanoTime() + nanosTimeout;
//只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
/**
* 將條件隊(duì)列中因超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是超時(shí))
* 值得注意的是:剩余超時(shí)時(shí)間允許為0和負(fù)數(shù)
*/
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//為了提高效率,超過閾值才執(zhí)行阻塞,否則接著自旋
if (nanosTimeout >= spinForTimeoutThreshold)
//若超過nanosTimeout時(shí)長,會(huì)自動(dòng)解除阻塞喚醒線程
LockSupport.parkNanos(this, nanosTimeout);
/**
* 檢測中斷,一旦發(fā)生中斷
* 1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
* 2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
* 注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
* 這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//剩余超時(shí)時(shí)間
nanosTimeout = deadline - System.nanoTime();
}
/**
* 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/**
* 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//interruptMode != 0 說明是需要進(jìn)行中斷處理的
if (interruptMode != 0)
//執(zhí)行中斷處理
reportInterruptAfterWait(interruptMode);
//返回已花費(fèi)時(shí)長
return deadline - System.nanoTime();
}
5.4 響應(yīng)中斷和超時(shí)阻塞-納秒日期單位可選
/**
* Implements timed condition wait.
* 響應(yīng)中斷和超時(shí)阻塞-納秒日期單位可選
* @return boolean 超時(shí)是否發(fā)生在被喚醒之前
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
//注意是納秒
long nanosTimeout = unit.toNanos(time);
//響應(yīng)中斷,直接拋異常,沒必要往下走了
if (Thread.interrupted())
throw new InterruptedException();
//新節(jié)點(diǎn)入條件隊(duì)列
Node node = addConditionWaiter();
//當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放同步鎖
int savedState = fullyRelease(node);
//截止時(shí)長
final long deadline = System.nanoTime() + nanosTimeout;
//只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
/**
* 將條件隊(duì)列中因超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是超時(shí))
* 值得注意的是:剩余超時(shí)時(shí)間允許為0和負(fù)數(shù)
*/
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
//為了提高效率,超過閾值才執(zhí)行阻塞,否則接著自旋
if (nanosTimeout >= spinForTimeoutThreshold)
//若超過nanosTimeout時(shí)長,會(huì)自動(dòng)解除阻塞喚醒線程
LockSupport.parkNanos(this, nanosTimeout);
/**
* 檢測中斷,一旦發(fā)生中斷
* 1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
* 2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
* 注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
* 這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//剩余超時(shí)時(shí)間
nanosTimeout = deadline - System.nanoTime();
}
/**
* 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/**
* 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//interruptMode != 0 說明是需要進(jìn)行中斷處理的
if (interruptMode != 0)
//執(zhí)行中斷處理
reportInterruptAfterWait(interruptMode);
return !timedout;
}
5.5 響應(yīng)中斷和超時(shí)阻塞-毫秒日期可選
/**
* Implements absolute timed condition wait.
* 響應(yīng)中斷和超時(shí)阻塞-毫秒日期可選
* @return boolean 超時(shí)是否發(fā)生在被喚醒之前
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
//注意:與awaitNano區(qū)別的是這里用的是毫秒
long abstime = deadline.getTime();
//響應(yīng)中斷,直接拋異常,沒必要往下走了
if (Thread.interrupted())
throw new InterruptedException();
//新節(jié)點(diǎn)入條件隊(duì)列
Node node = addConditionWaiter();
//當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放同步鎖
int savedState = fullyRelease(node);
//記錄節(jié)點(diǎn)是否超時(shí)
boolean timedout = false;
//只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//毫秒超時(shí),將條件隊(duì)列中因超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是超時(shí))
if (System.currentTimeMillis() > abstime) {
/**
* 記錄節(jié)點(diǎn)是否超時(shí)
* 注意:當(dāng)取消發(fā)生在節(jié)點(diǎn)被喚醒之前才返回true
*/
timedout = transferAfterCancelledWait(node);
//轉(zhuǎn)換一定會(huì)成功,因此安心退出即可
break;
}
//若超過abstime時(shí)長,會(huì)自動(dòng)解除阻塞喚醒線程
LockSupport.parkUntil(this, abstime);
/**
* 檢測中斷,一旦發(fā)生中斷
* 1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
* 2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
* 注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
* 這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/**
* 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/**
* 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
* 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
*/
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//interruptMode != 0 說明是需要進(jìn)行中斷處理的
if (interruptMode != 0)
//執(zhí)行中斷處理
reportInterruptAfterWait(interruptMode);
return !timedout;
}
6.條件節(jié)點(diǎn)喚醒
喚醒操作主要干了三個(gè)事情:
1.清除節(jié)點(diǎn): 從條件隊(duì)列中移除該節(jié)點(diǎn)
2.節(jié)點(diǎn)轉(zhuǎn)移: 將條件節(jié)點(diǎn)轉(zhuǎn)換為同步節(jié)點(diǎn),即從條件隊(duì)列轉(zhuǎn)移到同步隊(duì)列
3.喚醒節(jié)點(diǎn): 將轉(zhuǎn)移成功的節(jié)點(diǎn)重新喚醒
6.1 喚醒單個(gè)
喚醒單個(gè)的實(shí)質(zhì):根據(jù)FIFO原則喚醒first,即條件隊(duì)列頭節(jié)點(diǎn)
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* 從條件隊(duì)列中喚醒一個(gè)節(jié)點(diǎn)
* 原則:將條件隊(duì)列中已存在的等待時(shí)間最長的線程轉(zhuǎn)移到等待隊(duì)列中,即頭節(jié)點(diǎn)
*/
public final void signal() {
//條件隊(duì)列只適用于獨(dú)占模式且只能由持有鎖的線程執(zhí)行喚醒操作
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//先進(jìn)先出原則
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or null.
* @param first (non-null) the first node on condition queue 條件隊(duì)列非空頭節(jié)點(diǎn)
*/
private void doSignal(Node first) {
do {
//清空隊(duì)列
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
/**
* 將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)換到同步隊(duì)列中并喚醒線程
*/
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
6.2 喚醒全部
喚醒全部的實(shí)質(zhì):沿著nextWaiter順序遍歷依次喚醒
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* 喚醒條件隊(duì)列中的全部節(jié)點(diǎn)
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//跟signal的區(qū)別就是多了個(gè)All...
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
*
* 全部喚醒的實(shí)質(zhì):
* 沿著nextWaiter順序遍歷依次轉(zhuǎn)移并喚醒
*/
private void doSignalAll(Node first) {
//1.注意要清空條件隊(duì)列
lastWaiter = firstWaiter = null;
//2.沿著nextWaiter順序遍歷依次喚醒
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
//3.其實(shí)質(zhì)就是沿著nextWaiter順序遍歷
first = next;
} while (first != null);
}
7.條件節(jié)點(diǎn)轉(zhuǎn)移
轉(zhuǎn)移操作指的是節(jié)點(diǎn)由條件節(jié)點(diǎn)轉(zhuǎn)換為同步節(jié)點(diǎn),主要進(jìn)行了如下兩步進(jìn)行轉(zhuǎn)移:
1.更新節(jié)點(diǎn)狀態(tài): 根據(jù)同步隊(duì)列入隊(duì)原則,新節(jié)點(diǎn)初始狀態(tài)必須為0
2.同步隊(duì)列入隊(duì): 當(dāng)前節(jié)點(diǎn)肯定能入隊(duì)成功,同時(shí)返回前驅(qū)節(jié)點(diǎn)
7.1 因正常喚醒轉(zhuǎn)移
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
*
* 將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)換到同步隊(duì)列中并喚醒線程
* 1.在正常調(diào)用signalXXX()方法時(shí)才會(huì)調(diào)用該方法
* 2.若因?yàn)槌瑫r(shí)或中斷進(jìn)行轉(zhuǎn)移,不會(huì)調(diào)用該方法,但這兩種情況的轉(zhuǎn)移都會(huì)置為0
* 3.注意:在調(diào)用該方法之前都會(huì)執(zhí)行first.nextWaiter = null,即從等待隊(duì)列中移除當(dāng)前頭節(jié)點(diǎn)
*
* @param node the node 根據(jù)FIFO原則,通常為first節(jié)點(diǎn)
* @return true if successfully transferred (else the node was
* cancelled before signal) 轉(zhuǎn)換成功或節(jié)點(diǎn)在喚醒前被取消 才返回true
*/
final boolean transferForSignal(Node node) {
/**
* 1.CAS更新節(jié)點(diǎn)狀態(tài)為0,一旦失敗立即返回false
* 注意:必須將節(jié)點(diǎn)狀態(tài)更新為0,因?yàn)橥疥?duì)列入隊(duì)時(shí)新節(jié)點(diǎn)必須為0,
* 否則就不符合同步隊(duì)列入隊(duì)原則,因此一旦失敗立即返回
* 補(bǔ)充:CAS失敗的原因在于其他線程已經(jīng)執(zhí)行喚醒操作將該節(jié)點(diǎn)變更為0,因此其實(shí)無需再次enq了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//2.進(jìn)入同步隊(duì)列,注意enq方法返回的是前驅(qū)節(jié)點(diǎn)
Node p = enq(node);
/**
* 3.在因前驅(qū)節(jié)點(diǎn)被取消或狀態(tài)突變發(fā)生后需要喚醒節(jié)點(diǎn)線程
* ws > 0:
* 說明前驅(qū)節(jié)點(diǎn)被取消(CANCELLED),因此需要喚醒當(dāng)前節(jié)點(diǎn)線程
* !compareAndSetWaitStatus(p, ws, Node.SIGNAL):
* 此時(shí)發(fā)生狀態(tài)突變,比如ws剛好變成CANCELLED
* 補(bǔ)充:因?yàn)閑nq保證一定入隊(duì)成功,因此實(shí)質(zhì)是喚醒在同步隊(duì)列中的節(jié)點(diǎn)
* 結(jié)合awaitXXX(),線程會(huì)從park之后繼續(xù)往后執(zhí)行
*
* !!特別注意!!:正常情況下,根據(jù)流程可知
* ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
* 這種判斷是不會(huì)返回true的,此時(shí)是不會(huì)喚線程;
* 因此真正喚醒線程的地方在于調(diào)用signal()方法的線程發(fā)送完signal
* 信號(hào)后再調(diào)用release(1)方法(比如調(diào)用ReentrantLock的unlock()),
* 因?yàn)槠湟讶腙?duì),因此可以被喚醒
*/
int ws = p.waitStatus.
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
小問:讀者還記得enq方法都做了什么嗎?
小答:針對(duì)enq方法,筆者簡單回顧一下作用,幫助讀者理解一下:
1.初始化時(shí)對(duì)head和tail進(jìn)行賦值
2.當(dāng)前節(jié)點(diǎn)進(jìn)入同步對(duì)了并作為新的tail
3.入隊(duì)一定可以成功因?yàn)樽孕?/p>
4.注意enq返回的是前驅(qū)節(jié)點(diǎn)
7.2 因中斷或超時(shí)喚醒轉(zhuǎn)移
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* 將條件隊(duì)列中因中斷或超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移
* 取消發(fā)生在被喚醒之前返回true
* 兩種情況需要調(diào)用該方法:
* 1.中斷:checkInterruptWhileWaiting() -> 該方法會(huì)被awaitXXX()調(diào)用
* 2.超時(shí):帶超時(shí)的awaitXXX()方法
*/
final boolean transferAfterCancelledWait(Node node) {
//Node.CONDITION狀態(tài)節(jié)點(diǎn)只用于條件隊(duì)列,因此需要設(shè)置為0才能進(jìn)入同步隊(duì)列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//當(dāng)CAS變更waitStatus為0時(shí),說明節(jié)點(diǎn)已跟條件隊(duì)列無關(guān),隨后進(jìn)入同步隊(duì)列即可
enq(node);
//由于enq方法能夠保證進(jìn)入同步隊(duì)列成功,因此當(dāng)enq執(zhí)行完畢,可以放心的返回true
return true;
}
//若CAS失敗且節(jié)點(diǎn)還不在同步隊(duì)列中,線程需要先釋放資源,提高效率
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
小問:為什么需要執(zhí)行Thread.yield()?
小答:對(duì)于這個(gè)問題,筆者的考慮如下:
1.若CAS變更為0失敗,說明已被其他線程執(zhí)行喚醒轉(zhuǎn)換并變更為0
2.但由于enq操作可能還在進(jìn)行中,因此此時(shí)該節(jié)點(diǎn)可能還沒真正入隊(duì),因此需要循環(huán)檢測是否已入隊(duì)
3.由于入隊(duì)操作已經(jīng)在執(zhí)行中,因此無需重復(fù)執(zhí)行enq操作,可以先釋放資源,并通過isOnSyncQueue退出循環(huán)即可,優(yōu)化效率
8.條件節(jié)點(diǎn)隊(duì)列操作
8.1 條件節(jié)點(diǎn)入隊(duì)
新節(jié)點(diǎn)入隊(duì)遵循FIFO原則,主要會(huì)做如下操作:
1.清洗: 若尾節(jié)點(diǎn)非CONDITION狀態(tài),需要清除所有非CONDITION狀態(tài)節(jié)點(diǎn)并重設(shè)頭尾節(jié)點(diǎn)
2.封裝: 封裝當(dāng)前線程為Node,同時(shí)初始化節(jié)點(diǎn)狀態(tài)為CONDITION
3.入隊(duì): 隊(duì)列為空時(shí),頭尾節(jié)點(diǎn)指向同一個(gè)元素;非空時(shí)通過nextWaiter形成鏈表
/**
* Adds a new waiter to wait queue.
*
* 新增條件等待隊(duì)列隊(duì)尾節(jié)點(diǎn) - FIFO
* 補(bǔ)充:由于條件隊(duì)列只適用于獨(dú)占模式,因此該方法不會(huì)有并發(fā)問題
* @return its new wait node 返回新節(jié)點(diǎn)
*/
private Node addConditionWaiter() {
//FIFO原則:新入隊(duì)元素需隊(duì)尾插入
Node t = lastWaiter;
//1.若尾節(jié)點(diǎn)非Condition狀態(tài),需要清除所有非CONDITION狀態(tài)節(jié)點(diǎn)并重設(shè)頭尾節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
/**
* 刪除條件等待隊(duì)列中的所有非CONDITION狀態(tài)節(jié)點(diǎn)
* 補(bǔ)充:由于在條件隊(duì)列中節(jié)點(diǎn)狀態(tài)只能是CONDITION或0(signal|timeout|interrupt)
* 因此該方法實(shí)質(zhì)就清除空節(jié)點(diǎn)或狀態(tài)為0節(jié)點(diǎn)
*/
unlinkCancelledWaiters();
//上個(gè)方法執(zhí)行后會(huì)使得lastWaiter肯定為非取消狀態(tài)節(jié)點(diǎn)(只可能為空或CONDITION狀態(tài))
t = lastWaiter;
}
//2.封裝當(dāng)前線程為Node,同時(shí)初始化節(jié)點(diǎn)狀態(tài)為CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//3.隊(duì)尾元素為空,正說明條件隊(duì)列為空
if (t == null)
firstWaiter = node;
else
//4.新元素入隊(duì)需隊(duì)尾插入
t.nextWaiter = node;
//5.這里需要注意的是:隊(duì)列為空時(shí),頭尾節(jié)點(diǎn)是指向同一個(gè)元素的;非空時(shí)就會(huì)形成鏈表
lastWaiter = node;
return node;
}
小問:何時(shí)出現(xiàn)t != null && t.waitStatus != Node.CONDITION?
小答:尾節(jié)點(diǎn)的狀態(tài)不為Node.CONDITION,那只可能為0,這意味著節(jié)點(diǎn)需要轉(zhuǎn)移,那么節(jié)點(diǎn)狀態(tài)變更時(shí)機(jī)都有哪些呢?
1.當(dāng)因中斷或超時(shí)被喚醒后會(huì)通過調(diào)用transferAfterCancelledWait將節(jié)點(diǎn)狀態(tài)CAS為0
2.當(dāng)被正常喚醒后會(huì)通過調(diào)用transferForSignal將節(jié)點(diǎn)狀態(tài)CAS為0
分析:但問題是正常喚醒后會(huì)先執(zhí)行first.nextWaiter = null,因此此時(shí)尾節(jié)點(diǎn)應(yīng)為空
結(jié)論:排除這種情況后可知,只有第一種情況,即因中斷或超時(shí)被喚醒后才會(huì)出現(xiàn)這種情況
8.2 條件節(jié)點(diǎn)出隊(duì)
條件節(jié)點(diǎn)的出隊(duì)時(shí)機(jī):
1.新節(jié)點(diǎn)入隊(duì): 通過判斷t.waitStatus != Node.CONDITION為true時(shí)執(zhí)行unlinkCancelledWaiters()
2.節(jié)點(diǎn)重新嘗試獲取同步鎖后: 通過判斷node.nextWaiter != null為true時(shí)執(zhí)行unlinkCancelledWaiters()
3.節(jié)點(diǎn)被正常喚醒后: 通過調(diào)用signal()方法并執(zhí)行first.nextWaiter = null
補(bǔ)充:3與1,2的區(qū)別在于前者只是清除頭節(jié)點(diǎn),后者是遍歷清除所有非CONDITION狀態(tài)節(jié)點(diǎn)
注意:無論入隊(duì)還是出隊(duì),前提都是線程已經(jīng)持有同步鎖
/**
* Unlinks cancelled waiter nodes from condition queue.
*
* 移除條件隊(duì)列中的所有非CONDITION狀態(tài)節(jié)點(diǎn)
* 兩種情況會(huì)觸發(fā)該方法:
* 1.新節(jié)點(diǎn)加入條件隊(duì)列時(shí) -> addConditionWaiter()
* 2.節(jié)點(diǎn)被喚醒之后 -> awaitXXX()
* 補(bǔ)充一點(diǎn):注意不是在signal()方法中執(zhí)行的(因?yàn)楹笳咧挥修D(zhuǎn)移和喚醒操作,前者才有獲取鎖操作)
* 注意:條件隊(duì)列中的節(jié)點(diǎn)只有CONDITION和0兩種狀態(tài)
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//臨時(shí)節(jié)點(diǎn),主要用于在遍歷時(shí)記錄上一個(gè)非CONDITION節(jié)點(diǎn)(因?yàn)橐^所有非CONDITION節(jié)點(diǎn))
Node trail = null;
//從前往后順序遍歷條件隊(duì)列,剔除全部非CONDITION狀態(tài)節(jié)點(diǎn)
while (t != null) {
//下一個(gè)節(jié)點(diǎn),注意next可能為空
Node next = t.nextWaiter;
/**
* 非CONDITION狀態(tài)(即0),需要執(zhí)行移除操作
* 強(qiáng)調(diào):在條件隊(duì)列中節(jié)點(diǎn)的waitStatus,只可能是CONDITION或是0(signal|timeout|interrupt)
*/
if (t.waitStatus != Node.CONDITION) {
/**
* 移除后繼節(jié)點(diǎn) 等同于 將當(dāng)前節(jié)點(diǎn)從隊(duì)列中移除
* 同時(shí)由于不再有引用,會(huì)help GC
* 注意:GC同時(shí)會(huì)考慮firstWaiter和lastWaiter的引用情況
* 即若當(dāng)前線程無用,上述變量最終也會(huì)移除對(duì)該節(jié)點(diǎn)的引用
*/
t.nextWaiter = null;
/**
* 若之前沒有非CONDITION狀態(tài)節(jié)點(diǎn),就先讓next當(dāng)一下頭頭
* 注意:該方法執(zhí)行完畢后,firstWaiter只能為空或CONDITION
*/
if (trail == null)
firstWaiter = next;
else
/**
* 一旦當(dāng)前節(jié)點(diǎn)非CONDITION狀態(tài),那么需要先將它的后繼節(jié)點(diǎn)與上一個(gè)非取消節(jié)點(diǎn)建立起聯(lián)系
* 即使后繼節(jié)點(diǎn)是非CONDITION狀態(tài)也沒事,因?yàn)樵谙麓伪闅v時(shí)會(huì)重新建立聯(lián)系的
* 說白了,其本質(zhì)就是刪除t
*/
trail.nextWaiter = next;
/**
* 最后一個(gè)非CONDITION狀態(tài)節(jié)點(diǎn)即是條件隊(duì)列的尾節(jié)點(diǎn)
* 注意:該方法執(zhí)行完畢后,lastWaiter最終只能為空或CONDITION
*/
if (next == null)
lastWaiter = trail;
}
else
//CONDITION狀態(tài),就記錄一下以作為下次遍歷時(shí)的上一個(gè)非CONDITION節(jié)點(diǎn)
trail = t;
//開啟下一次循環(huán)
t = next;
}
}
小問:為神馬調(diào)用unlinkCancelledWaiters后firstWaiter只能為空或CONDITION?
小答:頭節(jié)點(diǎn)非空時(shí),有且只有條件隊(duì)列中全部都是非CONDITION狀態(tài)的節(jié)點(diǎn)時(shí),新的頭節(jié)點(diǎn)才能為空,原因在于firstWaiter = next;的前提是無可用的CONDITION狀態(tài)節(jié)點(diǎn),而trail只有在有CONDITION狀態(tài)節(jié)點(diǎn)時(shí)才會(huì)被賦值更新
9.重要方法
9.1 釋放同步鎖
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
*
* 釋放同步狀態(tài)并返回原狀態(tài)
* 當(dāng)釋放失敗時(shí)節(jié)點(diǎn)會(huì)被取消同時(shí)拋出IllegalMonitorStateException異常
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
//獲取當(dāng)前線程持有鎖的同步狀態(tài) -- 支持可重入
int savedState = getState();
/**
* 由于該方法只用于獨(dú)占模式,因此使用的是獨(dú)占獨(dú)有的release方法
* 關(guān)于release方法請(qǐng)參見筆者的并發(fā)番@AbstractQueuedSynchronizer一文通
* 作用是更新state狀態(tài)(為0)同時(shí)喚醒后繼節(jié)點(diǎn)(如果存在的話)
* 釋放鎖的目的是為了讓其他線程能夠獲取鎖去執(zhí)行任務(wù),
* 并等到其他線程調(diào)用signal()和release()后能夠喚醒該線程
*/
if (release(savedState)) {
failed = false;
return savedState;
} else {
//一旦釋放失敗,直接拋出異常 -- 干脆的很
throw new IllegalMonitorStateException();
}
} finally {
/**
* 釋放失敗的補(bǔ)救措施:
* 由于實(shí)際上節(jié)點(diǎn)已完成使命,節(jié)點(diǎn)狀態(tài)需要變成取消狀態(tài)以用于跳過和回收
* 注意:此時(shí)節(jié)點(diǎn)還是同步節(jié)點(diǎn),因此需要設(shè)置為CANCELLED
*/
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
9.2 檢測中斷
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*
* 檢測線程的中斷情況,通過返回狀態(tài)碼告知線程下一步應(yīng)如何處理中斷,情況有如下三種:
* 1.中斷發(fā)生在被喚醒之前返回THROW_IE(需要拋出異常)
* 2.中斷發(fā)生在被喚醒之后返回REINTERRUPT(需要重新中斷)
* 3.無中斷返回0
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ?
THROW_IE : REINTERRUPT) : 0
}
小問:為神馬中斷發(fā)生在被喚醒之前返回THROW_IE?之后返回REINTERRUPT?
小答:喚醒前拋出異常主要是為了快速失敗,提高效率,而重新中斷是要處理被喚醒后一系列
喚醒前: 若被喚醒之前線程被中斷,說明線程此時(shí)沒有獲取到資源,盡快拋出異常就可以結(jié)束等待并解放生產(chǎn)力;
喚醒后: 若被喚醒后線程被中斷,說明線程基本已經(jīng)獲取鎖,這時(shí)可能要多執(zhí)行一些操作,如釋放鎖等
9.3 判斷節(jié)點(diǎn)是否位于同步隊(duì)列
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
*
* 判斷節(jié)點(diǎn)是否位于同步隊(duì)列
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
//CONDITION只用于條件隊(duì)列 || prev為空說明其一定不在同步隊(duì)列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果next非空,它肯定在同步隊(duì)列中
if (node.next != null)
return true;
//node.prev非空并不意味著在同步隊(duì)列中,因此需要從后往前遍歷以判斷是否在同步隊(duì)列中
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
*
* node.prev非空并不意味著在同步隊(duì)列中,原因是CAS(放入同步隊(duì)列)可能失敗
* 因此需要從后往前遍歷以判斷其是否在同步隊(duì)列中
* 由于該方法被調(diào)用時(shí),該節(jié)點(diǎn)總是靠近tail,因此除非CAS失敗(可能性很低),否則幾乎無須遍歷
*/
private boolean findNodeFromTail(Node node) {
//從同步隊(duì)列的隊(duì)尾節(jié)點(diǎn)開始沿著prev依次往前遍歷
Node t = tail;
for (;;) {
//存在,返回true
if (t == node)
return true;
//都遍歷到頭了還不存在,那只能返回false了
if (t == null)
return false;
//沿著prev依次往前遍歷吧
t = t.prev;
}
}
10.流程圖
10.1 無響應(yīng)中斷
注意:僅當(dāng)調(diào)用signal()的線程再調(diào)用release()方法之后才會(huì)真正解除被阻塞線程的阻塞狀態(tài),但release()之后該線程并不是被立即喚醒,而是重新競爭鎖直到變成head節(jié)點(diǎn)的后繼節(jié)點(diǎn)且head節(jié)點(diǎn)為SIGNAl狀態(tài)時(shí)才能被真正喚醒

10.2 響應(yīng)中斷

10.3 響應(yīng)中斷與超時(shí)

1. 通過對(duì)比Object的監(jiān)視器方法和Condition接口

2 Condition接口與示例
Condition是依賴Lock對(duì)象的
使用示例
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
v般都會(huì)將Condition對(duì)象作為成員變量。當(dāng)調(diào)用await()方法后,當(dāng)前線程會(huì)釋放鎖并在此等待,而其他線程調(diào)用Condition對(duì)象的signal()方法,通知當(dāng)前線程后,當(dāng)前線程才從await()方法返回,并且在返回前已經(jīng)獲取了鎖。
Condition定義的方法


獲取一個(gè)Condition必須通過Lock的newCondition()方法。
Condition的使用方式
一個(gè)有界隊(duì)列的示例來深入了解Condition的使用方式。有界隊(duì)列是一種特殊的隊(duì)列,當(dāng)隊(duì)列為空時(shí),隊(duì)列的獲取操作將會(huì)阻塞獲取線程,直到隊(duì)列中有新增元素,當(dāng)隊(duì)列已滿時(shí),隊(duì)列的插入操作將會(huì)阻塞插入線程,直到隊(duì)列出現(xiàn)“空位”
public class BoundedQueue<T> {
private Object[] items;
// 添加的下標(biāo),刪除的下標(biāo)和數(shù)組當(dāng)前數(shù)量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 添加一個(gè)元素,如果數(shù)組滿,則添加線程進(jìn)入等待狀態(tài),直到有"空位"
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 由頭部刪除一個(gè)元素,如果數(shù)組空,則刪除線程進(jìn)入等待狀態(tài),直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
上述示例中,BoundedQueue通過add(T t)方法添加一個(gè)元素,通過remove()方法移出一個(gè)元素。以添加方法為例。
- 首先需要獲得鎖,目的是確保數(shù)組修改的可見性和排他性。當(dāng)數(shù)組數(shù)量等于數(shù)組長度時(shí),表示數(shù)組已滿,則調(diào)用notFull.await(),當(dāng)前線程隨之釋放鎖并進(jìn)入等待狀態(tài)。
- 如果數(shù)組數(shù)量不等于數(shù)組長度,表示數(shù)組未滿,則添加元素到數(shù)組中,同時(shí)通知等待在notEmpty上的線程,數(shù)組中已經(jīng)有新元素可以獲取。
- 在添加和刪除方法中使用while循環(huán)而非if判斷,目的是防止過早或意外的通知,只有條件符合才能夠退出循環(huán)。
3 Condition的實(shí)現(xiàn)分析
- ConditionObject是同步器AbstractQueuedSynchronizer的內(nèi)部類,因?yàn)镃ondition的操作需首先需要獲得鎖,
- 每個(gè)Condition對(duì)象都包含著一個(gè)隊(duì)列(以下稱為等待隊(duì)列),該隊(duì)列是Condition對(duì)象實(shí)現(xiàn)等待/通知功能的關(guān)鍵。
3.1 等待隊(duì)列
- 等待隊(duì)列是一個(gè)FIFO的隊(duì)列,在隊(duì)列中的每個(gè)節(jié)點(diǎn)都包含了一個(gè)線程引用,該線程就是在Condition對(duì)象上等待的線程,如果一個(gè)線程調(diào)用了Condition.await()方法,那么該線程將會(huì)釋放鎖、構(gòu)造成節(jié)點(diǎn)加入等待隊(duì)列并進(jìn)入等待狀態(tài)。
- 節(jié)點(diǎn)的定義復(fù)用了同步器中節(jié)點(diǎn)的定義,也就是說,同步隊(duì)列和等待隊(duì)列中節(jié)點(diǎn)類型都是同步器的靜態(tài)內(nèi)部類AbstractQueuedSynchronizer.Node**。
- 一個(gè)Condition包含一個(gè)等待隊(duì)列,Condition擁有首節(jié)點(diǎn)(firstWaiter)和尾節(jié)點(diǎn)(lastWaiter)。
- 當(dāng)前線程調(diào)用Condition.await()方法,將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入等待隊(duì)列
等待隊(duì)列的基本結(jié)構(gòu)圖

Condition擁有首尾節(jié)點(diǎn)的引用,而新增節(jié)點(diǎn)只需要將原有的尾節(jié)點(diǎn)nextWaiter指向它,并且更新尾節(jié)點(diǎn)即可。
沒有使用CAS保證,原因在于調(diào)用await()方法的線程必定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。
在Object的監(jiān)視器模型上,一個(gè)對(duì)象擁有一個(gè)同步隊(duì)列和等待隊(duì)列,
并發(fā)包中的Lock(更確切地說是同步器)擁有一個(gè)同步隊(duì)列和多個(gè)等待隊(duì)列

Condition的實(shí)現(xiàn)是同步器的內(nèi)部類,因此每個(gè)Condition實(shí)例都能夠訪問同步器提供的方法,相當(dāng)于每個(gè)Condition都擁有所屬同步器的引用。
3.2 等待
- 調(diào)用Condition的await()方法(或者以await開頭的方法),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)。
- 當(dāng)從await()方法返回時(shí),當(dāng)前線程一定獲取了Condition相關(guān)聯(lián)的鎖。 如果從隊(duì)列(同步隊(duì)列和等待隊(duì)列)的角度看await()方法,當(dāng)調(diào)用await()方法時(shí),相當(dāng)于同步隊(duì)列的首節(jié)點(diǎn)(獲取了鎖的節(jié)點(diǎn))移動(dòng)到Condition的等待隊(duì)列中。
Condition的await()方法
public final void await() throws Interrupt
edException {
if (Thread.interrupted())
throw new InterruptedException();
// 當(dāng)前線程加入等待隊(duì)列
Node node = addConditionWaiter();
// 釋放同步狀態(tài),也就是釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 調(diào)用該方法的線程成功獲取了鎖的線程,也就是同步隊(duì)列中的首節(jié)點(diǎn),該方法會(huì)將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)并加入等待隊(duì)列中,然后釋放同步狀態(tài),喚醒同步隊(duì)列中的后繼節(jié)點(diǎn),然后當(dāng)前線程會(huì)進(jìn)入等待狀態(tài)。
- 當(dāng)?shù)却?duì)列中的節(jié)點(diǎn)被喚醒,則喚醒節(jié)點(diǎn)的線程開始嘗試獲取同步狀態(tài)。如果不是通過其他線程調(diào)用Condition.signal()方法喚醒,而是對(duì)等待線程進(jìn)行中斷,則會(huì)拋出InterruptedException。
- 如果從隊(duì)列的角度去看,當(dāng)前線程加入Condition的等待隊(duì)列,該過程如圖5-11示。

- 如圖所示,同步隊(duì)列的首節(jié)點(diǎn)并不會(huì)直接加入等待隊(duì)列,而是通過addConditionWaiter()方法把當(dāng)前線程構(gòu)造成一個(gè)新的節(jié)點(diǎn)并將其加入等待隊(duì)列中。
3.3 通知
調(diào)用Condition的signal()方法,將會(huì)喚醒在等待隊(duì)列中等待時(shí)間最長的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中。
ConditionObject的signal方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
調(diào)用該方法的前置條件是當(dāng)前線程必須獲取了鎖,可以看到signal()方法進(jìn)行了isHeldExclusively()檢查,也就是當(dāng)前線程必須是獲取了鎖的線程。接著獲取等待隊(duì)列的首節(jié)點(diǎn),將其移動(dòng)到同步隊(duì)列并使用LockSupport喚醒節(jié)點(diǎn)中的線程。
節(jié)點(diǎn)從等待隊(duì)列移動(dòng)到同步隊(duì)列的過程

- 通過調(diào)用同步器的enq(Node node)方法,等待隊(duì)列中的頭節(jié)點(diǎn)線程安全地移動(dòng)到同步隊(duì)列。
- 當(dāng)節(jié)點(diǎn)移動(dòng)到同步隊(duì)列后,當(dāng)前線程再使用LockSupport喚醒該節(jié)點(diǎn)的線程。 被喚醒后的線程,將從await()方法中的while循環(huán)中退出(isOnSyncQueue(Node node)方法返回true,節(jié)點(diǎn)已經(jīng)在同步隊(duì)列中),進(jìn)而調(diào)用同步器的acquireQueued()方法加入到獲取同步狀態(tài)的競爭中。
- 成功獲取同步狀態(tài)(或者說鎖)之后,被喚醒的線程將從先前調(diào)用的await()方法返回,此時(shí)該線程已經(jīng)成功地獲取了鎖。
- Condition的signalAll()方法,相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)節(jié)點(diǎn)均執(zhí)行一次signal()方法,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中,并喚醒每個(gè)節(jié)點(diǎn)的線程。
參考
《java并發(fā)編程的藝術(shù)》
并發(fā)番@ConditionObject一文通