背景
最近收到一道面試題:我們知道JDK的線程池在線程數(shù)達(dá)到corePoolSize之后,先判斷隊(duì)列,再判斷maximumPoolSize。如果想反過來,即先判斷maximumPoolSize再判斷隊(duì)列,怎么辦?
建議往下瀏覽之前先思考一下解決方案,如果自己面對這道面試題,該如何作答?
方案一
由于線程池的行為是定義在JDK相關(guān)代碼中,我們想改變其默認(rèn)行為,很自然的一種想法便是:繼承自JDK的線程池類java.util.concurrent.ThreadPoolExecutor,然后改寫其execute方法,將判斷隊(duì)列與maximumPoolSize的邏輯順序調(diào)整一下,以達(dá)到目的
原來的邏輯如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 創(chuàng)建新線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 代碼運(yùn)行到此處,說明線程池?cái)?shù)量達(dá)到了corePoolSize
if (isRunning(c) && workQueue.offer(command)) {
// 將任務(wù)成功入隊(duì)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 代碼運(yùn)行到此處,說明入隊(duì)失敗
else if (!addWorker(command, false))
// 創(chuàng)建新線程失敗則執(zhí)行拒絕策略
reject(command);
}
但是仔細(xì)閱讀代碼會(huì)發(fā)現(xiàn),execute中涉及到的一些關(guān)鍵方法如workerCountOf、addWorker等是私有的,關(guān)鍵變量如ctl、corePoolSize也是私有的,即無法通過簡單繼承ThreadPoolExecutor改寫其execute方法的核心邏輯達(dá)到目的。
那考慮的一個(gè)變種是,定義一個(gè)MyThreadPoolExecutor,把ThreadPoolExecutor的代碼照搬過來,只改寫其中execute方法,改寫后的邏輯如下:
public void execute(Runnable command) {
if (command == null)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 先判斷maximumPoolSize
if (workerCountOf(c) < maximumPoolSize) {
if (addWorker(command, false))
return;
c = ctl.get();
}
// 再判斷隊(duì)列
else if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (isRunning(c) && !workQueue.offer(command))
reject(command);
}
改寫之后,發(fā)現(xiàn)reject方法也得重寫,原因是RejectedExecutionHandler#rejectedExecution第二個(gè)入?yún)⑹荰hreadPoolExecutor,不能傳this
// java.util.concurrent.ThreadPoolExecutor#reject
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
// java.util.concurrent.RejectedExecutionHandler
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
這樣,連RejectedExecutionHandler也要改寫一下
由于RejectedExecutionHandler的改造并非面試題核心邏輯,所以此處省略,明白要表達(dá)的意思即可
但這樣做之后,與三方框架的兼容就很難了--->有不少三方框架入?yún)⑹切枰猅hreadPoolExecutor,而不是自定義的MyThreadPoolExecutor,后續(xù)的使用會(huì)是個(gè)問題
評價(jià):自定義MyThreadPoolExecutor需要代碼大篇幅的拷貝,麻煩不說,兼容性還是個(gè)問題,從實(shí)戰(zhàn)出發(fā)考慮,可行性很低
方案二
那有沒有什么方案能夠既省事,又能兼顧兼容性?
兩步走:
- 自定義Queue,改寫offer邏輯
- 自定義線程池類,繼承自ThreadPoolExecutor,改寫核心邏輯
自定義Queue
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
// 自定義的線程池類,繼承自ThreadPoolExecutor
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
// offer方法的含義是:將任務(wù)提交到隊(duì)列中,返回值為true/false,分別代表提交成功/提交失敗
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
// 線程池的當(dāng)前線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
// 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù),意味著線程池中有空閑線程,直接扔進(jìn)隊(duì)列里,讓線程去處理
return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false,暗含入隊(duì)失敗,讓線程池去創(chuàng)建新的線程
return false;
}
// 重點(diǎn): 代碼運(yùn)行到此處,說明當(dāng)前線程數(shù) >= 最大線程數(shù),需要真正的提交到隊(duì)列中
return super.offer(runnable);
}
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
自定義線程池類
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* 定義一個(gè)成員變量,用于記錄當(dāng)前線程池中已提交的任務(wù)數(shù)量
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// ThreadPoolExecutor的勾子方法,在task執(zhí)行完后需要將池中已提交的任務(wù)數(shù) - 1
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
// 將池中已提交的任務(wù)數(shù) + 1
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
核心邏輯:當(dāng)提交任務(wù)給EagerThreadPoolExecutor,執(zhí)行submittedTaskCount.incrementAndGet();將池中已提交的任務(wù)數(shù) + 1,然后就調(diào)用父類的execute方法
// 代碼運(yùn)行到此處,說明線程數(shù) >= corePoolSize, 此時(shí)workQueue為自定義的TaskQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
核心邏輯:當(dāng)執(zhí)行workQueue.offer(command),走到自定義的TaskQueue#offer邏輯,而offer方法的返回值決定著是否創(chuàng)建更多的線程:返回true,代表入隊(duì)成功,不創(chuàng)建線程;返回false,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程
// 線程池的當(dāng)前線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
// 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù),意味著線程池中有空閑線程,直接扔進(jìn)隊(duì)列里,讓線程去處理
return super.offer(runnable);
}
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false,暗含入隊(duì)失敗,讓線程池去創(chuàng)建新的線程
return false;
}
// 重點(diǎn): 代碼運(yùn)行到此處,說明當(dāng)前線程數(shù) >= 最大線程數(shù),需要真正的提交到隊(duì)列中
return super.offer(runnable);
核心邏輯:當(dāng)前線程數(shù)小于最大線程數(shù)就返回false,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程
因此,總結(jié)起來就是:自定義的EagerThreadPoolExecutor依賴自定義的TaskQueue的offer返回值來決定是否創(chuàng)建更多的線程,達(dá)到先判斷maximumPoolSize再判斷隊(duì)列的目的
評價(jià):該方案不需要修改JDK線程池的核心邏輯,盡最大可能避免因更改核心流程考慮不周而引入的BUG。另一方面,擴(kuò)展Queue的手段,也是JDK提供的一個(gè)能夠讓用戶在不干涉核心流程的情況下,達(dá)到安全擴(kuò)展線程池能力的方式
題外話
有朋友或許會(huì)有疑問,這道面試題是面試官天馬行空想像出來的嗎?是否有實(shí)際的場景跟需要呢?
可以從至少兩個(gè)開源框架上找到答案
Dubbo 2.6.2及以上
其實(shí)上邊的方案二,代碼來自于Dubbo源碼,
相關(guān)git issue在此: Extension: Eager Thread Pool
Tomcat
Tomcat自定義的線程池類名與JDK的相同,都叫ThreadPoolExecutor,只是包不同,且Tomcat的ThreadPoolExecutor繼承自JDK的ThreadPoolExecutor
Tomcat自定義的隊(duì)列也叫TaskQueue
Tomcat的ThreadPoolExecutor與TaskQueue核心邏輯、思想與方案二貼的代碼幾乎一致。實(shí)際上,是carryxyh(Dubbo EagerThreadPoolExecutor作者)借鑒的Tomcat設(shè)計(jì),關(guān)于這一點(diǎn)Dubbo github issue上作者本人也有提及
JDK線程池與Tomcat線程池方案誰最好?
筆者認(rèn)為,沒有哪種方案最好,技術(shù)沒有銀彈,只是在不同視角進(jìn)行的trade off,在某種場景下最好的方案在另一個(gè)場景中可能卻導(dǎo)致糟糕的后果??梢詮牧硪粋€(gè)角度考慮:如果有一種放之四海皆準(zhǔn),從各個(gè)角度考慮都優(yōu)于其他技術(shù)的存在,那么它的出現(xiàn)必將完全取代它的競品。而從現(xiàn)實(shí)看,顯然, JDK線程線與Tomcat線程池都各有場景與發(fā)展,并沒有出現(xiàn)一方取代另一方的情況,因此不存在哪種方案最好的說法
如果線上環(huán)境要使用線程池,哪一種更合適?
線程數(shù)與CPU核數(shù)、任務(wù)類型的關(guān)系就不細(xì)說了。簡單而言,如果不能忍受延遲,期望應(yīng)用能盡快地為用戶提供服務(wù),那么Tomcat線程池可能更適合你;相反,如果你能容忍一些延遲來換取性能上的提升,那么JDK線程池可能會(huì)更合適一些
注
方案一的代碼乃筆者隨手而敲,未經(jīng)過任何生產(chǎn)環(huán)境的檢驗(yàn)跟錘煉,可能存在潛在的BUG,強(qiáng)烈不建議生產(chǎn)環(huán)境使用。如果確實(shí)有需要,請使用方案二,有知名框架背書,且實(shí)現(xiàn)更為安全與優(yōu)雅,乃首先之姿
最后,感謝這位朋友的面試題,也感謝孤獨(dú)煙(人稱煙哥)分享面試題讓大家參與討論,以及飛奔的普朗克(人稱何總)提供的思路,才有了本篇的內(nèi)容分享,希望大家都能有所收獲