引言
合理利用線程池能夠帶來(lái)三個(gè)好處。第一:降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷(xiāo)毀造成的消耗。第二:提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。第三:提高線程的可管理性。線程是稀缺資源,如果無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。但是要做到合理的利用線程池,必須對(duì)其原理了如指掌。
線程池的使用
創(chuàng)建
我們可以通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建一個(gè)線程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
創(chuàng)建一個(gè)線程池需要輸入幾個(gè)參數(shù):
corePoolSize:核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程;
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;
-
milliseconds:參數(shù)keepAliveTime的時(shí)間單位,有7種取值,在TimeUnit類(lèi)中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時(shí) TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒 runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列??梢赃x擇以下幾個(gè)阻塞隊(duì)列。
ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。
LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列。
PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)得無(wú)限阻塞隊(duì)列。ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字,Debug和定位問(wèn)題時(shí)非常又幫助。
handler
RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說(shuō)明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個(gè)策略默認(rèn)情況下是AbortPolicy,表示無(wú)法處理新任務(wù)時(shí)拋出異常。以下是JDK1.5提供的四種策略。n AbortPolicy:直接拋出異常。
CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)。
DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy:不處理,丟棄掉。
也可以根據(jù)應(yīng)用場(chǎng)景需要來(lái)實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)。
向線程池提交任務(wù)
我們可以使用execute提交的任務(wù),但是execute方法沒(méi)有返回值,所以無(wú)法判斷任務(wù)知否被線程池執(zhí)行成功。通過(guò)以下代碼可知execute方法輸入的任務(wù)是一個(gè)Runnable類(lèi)的實(shí)例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
我們也可以使用submit 方法來(lái)提交任務(wù),它會(huì)返回一個(gè)future,那么我們可以通過(guò)這個(gè)future來(lái)判斷任務(wù)是否執(zhí)行成功,通過(guò)future的get方法來(lái)獲取返回值,get方法會(huì)阻塞住直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會(huì)阻塞一段時(shí)間后立即返回,這時(shí)有可能任務(wù)沒(méi)有執(zhí)行完。
Future future = executor.submit(
new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無(wú)法執(zhí)行任務(wù)異常
} finally {
// 關(guān)閉線程池
executor.shutdown();
}
線程池的關(guān)閉
我們可以通過(guò)調(diào)用線程池的shutdown或shutdownNow方法來(lái)關(guān)閉線程池,但是它們的實(shí)現(xiàn)原理不同,shutdown的原理是只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程。shutdownNow的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來(lái)中斷線程,所以無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止。shutdownNow會(huì)首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。
只要調(diào)用了這兩個(gè)關(guān)閉方法的其中一個(gè),isShutdown方法就會(huì)返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。至于我們應(yīng)該調(diào)用哪一種方法來(lái)關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown來(lái)關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow。
線程池的工作流程

