Java 線程池講解——針對(duì) IO 密集型任務(wù)

針對(duì) IO 密集型的任務(wù),我們可以針對(duì)原本的線程池做一些改造,從而可以提高任務(wù)的處理效率。

基本

阿里巴巴泰山版java開(kāi)發(fā)手冊(cè)中有這么一條:

線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 的方式,
這樣的處理方式讓寫(xiě)的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。 

那么如果要使用 ThreadPoolExecutor ,那就先來(lái)看看構(gòu)造方法中的所有入?yún)ⅲ?/p>

corePoolSize : 核心線程數(shù),當(dāng)線程池中的線程數(shù)量為 corePoolSize 時(shí),即使這些線程處于空閑狀態(tài),也不會(huì)銷(xiāo)毀(除非設(shè)置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大線程數(shù),線程池中允許的線程數(shù)量的最大值。
keepAliveTime : 線程空閑時(shí)間,當(dāng)線程池中的線程數(shù)大于 corePoolSize 時(shí),多余的空閑線程將在銷(xiāo)毀之前等待新任務(wù)的最長(zhǎng)時(shí)間。
workQueue : 任務(wù)隊(duì)列
unit : 線程空閑時(shí)間的單位。
threadFactory : 線程工廠,線程池創(chuàng)建線程時(shí)使用的工廠。
handler : 拒絕策略,因達(dá)到線程邊界和任務(wù)隊(duì)列滿(mǎn)時(shí),針對(duì)新任務(wù)的處理方法。

這么說(shuō)可能有些難以理解,你可以結(jié)合下圖進(jìn)行參考:


那么由此我們可以知道,當(dāng)大量任務(wù)被放入線程池之后,先是被核心線程執(zhí)行,多余的會(huì)被放進(jìn)隊(duì)列里,當(dāng)隊(duì)列滿(mǎn)了之后才會(huì)創(chuàng)建額外的線程進(jìn)行處理,再多就會(huì)采取拒絕策略。

但這樣真的能滿(mǎn)足我們的所有需求嗎?

任務(wù)的分類(lèi)

正常來(lái)說(shuō),我們可以把需要處理的任務(wù)按照消耗資源的不同,分為兩種:CPU 密集型IO 密集型。

CPU 密集型

既然名字里帶有CPU了,說(shuō)明其消耗的主要資源就是 CPU 了。

具體是指那種包含大量運(yùn)算、在持有的 CPU 分配的時(shí)間片上一直在執(zhí)行任務(wù)、幾乎不需要依賴(lài)或等待其他任何東西。

這樣的任務(wù),在我的理解中,處理起來(lái)其實(shí)沒(méi)有多少優(yōu)化空間,因?yàn)樘幚頃r(shí)幾乎沒(méi)有等待時(shí)間,所以一直占有 CPU 進(jìn)行執(zhí)行,才是最好的方式。

唯一能想到優(yōu)化的地方,就是當(dāng)單個(gè)線程累計(jì)較多任務(wù)時(shí),其他線程能進(jìn)行分擔(dān),類(lèi)似fork/join框架的概念。

設(shè)置線程數(shù)時(shí),針對(duì)單臺(tái)機(jī)器,最好就是有幾個(gè) CPU ,就創(chuàng)建幾個(gè)線程,然后每個(gè)線程都在執(zhí)行這種任務(wù),永不停歇。

IO 密集型

和上面一樣,既然名字里帶有IO了,說(shuō)明其消耗的主要資源就是 IO 了。

我們所接觸到的 IO ,大致可以分成兩種:磁盤(pán) IO網(wǎng)絡(luò) IO。

磁盤(pán) IO ,大多都是一些針對(duì)磁盤(pán)的讀寫(xiě)操作,最常見(jiàn)的就是文件的讀寫(xiě),假如你的數(shù)據(jù)庫(kù)、 Redis 也是在本地的話,那么這個(gè)也屬于磁盤(pán) IO。

網(wǎng)絡(luò) IO ,這個(gè)應(yīng)該是大家更加熟悉的,我們會(huì)遇到各種網(wǎng)絡(luò)請(qǐng)求,比如 http 請(qǐng)求、遠(yuǎn)程數(shù)據(jù)庫(kù)讀寫(xiě)、遠(yuǎn)程 Redis 讀寫(xiě)等等。

IO 操作的特點(diǎn)就是需要等待,我們請(qǐng)求一些數(shù)據(jù),由對(duì)方將數(shù)據(jù)寫(xiě)入緩沖區(qū),在這段時(shí)間中,需要讀取數(shù)據(jù)的線程根本無(wú)事可做,因此可以把 CPU 時(shí)間片讓出去,直到緩沖區(qū)寫(xiě)滿(mǎn)。

