
Executor的主要作用是解耦任務(wù)提交和任務(wù)執(zhí)行(包括如何使用線程,如何調(diào)度)
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
Executor本身并不表示使用線程
ExecutorService提供了關(guān)閉機(jī)制以及提交任務(wù)返回Future對(duì)象用于追蹤任務(wù)執(zhí)行進(jìn)度或取消任務(wù)。

先分析一下AbstractExecutorService實(shí)現(xiàn)中用到的Future的實(shí)現(xiàn)類FutureTask的實(shí)現(xiàn)機(jī)制

如果當(dāng)前的任務(wù)是Runnable,通過(guò)RunnableAdapter轉(zhuǎn)為Callable


FutureTask自身實(shí)現(xiàn)了Runnable,包裝內(nèi)部的Callable或Runnable



Future#get實(shí)現(xiàn)機(jī)制:如果任務(wù)還沒(méi)開始,調(diào)用線程加入任務(wù)的等待隊(duì)列,等待任務(wù)完成或取消時(shí)被喚醒,否則等待任務(wù)到達(dá)最終狀態(tài),正常執(zhí)行返回結(jié)果或異常時(shí)拋出ExecutionException異常


Future#cancle實(shí)現(xiàn)機(jī)制:如果任務(wù)還沒(méi)開始,狀態(tài)改為INTERRUPTING或CANCELLED,如果支持中斷,打斷當(dāng)前線程(參考run方法,先設(shè)置runner線程,再修改狀態(tài),所以可能當(dāng)前狀態(tài)是NEW,但是runner已經(jīng)設(shè)置了),然后喚醒所有之前等待的線程。
AbstractExecutorService的實(shí)現(xiàn)機(jī)制:任務(wù)的具體執(zhí)行都委托給從Executor繼承的execute方法,主要實(shí)現(xiàn)了submit和invokeAll,invokeAny方法。


任務(wù)的執(zhí)行都委托給Executor,所有提交的任務(wù)都用QueueingFuture包裝,任務(wù)執(zhí)行完加入內(nèi)部的BlockingQueue。

invokeAny:先提交一個(gè)任務(wù),然后循環(huán)檢查ExecutorCompletionService的阻塞隊(duì)列是否有已完成的任務(wù),有就返回,沒(méi)有就再提交一個(gè)新任務(wù),直到任務(wù)都提交完,然后阻塞。第一個(gè)任務(wù)完成后,cancel所有可以cancel的任務(wù)。
AbstractExecutorService有兩個(gè)具體的子類:ThreadPoolExecutor和ForkJoinPool,ScheduledThreadPoolExecutor又繼承了ThreadPoolExecutor
ThreadPoolExecutor:


workQueue表示任務(wù)隊(duì)列,workers表示當(dāng)前執(zhí)行任務(wù)的線程集合。

Worker繼承了AbstractQueuedSynchronizer,自身就是一個(gè)簡(jiǎn)單的互斥鎖,實(shí)現(xiàn)了Runnable,Worker在構(gòu)造時(shí)內(nèi)部會(huì)利用ThreadFactory產(chǎn)生一個(gè)線程,線程啟動(dòng)時(shí),執(zhí)行Worker自身的run方法。

Worker執(zhí)行過(guò)程中,會(huì)通過(guò)getTask獲取任務(wù),每次執(zhí)行任務(wù)之前都會(huì)獲取worker自身的互斥鎖

getTask通過(guò)返回null(線程池stop,或shutdown之后任務(wù)隊(duì)列為空,或者動(dòng)態(tài)調(diào)整參數(shù)之后線程太多,或者獲取任務(wù)超時(shí)(說(shuō)明任務(wù)太少了,不需要那么多線程)),控制Worker結(jié)束循環(huán)

Worker循環(huán)結(jié)束有兩種原因:執(zhí)行的任務(wù)拋出異常,getTask返回null。
如果是后者,再次檢查以確保目前的線程數(shù)不低于最低要求,線程數(shù)不夠時(shí)添加worker線程。因異常而結(jié)束任務(wù)循環(huán)也會(huì)添加新的worker線程。


添加worker失敗的原因有三:線程池stop;shutdown之后任務(wù)隊(duì)列為空;當(dāng)前線程數(shù)超過(guò)最大線程數(shù)。worker添加成功之后,啟動(dòng)內(nèi)部的線程,開始循環(huán)處理任務(wù)。

關(guān)鍵點(diǎn)在于,核心線程全部啟動(dòng)之后,任務(wù)會(huì)先加入任務(wù)隊(duì)列,只有任務(wù)隊(duì)列是有界隊(duì)列,且隊(duì)列滿了才會(huì)啟動(dòng)非核心線程?。?!



shutdown之后,修改狀態(tài)為SHUTDOWN,然后打斷所有idle線程,所謂idle,就是可以獲取worker的互斥鎖,說(shuō)明worker當(dāng)前在等待任務(wù)而不是執(zhí)行任務(wù),參考runWorker方法。如果當(dāng)前所有worker正巧都在等待任務(wù),所有worker都會(huì)被打斷(processWorkerExit方法會(huì)在worker退出循環(huán)時(shí)調(diào)用,根據(jù)情況再添加worker)。tryTerminate中會(huì)先檢查如果當(dāng)前狀態(tài)是SHUTDOWN但是任務(wù)隊(duì)列不為空,不能進(jìn)入terminal狀態(tài),如果當(dāng)前是shutdown且任務(wù)隊(duì)列為空且線程數(shù)為空,修改狀態(tài)為過(guò)渡狀態(tài)TIDYING,然后修改為最終狀態(tài)TERMINATED。

