java并發(fā)編程之Condition

引言


在java中,對于任意一個java對象,它都擁有一組定義在java.lang.Object上監(jiān)視器方法,包括wait()wait(long timeout),notify()notifyAll(),這些方法配合synchronized關(guān)鍵字一起使用可以實現(xiàn)等待/通知模式。

同樣,Condition接口也提供了類似Object監(jiān)視器的方法,通過與Lock配合來實現(xiàn)等待/通知模式。

為了更好的了解Condition的特性,我們來對比一下兩者的使用方式以及功能特性:

對比項 Object監(jiān)視器 Condition
前置條件 獲取對象的鎖 調(diào)用Lock.lock獲取鎖,調(diào)用Lock.newCondition獲取Condition對象
調(diào)用方式 直接調(diào)用,比如object.notify() 直接調(diào)用,比如condition.await()
等待隊列的個數(shù) 一個 多個
當(dāng)前線程釋放鎖進(jìn)入等待狀態(tài) 支持 支持
當(dāng)前線程釋放鎖進(jìn)入等待狀態(tài),在等待狀態(tài)中不斷響中斷 不支持 支持
當(dāng)前線程釋放鎖并進(jìn)入超時等待狀態(tài) 支持 支持
當(dāng)前線程釋放鎖并進(jìn)入等待狀態(tài)直到將來的某個時間 不支持 支持
喚醒等待隊列中的一個線程 支持 支持
喚醒等待隊列中的全部線程 支持 支持

Condition使用示例


實現(xiàn)一個簡單的有界隊列,隊列為空時,隊列的刪除操作將會阻塞直到隊列中有新的元素,隊列已滿時,隊列的插入操作將會阻塞直到隊列出現(xiàn)空位。


簡單有界隊列實現(xiàn)

不難看出,Condition的使用方式是比較簡單的,需要注意的是使用Condition的等待/通知需要提前獲取到與Condition對象關(guān)聯(lián)的鎖,Condition對象由Lock對象創(chuàng)建。

以上述示例中的add(T object)為例,詳細(xì)描述一下Condition等待/通知的整個過程:

  • 獲取鎖,確保對數(shù)據(jù)數(shù)據(jù)修改的安全性;
  • 數(shù)組元素的個數(shù)等于數(shù)組的長度時,調(diào)用notFull.await(),插入線程釋放鎖進(jìn)入等待;
  • 數(shù)組未滿,添加元素到數(shù)組中,調(diào)用notEmpty.signal()通知等待在notEmpty上的線程,數(shù)組中有新的元素可以操作。

總的來說,Condition的等待/通知使用方式大體上跟經(jīng)典的Object監(jiān)視器上的等待/通知是非常類似的。

Condition實現(xiàn)分析


Condition api

Condition提供以下接口以供實現(xiàn):

  1. void await() throws InterruptedException
    當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知(signal)或者被中斷時,當(dāng)前線程進(jìn)入運行狀態(tài),從await()返回;

  2. void awaitUninterruptibly()
    當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知,對中斷不做響應(yīng);

  3. long awaitNanos(long nanosTimeout) throws InterruptedException
    在接口1的返回條件基礎(chǔ)上增加了超時響應(yīng),返回值表示當(dāng)前剩余的時間,如果在nanosTimeout之前被喚醒,返回值 = nanosTimeout - 實際消耗的時間,返回值 <= 0表示超時;

  4. boolean await(long time, TimeUnit unit) throws InterruptedException
    同樣是在接口1的返回條件基礎(chǔ)上增加了超時響應(yīng),與接口3不同的是:

  • 可以自定義超時時間單位;
  • 返回值返回true/false,在time之前被喚醒,返回true,超時返回false。
  1. boolean awaitUntil(Date deadline) throws InterruptedException
    當(dāng)前線程進(jìn)入等待狀態(tài)直到將來的指定時間被通知,如果沒有到指定時間被通知返回true,否則,到達(dá)指定時間,返回false;

  2. void signal()
    喚醒一個等待在Condition上的線程;

  3. void signalAll()
    喚醒等待在Condition上所有的線程。

Condition具體實現(xiàn)分析

ConditionObject是Condition在java并發(fā)中的具體的實現(xiàn),它是AQS的內(nèi)部類。因為Condition相關(guān)操作都需要獲取鎖,所以作為AQS的內(nèi)部類也比較合理。接下來就以ConditionObject的等待隊列、等待、通知為切入點分析ConditionObject的具體實現(xiàn)。

