關(guān)于線程池,無論是在實際的項目開發(fā)還是面試,它都是并發(fā)編程中當之無愧的重中之重。因此,掌握線程池是每個Java開發(fā)者的必備技能。
本文將從線程池的應用場景和設(shè)計原理出發(fā),先帶大家手擼一個線程池,在理解線程池的內(nèi)部構(gòu)造后,再深入剖析Java中的線程池。全文大約2.5萬字,篇幅較長,在閱讀時建議先看目錄再看內(nèi)容。
一、為什么要使用線程池
在前面系列文章的學習中,你已然知道多線程可以加速任務的處理、提高系統(tǒng)的吞吐量。那么,是否我們因此就可以頻繁地創(chuàng)建新的線程呢?答案是否定的。頻繁地繁創(chuàng)建和啟用新的線程不僅代價昂貴,而且無限增加的線程勢必也會造成管理成本的急劇上升。因此,為了平衡多線程的收益和成本,線程池誕生了。
1. 線程池的使用場景
生產(chǎn)者與消費者問題是線程池的典型應用場景。當你有源源不斷的任務需要處理時,為了提高任務的處理速度,你需要創(chuàng)建多個線程。那么,問題來了,如何管理這些任務和多線程呢?答案是:線程池。
線程池的池化(Pooling)原理的應用并不局限于Java中,在MySQL和諸多的分布式中間件系統(tǒng)中都有著廣泛的應用。當我們鏈接數(shù)據(jù)庫的時候,對鏈接的管理用的是線程池;當我們使用Tomcat時,對請求鏈接的管理用的也是線程池。所以,當你有批量的任務需要多線程處理時,那么基本上你就需要使用線程池。
2. 線程池的使用好處
線程池的好處主要體現(xiàn)在三個方面:系統(tǒng)資源、任務處理速度和相關(guān)的復雜度管理,主要表現(xiàn)在:
- 降低系統(tǒng)的資源開銷:通過復用線程池中的工作線程,避免頻繁創(chuàng)建新的線程,可以有效降低系統(tǒng)資源的開銷;
- 提高任務的執(zhí)行速度:新任務達到時,無需創(chuàng)建新的線程,直接將任務交由已經(jīng)存在的線程進行處理,可以有效提高任務的執(zhí)行速度;
- 有效管理任務和工作線程:線程池內(nèi)提供了任務管理和工作線程管理的機制。
為什么說創(chuàng)建線程是昂貴的
現(xiàn)在你已經(jīng)知道,頻繁地創(chuàng)建新線程需要付出額外的代價,所以我們使用了線程池。那么,創(chuàng)建一個新的線程的代價究竟是怎樣的呢?可以參考以下幾點:
- 創(chuàng)建線程時,JVM必須為線程堆棧分配和初始化一大塊內(nèi)存。每個線程方法的調(diào)用棧幀都會存儲到這里,包括局部變量、返回值和常量池等;
- 在創(chuàng)建和注冊本機線程時,需要和宿主機發(fā)生系統(tǒng)調(diào)用;
- 需要創(chuàng)建、初始化描述符,并將其添加到 JVM 內(nèi)部數(shù)據(jù)結(jié)構(gòu)中。
另外,從某種意義上說,只要線程還活著,它就會占用資源,這不僅昂貴,而且浪費。 例如 ,線程堆棧、訪問堆棧的可達對象、JVM 線程描述符、操作系統(tǒng)本機線程描述符等等,在線程活著的時候,這些資源都會持續(xù)占據(jù)。
雖然不同的Java平臺在創(chuàng)建線程時的代價可能有所差異,但總體來說,都不便宜。
3. 線程池的核心組成
一個完整的線程池,應該包含以下幾個核心部分:
- 任務提交:提供接口接收任務的提交;
- 任務管理:選擇合適的隊列對提交的任務進行管理,包括對拒絕策略的設(shè)置;
- 任務執(zhí)行:由工作線程來執(zhí)行提交的任務;
- 線程池管理:包括基本參數(shù)設(shè)置、任務監(jiān)控、工作線程管理等。

