1. 阻塞隊(duì)列
隊(duì)列:是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作,和棧一樣,隊(duì)列是一種操作受限制的線性表。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。
在隊(duì)列中插入一個(gè)隊(duì)列元素稱為入隊(duì),從隊(duì)列中刪除一個(gè)隊(duì)列元素稱為出隊(duì)。因?yàn)殛?duì)列只允許在一端插入,在另一端刪除,所以只有最早進(jìn)入隊(duì)列的元素才能最先從隊(duì)列中刪除,故隊(duì)列又稱為先進(jìn)先出(FIFO—first in first out)線性表。-
阻塞隊(duì)列是支持阻塞插入和支持阻塞移除的隊(duì)列:
阻塞的插入:意思是當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。
阻塞的移除:意思是在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/p>
2. 阻塞隊(duì)列的用途
? 阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程。阻塞隊(duì)列就是生產(chǎn)者用來(lái)存放元素、消費(fèi)者用來(lái)獲取元素的容器。
2.1 為什么有生產(chǎn)者消費(fèi)者模式
? 在并發(fā)編程中,為了平衡生產(chǎn)線程和消費(fèi)線程的工作能力來(lái)提高程序整體處理數(shù)據(jù)的速度,因此生產(chǎn)者消費(fèi)者模式就有了用武之地。
? 在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開(kāi)發(fā)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。
? 因此就需要將生產(chǎn)者和消費(fèi)者隔離開(kāi),并且用一個(gè)容器將產(chǎn)品裝好。生產(chǎn)者生產(chǎn)好產(chǎn)品就把產(chǎn)品放入容器,消費(fèi)者需要的時(shí)候就從容器中拿。 而這個(gè)容器就可以用隊(duì)列來(lái)實(shí)現(xiàn)。然而有容器為空和滿的兩種情況導(dǎo)致生產(chǎn)者在容器滿的時(shí)候無(wú)法放入隊(duì)列,消費(fèi)者當(dāng)容器為空的時(shí)候無(wú)法從隊(duì)列獲取產(chǎn)品。所以阻塞隊(duì)列就應(yīng)運(yùn)而生。
2.2 阻塞隊(duì)列接口(BlockingQueue)
? 下面看看BlockingQueue中定義的方法區(qū)別
| 方法/處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
|---|---|---|---|---|
| 插入方法 | add(E) | offer(E) | put(E) | offer(E, time, unit) |
| 移除方法 | remove() | poll() | take() | poll(E, time, unit) |
| 檢查隊(duì)列方法 | element() | peek() | 不可用 | 不可用 |
拋出異常:當(dāng)隊(duì)列滿時(shí),如果再往隊(duì)列里插入元素,會(huì)拋出IllegalStateException("Queuefull")異常。當(dāng)隊(duì)列空時(shí),從隊(duì)列里獲取元素會(huì)拋出NoSuchElementException異常。
返回特殊值:當(dāng)往隊(duì)列插入元素時(shí),會(huì)返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊(duì)列里取出一個(gè)元素,如果沒(méi)有則返回null。
一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到隊(duì)列可用或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí),如果消費(fèi)者線程從隊(duì)列里take元素,隊(duì)列會(huì)阻塞住消費(fèi)者線程,直到隊(duì)列不為空。
超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里插入元素,隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)了指定的時(shí)間,生產(chǎn)者線程就會(huì)退出。
2.3 有哪些阻塞隊(duì)列
-
ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列。在操作隊(duì)列的時(shí)候使用同一把顯示鎖。
-
LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。在操作隊(duì)列的時(shí)候使用三把鎖實(shí)現(xiàn):lock全局鎖、放入時(shí)隊(duì)列不滿鎖notFull和取出時(shí)隊(duì)列不為空鎖notEmpty。
-
PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列。默認(rèn)情況下元素采取自然順序升序排列。也可以自定義類實(shí)現(xiàn)compareTo()方法來(lái)指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時(shí),指定構(gòu)造參數(shù)Comparator來(lái)對(duì)元素進(jìn)行排序。需要注意的是不能保證同優(yōu)先級(jí)元素的順序。
-
DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。
DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。
-
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè)put操作必須等待一個(gè)take操作,否則不能繼續(xù)添加元素。主要可以解決生產(chǎn)者消費(fèi)者隔離問(wèn)題。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
- LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
-
(1) transfer方法
如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時(shí)間限制的poll()方法時(shí)),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者。如果沒(méi)有消費(fèi)者在等待接收元素,transfer方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。 -
(2) tryTransfer方法
tryTransfer方法是用來(lái)試探生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒(méi)有消費(fèi)者等待接收元素,則返回false。和transfer方法的區(qū)別是tryTransfer方法無(wú)論消費(fèi)者是否接收,方法立即返回,而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。
-
(1) transfer方法
-
LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
LinkedBlockingDeque是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的是可以從隊(duì)列的兩端插入和移出元素。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。
3.線程池
3.1 使用線程池的好處
降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。有時(shí)候任務(wù)執(zhí)行的時(shí)間還遠(yuǎn)小于線程創(chuàng)建和銷毀的時(shí)間。
第三:提高線程的可管理性。線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
3.2 線程池相關(guān)的類
Executor是一個(gè)接口,它是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離開(kāi)來(lái)。
ExecutorService接口繼承了Executor,在其上做了一些shutdown()、submit()的擴(kuò)展,可以說(shuō)是真正的線程池接口;
AbstractExecutorService抽象類實(shí)現(xiàn)了ExecutorService接口中的大部分方法;
ThreadPoolExecutor是線程池的核心實(shí)現(xiàn)類,用來(lái)執(zhí)行被提交的任務(wù)。
ScheduledExecutorService接口繼承了ExecutorService接口,提供了帶"周期執(zhí)行"功能ExecutorService;
ScheduledThreadPoolExecutor是一個(gè)實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令。

