阻塞隊(duì)列--概述

什么是阻塞隊(duì)列

首先通過(guò)接口類BlockingQueue中的注釋來(lái)簡(jiǎn)單了解阻塞隊(duì)列。
阻塞隊(duì)列是一個(gè)支持附加操作的特殊隊(duì)列:在隊(duì)列為空時(shí)回收元素會(huì)阻塞等待直到隊(duì)列非空,或在隊(duì)列已滿時(shí)插入元素,會(huì)阻塞等待直到隊(duì)列不滿。

阻塞隊(duì)列的方法提供了四種不同的處理方式:拋異常、返回特殊值(null或false)、阻塞當(dāng)前線程直到操作成功以及阻塞一段時(shí)間,超時(shí)退出。這四種處理方式分別對(duì)應(yīng)不同的函數(shù)接口:

具體操作 拋異常 返回特殊值 無(wú)限阻塞 有時(shí)限阻塞
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable
  • 阻塞隊(duì)列存儲(chǔ)的元素不能為空,null值保留作為poll操作失敗的返回值。
  • 阻塞隊(duì)列可以是有界的,也可以是無(wú)界的。對(duì)于無(wú)界阻塞隊(duì)列,不可能會(huì)出現(xiàn)隊(duì)列滿的情況,所以使用put或offer方法永遠(yuǎn)不會(huì)被阻塞,且使用offer方法時(shí),永遠(yuǎn)返回true。
  • 阻塞隊(duì)列主要用于生產(chǎn)者消費(fèi)者場(chǎng)景,同時(shí)也附加的支持集合接口??赏ㄟ^(guò)調(diào)用remove(x)的方法來(lái)移除隊(duì)列中的任意元素,但這種方法并不高效,且并不常用,一般只用于取消排隊(duì)的消息。
  • 阻塞隊(duì)列是線程安全的,所有排隊(duì)方法通過(guò)使用內(nèi)部鎖或其他方式的同步控制來(lái)實(shí)現(xiàn)原子性。然而,批量集合操作如addAll、containsAll、retainAll和removeAll則是非原子性的,除非在實(shí)現(xiàn)中另行定義。所以,調(diào)用addAll(c),可能在添加部分c中元素后失敗拋異常。
  • 阻塞隊(duì)列本質(zhì)上不支持任何close或shutdown等操作來(lái)停止元素的添加。這種需求可通過(guò)由生產(chǎn)者插入特殊元素的方式實(shí)現(xiàn),消費(fèi)者拿到這個(gè)元素之后就獲取到了元素插入已終止的消息。
    對(duì)于無(wú)界阻塞隊(duì)列,隊(duì)列不可能會(huì)出現(xiàn)滿的情況,所以使用put或offer方法永遠(yuǎn)不會(huì)被阻塞,且使用offer方法永遠(yuǎn)返回true。
  • 內(nèi)存一致性,同其他并發(fā)容器一樣,線程對(duì)元素執(zhí)行的操作在元素插入隊(duì)列之前執(zhí)行。
  • 內(nèi)存可見(jiàn)性,遵循h(huán)appens-before,在另一個(gè)線程對(duì)阻塞隊(duì)列中某個(gè)元素訪問(wèn)或刪除之后操作。

JDK提供的7個(gè)阻塞隊(duì)列

ArrayBlockingQueue

是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,按先進(jìn)先出的原則對(duì)元素進(jìn)行排序。put和take方法分別為添加和刪除的阻塞方法。默認(rèn)情況下不保證線程公平。
ArrayBlockingQueue內(nèi)部使用一把重入鎖ReentrantLock來(lái)保證多個(gè)線程之間的插入刪除元素的同步;同時(shí)使用兩個(gè)條件對(duì)象Condition來(lái)實(shí)現(xiàn)阻塞邏輯,調(diào)用其await和signal方法來(lái)實(shí)現(xiàn)線程的等待和喚醒。
由于使用了ReentrantLock,所以ArrayBlockingQueue存在線程公平與不公平兩種選擇。
插入刪除元素的具體執(zhí)行邏輯:ArrayBlockingQueue
PS:這7個(gè)阻塞隊(duì)列本來(lái)想著一個(gè)一個(gè)解析的,但看了下源碼,邏輯其實(shí)沒(méi)有很復(fù)雜,所以后面幾個(gè)就只記一下內(nèi)部實(shí)現(xiàn)的主要點(diǎn)。