打斷所有已經(jīng)啟動(dòng)的worker,返回所有還未執(zhí)行的任務(wù)。

shutdown之后線程池并不一定關(guān)閉!?。∷哉_的做法是shutdown之后調(diào)用awaitTermination等待所有任務(wù)執(zhí)行完后所有線程被打斷。

ThreadPoolExecutor可控制參數(shù):
corePoolSize:核心線程數(shù),worker數(shù)量小于corePoolSize時(shí)每次提交任務(wù)都啟動(dòng)一個(gè)core線程,可以使用set方法在運(yùn)行時(shí)調(diào)整。
maximumPoolSize:最大線程數(shù),包括core和非core線程,從上面的源碼分析可以直到只有任務(wù)隊(duì)列為有界隊(duì)列時(shí)才會(huì)啟動(dòng)非core線程。
workQueue:任務(wù)隊(duì)列,只有任務(wù)隊(duì)列為有界隊(duì)列時(shí)才會(huì)啟動(dòng)非core線程。
keepAliveTime:worker在指定時(shí)間內(nèi)獲取不到任務(wù),說(shuō)明此時(shí)人浮于事,需要裁員,getTask會(huì)返回null,結(jié)束獲取任務(wù)超時(shí)的worker。
threadFactory:定義如何產(chǎn)生線程,默認(rèn)直接new Thread。
handler:提交任務(wù)時(shí)任務(wù)隊(duì)列滿了或線程池shutdown之后的行為,默認(rèn)拋出RejectedExecutionException異常,可選策略包括忽略(DiscardPolicy),在提交任務(wù)的線程中執(zhí)行(CallerRunsPolicy),移除任務(wù)隊(duì)列里最前面的任務(wù)(DiscardOldestPolicy)。
keepAliveTime:如果通過(guò)set設(shè)置了值,如果一個(gè)worker超過(guò)指定時(shí)間未獲得任務(wù)就會(huì)timeout而結(jié)束循環(huán),如果當(dāng)前線程數(shù)超過(guò)了corePoolSize,不會(huì)再添加新的worker,默認(rèn)不支持timeout。
allowCoreThreadTimeOut:默認(rèn)線程數(shù)小于corePoolSize,timeout之后就會(huì)添加新的worker,如果設(shè)置了allowCoreThreadTimeOut,只有當(dāng)前線程為0時(shí)才會(huì)添加新的worker。
下面分析一下ThreadPoolExecutor的子類ScheduledThreadPoolExecutor的實(shí)現(xiàn)機(jī)制:

從構(gòu)造上看,主要是任務(wù)隊(duì)列使用了DelayedWorkQueue,DelayedWorkQueue是一個(gè)簡(jiǎn)單的基于二叉堆實(shí)現(xiàn)的優(yōu)先級(jí)阻塞無(wú)界隊(duì)列,所有任務(wù)按觸發(fā)時(shí)刻排序,keepAliveTime為0,不支持worker超時(shí)。從上文的分析可知,使用無(wú)界隊(duì)列時(shí)是不會(huì)啟動(dòng)非core線程的,maximumPoolSize設(shè)置成了Integer.MAX_VALUE而不是corePoolSize,避免運(yùn)行時(shí)修改corePoolSize時(shí)還要修改maximumPoolSize。

所有提交的任務(wù)都會(huì)用ScheduledFutureTask包裝

任務(wù)先按觸發(fā)時(shí)刻排序,同時(shí)觸發(fā)的任務(wù)按提交順序排序



如果是重復(fù)任務(wù),任務(wù)執(zhí)行完,計(jì)算下次觸發(fā)時(shí)刻,重新加入任務(wù)隊(duì)列。此處有一個(gè)細(xì)節(jié):就算是fixed-rate的任務(wù),也是上次執(zhí)行完之后才會(huì)再次加入任務(wù)隊(duì)列。

shutdown之后不允許提交新任務(wù),如果是之前提交的延遲任務(wù)還沒(méi)到時(shí)間或者是周期性任務(wù),根據(jù)參數(shù)決定是否還能繼續(xù)執(zhí)行,默認(rèn)運(yùn)行繼續(xù)等待執(zhí)行延遲任務(wù),不允許執(zhí)行周期任務(wù)。
ForkJoinPool:日后補(bǔ)充?。?!
下面來(lái)分析一下Executors里的靜態(tài)方法構(gòu)造的都是什么線程:

無(wú)界隊(duì)列,不支持timeout,固定線程數(shù)。

newSingleThreadExecutor = newFixedThreadPool(1)

使用特殊的隊(duì)列SynchronousQueue,相當(dāng)于容量為1的阻塞隊(duì)列,只有這樣,如果已經(jīng)有任務(wù)在等待執(zhí)行了,再次提交任務(wù)時(shí)才會(huì)啟動(dòng)非core線程。