等待隊列

ConditionObject的等待隊列是一個FIFO隊列,隊列的每個節(jié)點都是等待在Condition對象上的線程的引用,在調(diào)用Condition的await()方法之后,線程釋放鎖,構(gòu)造成相應(yīng)的節(jié)點進(jìn)入等待隊列等待。其中節(jié)點的定義復(fù)用AQS的Node定義。

等待隊列相關(guān)操作實現(xiàn)

ConditionObject等待隊列相關(guān)操作實現(xiàn)

  • ConditionObject包含等待隊列的首節(jié)點firstWaiter和尾節(jié)點lastWaiter;
  • 線程調(diào)用await()方法時,調(diào)用addConditionWaiter()方法入隊:
    • step1:將線程構(gòu)造成Node;
    • step2:將Node加入到等待隊列中。

從隊列相關(guān)操作的具體實現(xiàn)可以知道等待隊列的基本結(jié)構(gòu)如下圖所示:


等待隊列基本結(jié)構(gòu)

插入節(jié)點只需要將原有尾節(jié)點的nextWaiter指向當(dāng)前節(jié)點,并且更新尾節(jié)點。更新節(jié)點并沒有像AQS更新同步隊列使用CAS是因為調(diào)用await()方法的線程必定是獲取了鎖的線程,鎖保證了操作的線程安全。

注:AQS實質(zhì)上擁有一個同步隊列和多個等待隊列,具體對應(yīng)關(guān)系如下圖所示:

AQS同步隊列與等待隊列

等待

調(diào)用Condition的await開頭的系列方法,當(dāng)前線程進(jìn)入等待隊列等待,那么Condition的等待實質(zhì)是await系列方法的具體實現(xiàn)。

await實現(xiàn)

await實現(xiàn)

具體執(zhí)行流程如下:

  • 調(diào)用addConditionWaiter將當(dāng)前線程加入等待隊列;
  • 調(diào)用fullRelease釋放當(dāng)前線程節(jié)點的同步狀態(tài),喚醒后繼節(jié)點;
  • 線程進(jìn)入等待狀態(tài);
  • 線程被喚醒后,從while循環(huán)中退出,調(diào)用acquireQueued嘗試獲取同步狀態(tài);
  • 同步狀態(tài)獲取成功后,線程從await方法返回。

其他以await開頭的方法具體實現(xiàn)與await基本一致,只是在它的基礎(chǔ)上增加了超時限制,不管有沒有被喚醒,到達(dá)指定時間,等待結(jié)束,從await返回。整個await系列方法將線程加入等待隊列的流程可以總結(jié)為下圖:


線程加入等待隊列.png

喚醒

調(diào)用Condition的signal()方法將會喚醒再等待隊列中的首節(jié)點,該節(jié)點也是到目前為止等待時間最長的節(jié)點。

signal實現(xiàn)

signal實現(xiàn)

  • step1:前置檢查,判斷當(dāng)前線程是否是獲取了鎖的線程,如果不是拋出異常IllegalMonitorStateException,否則,執(zhí)行step2;
  • step2:取得等待隊列的頭結(jié)點,頭結(jié)點不為空執(zhí)行doSignal,否則,signal結(jié)束。

可以看出,doSignal方法是整個signal方法實現(xiàn)的核心,它完成了將線程從喚醒的所有操作。

doSignal實現(xiàn)

doSignal實現(xiàn)

整個doSignal完成了這兩個操作:調(diào)用transferForSignal將節(jié)點從等待隊列移動到同步隊列,并且,將該節(jié)點從等待隊列刪除。

transferForSignal實現(xiàn)

transferForSignal實現(xiàn)

  • step1:將節(jié)點waitStatus設(shè)置為0,設(shè)置成功執(zhí)行step2,否則返回false;
  • step2:調(diào)用enq方法將該節(jié)點加入同步隊列;
  • step3:使用LockSuppor.unpark()方法喚醒該節(jié)點的線程。

Condition的signalAll()方法,將等待隊列中的所有節(jié)點全部喚醒,相當(dāng)于將等待隊列中的每一個節(jié)點都執(zhí)行一次signal()。整個signal系列方法將線程從等待隊列移動到同步隊列可以總結(jié)為下圖:


線程從等待隊列移動到同步隊列
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容