針對(duì) IO 密集型的任務(wù),我們可以針對(duì)原本的線程池做一些改造,從而可以提高任務(wù)的處理效率。
基本
在阿里巴巴泰山版java開(kāi)發(fā)手冊(cè)中有這么一條:
線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 的方式,
這樣的處理方式讓寫(xiě)的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
那么如果要使用 ThreadPoolExecutor ,那就先來(lái)看看構(gòu)造方法中的所有入?yún)ⅲ?/p>
corePoolSize : 核心線程數(shù),當(dāng)線程池中的線程數(shù)量為 corePoolSize 時(shí),即使這些線程處于空閑狀態(tài),也不會(huì)銷(xiāo)毀(除非設(shè)置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大線程數(shù),線程池中允許的線程數(shù)量的最大值。
keepAliveTime : 線程空閑時(shí)間,當(dāng)線程池中的線程數(shù)大于 corePoolSize 時(shí),多余的空閑線程將在銷(xiāo)毀之前等待新任務(wù)的最長(zhǎng)時(shí)間。
workQueue : 任務(wù)隊(duì)列
unit : 線程空閑時(shí)間的單位。
threadFactory : 線程工廠,線程池創(chuàng)建線程時(shí)使用的工廠。
handler : 拒絕策略,因達(dá)到線程邊界和任務(wù)隊(duì)列滿(mǎn)時(shí),針對(duì)新任務(wù)的處理方法。
這么說(shuō)可能有些難以理解,你可以結(jié)合下圖進(jìn)行參考:

那么由此我們可以知道,當(dāng)大量任務(wù)被放入線程池之后,先是被核心線程執(zhí)行,多余的會(huì)被放進(jìn)隊(duì)列里,當(dāng)隊(duì)列滿(mǎn)了之后才會(huì)創(chuàng)建額外的線程進(jìn)行處理,再多就會(huì)采取拒絕策略。
但這樣真的能滿(mǎn)足我們的所有需求嗎?
任務(wù)的分類(lèi)
正常來(lái)說(shuō),我們可以把需要處理的任務(wù)按照消耗資源的不同,分為兩種:CPU 密集型和IO 密集型。
CPU 密集型
既然名字里帶有CPU了,說(shuō)明其消耗的主要資源就是 CPU 了。
具體是指那種包含大量運(yùn)算、在持有的 CPU 分配的時(shí)間片上一直在執(zhí)行任務(wù)、幾乎不需要依賴(lài)或等待其他任何東西。
這樣的任務(wù),在我的理解中,處理起來(lái)其實(shí)沒(méi)有多少優(yōu)化空間,因?yàn)樘幚頃r(shí)幾乎沒(méi)有等待時(shí)間,所以一直占有 CPU 進(jìn)行執(zhí)行,才是最好的方式。
唯一能想到優(yōu)化的地方,就是當(dāng)單個(gè)線程累計(jì)較多任務(wù)時(shí),其他線程能進(jìn)行分擔(dān),類(lèi)似fork/join框架的概念。
設(shè)置線程數(shù)時(shí),針對(duì)單臺(tái)機(jī)器,最好就是有幾個(gè) CPU ,就創(chuàng)建幾個(gè)線程,然后每個(gè)線程都在執(zhí)行這種任務(wù),永不停歇。
IO 密集型
和上面一樣,既然名字里帶有IO了,說(shuō)明其消耗的主要資源就是 IO 了。
我們所接觸到的 IO ,大致可以分成兩種:磁盤(pán) IO和網(wǎng)絡(luò) IO。
磁盤(pán) IO ,大多都是一些針對(duì)磁盤(pán)的讀寫(xiě)操作,最常見(jiàn)的就是文件的讀寫(xiě),假如你的數(shù)據(jù)庫(kù)、 Redis 也是在本地的話,那么這個(gè)也屬于磁盤(pán) IO。
網(wǎng)絡(luò) IO ,這個(gè)應(yīng)該是大家更加熟悉的,我們會(huì)遇到各種網(wǎng)絡(luò)請(qǐng)求,比如 http 請(qǐng)求、遠(yuǎn)程數(shù)據(jù)庫(kù)讀寫(xiě)、遠(yuǎn)程 Redis 讀寫(xiě)等等。
IO 操作的特點(diǎn)就是需要等待,我們請(qǐng)求一些數(shù)據(jù),由對(duì)方將數(shù)據(jù)寫(xiě)入緩沖區(qū),在這段時(shí)間中,需要讀取數(shù)據(jù)的線程根本無(wú)事可做,因此可以把 CPU 時(shí)間片讓出去,直到緩沖區(qū)寫(xiě)滿(mǎn)。
既然這樣,IO 密集型任務(wù)其實(shí)就有很大的優(yōu)化空間了(畢竟存在等待),那現(xiàn)有的線程池可以很好的滿(mǎn)足我們的需求嗎?
線程池的優(yōu)化
還記得上面說(shuō)的, ThreadPoolExecutor 針對(duì)多余任務(wù)的處理,是先放到等待隊(duì)列中,當(dāng)隊(duì)列塞滿(mǎn)后,再創(chuàng)建額外的線程進(jìn)行處理。
假設(shè)我們的任務(wù)基本都是 IO 密集型,我們希望程序可以有更高的吞吐量,可以在更短的時(shí)間內(nèi)處理更多的任務(wù),那么上面的 ThreadPoolExecutor 明顯是不滿(mǎn)足我們的需求,那該如何解決呢?
也許再來(lái)看看 ThreadPoolExecutor 的 execute 方法,會(huì)讓我們有一些思路:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果當(dāng)前活躍線程數(shù),小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
// 則優(yōu)先創(chuàng)建線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果任務(wù)可以成功放入隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不可以成功放入隊(duì)列,則創(chuàng)建線程
else if (!addWorker(command, false))
// 如果無(wú)法繼續(xù)創(chuàng)建線程,則拒絕任務(wù)
reject(command);
}
針對(duì)放入隊(duì)列的操作,如果隊(duì)列放入失敗,線程池就會(huì)選擇去創(chuàng)建線程了。因此,我們或許可以嘗試自定義線程池,針對(duì) offer 操作,做一些自定義處理。
也就是將任務(wù)放入隊(duì)列時(shí),先檢查線程池的線程數(shù)是否小于最大線程數(shù),如果是,則拒絕放入隊(duì)列,否則,再?lài)L試放入隊(duì)列中。
如果你有看過(guò) dubbo 或者 tomcat 的線程池,你會(huì)發(fā)現(xiàn)他們就有這樣的實(shí)現(xiàn)方法。
比如 dubbo 中的 TaskQueue,我們來(lái)看看它的 offer 方法:
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// 如果有空閑等待的線程,則將任務(wù)放入隊(duì)列中,讓線程去處理任務(wù)
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 如果當(dāng)前線程數(shù)小于最大線程數(shù),則返回 false ,讓線程池去創(chuàng)建新的線程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 否則,就將任務(wù)放入隊(duì)列中
return super.offer(runnable);
}
這樣就可以讓線程池優(yōu)先新建線程了。需要注意的時(shí),此時(shí)的隊(duì)列因?yàn)樾枰鶕?jù)線程池中的線程數(shù)決定是否放入任務(wù)成功,所以需要持有executor對(duì)象,這點(diǎn)不要忘記奧。
總結(jié)
通過(guò)本篇文章,主要是讓大家重新了解了一下 ThreadPoolExecutor ,并針對(duì)高吞吐場(chǎng)景下如何進(jìn)行局部?jī)?yōu)化。
有興趣的話可以訪問(wèn)我的博客或者關(guān)注我的公眾號(hào)、頭條號(hào),說(shuō)不定會(huì)有意外的驚喜。
公眾號(hào):健程之道