LinkedBlockingQueue

用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE,隊(duì)列按照先進(jìn)先出的原則對(duì)元素排序。
插入元素在表尾,刪除元素在表頭。
LinkedBlockingQueue內(nèi)部使用了兩把重入鎖ReentrantLock,分別用來(lái)保護(hù)插入操作和刪除操作。
同樣也是使用兩個(gè)條件對(duì)象來(lái)實(shí)現(xiàn)阻塞邏輯。

PriorityBlockingQueue

支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列,默認(rèn)按元素自然順序升序排列,可通過(guò)自定義類實(shí)現(xiàn)compareTo方法或指定構(gòu)造參數(shù)Comparator來(lái)指定元素排序規(guī)則,不保證同優(yōu)先級(jí)元素的順序。
PriorityBlockingQueue內(nèi)部使用的是數(shù)組對(duì)象來(lái)存儲(chǔ)元素,且數(shù)組容量初始化為11。
其內(nèi)部只使用了一把重入鎖ReentrantLock,和一個(gè)條件對(duì)象Condition,只用于阻塞和喚醒刪除元素操作的線程。
當(dāng)插入元素時(shí),若此時(shí)數(shù)組已滿,也不需要等待,它會(huì)嘗試擴(kuò)容,因此插入操作也不會(huì)有阻塞的可能。
PriorityBlockingQueue內(nèi)部還有一個(gè)allocationSpinLock自旋鎖,用于擴(kuò)容時(shí)的同步保護(hù),在執(zhí)行擴(kuò)容操作前,需先自旋嘗試將allocationSpinLock置為1,設(shè)置成功后才能繼續(xù)往下執(zhí)行。

DelayQueue
  • 支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列,內(nèi)部使用優(yōu)先級(jí)隊(duì)列PriorityQueue來(lái)存儲(chǔ)元素。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素,只有過(guò)了delay時(shí)間才能從隊(duì)列中提取元素。
    DelayQueue就是往優(yōu)先級(jí)隊(duì)列中添加元素,然后與元素的delay(過(guò)期值)作為排序因素,從而實(shí)現(xiàn)先過(guò)期的元素排在隊(duì)首,每次從隊(duì)列中取出的元素都是最先要過(guò)期的元素。

  • 應(yīng)用場(chǎng)景:
    1)訂單,下單一段時(shí)間后沒(méi)有付款就取消訂單;
    2)關(guān)閉空閑連接,服務(wù)器中,關(guān)閉有一段時(shí)間空閑的客戶端連接;
    3)緩存對(duì)象,超過(guò)了緩存時(shí)間,則清除;
    4)任務(wù)超時(shí)處理,處理超時(shí)未響應(yīng)的請(qǐng)求等。

  • DelayQueue內(nèi)部也是由一把重入鎖ReentrantLock實(shí)現(xiàn)線程間的同步。

  • DelayQueue內(nèi)部還有一個(gè)Thread類型的對(duì)象leader,用來(lái)記錄等待隊(duì)頭元素的線程。使用leader可以減少不必要的等待時(shí)間。
    當(dāng)多個(gè)線程調(diào)用take方法去取元素,如果當(dāng)前l(fā)eader非空,說(shuō)明有線程在取 ,則當(dāng)前線程等待;如果leader為空,則將當(dāng)前線程設(shè)置為leader。
    當(dāng)一個(gè)線程成為leader,它只需要等待下一個(gè)delay時(shí)間過(guò)去,但其他線程將會(huì)無(wú)限等待,leader線程必須從take()或poll(...)返回之前向其他線程發(fā)出信號(hào),除非在這個(gè)期間,某個(gè)線程成為了leader線程。
    當(dāng)隊(duì)頭元素被更早到期的元素替換時(shí),leader字段都會(huì)被重置為null,并向一些等待線程(不一定是當(dāng)前的leader)發(fā)出信號(hào)。所以,等待線程在等待過(guò)程中可能會(huì)得到或失去leadership。
    這整個(gè)過(guò)程的等待喚醒邏輯就是通過(guò)調(diào)用DelayQueue的類型為Condition的成員變量available的await和signal等方法實(shí)現(xiàn)的。

