ThreadPoolExecutor的用法

Java中的線程池

  • 一般我們說(shuō)起Java中的線程池,其實(shí)指的是java.util.concurrent包下的ThreadPoolExecutor。當(dāng)然java包下還有其他線程池的實(shí)現(xiàn)類(lèi),但主要也是最常用的就是這個(gè)類(lèi)。今天我們來(lái)好好說(shuō)說(shuō)這個(gè)類(lèi)。
  • 這里我們結(jié)合了其他人的整理和自己的思考進(jìn)行了總結(jié)。

1. 工作原理

如圖所示:


ThreadPoolExecutor工作原理

當(dāng)主線程中調(diào)用execute接口提交執(zhí)行任務(wù)時(shí):
則執(zhí)行以下步驟:
注意:線程池初始時(shí),是空的。

  1. 如果當(dāng)前線程數(shù)<corePoolSize,如果是則創(chuàng)建新的線程執(zhí)行該任務(wù)
  2. 如果當(dāng)前線程數(shù)>=corePoolSize,則將任務(wù)存入BlockingQueue<Runnable>
  3. 如果阻塞隊(duì)列已滿,且當(dāng)前線程數(shù)<maximumPoolSize,則新建線程執(zhí)行該任務(wù)。
  4. 如果阻塞隊(duì)列已滿,且當(dāng)前線程數(shù)>=maximumPoolSize,則拋出異常RejectedExecutionException,告訴調(diào)用者無(wú)法再接受任務(wù)了。
注意點(diǎn):
  • 線程池初始化時(shí),是空的。
  • 如果阻塞隊(duì)列已滿,且當(dāng)前線程數(shù)<maximumPoolSize,則新建線程執(zhí)行該任務(wù)。而不是新建線程,從阻塞隊(duì)列里take任務(wù)來(lái)執(zhí)行,所以這里并不是先來(lái)先執(zhí)行的。
提問(wèn):這里的阻塞隊(duì)列是BlockingQueue<Runnale>,我們知道ThreadPoolExecutor是支持Callable任務(wù)提交的,那這里不會(huì)有問(wèn)題嗎?
  • 答:其實(shí)我們這里說(shuō)的都是execute接口提交任務(wù)。execute接口只接受Runnable。其實(shí)我們看一下繼承關(guān)系圖:


    ThreadPoolExecutor類(lèi)圖
  • 可以看到其實(shí)ThreadPoolExecutor是繼承了AbstractExecutorService,而AbstractExecutorService實(shí)現(xiàn)了ExecutorService。

  • 且我們可以看到ThreadPoolExecutor類(lèi)里只重寫(xiě)了execute方法。ExecutorService的其他方法都沒(méi)有實(shí)現(xiàn),而是在AbstractExecutorService里實(shí)現(xiàn)的。所以說(shuō)整個(gè)ThreadPoolExecutor里的策略都是只針對(duì)execute方法來(lái)說(shuō)的。所以說(shuō)上述的工作原理只針對(duì)execute接口。像其他的submit/invoke接口并不適用--因?yàn)檎{(diào)用這些接口其實(shí)調(diào)用的是AbstractExecutorService的實(shí)現(xiàn)。

2. 我們來(lái)看一下ThreadPoolExecutor的參數(shù)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:就是上圖中的CorePoolSize,意為核心線程數(shù)。
  • maximumPoolSize:就是上圖中maximumPoolSize,意為整個(gè)線程池的最大線程數(shù)。這個(gè)數(shù)字是包含orePoolSize的。
  • keepAliveTime和unit:是idle線程最大存活時(shí)間。unit是前者的單位。即如果當(dāng)前線程數(shù)>corePoolSize,則會(huì)kill一些線程至corePoolSize大小。
  • workQueue:就是上面說(shuō)的阻塞隊(duì)列,注意BlockingQueue<Runnable>只是個(gè)接口,下面我們會(huì)細(xì)說(shuō)一下它的常用實(shí)現(xiàn)類(lèi)。
  • 下面兩個(gè)參數(shù)是可選項(xiàng):即Java重載了ThreadPoolExecutor的構(gòu)造方法,下面兩個(gè)參數(shù)可以不傳,也可以只傳一個(gè)。
  • threadFactory:線程工廠。創(chuàng)建線程的接口,該接口只有一個(gè)方法:Thread newThread(Runnable r);我們可以實(shí)現(xiàn)這個(gè)接口進(jìn)而自定義創(chuàng)建線程,比如制定線程名稱(chēng),線程組等。
  • handler:當(dāng)線程池已滿時(shí),再提交任務(wù)會(huì)觸發(fā)調(diào)用這個(gè)回調(diào)函數(shù)。RejectedExecutionHandler接口的唯一方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

3. 阻塞隊(duì)列的常用實(shí)現(xiàn)類(lèi)

  • ArrayBlockingQueue: 有邊界的阻塞隊(duì)列。長(zhǎng)度大小初始化時(shí)制定,即內(nèi)部由數(shù)組實(shí)現(xiàn)。
  • LinkedBlockingQueue: 有邊界的阻塞隊(duì)列。邊界是可選的,如果初始化的時(shí)候不指定則默認(rèn)是Interger.MAX_VALUE,內(nèi)部由鏈表實(shí)現(xiàn)。(最常用)
  • PriorityBlockQueue: 帶有優(yōu)先級(jí)的阻塞隊(duì)列。沒(méi)有邊界。
  • 這三個(gè)類(lèi)都實(shí)現(xiàn)了BlockingQueue接口,其中LinkedBlockingQueue最常用,特別注意一般一定要指定邊界大小,不然線程池失去了些意義,且造成內(nèi)存泄露。
    這里就不對(duì)這些細(xì)說(shuō),以后我們?cè)偌?xì)說(shuō)。