二、如何手工制作線程池
通過第一部分的閱讀,現(xiàn)在你已經(jīng)了解了線程池的作用及它的核心組成。為了更深刻地理解線程池的組成,在這一部分我們通過簡單的四步來手工制作一個簡單的線程池。當然,麻雀雖小,五臟俱全。如果你能手工自制線程池之后,那么在理解后續(xù)的Java中的線程池時,將會易如反掌。
1. 線程池設(shè)計和制作
第一步:定義一個王者線程池:TheKingThreadPool,它是這次手工制作中名副其實的主角兒。在這個線程池中,包含了任務隊列管理、工作線程管理,并提供了可以指定隊列類型的構(gòu)造參數(shù),以及任務提交入口和線程池關(guān)閉接口。你看,雖然它看起來似乎很迷你,但是線程池的核心組件都已經(jīng)具備了,甚至在它的基礎(chǔ)上,你完全可以把它擴展成更為成熟的線程池。
/** * 王者線程池 */public class TheKingThreadPool { private final BlockingQueue<Task> taskQueue; private final List<Worker> workers = new ArrayList<>(); private ThreadPoolStatus status; /** * 初始化構(gòu)建線程池 * * @param worksNumber 線程池中的工作線程數(shù)量 * @param taskQueue 任務隊列 */ public TheKingThreadPool(int worksNumber, BlockingQueue<Task> taskQueue) { this.taskQueue = taskQueue; status = ThreadPoolStatus.RUNNING; for (int i = 0; i < worksNumber; i++) { workers.add(new Worker("Worker" + i, taskQueue)); } for (Worker worker : workers) { Thread workThread = new Thread(worker); workThread.setName(worker.getName()); workThread.start(); } } /** * 提交任務 * * @param task 待執(zhí)行的任務 */ public synchronized void execute(Task task) { if (!this.status.isRunning()) { throw new IllegalStateException("線程池非運行狀態(tài),停止接單啦~"); } this.taskQueue.offer(task); } /** * 等待所有任務執(zhí)行結(jié)束 */ public synchronized void waitUntilAllTasksFinished() { while (this.taskQueue.size() > 0) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 關(guān)閉線程池 */ public synchronized void shutdown() { this.status = ThreadPoolStatus.SHUTDOWN; } /** * 停止線程池 */ public synchronized void stop() { this.status = ThreadPoolStatus.SHUTDOWN; for (Worker worker : workers) { worker.doStop(); } }}
第二步:設(shè)計并制作工作線程。工作線程是干活的線程,將負責處理提交到線程池中的任務,我們把它叫做Worker。其實,這里的Worker的定義和Java線程池中的Worker已經(jīng)很像了,它繼承了Runnable接口并封裝了Thread. 在構(gòu)造Worker時,可以設(shè)定它的名字,并傳入任務隊列。當Worker啟動后,它將會從任務隊列中獲取任務并執(zhí)行。此外,它還提供了Stop方法,用以響應線程池的狀態(tài)變化。
/** * 線程池中用于執(zhí)行任務的線程 */public class Worker implements Runnable { private final String name; private Thread thread = null; private final BlockingQueue<Task> taskQueue; private boolean isStopped = false; private AtomicInteger counter = new AtomicInteger(); public Worker(String name, BlockingQueue<Task> queue) { this.name = name; taskQueue = queue; } public void run() { this.thread = Thread.currentThread(); while (!isStopped()) { try { Task task = taskQueue.poll(5L, TimeUnit.SECONDS); if (task != null) { note(this.thread.getName(), ":獲取到新的任務->", task.getTaskDesc()); task.run(); counter.getAndIncrement(); } } catch (Exception ignored) { } } note(this.thread.getName(), ":已結(jié)束工作,執(zhí)行任務數(shù)量:" + counter.get()); } public synchronized void doStop() { isStopped = true; if (thread != null) { this.thread.interrupt(); } } public synchronized boolean isStopped() { return isStopped; } public String getName() { return name; }}
第三步:設(shè)計并制作任務。任務是可以可執(zhí)行的對象,因此我們直接繼承Runnable接口就行。其實,直接使用Runnable接口也是可以的,只不過為了讓示例更加清楚,我們給Task加了任務描述的方法。
/** * 任務 */public interface Task extends Runnable { String getTaskDesc();}
第四步:設(shè)計線程池的狀態(tài)。線程池作為一個運行框架,它必然會有一系列的狀態(tài),比如運行中、停止、關(guān)閉等。
public enum ThreadPoolStatus { RUNNING(), SHUTDOWN(), STOP(), TIDYING(), TERMINATED(); ThreadPoolStatus() { } public boolean isRunning() { return ThreadPoolStatus.RUNNING.equals(this); }}
以上四個步驟完成后,一個簡易的線程池就已經(jīng)制作完畢。你看,如果你從以上幾點入手來理解線程池的源碼的話,是不是要簡單多了?Java中的線程池的核心組成也是如此,只不過在細節(jié)處理等方面更多全面且豐富。
2. 運行線程池
現(xiàn)在,我們的王者線程池已經(jīng)制作好。接下來,我們通過一個場景來運行它,看看它的效果如何。
試驗場景:峽谷森林中,鎧、蘭陵王和典韋等負責打野,而安其拉、貂蟬和大喬等美女負責對狩獵到的野怪進行燒烤,一場歡快的峽谷燒烤節(jié)正在進行中。
在這個場景中,鎧和蘭陵王他們負責提交任務,而貂蟬和大喬她們則負責處理任務。

在下面的實現(xiàn)代碼中,我們通過上述設(shè)計的TheKingThreadPool來定義個線程池,wildMonsters中的野怪表示待提交的任務,并安排3個工作線程來執(zhí)行任務。在示例代碼的末尾,當所有任務執(zhí)行結(jié)束后,關(guān)閉線程池。
public static void main(String[] args) { TheKingThreadPool theKingThreadPool = new TheKingThreadPool(3, new ArrayBlockingQueue<>(10)); String[] wildMonsters = {"棕熊", "野雞", "灰狼", "野兔", "狐貍", "小鹿", "小花豹", "野豬"}; for (String wildMonsterName : wildMonsters) { theKingThreadPool.execute(new Task() { public String getTaskDesc() { return wildMonsterName; } public void run() { System.out.println(Thread.currentThread().getName() + ":" + wildMonsterName + "已經(jīng)烤好"); } }); } theKingThreadPool.waitUntilAllTasksFinished(); theKingThreadPool.stop(); }
王者線程池運行結(jié)果如下:
Worker0:獲取到新的任務->灰狼Worker1:獲取到新的任務->野雞Worker1:野雞已經(jīng)烤好Worker2:獲取到新的任務->棕熊Worker2:棕熊已經(jīng)烤好Worker1:獲取到新的任務->野兔Worker1:野兔已經(jīng)烤好Worker0:灰狼已經(jīng)烤好Worker1:獲取到新的任務->小鹿Worker1:小鹿已經(jīng)烤好Worker2:獲取到新的任務->狐貍Worker2:狐貍已經(jīng)烤好Worker1:獲取到新的任務->野豬Worker1:野豬已經(jīng)烤好Worker0:獲取到新的任務->小花豹Worker0:小花豹已經(jīng)烤好Worker0:已結(jié)束工作,執(zhí)行任務數(shù)量:2Worker2:已結(jié)束工作,執(zhí)行任務數(shù)量:2Worker1:已結(jié)束工作,執(zhí)行任務數(shù)量:4Process finished with exit code 0
從結(jié)果中可以看到,效果完全符合預期。所有的任務都已經(jīng)提交完畢,并且都被正確執(zhí)行。此外,通過線程池的任務統(tǒng)計,可以看到任務并不是均勻分配,Worker1執(zhí)行了4個任務,而Worker0和Worker2均只執(zhí)行了2個任務,這也是線程池中的正?,F(xiàn)象。
三、透徹理解Java中的線程池
在手工制作線程線程池之后,再來理解Java中的線程池就相對要容易很多。當然,相比于王者線程池,Java中的線程池(ThreadPoolExecutor)的實現(xiàn)要復雜很多。所以,理解時應當遵循一定的結(jié)構(gòu)和脈絡,把握住線程池的核心要點,眉毛胡子一把抓、理不清層次會導致你無法有效理解它的設(shè)計內(nèi)涵,進而導致你無法正確掌握它。
總體來說,Java中的線程池的設(shè)計核心都是圍繞“任務”進行,可以通過一個框架、兩大核心、三大過程概括。理解了這三個重要概念,基本上你已經(jīng)能從相對抽象的層面理解了線程池。
- 一個框架:即線程池的整體設(shè)計存在一個框架,而不是雜亂無章的組成。所以,在學習線程池時,首先要能從立體上感知到這個框架的存在,而不要陷于凌亂的細節(jié)中;
- 兩大核心:在線程池的整個框架中,圍繞任務執(zhí)行這件事,存在兩大核心:任務的管理和任務的執(zhí)行,對應的也就是任務隊列和用于執(zhí)行任務的工作線程。任務隊列和工作線程是框架得以有效運轉(zhuǎn)的關(guān)鍵部件;
- 三大過程:前面說過,線程池的整體設(shè)計都是圍繞任務展開,所以框架內(nèi)可以分為任務提交、任務管理和任務執(zhí)行三大過程。
從類比的角度講,你可以把框架看作是一個生產(chǎn)車間。在這個車間里,有一條流水線,任務隊列和工作線程是這條流水線的兩大關(guān)鍵組成。而在流水線運作的過程中,就會涉及任務提交、任務管理和任務執(zhí)行等不同的過程。
下面這幅圖,將幫助你立體地感知線程池的整體設(shè)計,建議你收藏。在這幅圖中,清楚地展示了線程池整個框架的工作流程和核心部件,接下來的文章也將圍繞這幅圖展開。

1. 線程池框架設(shè)計概覽
從源碼層面看,理解Java中的線程池,要從下面這四兄弟的概念和關(guān)系入手,這四個概念務必了然于心。

- Executor:作為線程池的最頂層接口,Executor的接口在設(shè)計上,實現(xiàn)了任務提交與任務執(zhí)行之間的解耦,這是它存在的意義。在Executor中,只定義了一個方法void execute(Runnable command),用于執(zhí)行提交的可運行的任務。注意,你看它這個方法的參數(shù)干脆就叫command,也就是“命令”,意在表明所提交的不是一個靜止的對象,而是可運行的命令。并且,這個命令將在未來的某一時刻執(zhí)行,具體由哪個線程來執(zhí)行也是不確定的;
- ExecutorService:繼承了Executor的接口,并在此基礎(chǔ)上提供可以管理服務和執(zhí)行結(jié)果(Futrue) 的能力。ExecutorService所提供的submit方法可以返回任務的執(zhí)行結(jié)果,而shutdown方法則可以用于關(guān)閉服務。相比起來,Executor只具備單一的執(zhí)行能力,而ExecutorService則不僅具有執(zhí)行能力,還提供了簡單的服務管理能力;
- AbstractExecutorService:作為ExecutorService的簡單實現(xiàn),該類通過RunnableFuture和newTaskFor實現(xiàn)了submit、invokeAny和invokeAll等方法;
- ThreadPoolExecutor:該類是線程池的最終實現(xiàn)類,實現(xiàn)了Executor和ExecutorService中定義的能力,并豐富了AbstractExecutorService中的實現(xiàn)。在ThreadPoolExecutor中,定義了任務管理策略和線程池管理能力,相關(guān)能力的實現(xiàn)細節(jié)將是我們下文所要講解的核心所在。
如果你覺得還是不太能直觀地感受四兄弟的差異,那么你可以放大查看下面這幅高清圖示。看的時候,要格外注意它們各自方法的不同,方法的不同意味著它們的能力不同。

而對于線程池總體的執(zhí)行過程,下面這幅圖也建議你收藏。這幅圖雖然簡明,但完整展示了從任務提交到任務執(zhí)行的整個過程。這個執(zhí)行過程往往也是面試中的高頻面試題,務必掌握。

(1)線程池的核心屬性
線程池中的一些核心屬性選取如下,對于其中個別屬性會做特別說明。
// 線程池控制相關(guān)的主要變量// 這個變量很神奇,下文后專門陳述,請?zhí)貏e留意private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 待處理的任務隊列private final BlockingQueue < Runnable > workQueue;// 工作線程集合private final HashSet < Worker > workers = new HashSet < Worker > ();// 創(chuàng)建線程所用到的線程工廠private volatile ThreadFactory threadFactory;// 拒絕策略private volatile RejectedExecutionHandler handler;// 核心線程數(shù)private volatile int corePoolSize;// 最大線程數(shù)private volatile int maximumPoolSize;// 空閑線程的保活時長private volatile long keepAliveTime;// 線程池變更的主要控制鎖,在工作線程數(shù)、變更線程池狀態(tài)等場景下都會用到private final ReentrantLock mainLock = new ReentrantLock();
關(guān)于ctl字段的特別說明
在ThreadPoolExecutor的多個核心字段中,其他字段可能都比較好理解,但是ctl要單獨拎出來做些解釋。
顧名思義,ctl這個字段用于對線程池的控制。它的設(shè)計比較有趣,用一個字段卻表示了兩層含義,也就是這個字段實際是兩個字段的合體:
- runState:線程池的運行狀態(tài)(高3位);
- workerCount:工作線程數(shù)量(第29位)。
這兩個字段的值相互獨立,互不影響。那為何要用這種設(shè)計呢?這是因為,在線程池中這兩個字段幾乎總是如影相隨,如果不用一個字段來表示的話,那么就需要通過鎖的機制來控制兩個字段的一致性。不得不說,這個字段設(shè)計上還是比較巧妙的。
在線程池中,也提供了一些方法可以方便地獲取線程池的狀態(tài)和工作線程數(shù)量,它們都是通過對ctl進行位運算得來。
/** 計算當前線程池的狀態(tài)*/private static int runStateOf(int c) { return c & ~CAPACITY;}/** 計算當前工作線程數(shù)*/private static int workerCountOf(int c) { return c & CAPACITY;}/** 初始化ctl變量*/private static int ctlOf(int rs, int wc) { return rs | wc;}
關(guān)于位運算,這里補充一點說明,如果你對位運算有點迷糊的話可以看看,如果你對它比較熟悉則可以直接跳過。
假設(shè)A=15,二進制是1111;B=6,二進制是110.
運算符名稱描述示例&按位與如果相對應位都是1,則結(jié)果為1,否則為0(A&B),得到6,即110~按位非按位取反運算符翻轉(zhuǎn)操作數(shù)的每一位,即0變成1,1變成0。(?A)得到-16,即
11111111111111111111111111110000|按位或如果相對應位都是 0,則結(jié)果為 0,否則為 1(A | B)得到15,即 1111
(2)線程池的核心構(gòu)造器
ThreadPoolExecutor有四個構(gòu)造器,其中一個是核心構(gòu)造器。你可以根據(jù)需要,按需使用這些構(gòu)造器。
- 核心構(gòu)造器之一:相對較為常用的一個構(gòu)造器,你可以指定核心線程數(shù)、最大線程數(shù)、線程保活時間和任務隊列類型。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}
- 核心構(gòu)造器之二:相比于第一個構(gòu)造器,你可以在這個構(gòu)造器中指定ThreadFactory. 通過ThreadFactory,你可以指定線程名稱、分組等個性化信息。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}
- 核心構(gòu)造器之三:這個構(gòu)造器的要點在于,你可以指定拒絕策略。關(guān)于任務隊列的拒絕策略,下文有詳細介紹。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}
- 核心構(gòu)造器之四:這個構(gòu)造器是ThreadPoolExecutor的核心構(gòu)造器,提供了較為全面的參數(shù)設(shè)置,上述的三個構(gòu)造器都是基于它實現(xiàn)。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
(3)線程池中的核心方法
/*** 提交Runnable類型的任務并執(zhí)行,但不返回結(jié)果*/public void execute(Runnable command){...}/*** 提交Runnable類型的任務,并返回結(jié)果*/public Future<?> submit(Runnable task){...}/*** 提交Runnable類型的任務,并返回結(jié)果,支持指定默認結(jié)果*/public <T> Future<T> submit(Runnable task, T result){...}/*** 提交Callable類型的任務并執(zhí)行*/public <T> Future<T> submit(Callable<T> task) {...}/*** 關(guān)閉線程池,繼續(xù)執(zhí)行隊列中未完成的任務,但不會接收新的任務*/public void shutdown() {...}/*** 立即關(guān)閉線程池,同時放棄未執(zhí)行的任務,并不再接收新的任務*/public List<Runnable> shutdownNow(){...}
(4)線程池的狀態(tài)與生命周期管理
前文說過,線程池恰似一個生產(chǎn)車間,而從生產(chǎn)車間的角度看,生產(chǎn)車間有運行、停產(chǎn)等不同狀態(tài),所以線程池也是有一定的狀態(tài)和使用周期的。

