多線程并發(fā)總結(jié)(六)--阻塞隊(duì)列和線程池

1. 阻塞隊(duì)列

  • 隊(duì)列:是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作,和棧一樣,隊(duì)列是一種操作受限制的線性表。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。
    在隊(duì)列中插入一個(gè)隊(duì)列元素稱為入隊(duì),從隊(duì)列中刪除一個(gè)隊(duì)列元素稱為出隊(duì)。因?yàn)殛?duì)列只允許在一端插入,在另一端刪除,所以只有最早進(jìn)入隊(duì)列的元素才能最先從隊(duì)列中刪除,故隊(duì)列又稱為先進(jìn)先出(FIFO—first in first out)線性表。

  • 阻塞隊(duì)列是支持阻塞插入和支持阻塞移除的隊(duì)列:

    阻塞的插入:意思是當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。
    阻塞的移除:意思是在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/p>

2. 阻塞隊(duì)列的用途

? 阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程。阻塞隊(duì)列就是生產(chǎn)者用來(lái)存放元素、消費(fèi)者用來(lái)獲取元素的容器。

2.1 為什么有生產(chǎn)者消費(fèi)者模式

? 在并發(fā)編程中,為了平衡生產(chǎn)線程和消費(fèi)線程的工作能力來(lái)提高程序整體處理數(shù)據(jù)的速度,因此生產(chǎn)者消費(fèi)者模式就有了用武之地。

? 在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開(kāi)發(fā)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。

? 因此就需要將生產(chǎn)者和消費(fèi)者隔離開(kāi),并且用一個(gè)容器將產(chǎn)品裝好。生產(chǎn)者生產(chǎn)好產(chǎn)品就把產(chǎn)品放入容器,消費(fèi)者需要的時(shí)候就從容器中拿。 而這個(gè)容器就可以用隊(duì)列來(lái)實(shí)現(xiàn)。然而有容器為空和滿的兩種情況導(dǎo)致生產(chǎn)者在容器滿的時(shí)候無(wú)法放入隊(duì)列,消費(fèi)者當(dāng)容器為空的時(shí)候無(wú)法從隊(duì)列獲取產(chǎn)品。所以阻塞隊(duì)列就應(yīng)運(yùn)而生。

2.2 阻塞隊(duì)列接口(BlockingQueue)

? 下面看看BlockingQueue中定義的方法區(qū)別

方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時(shí)退出
插入方法 add(E) offer(E) put(E) offer(E, time, unit)
移除方法 remove() poll() take() poll(E, time, unit)
檢查隊(duì)列方法 element() peek() 不可用 不可用
  • 拋出異常:當(dāng)隊(duì)列滿時(shí),如果再往隊(duì)列里插入元素,會(huì)拋出IllegalStateException("Queuefull")異常。當(dāng)隊(duì)列空時(shí),從隊(duì)列里獲取元素會(huì)拋出NoSuchElementException異常。

  • 返回特殊值:當(dāng)往隊(duì)列插入元素時(shí),會(huì)返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊(duì)列里取出一個(gè)元素,如果沒(méi)有則返回null。

  • 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到隊(duì)列可用或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí),如果消費(fèi)者線程從隊(duì)列里take元素,隊(duì)列會(huì)阻塞住消費(fèi)者線程,直到隊(duì)列不為空。

  • 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里插入元素,隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)了指定的時(shí)間,生產(chǎn)者線程就會(huì)退出。

2.3 有哪些阻塞隊(duì)列

  • ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。

    是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列。在操作隊(duì)列的時(shí)候使用同一把顯示鎖。

  • LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。

    是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。在操作隊(duì)列的時(shí)候使用三把鎖實(shí)現(xiàn):lock全局鎖、放入時(shí)隊(duì)列不滿鎖notFull和取出時(shí)隊(duì)列不為空鎖notEmpty。

  • PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。

    PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列。默認(rèn)情況下元素采取自然順序升序排列。也可以自定義類實(shí)現(xiàn)compareTo()方法來(lái)指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時(shí),指定構(gòu)造參數(shù)Comparator來(lái)對(duì)元素進(jìn)行排序。需要注意的是不能保證同優(yōu)先級(jí)元素的順序。

  • DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。

    DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。

  • SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。

    SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè)put操作必須等待一個(gè)take操作,否則不能繼續(xù)添加元素。主要可以解決生產(chǎn)者消費(fèi)者隔離問(wèn)題。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

  • LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
    • (1) transfer方法
      如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時(shí)間限制的poll()方法時(shí)),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者。如果沒(méi)有消費(fèi)者在等待接收元素,transfer方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。
    • (2) tryTransfer方法
      tryTransfer方法是用來(lái)試探生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒(méi)有消費(fèi)者等待接收元素,則返回false。和transfer方法的區(qū)別是tryTransfer方法無(wú)論消費(fèi)者是否接收,方法立即返回,而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。
  • LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。

    LinkedBlockingDeque是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的是可以從隊(duì)列的兩端插入和移出元素。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。

3.線程池

3.1 使用線程池的好處

  • 降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。

  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。有時(shí)候任務(wù)執(zhí)行的時(shí)間還遠(yuǎn)小于線程創(chuàng)建和銷毀的時(shí)間。

  • 第三:提高線程的可管理性。線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

