Java如何讓線程池滿后再放隊(duì)列

背景

最近收到一道面試題:我們知道JDK的線程池在線程數(shù)達(dá)到corePoolSize之后,先判斷隊(duì)列,再判斷maximumPoolSize。如果想反過來,即先判斷maximumPoolSize再判斷隊(duì)列,怎么辦?

建議往下瀏覽之前先思考一下解決方案,如果自己面對這道面試題,該如何作答?


方案一

由于線程池的行為是定義在JDK相關(guān)代碼中,我們想改變其默認(rèn)行為,很自然的一種想法便是:繼承自JDK的線程池類java.util.concurrent.ThreadPoolExecutor,然后改寫其execute方法,將判斷隊(duì)列與maximumPoolSize的邏輯順序調(diào)整一下,以達(dá)到目的

原來的邏輯如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 創(chuàng)建新線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 代碼運(yùn)行到此處,說明線程池?cái)?shù)量達(dá)到了corePoolSize
    if (isRunning(c) && workQueue.offer(command)) {
        // 將任務(wù)成功入隊(duì)
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 代碼運(yùn)行到此處,說明入隊(duì)失敗
    else if (!addWorker(command, false))
        // 創(chuàng)建新線程失敗則執(zhí)行拒絕策略
        reject(command);
}

但是仔細(xì)閱讀代碼會(huì)發(fā)現(xiàn),execute中涉及到的一些關(guān)鍵方法如workerCountOf、addWorker等是私有的,關(guān)鍵變量如ctl、corePoolSize也是私有的,即無法通過簡單繼承ThreadPoolExecutor改寫其execute方法的核心邏輯達(dá)到目的。

那考慮的一個(gè)變種是,定義一個(gè)MyThreadPoolExecutor,把ThreadPoolExecutor的代碼照搬過來,只改寫其中execute方法,改寫后的邏輯如下:

public void execute(Runnable command) {
    if (command == null)
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 先判斷maximumPoolSize
    if (workerCountOf(c) < maximumPoolSize) {
        if (addWorker(command, false))
            return;
        c = ctl.get();
    }
    // 再判斷隊(duì)列
    else if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (isRunning(c) && !workQueue.offer(command))
        reject(command);
}

改寫之后,發(fā)現(xiàn)reject方法也得重寫,原因是RejectedExecutionHandler#rejectedExecution第二個(gè)入?yún)⑹荰hreadPoolExecutor,不能傳this

// java.util.concurrent.ThreadPoolExecutor#reject

final void reject(Runnable command) {
     handler.rejectedExecution(command, this);
}
// java.util.concurrent.RejectedExecutionHandler

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

這樣,連RejectedExecutionHandler也要改寫一下

由于RejectedExecutionHandler的改造并非面試題核心邏輯,所以此處省略,明白要表達(dá)的意思即可

但這樣做之后,與三方框架的兼容就很難了--->有不少三方框架入?yún)⑹切枰猅hreadPoolExecutor,而不是自定義的MyThreadPoolExecutor,后續(xù)的使用會(huì)是個(gè)問題

評價(jià):自定義MyThreadPoolExecutor需要代碼大篇幅的拷貝,麻煩不說,兼容性還是個(gè)問題,從實(shí)戰(zhàn)出發(fā)考慮,可行性很低

方案二

那有沒有什么方案能夠既省事,又能兼顧兼容性?

兩步走:

  1. 自定義Queue,改寫offer邏輯
  2. 自定義線程池類,繼承自ThreadPoolExecutor,改寫核心邏輯
自定義Queue
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;
    
    // 自定義的線程池類,繼承自ThreadPoolExecutor
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    // offer方法的含義是:將任務(wù)提交到隊(duì)列中,返回值為true/false,分別代表提交成功/提交失敗
    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        // 線程池的當(dāng)前線程數(shù)
        int currentPoolThreadSize = executor.getPoolSize();
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            // 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù),意味著線程池中有空閑線程,直接扔進(jìn)隊(duì)列里,讓線程去處理
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            // 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false,暗含入隊(duì)失敗,讓線程池去創(chuàng)建新的線程
            return false;
        }

        // 重點(diǎn): 代碼運(yùn)行到此處,說明當(dāng)前線程數(shù) >= 最大線程數(shù),需要真正的提交到隊(duì)列中
        return super.offer(runnable);
    }

    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}
自定義線程池類
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * 定義一個(gè)成員變量,用于記錄當(dāng)前線程池中已提交的任務(wù)數(shù)量
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }


    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // ThreadPoolExecutor的勾子方法,在task執(zhí)行完后需要將池中已提交的任務(wù)數(shù) - 1
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 將池中已提交的任務(wù)數(shù) + 1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

核心邏輯:當(dāng)提交任務(wù)給EagerThreadPoolExecutor,執(zhí)行submittedTaskCount.incrementAndGet();將池中已提交的任務(wù)數(shù) + 1,然后就調(diào)用父類的execute方法

// 代碼運(yùn)行到此處,說明線程數(shù) >= corePoolSize, 此時(shí)workQueue為自定義的TaskQueue
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

核心邏輯:當(dāng)執(zhí)行workQueue.offer(command),走到自定義的TaskQueue#offer邏輯,而offer方法的返回值決定著是否創(chuàng)建更多的線程:返回true,代表入隊(duì)成功,不創(chuàng)建線程;返回false,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程