快速創(chuàng)建
不過(guò)在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類(lèi)中提供的幾個(gè)靜態(tài)方法來(lái)創(chuàng)建線程池:
Executors.newCachedThreadPool(); //創(chuàng)建一個(gè)緩沖池
Executors.newSingleThreadExecutor(); //創(chuàng)建容量為1的緩沖池
Executors.newFixedThreadPool(int); //創(chuàng)建固定容量大小的緩沖池
Executors.newWorkStealingPool(int) //java8新增,使用目前機(jī)器上可用的處理器作為它的并行級(jí)別
下面是這三個(gè)靜態(tài)方法的具體實(shí)現(xiàn);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
從它們的具體實(shí)現(xiàn)來(lái)看,它們實(shí)際上也是調(diào)用了ThreadPoolExecutor,只不過(guò)參數(shù)都已配置好了。
newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說(shuō)來(lái)了任務(wù)就創(chuàng)建線程運(yùn)行,當(dāng)線程空閑超過(guò)60秒,就銷(xiāo)毀線程。
實(shí)際中,如果Executors提供的三個(gè)靜態(tài)方法能滿足要求,就盡量使用它提供的三個(gè)方法,因?yàn)樽约喝ナ謩?dòng)配置ThreadPoolExecutor的參數(shù)有點(diǎn)麻煩,要根據(jù)實(shí)際任務(wù)的類(lèi)型和數(shù)量來(lái)進(jìn)行配置。
另外,如果ThreadPoolExecutor達(dá)不到要求,可以自己繼承ThreadPoolExecutor類(lèi)進(jìn)行重寫(xiě)。
合理配置線程池的大小
首先,看一下理想的情況,也就是所有要處理的任務(wù)都是計(jì)算任務(wù),這時(shí),線程數(shù)應(yīng)該等于 CPU 核數(shù),讓每個(gè) CPU 運(yùn)行一個(gè)線程,不需要線程切換,效率是最高的,當(dāng)然這是理想情況。
這種情況下,如果要達(dá)到某個(gè)數(shù)量的 QPS,我們使用如下的計(jì)算公式。
設(shè)置的線程數(shù) = 目標(biāo) QPS/(1/任務(wù)實(shí)際處理時(shí)間)
舉例說(shuō)明,假設(shè)目標(biāo) QPS=100,任務(wù)實(shí)際處理時(shí)間 0.2s,100 * 0.2 = 20個(gè)線程,這里的20個(gè)線程必須對(duì)應(yīng)物理的20個(gè) CPU 核心,否則將不能達(dá)到預(yù)估的 QPS 指標(biāo)。
但實(shí)際上線上服務(wù)除了做內(nèi)存計(jì)算,更多的是訪問(wèn)數(shù)據(jù)庫(kù)、緩存和外部服務(wù),大部分的時(shí)間都是在等待 IO 任務(wù)。
如果 IO 任務(wù)較多,我們使用阿姆達(dá)爾定律來(lái)計(jì)算。
設(shè)置的線程數(shù) = CPU 核數(shù) * (1 + io/computing)
舉例說(shuō)明,假設(shè)4核 CPU,每個(gè)任務(wù)中的 IO 任務(wù)占總?cè)蝿?wù)的80%,4 * (1 + 4) = 20個(gè)線程,這里的20個(gè)線程對(duì)應(yīng)的是4核心的 CPU。
線程中除了線程數(shù)的設(shè)置,線程隊(duì)列大小的設(shè)置也很重要,這也是可以通過(guò)理論計(jì)算得出,規(guī)則為按照目標(biāo)響應(yīng)時(shí)間計(jì)算隊(duì)列大小。
隊(duì)列大小 = 線程數(shù) * (目標(biāo)相應(yīng)時(shí)間/任務(wù)實(shí)際處理時(shí)間)
舉例說(shuō)明,假設(shè)目標(biāo)相應(yīng)時(shí)間為0.4s,計(jì)算阻塞隊(duì)列的長(zhǎng)度為20 * (0.4 / 0.2) = 40。
另外,在設(shè)置線程池?cái)?shù)量的時(shí)候,有如下最佳實(shí)踐:
- 線程池的使用要考慮線程最大數(shù)量和最小數(shù)最小數(shù)量。
- 對(duì)于單部的服務(wù),線程的最大數(shù)量應(yīng)該等于線程的最小數(shù)量,而混布的服務(wù),適當(dāng)?shù)睦_(kāi)最大最小數(shù)量的差距,能夠整體調(diào)整 CPU 內(nèi)核的利用率。
- 列大小一定要設(shè)置有界隊(duì)列,否則壓力過(guò)大就會(huì)拖垮整個(gè)服務(wù)。
- 才使用線程池,須進(jìn)行設(shè)計(jì)性能評(píng)估和壓測(cè)。
- 線程池的失敗策略,失敗后的補(bǔ)償。
- 處理服務(wù)須與線上面向用戶的服務(wù)進(jìn)行分離。