SynchronousQueue

一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每一個(gè)put的線程會(huì)阻塞到直到有一個(gè)take線程取走元素為止,每一個(gè)take的線程會(huì)阻塞到直到有一個(gè)put的線程放入元素為止。
由于SynchronousQueue不存儲(chǔ)元素,所以類似peek操作或者迭代器操作都是無(wú)效的。
支持公平訪問(wèn)隊(duì)列,默認(rèn)情況下線程采用非公平性策略訪問(wèn)隊(duì)列。
SynchronousQueue只是一個(gè)對(duì)外的封裝層,其真正的實(shí)現(xiàn)邏輯在其類型為Transferer的成員變量transferer的transfer方法中;抽象類Transferer有兩個(gè)具體的實(shí)現(xiàn)類:TransferStack和TransferQueue,分別在非公平和公平的模式下使用。
Transferer類內(nèi)部是通過(guò)自旋鎖及CAS操作實(shí)現(xiàn)多個(gè)線程間的同步。
SynchronousQueue可當(dāng)做一個(gè)傳遞中介,負(fù)責(zé)將生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程,適用于傳遞性場(chǎng)景,吞吐量高。
其內(nèi)部的具體實(shí)現(xiàn)邏輯可參考:SynchronousQueue

LinkedTransferQueue
  • 由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列。相比其他阻塞隊(duì)列,多了tryTransfer和transfer方法。
  • LinkedTransferQueue可以認(rèn)為是LinkedBlockingQueue和SynchronousQueue的結(jié)合體。LinkedBlockingQueue內(nèi)部使用了ReentrantLock,性能不高,而SynchronousQueue則無(wú)法存儲(chǔ)元素,
  • LinkedTransferQueue實(shí)現(xiàn)了TransferQueue接口,該接口包含的方法如下:
public interface TransferQueue<E> extends BlockingQueue<E> {
    //如果當(dāng)前有消費(fèi)者正在等待接收元素,則將生產(chǎn)者傳入的元素立即transfer給消費(fèi)者,否則返回false
    boolean tryTransfer(E e);
    //如果當(dāng)前有消費(fèi)者正在等待接收元素,transfer方法可以把生產(chǎn)者傳入的元素立即transfer給消費(fèi)者;
    //如果沒(méi)有消費(fèi)者等待接收,transfer方法則將元素存放在隊(duì)列的tail結(jié)點(diǎn),等該元素被消費(fèi)了才返回
    void transfer(E e) throws InterruptedException;
    //tryTransfer方法增加了超時(shí)時(shí)間
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    //若當(dāng)前至少有一位消費(fèi)者在等待,則返回true
    boolean hasWaitingConsumer();
    //返回當(dāng)前等待的消費(fèi)者線程數(shù)
    int getWaitingConsumerCount();

LinkedTransferQueue內(nèi)部是通過(guò)自旋以及CAS操作來(lái)實(shí)現(xiàn)線程間的同步。

LinkedBlockingDeque

有鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,即可以從隊(duì)列的兩端插入或移除元素。
其內(nèi)部同樣擁有一把重入鎖ReentrantLock,兩個(gè)條件對(duì)象notEmpty和notFull。整體邏輯同LinkedBlockingQueue相似。
在初始化LinkedBlockingDeque時(shí)可以設(shè)置容量防止其過(guò)度膨脹,雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容