4. RejectedExecutionHandler線程池飽和策略

  • 默認(rèn)的 ThreadPoolExecutor.AbortPolicy:處理程序遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy:調(diào)用線程直接call該任務(wù)的execute 本身。
  • ThreadPoolExecutor.DiscardPolicy:將任務(wù)刪除。
  • ThreadPoolExecutor.DiscardOldestPolicy刪除工作隊(duì)列頭部任務(wù)。

5. ThreadPoolExecutor的擴(kuò)展

我們看ThreadPoolExecutor的源碼發(fā)現(xiàn)如下三個(gè)函數(shù)的實(shí)現(xiàn)為空且是protected。明顯用于子類(lèi)實(shí)現(xiàn)的。

protected void beforeExecute(Thread t, Runnable r) { }  
protected void afterExecute(Runnable r, Throwable t) { }  
protected void terminated() { } 
  • 在執(zhí)行任務(wù)的線程中將調(diào)用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計(jì)時(shí)、監(jiān)視或者統(tǒng)計(jì)信息收集的功能。
  • 無(wú)論任務(wù)是從run中正常返回,還是拋出一個(gè)異常而返回,afterExecute都會(huì)被調(diào)用。如果任務(wù)在完成后帶有一個(gè)Error,那么就不會(huì)調(diào)用afterExecute。
  • 如果beforeExecute拋出一個(gè)RuntimeException,那么任務(wù)將不被執(zhí)行,并且afterExecute也不會(huì)被調(diào)用。
  • 在線程池完成關(guān)閉時(shí)調(diào)用terminated,也就是在所有任務(wù)都已經(jīng)完成并且所有工作者線程也已經(jīng)關(guān)閉后,terminated可以用來(lái)釋放Executor在其生命周期里分配的各種資源,此外還可以執(zhí)行發(fā)送通知、記錄日志或者手機(jī)finalize統(tǒng)計(jì)等操作。
    即我們可以子類(lèi)來(lái)繼承ThreadPoolExecutor來(lái)定制化一些功能。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容