- Running:運行中,該狀態(tài)下可以繼續(xù)向線程池中增加任務,并正常處理隊列中的任務;
- Shutdown:關(guān)閉中,該狀態(tài)下線程池不會立即停止,但不能繼續(xù)向線程池中增加任務,直到任務執(zhí)行結(jié)束;
- Stop:停止,該狀態(tài)下將不再接收新的任務,同時不再處理隊列中的任務,并中斷工作中的線程;
- Tidying:相對短暫的中間狀態(tài),所有任務都已經(jīng)結(jié)束,并且所有的工作線程都不再存在(workerCount==0),并運行terminated()鉤子方法;
- Terminated:terminated()運行結(jié)束。
2. 如何向線程池中提交任務
向線程池提交任務有兩種比較常見的方式,一種是需要返回執(zhí)行結(jié)果的,一種則是不需要返回結(jié)果的。
(1)不關(guān)注任務執(zhí)行結(jié)果:execute
通過execute()提交任務到線程池后,任務將在未來某個時刻執(zhí)行,執(zhí)行的任務的線程可能是當前線程池中的線程,也可能是新創(chuàng)建的線程。當然,如果此時線程池應關(guān)閉,或者任務隊列已滿,那么該任務將交由RejectedExecutionHandler處理。
(2)關(guān)注任務執(zhí)行結(jié)果:submit
通過submit()提交任務到線程池后,運行機制和execute類似,其核心不同在于,由submit()提交任務時將等待任務執(zhí)行結(jié)束并返回結(jié)果。
3. 如何管理提交的任務
(1)任務隊列選型策略
- SynchronousQueue:無縫傳遞(Direct handoffs)。當新的任務到達時,將直接交由線程處理,而不是放入緩存隊列。因此,如果任務達到時卻沒有可用線程,那么將會創(chuàng)建新的線程。所以,為了避免任務丟失,在使用SynchronousQueue時,將會需要創(chuàng)建無數(shù)的線程,在使用時需要謹慎評估。
- LinkedBlockingQueue:無界隊列,新提交的任務都會緩存到該隊列中。使用無界隊列時,只有corePoolSize中的線程來處理隊列中的任務,這時候和maximumPoolSize是沒有關(guān)系的,它不會創(chuàng)建新的線程。當然,你需要注意的是,如果任務的處理速度遠低于任務的產(chǎn)生速度,那么LinkedBlockingQueue的無限增長可能會導致內(nèi)存容量等問題。
- ArrayBlockingQueue:有界隊列,可能會觸發(fā)創(chuàng)建新的工作線程,maximumPoolSize參數(shù)設(shè)置在有界隊列中將發(fā)揮作用。在使用有界隊列時,要特別注意任務隊列大小和工作線程數(shù)量之間的權(quán)衡。如果任務隊列大但是線程數(shù)量少,那么結(jié)果會是系統(tǒng)資源(主要是CPU)占用率較低,但同時系統(tǒng)的吞吐量也會降低。反之,如果縮小任務隊列并擴大工作線程數(shù)量,那么結(jié)果則是系統(tǒng)吞吐量增大,但同時系統(tǒng)資源占用也會增加。所以,使用有界隊列時,要考慮到平衡的藝術(shù),并配置相應的拒絕策略。
(2)如何選擇合適的拒絕策略
在使用線程池時,拒絕策略是必須要確認的地方,因為它可能會造成任務丟失。
當線程池已經(jīng)關(guān)閉或任務隊列已滿且無法再創(chuàng)建新的工作線程時,那么新提交的任務將會被拒絕,拒絕時將調(diào)用RejectedExecutionHandler中的rejectedExecution(Runnable r, ThreadPoolExecutor executor)來執(zhí)行具體的拒絕動作。
final void reject(Runnable command) { handler.rejectedExecution(command, this);}
以execute方法為例,當線程池狀態(tài)異?;驘o法新增工作線程時,將會執(zhí)行任務拒絕策略。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}
ThreadPoolExecutor的默認拒絕策略是AbortPolicy,這一點在屬性定義中已經(jīng)確定。在大部分場景中,直接拒絕任務都是不合適的。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
- AbortPolicy:默認策略,直接拋出RejectedExecutionException異常;
- CallerRunsPolicy:交由當前線程自己來執(zhí)行。這種策略這提供了一個簡單的反饋控制機制,可以減慢提交新任務的速度;
- DiscardPolicy:直接丟棄任務,不會拋出異常;
- DiscardOldestPolicy:如果此時線程池沒有關(guān)閉,將從隊列的頭部取出第一個任務并丟棄,并再次嘗試執(zhí)行。如果執(zhí)行失敗,那么將重復這個過程。
如果上述四種策略均不滿足,你也可以通過RejectedExecutionHandler接口定制個性化的拒絕策略。事實上,為了兼顧任務不丟失和系統(tǒng)負載,建議你自己實現(xiàn)拒絕策略。
(3)隊列維護
對于任務隊列的維護,線程池也提供了一些方法。
- 獲取當前任務隊列
public BlockingQueue<Runnable> getQueue() { return workQueue;}
- 從隊列中移除任務
public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed;}
4. 如何管理執(zhí)行任務的工作線程
(1)核心工作線程
核心線程(corePoolSize)是指最小數(shù)量的工作線程,此類線程不允許超時回收。當然,如果你設(shè)置了allowCoreThreadTimeOut,那么核心線程也是會超時的,這可能會導致核心線程數(shù)為零。核心線程的數(shù)量可以通過線程池的構(gòu)造參數(shù)指定。
(2)最大工作線程
最大工作線程指的是線程池為了處理現(xiàn)有任務,所能創(chuàng)建的最大工作線程數(shù)量。
最大工作線程可以通過構(gòu)造函數(shù)的maximumPoolSize變量設(shè)定。當然,如果你所使用的任務隊列是無界隊列,那么這個參數(shù)將形同虛設(shè)。
(3)如何創(chuàng)建新的工作線程
在線程池中,新線程的創(chuàng)建是通過ThreadFactory完成。你可以通過線程池的構(gòu)造函數(shù)指定特定的ThreadFactory,如未指定將使用默認的Executors.defaultThreadFactory(),該工廠所創(chuàng)建的線程具有相同的ThreadGroup和優(yōu)先級(NORM_PRIORITY),并且都不是守護( Non-Daemon)線程。
通過設(shè)定ThreadFactory,你可以自定義線程的名字、線程組以及守護狀態(tài)等。
在Java的線程池ThreadPoolExecutor中,addWorker方法負責新線程的具體創(chuàng)建工作。
private boolean addWorker(Runnable firstTask, boolean core) {...}
(4)保活時間
?;顣r間指的是非核心線程在空閑時所能存活的時間。
如果線程池中的線程數(shù)量超過了corePoolSize中的設(shè)定,那么空閑線程的空閑時間在超過keepAliveTime中設(shè)定的時間后,線程將被回收終止。在線程被回收后,如果需要新的線程時,將繼續(xù)創(chuàng)建新的線程。
需要注意的是,keepAliveTime僅對非核心線程有效,如果需要設(shè)置核心線程的?;顣r間,需要使用allowCoreThreadTimeOut參數(shù)。
(5)鉤子方法
- 設(shè)定任務執(zhí)行前動作:beforeExecute
如果你希望提交的任務在執(zhí)行前執(zhí)行特定的動作,比如寫入日志或設(shè)定ThreadLocal等。那么,你可以通過重寫beforeExecute來實現(xiàn)這一目的。
protected void beforeExecute(Thread t, Runnable r) { }
- 設(shè)定任務執(zhí)行后動作:beforeExecute 如果你希望提交的任務在執(zhí)行后執(zhí)行特定的動作,比如寫入日志或捕獲異常等。那么,你可以通過重寫afterExecute來實現(xiàn)這一目的。
protected void afterExecute(Runnable r, Throwable t) { }
- 設(shè)定線程池終止動作:terminated
protected void terminated() { }
(6)線程池的預熱
默認情況下,在設(shè)置核心線程數(shù)之后,也不會立即創(chuàng)建相關(guān)線程,而是任務到達后再創(chuàng)建。
如果你需要預先就啟動核心線程,那么你可以通過調(diào)用prestartCoreThread或prestartAllCoreThreads來提前啟動,以達到線程池預熱目的,并且可以通過ensurePrestart方法來驗證效果。
(7)線程回收機制
當線程池中的工作線程數(shù)量大于corePoolSize設(shè)置的數(shù)量時,并且存在空閑線程,并且這個空閑線程的空閑時長超過了keepAliveTime所設(shè)置的時長,那么這樣的空閑線程將會被回收,以降低不必要的資源浪費。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { ... } finally { processWorkerExit(w, completedAbruptly); // 主動回收自己 } }
(8)線程數(shù)調(diào)整策略
線程池的工作線程的設(shè)置是否合理,關(guān)系到系統(tǒng)負載和任務處理速度之間的平衡。這里要明確的是,如何設(shè)置核心線程并沒有放之四海而皆準的公式。每個業(yè)務場景都有著它獨特的地方,CPU密集型和IO密集型任務存在較大差異。因此,在使用線程池的時候,要具體問題具體分析,但是你可以運行結(jié)果持續(xù)調(diào)整來優(yōu)化線程池。
5. 線程池使用示例
我們?nèi)砸允止ぶ谱骶€程池部分的場景為例,通過ThreadPoolExecutor實現(xiàn)來展示線程池的使用示例。從代碼中看,ThreadPoolExecutor的使用和王者線程池TheKingThreadPool的用法基本一致。
public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < > (10)); String[] wildMonsters = {"棕熊", "野雞", "灰狼", "野兔", "狐貍", "小鹿", "小花豹", "野豬"}; for (String wildMonsterName: wildMonsters) { threadPoolExecutor.execute(new RunnableTask() { public String getTaskDesc() { return wildMonsterName; } public void run() { System.out.println(Thread.currentThread().getName() + ":" + wildMonsterName + "已經(jīng)烤好"); } }); } threadPoolExecutor.shutdown();}
6. Executors類
Executors是JUC中一個針對ThreadPoolExecutor和ThreadFactory等設(shè)計的一個工具類。通過Executors,可以方便地創(chuàng)建不同類型的線程池。當然,其內(nèi)部主要是通過給ThreadPoolExecutor的構(gòu)造傳遞特定的參數(shù)實現(xiàn),并無玄機可言。常用的幾個工具如下所示:
- 創(chuàng)建固定線程數(shù)的線程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 創(chuàng)建只有1個線程的線程池
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
- 創(chuàng)建緩存線程池:這種線程池不設(shè)定核心線程數(shù),根據(jù)任務的數(shù)據(jù)動態(tài)創(chuàng)建線程。當任務執(zhí)行結(jié)束后,線程會被逐步回收,也就是所有的線程都是臨時的。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
7. 線程池監(jiān)控
作為一個運行框架,ThreadPoolExecutor既簡單也復雜。因此,對其內(nèi)部的監(jiān)控和管理是十分必要的。ThreadPoolExecutor也提供了一些方法,通過這些方法,我們可以獲取到線程池的一些重要狀態(tài)和數(shù)據(jù)。
- 獲取線程池大小
public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Remove rare and surprising possibility of // isTerminated() && getPoolSize() > 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } }
- 獲取活躍工作線程數(shù)量
public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w: workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } }
- 獲取最大線程池
public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } }
- 獲取線程池中的任務總數(shù)
public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w: workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } }
- 獲取線程池中已完成的任務總數(shù)
public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w: workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); }}
四、如何養(yǎng)成正確使用線程池的良好習慣
1. 線程池的使用風險提示
雖然線程池的使用有諸多的好處,然而天下沒有免費的午餐,線程池在給我們帶來便利的同時,也有一些避免踩坑的注意事項:
- 線程池設(shè)置過大或過小都不合適。如果線程池的線程數(shù)量過多,雖然局部處理速度增加,但將會影響應用系統(tǒng)的整體性能。而如果線程池的線程數(shù)量過少,線程池可能無法帶來預期的性能的提升;
- 和其他多線程類似,線程池中也可能會發(fā)生死鎖。比如,某個任務等待另外一個任務結(jié)束,但卻沒有線程來執(zhí)行等待的那個任務,這也是為什么要避免任務間存在依賴;
- 添加任務到隊列時耗時過長。如果任務隊列已滿,外部線程向隊列添加任務將會受阻。所以,為了避免外部線程阻塞時間過長,你可以設(shè)定最大等待時間;
為了降低這些風險的發(fā)生,你在設(shè)置線程池的類型和參數(shù)時,應當格外小心。在正式上線前,最好能做一次壓力測試。
2. 創(chuàng)建線程池的推薦姿勢
雖然通過Executors創(chuàng)建線程比較方便,但是Executors的封裝屏蔽了一些重要的參數(shù)細節(jié),而這些參數(shù)對于線程池至關(guān)重要,所以為了避免因?qū)xecutors不了解而錯誤地使用線程池,建議還是通過ThreadPoolExecutor的構(gòu)造參數(shù)直接創(chuàng)建。
3. 盡量避免使用無界隊列
如果再認真點說的話,你應該在任何時候都避免使用無界隊列來管理任務。注意,Executors的newFixedThreadPool所使用的是LinkedBlockingQueue,上文有它的源碼。
小結(jié)
以上就是關(guān)于Java線程池的全部內(nèi)容。在這篇文章中,我們講解了線程池的應用場景、核心組成及原理,并手工制作了一個線程池,而且在此基礎(chǔ)上深入講解了Java中的線程池ThreadPoolExecutor的實現(xiàn)。雖然文章整體篇幅較大,但是由于線程池涉及的內(nèi)容十分廣泛,難以在一篇文章中全部提及,仍有部分重要內(nèi)容未能覆蓋,比如如何處理線程池中的異常、如何優(yōu)雅關(guān)閉線程池等。
熟練掌握線程池并不是一件容易的事,建議按照本文開篇的建議,先理解其要解決的問題,再理解其核心組成原理,最后再深入到Java中的源碼中。如此一來,帶著已知的概念去看源碼,會更容易理解源碼的設(shè)計之道。
轉(zhuǎn)載于:
https://www.cnblogs.com/time-as-a-friend/p/15060244.html