3.3 線程池的創(chuàng)建各個(gè)參數(shù)含義
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, /*核心線程數(shù),最大線程數(shù)*/
? long keepAliveTime, TimeUnit unit, /*空閑線程存活時(shí)間 */
? BlockingQueue<Runnable> workQueue, /*阻塞隊(duì)列 */
ThreadFactory threadFactory, /*線程工廠 */
RejectedExecutionHandler handler) /*線程池拒絕策略處理器 */
從構(gòu)造方法可以看出創(chuàng)建線程池有五類7個(gè)參數(shù)可以配置,下面來(lái)一一闡述一下
3.3.1 corePoolSize
? 線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;
? 如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;
? 如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。
3.3.2 maximumPoolSize
? 線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize
3.3.3 keepAliveTime
? 線程空閑時(shí)的存活時(shí)間,即當(dāng)線程沒(méi)有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時(shí)才有用
3.3.4 TimeUnit
keepAliveTime的時(shí)間單位
3.3.5 workQueue
? workQueue必須是BlockingQueue阻塞隊(duì)列。當(dāng)線程池中的線程數(shù)超過(guò)它的corePoolSize的時(shí)候,線程會(huì)進(jìn)入阻塞隊(duì)列進(jìn)行阻塞等待。通過(guò)workQueue,線程池實(shí)現(xiàn)了阻塞功能。
? 一般來(lái)說(shuō),我們應(yīng)該盡量使用有界隊(duì)列,因?yàn)槭褂脽o(wú)界隊(duì)列作為工作隊(duì)列會(huì)對(duì)線程池帶來(lái)如下影響。
- (1)當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize后,新任務(wù)將在無(wú)界隊(duì)列中等待,因此線程池中的線程數(shù)不會(huì)超過(guò)corePoolSize。
- (2)由于1,使用無(wú)界隊(duì)列時(shí)maximumPoolSize將是一個(gè)無(wú)效參數(shù)。
- (3)由于1和2,使用無(wú)界隊(duì)列時(shí)keepAliveTime將是一個(gè)無(wú)效參數(shù)。
- (4)更重要的,使用無(wú)界queue可能會(huì)耗盡系統(tǒng)資源,有界隊(duì)列則有助于防止資源耗盡,同時(shí)即使使用有界隊(duì)列,也要盡量控制隊(duì)列的大小在一個(gè)合適的范圍。
3.3.6 threadFactory
? 創(chuàng)建線程的工廠,通過(guò)自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識(shí)別度的線程名,當(dāng)然還可以更加自由的對(duì)線程做更多的設(shè)置,比如設(shè)置所有的線程為守護(hù)線程。
? Executors靜態(tài)工廠里默認(rèn)的threadFactory,線程的命名規(guī)則是“pool-數(shù)字-thread-數(shù)字”。
3.3.7 RejectedExecutionHandler
? 線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒(méi)有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:
- (1) AbortPolicy:直接拋出異常,默認(rèn)策略;
- (2) CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);
- (3) DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
- (4) DiscardPolicy:直接丟棄任務(wù);
當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。
3.4 線程池工作機(jī)制
- 如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
- 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
- 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)。
- 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。

3.5 提交任務(wù)
3.5.1 execute()
execute()方法用于提交不需要返回值的任務(wù),所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。
3.5.2 submit()
? submit()方法用于提交需要返回值的任務(wù)。線程池會(huì)返回一個(gè)future類型的對(duì)象,通過(guò)這個(gè)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過(guò)future的get()方法來(lái)獲取返回值,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線程一段時(shí)間后立即返回,這時(shí)候有可能任務(wù)沒(méi)有執(zhí)行完。
3.6 關(guān)閉線程池
? 可以通過(guò)調(diào)用線程池的shutdown或shutdownNow方法來(lái)關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來(lái)中斷線程,所以無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程。
? 只要調(diào)用了這兩個(gè)關(guān)閉方法中的任意一個(gè),isShutdown方法就會(huì)返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。至于應(yīng)該調(diào)用哪一種方法來(lái)關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來(lái)關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 將線程池狀態(tài)設(shè)置成SHUTDOWN
interruptIdleWorkers(); // 中斷空閑線程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 將線程池狀態(tài)設(shè)置成STOP
interruptWorkers(); // 中斷所有線程
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
3.7 如何合理配置線程池
? 要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)分析。
- 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級(jí):高、中和低。
- 任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。
? 針對(duì)CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu。(可以通過(guò)Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù))
? 針對(duì)混合型的任務(wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒(méi)必要進(jìn)行分解。
? 針對(duì)優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來(lái)處理。它可以讓優(yōu)先級(jí)高的任務(wù)先執(zhí)行
? 執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來(lái)處理,或者可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行