Java中的線程池
- 一般我們說(shuō)起Java中的線程池,其實(shí)指的是java.util.concurrent包下的ThreadPoolExecutor。當(dāng)然java包下還有其他線程池的實(shí)現(xiàn)類(lèi),但主要也是最常用的就是這個(gè)類(lèi)。今天我們來(lái)好好說(shuō)說(shuō)這個(gè)類(lèi)。
- 這里我們結(jié)合了其他人的整理和自己的思考進(jìn)行了總結(jié)。
1. 工作原理
如圖所示:

ThreadPoolExecutor工作原理
當(dāng)主線程中調(diào)用execute接口提交執(zhí)行任務(wù)時(shí):
則執(zhí)行以下步驟:
注意:線程池初始時(shí),是空的。
- 如果當(dāng)前線程數(shù)<corePoolSize,如果是則創(chuàng)建新的線程執(zhí)行該任務(wù)
- 如果當(dāng)前線程數(shù)>=corePoolSize,則將任務(wù)存入BlockingQueue<Runnable>
- 如果阻塞隊(duì)列已滿,且當(dāng)前線程數(shù)<maximumPoolSize,則新建線程執(zhí)行該任務(wù)。
- 如果阻塞隊(duì)列已滿,且當(dāng)前線程數(shù)>=maximumPoolSize,則拋出異常RejectedExecutionException,告訴調(diào)用者無(wú)法再接受任務(wù)了。
注意點(diǎn):
- 線程池初始化時(shí),是空的。
- 如果阻塞隊(duì)列已滿,且當(dāng)前線程數(shù)<maximumPoolSize,則新建線程執(zhí)行該任務(wù)。而不是新建線程,從阻塞隊(duì)列里take任務(wù)來(lái)執(zhí)行,所以這里并不是先來(lái)先執(zhí)行的。
提問(wèn):這里的阻塞隊(duì)列是BlockingQueue<Runnale>,我們知道ThreadPoolExecutor是支持Callable任務(wù)提交的,那這里不會(huì)有問(wèn)題嗎?
-
答:其實(shí)我們這里說(shuō)的都是execute接口提交任務(wù)。execute接口只接受Runnable。其實(shí)我們看一下繼承關(guān)系圖:
ThreadPoolExecutor類(lèi)圖 可以看到其實(shí)ThreadPoolExecutor是繼承了AbstractExecutorService,而AbstractExecutorService實(shí)現(xiàn)了ExecutorService。
且我們可以看到ThreadPoolExecutor類(lèi)里只重寫(xiě)了execute方法。ExecutorService的其他方法都沒(méi)有實(shí)現(xiàn),而是在AbstractExecutorService里實(shí)現(xiàn)的。所以說(shuō)整個(gè)ThreadPoolExecutor里的策略都是只針對(duì)execute方法來(lái)說(shuō)的。所以說(shuō)上述的工作原理只針對(duì)execute接口。像其他的submit/invoke接口并不適用--因?yàn)檎{(diào)用這些接口其實(shí)調(diào)用的是AbstractExecutorService的實(shí)現(xiàn)。
2. 我們來(lái)看一下ThreadPoolExecutor的參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:就是上圖中的CorePoolSize,意為核心線程數(shù)。
- maximumPoolSize:就是上圖中maximumPoolSize,意為整個(gè)線程池的最大線程數(shù)。這個(gè)數(shù)字是包含orePoolSize的。
- keepAliveTime和unit:是idle線程最大存活時(shí)間。unit是前者的單位。即如果當(dāng)前線程數(shù)>corePoolSize,則會(huì)kill一些線程至corePoolSize大小。
- workQueue:就是上面說(shuō)的阻塞隊(duì)列,注意BlockingQueue<Runnable>只是個(gè)接口,下面我們會(huì)細(xì)說(shuō)一下它的常用實(shí)現(xiàn)類(lèi)。
- 下面兩個(gè)參數(shù)是可選項(xiàng):即Java重載了ThreadPoolExecutor的構(gòu)造方法,下面兩個(gè)參數(shù)可以不傳,也可以只傳一個(gè)。
- threadFactory:線程工廠。創(chuàng)建線程的接口,該接口只有一個(gè)方法:Thread newThread(Runnable r);我們可以實(shí)現(xiàn)這個(gè)接口進(jìn)而自定義創(chuàng)建線程,比如制定線程名稱(chēng),線程組等。
- handler:當(dāng)線程池已滿時(shí),再提交任務(wù)會(huì)觸發(fā)調(diào)用這個(gè)回調(diào)函數(shù)。RejectedExecutionHandler接口的唯一方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
3. 阻塞隊(duì)列的常用實(shí)現(xiàn)類(lèi)
- ArrayBlockingQueue: 有邊界的阻塞隊(duì)列。長(zhǎng)度大小初始化時(shí)制定,即內(nèi)部由數(shù)組實(shí)現(xiàn)。
- LinkedBlockingQueue: 有邊界的阻塞隊(duì)列。邊界是可選的,如果初始化的時(shí)候不指定則默認(rèn)是Interger.MAX_VALUE,內(nèi)部由鏈表實(shí)現(xiàn)。(最常用)
- PriorityBlockQueue: 帶有優(yōu)先級(jí)的阻塞隊(duì)列。沒(méi)有邊界。
- 這三個(gè)類(lèi)都實(shí)現(xiàn)了BlockingQueue接口,其中LinkedBlockingQueue最常用,特別注意一般一定要指定邊界大小,不然線程池失去了些意義,且造成內(nèi)存泄露。
這里就不對(duì)這些細(xì)說(shuō),以后我們?cè)偌?xì)說(shuō)。
4. RejectedExecutionHandler線程池飽和策略
- 默認(rèn)的 ThreadPoolExecutor.AbortPolicy:處理程序遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException。
- ThreadPoolExecutor.CallerRunsPolicy:調(diào)用線程直接call該任務(wù)的execute 本身。
- ThreadPoolExecutor.DiscardPolicy:將任務(wù)刪除。
- ThreadPoolExecutor.DiscardOldestPolicy刪除工作隊(duì)列頭部任務(wù)。
5. ThreadPoolExecutor的擴(kuò)展
我們看ThreadPoolExecutor的源碼發(fā)現(xiàn)如下三個(gè)函數(shù)的實(shí)現(xiàn)為空且是protected。明顯用于子類(lèi)實(shí)現(xiàn)的。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
- 在執(zhí)行任務(wù)的線程中將調(diào)用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計(jì)時(shí)、監(jiān)視或者統(tǒng)計(jì)信息收集的功能。
- 無(wú)論任務(wù)是從run中正常返回,還是拋出一個(gè)異常而返回,afterExecute都會(huì)被調(diào)用。如果任務(wù)在完成后帶有一個(gè)Error,那么就不會(huì)調(diào)用afterExecute。
- 如果beforeExecute拋出一個(gè)RuntimeException,那么任務(wù)將不被執(zhí)行,并且afterExecute也不會(huì)被調(diào)用。
- 在線程池完成關(guān)閉時(shí)調(diào)用terminated,也就是在所有任務(wù)都已經(jīng)完成并且所有工作者線程也已經(jīng)關(guān)閉后,terminated可以用來(lái)釋放Executor在其生命周期里分配的各種資源,此外還可以執(zhí)行發(fā)送通知、記錄日志或者手機(jī)finalize統(tǒng)計(jì)等操作。
即我們可以子類(lèi)來(lái)繼承ThreadPoolExecutor來(lái)定制化一些功能。