既然這樣,IO 密集型任務(wù)其實(shí)就有很大的優(yōu)化空間了(畢竟存在等待),那現(xiàn)有的線程池可以很好的滿(mǎn)足我們的需求嗎?

線程池的優(yōu)化

還記得上面說(shuō)的, ThreadPoolExecutor 針對(duì)多余任務(wù)的處理,是先放到等待隊(duì)列中,當(dāng)隊(duì)列塞滿(mǎn)后,再創(chuàng)建額外的線程進(jìn)行處理。

假設(shè)我們的任務(wù)基本都是 IO 密集型,我們希望程序可以有更高的吞吐量,可以在更短的時(shí)間內(nèi)處理更多的任務(wù),那么上面的 ThreadPoolExecutor 明顯是不滿(mǎn)足我們的需求,那該如何解決呢?

也許再來(lái)看看 ThreadPoolExecutor 的 execute 方法,會(huì)讓我們有一些思路:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果當(dāng)前活躍線程數(shù),小于核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            // 則優(yōu)先創(chuàng)建線程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果任務(wù)可以成功放入隊(duì)列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果不可以成功放入隊(duì)列,則創(chuàng)建線程
        else if (!addWorker(command, false))
            // 如果無(wú)法繼續(xù)創(chuàng)建線程,則拒絕任務(wù)
            reject(command);
    }

針對(duì)放入隊(duì)列的操作,如果隊(duì)列放入失敗,線程池就會(huì)選擇去創(chuàng)建線程了。因此,我們或許可以嘗試自定義線程池,針對(duì) offer 操作,做一些自定義處理。

也就是將任務(wù)放入隊(duì)列時(shí),先檢查線程池的線程數(shù)是否小于最大線程數(shù),如果是,則拒絕放入隊(duì)列,否則,再?lài)L試放入隊(duì)列中。

如果你有看過(guò) dubbo 或者 tomcat 的線程池,你會(huì)發(fā)現(xiàn)他們就有這樣的實(shí)現(xiàn)方法。

比如 dubbo 中的 TaskQueue,我們來(lái)看看它的 offer 方法:

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有空閑等待的線程,則將任務(wù)放入隊(duì)列中,讓線程去處理任務(wù)
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // 如果當(dāng)前線程數(shù)小于最大線程數(shù),則返回 false ,讓線程池去創(chuàng)建新的線程
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 否則,就將任務(wù)放入隊(duì)列中
        return super.offer(runnable);
    }

這樣就可以讓線程池優(yōu)先新建線程了。需要注意的時(shí),此時(shí)的隊(duì)列因?yàn)樾枰鶕?jù)線程池中的線程數(shù)決定是否放入任務(wù)成功,所以需要持有executor對(duì)象,這點(diǎn)不要忘記奧。

總結(jié)

通過(guò)本篇文章,主要是讓大家重新了解了一下 ThreadPoolExecutor ,并針對(duì)高吞吐場(chǎng)景下如何進(jìn)行局部?jī)?yōu)化。

有興趣的話可以訪問(wèn)我的博客或者關(guān)注我的公眾號(hào)、頭條號(hào),說(shuō)不定會(huì)有意外的驚喜。

https://death00.github.io/

公眾號(hào):健程之道

image
image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線...
    石家志遠(yuǎn)閱讀 1,428評(píng)論 0 6
  • 線程池介紹 在web開(kāi)發(fā)中,服務(wù)器需要接受并處理請(qǐng)求,所以會(huì)為一個(gè)請(qǐng)求來(lái)分配一個(gè)線程來(lái)進(jìn)行處理。如果每次請(qǐng)求都新創(chuàng)...
    愛(ài)情小傻蛋閱讀 309評(píng)論 1 2
  • 以下是本文的目錄大綱: 一.Java中的ThreadPoolExecutor類(lèi) 二.深入剖析線程池實(shí)現(xiàn)原理 三.使...
    士力架1020閱讀 117評(píng)論 0 0
  • 它們都是某種線程池,可以控制線程創(chuàng)建,釋放,并通過(guò)某種策略嘗試復(fù)用線程去執(zhí)行任務(wù)的一個(gè)管理框架在Java8中,按照...
    Darren的徒弟閱讀 2,619評(píng)論 0 1
  • 久違的晴天,家長(zhǎng)會(huì)。 家長(zhǎng)大會(huì)開(kāi)好到教室時(shí),離放學(xué)已經(jīng)沒(méi)多少時(shí)間了。班主任說(shuō)已經(jīng)安排了三個(gè)家長(zhǎng)分享經(jīng)驗(yàn)。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,825評(píng)論 16 22

友情鏈接更多精彩內(nèi)容