// 線程池的當(dāng)前線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
    // 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù),意味著線程池中有空閑線程,直接扔進(jìn)隊(duì)列里,讓線程去處理
    return super.offer(runnable);
}

if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
    // 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false,暗含入隊(duì)失敗,讓線程池去創(chuàng)建新的線程
    return false;
}

// 重點(diǎn): 代碼運(yùn)行到此處,說明當(dāng)前線程數(shù) >= 最大線程數(shù),需要真正的提交到隊(duì)列中
return super.offer(runnable);

核心邏輯:當(dāng)前線程數(shù)小于最大線程數(shù)就返回false,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程

因此,總結(jié)起來就是:自定義的EagerThreadPoolExecutor依賴自定義的TaskQueue的offer返回值來決定是否創(chuàng)建更多的線程,達(dá)到先判斷maximumPoolSize再判斷隊(duì)列的目的

評價(jià):該方案不需要修改JDK線程池的核心邏輯,盡最大可能避免因更改核心流程考慮不周而引入的BUG。另一方面,擴(kuò)展Queue的手段,也是JDK提供的一個(gè)能夠讓用戶在不干涉核心流程的情況下,達(dá)到安全擴(kuò)展線程池能力的方式

題外話

有朋友或許會(huì)有疑問,這道面試題是面試官天馬行空想像出來的嗎?是否有實(shí)際的場景跟需要呢?

可以從至少兩個(gè)開源框架上找到答案

Dubbo 2.6.2及以上

其實(shí)上邊的方案二,代碼來自于Dubbo源碼,
相關(guān)git issue在此: Extension: Eager Thread Pool

Tomcat

Tomcat自定義的線程池類名與JDK的相同,都叫ThreadPoolExecutor,只是包不同,且Tomcat的ThreadPoolExecutor繼承自JDK的ThreadPoolExecutor

Tomcat自定義的隊(duì)列也叫TaskQueue

Tomcat的ThreadPoolExecutor與TaskQueue核心邏輯、思想與方案二貼的代碼幾乎一致。實(shí)際上,是carryxyh(Dubbo EagerThreadPoolExecutor作者)借鑒的Tomcat設(shè)計(jì),關(guān)于這一點(diǎn)Dubbo github issue上作者本人也有提及

JDK線程池與Tomcat線程池方案誰最好?

筆者認(rèn)為,沒有哪種方案最好,技術(shù)沒有銀彈,只是在不同視角進(jìn)行的trade off,在某種場景下最好的方案在另一個(gè)場景中可能卻導(dǎo)致糟糕的后果??梢詮牧硪粋€(gè)角度考慮:如果有一種放之四海皆準(zhǔn),從各個(gè)角度考慮都優(yōu)于其他技術(shù)的存在,那么它的出現(xiàn)必將完全取代它的競品。而從現(xiàn)實(shí)看,顯然, JDK線程線與Tomcat線程池都各有場景與發(fā)展,并沒有出現(xiàn)一方取代另一方的情況,因此不存在哪種方案最好的說法

如果線上環(huán)境要使用線程池,哪一種更合適?

線程數(shù)與CPU核數(shù)、任務(wù)類型的關(guān)系就不細(xì)說了。簡單而言,如果不能忍受延遲,期望應(yīng)用能盡快地為用戶提供服務(wù),那么Tomcat線程池可能更適合你;相反,如果你能容忍一些延遲來換取性能上的提升,那么JDK線程池可能會(huì)更合適一些

方案一的代碼乃筆者隨手而敲,未經(jīng)過任何生產(chǎn)環(huán)境的檢驗(yàn)跟錘煉,可能存在潛在的BUG,強(qiáng)烈不建議生產(chǎn)環(huán)境使用。如果確實(shí)有需要,請使用方案二,有知名框架背書,且實(shí)現(xiàn)更為安全與優(yōu)雅,乃首先之姿


最后,感謝這位朋友的面試題,也感謝孤獨(dú)煙(人稱煙哥)分享面試題讓大家參與討論,以及飛奔的普朗克(人稱何總)提供的思路,才有了本篇的內(nèi)容分享,希望大家都能有所收獲

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線程池不僅在項(xiàng)目中是非常常用的一項(xiàng)技術(shù)而且在面試中基本上也是必問的知識點(diǎn),接下來跟著我一起來鞏固一下線程池的相關(guān)知...
    不學(xué)無數(shù)的程序員閱讀 3,281評論 0 21
  • 第6章介紹了任務(wù)執(zhí)行框架, 它不僅能簡化任務(wù)與線程的生命周期管理, 而且還提供一 種簡單靈活的方式將任務(wù)的提交與任...
    好好學(xué)習(xí)Sun閱讀 1,308評論 0 2
  • 昨晚手機(jī)被張小帥拿去查資料,我邊看書邊等他,打算等他用完之后就來簡書更新,誰知,居然一不小心睡著了。 等張小帥把手...
    葛芳閱讀 465評論 6 7
  • 王仙娥 古稀老人王仙娥 十五年來素生活 八十萬元救兒命 企盼母愛勝病魔 (古稀老人,為救兒子給兒子移腎用去八十萬元...
    旖旎i閱讀 410評論 3 9
  • 說起來,鐘書閣是離我們最近的書店,卻是第一次去。 據(jù)說重慶鐘書閣成了網(wǎng)紅打卡地,春節(jié)很多人排一兩個(gè)小時(shí)才能進(jìn)去,不...
    不舍札記閱讀 525評論 0 0

友情鏈接更多精彩內(nèi)容