3.2 線程池相關(guān)的類

  • Executor是一個(gè)接口,它是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離開(kāi)來(lái)。

  • ExecutorService接口繼承了Executor,在其上做了一些shutdown()、submit()的擴(kuò)展,可以說(shuō)是真正的線程池接口;

  • AbstractExecutorService抽象類實(shí)現(xiàn)了ExecutorService接口中的大部分方法;

  • ThreadPoolExecutor是線程池的核心實(shí)現(xiàn)類,用來(lái)執(zhí)行被提交的任務(wù)。

  • ScheduledExecutorService接口繼承了ExecutorService接口,提供了帶"周期執(zhí)行"功能ExecutorService;

  • ScheduledThreadPoolExecutor是一個(gè)實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令。

線程池類關(guān)系圖.png

3.3 線程池的創(chuàng)建各個(gè)參數(shù)含義

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, /*核心線程數(shù),最大線程數(shù)*/

? long keepAliveTime, TimeUnit unit, /*空閑線程存活時(shí)間 */

? BlockingQueue<Runnable> workQueue, /*阻塞隊(duì)列 */

ThreadFactory threadFactory, /*線程工廠 */

RejectedExecutionHandler handler) /*線程池拒絕策略處理器 */

從構(gòu)造方法可以看出創(chuàng)建線程池有五類7個(gè)參數(shù)可以配置,下面來(lái)一一闡述一下

3.3.1 corePoolSize

? 線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;

? 如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;

? 如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。

3.3.2 maximumPoolSize

? 線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize

3.3.3 keepAliveTime

? 線程空閑時(shí)的存活時(shí)間,即當(dāng)線程沒(méi)有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時(shí)才有用

3.3.4 TimeUnit

keepAliveTime的時(shí)間單位

3.3.5 workQueue

? workQueue必須是BlockingQueue阻塞隊(duì)列。當(dāng)線程池中的線程數(shù)超過(guò)它的corePoolSize的時(shí)候,線程會(huì)進(jìn)入阻塞隊(duì)列進(jìn)行阻塞等待。通過(guò)workQueue,線程池實(shí)現(xiàn)了阻塞功能。

? 一般來(lái)說(shuō),我們應(yīng)該盡量使用有界隊(duì)列,因?yàn)槭褂脽o(wú)界隊(duì)列作為工作隊(duì)列會(huì)對(duì)線程池帶來(lái)如下影響。

  • (1)當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize后,新任務(wù)將在無(wú)界隊(duì)列中等待,因此線程池中的線程數(shù)不會(huì)超過(guò)corePoolSize。
  • (2)由于1,使用無(wú)界隊(duì)列時(shí)maximumPoolSize將是一個(gè)無(wú)效參數(shù)。
  • (3)由于1和2,使用無(wú)界隊(duì)列時(shí)keepAliveTime將是一個(gè)無(wú)效參數(shù)。
  • (4)更重要的,使用無(wú)界queue可能會(huì)耗盡系統(tǒng)資源,有界隊(duì)列則有助于防止資源耗盡,同時(shí)即使使用有界隊(duì)列,也要盡量控制隊(duì)列的大小在一個(gè)合適的范圍。
3.3.6 threadFactory

? 創(chuàng)建線程的工廠,通過(guò)自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識(shí)別度的線程名,當(dāng)然還可以更加自由的對(duì)線程做更多的設(shè)置,比如設(shè)置所有的線程為守護(hù)線程。

? Executors靜態(tài)工廠里默認(rèn)的threadFactory,線程的命名規(guī)則是“pool-數(shù)字-thread-數(shù)字”。

3.3.7 RejectedExecutionHandler

? 線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒(méi)有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:

  • (1) AbortPolicy:直接拋出異常,默認(rèn)策略;
  • (2) CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);
  • (3) DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
  • (4) DiscardPolicy:直接丟棄任務(wù);

當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

3.4 線程池工作機(jī)制

    1. 如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
    1. 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
    1. 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)。
    1. 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
線程池工作機(jī)制.png

3.5 提交任務(wù)

3.5.1 execute()
execute()方法用于提交不需要返回值的任務(wù),所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。
3.5.2 submit()

? submit()方法用于提交需要返回值的任務(wù)。線程池會(huì)返回一個(gè)future類型的對(duì)象,通過(guò)這個(gè)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過(guò)future的get()方法來(lái)獲取返回值,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線程一段時(shí)間后立即返回,這時(shí)候有可能任務(wù)沒(méi)有執(zhí)行完。

3.6 關(guān)閉線程池

? 可以通過(guò)調(diào)用線程池的shutdown或shutdownNow方法來(lái)關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來(lái)中斷線程,所以無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程。

? 只要調(diào)用了這兩個(gè)關(guān)閉方法中的任意一個(gè),isShutdown方法就會(huì)返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。至于應(yīng)該調(diào)用哪一種方法來(lái)關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來(lái)關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);  // 將線程池狀態(tài)設(shè)置成SHUTDOWN
        interruptIdleWorkers();    // 中斷空閑線程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);  // 將線程池狀態(tài)設(shè)置成STOP
            interruptWorkers();     // 中斷所有線程
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

3.7 如何合理配置線程池

? 要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)分析。

  • 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
  • 任務(wù)的優(yōu)先級(jí):高、中和低。
  • 任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中和短。
  • 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。

? 針對(duì)CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu。(可以通過(guò)Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù))

? 針對(duì)混合型的任務(wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒(méi)必要進(jìn)行分解。

? 針對(duì)優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來(lái)處理。它可以讓優(yōu)先級(jí)高的任務(wù)先執(zhí)行

? 執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來(lái)處理,或者可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行

測(cè)試用例代碼見(jiàn): git@github.com:oujie123/UnderstandingOfThread.git

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容