
Executor
是一個接口,僅要求實現(xiàn)void execute(Runnable command);,要求是在各種場合(新開的線程、線程池內(nèi)等等),同步(在execute方法里立刻執(zhí)行)或異步得完成某一任務(wù)。
ExecutorService
接口,繼承自Executor,擴展了Executor并添加了一些生命周期管理的方法。
一個ExecutorService的生命周期有三種狀態(tài):運行、關(guān)閉、中止。
當調(diào)用ExecutorService.shutdown()后,處于關(guān)閉狀態(tài),isShutdown()方法返回true。此時無法新加任務(wù)。
所有已添加的任務(wù)關(guān)閉后,Executor處于終止狀態(tài),isTerminated()返回true。這要求之前肯定調(diào)用過shutdown或shutdownNow??梢杂?code>awaitTermination阻塞式得等待所有任務(wù)都已經(jīng)關(guān)閉。
此外,調(diào)用多了submit和invoke:
submit和executor的區(qū)別在于它的入?yún)⒖梢允?code>Runnable也可以是Callable,而且有返回值Future<?>
invoke系(invokeAll、invokeAny)的入?yún)⒍加?code>Collection<? extends Callable<T>> tasks,返回T或T的集合。
Future FutureTask
Future<V>是一個接口,規(guī)定了異步執(zhí)行的操作,通過get()方法可以獲得操作的結(jié)果,如果異步操作還沒有完成,則get()會使當前線程阻塞,可以指定等待時間。cancel方法取消操作。
FutureTask<V>是一個類,實現(xiàn)了RunnableFuture<V>,該接口繼承自Runnable和Future<V>
內(nèi)部持有一個Callable<V>,構(gòu)造函數(shù)需要一個Callable<V>但也可以接受一個Runnable
(轉(zhuǎn)化this.callable = Executors.callable(runnable, result);)
異步的原理:
- 持有一個
volatile int state,表面目前的狀態(tài),是新建還是完成還是取消之類的。 - 內(nèi)部類
WaitNode,是一個存儲thread的鏈表,當前節(jié)點指向當前線程,next指向下一個可用線程。通過CAS操作更改運行線程。 - 通過
LockSupport.park在運行完成前阻塞獲取,LockSupport.unpark取消阻塞。
ThreadPoolExecutor
ThreadPoolExecutor 繼承自AbstractExecutorService,后者實現(xiàn)了ExecutorService
首先實現(xiàn)了ExecutorService的各方法
構(gòu)造函數(shù)和各參數(shù):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心線程數(shù)
- maximumPoolSize:最大線程數(shù)
- keepAliveTime:線程空閑時間
- TimeUnit 時間尺度
- workQueue 阻塞隊列,用于存放等待著的線程
- threadFactory 線程工廠,給內(nèi)部類worker提供線程
- rejectedExecutionHandler:任務(wù)拒絕處理器
提供了四種方式來處理任務(wù)拒絕策略- 直接丟棄(DiscardPolicy)
- 丟棄隊列中最老的任務(wù)(DiscardOldestPolicy)。
- 拋異常(AbortPolicy)
- 將任務(wù)分給調(diào)用線程來執(zhí)行(CallerRunsPolicy)。
內(nèi)部有一個worker類作為任務(wù)執(zhí)行者,HashSet<Worker> workers作為線程池,workQueue作為等待的隊列
狀態(tài)通過final AtomicInteger ctl保存
五個狀態(tài):
- RUNNING 接受新任務(wù)和處理隊列里的任務(wù)
- SHUTDOWN 不接受新任務(wù)但是處理隊列里的任務(wù)
- STOP 不接受新任務(wù),不處理隊列里的任務(wù),中斷處理中的任務(wù)
- TIDYING 所有任務(wù)都被關(guān)閉(terminated),workerCount為0,將調(diào)用
terminated - TERMINATED
terminated已被調(diào)用
狀態(tài)轉(zhuǎn)換:
- RUNNING -> SHUTDOWN 調(diào)用
shutdown(),可能隱式地在finalize()時調(diào)用 - (RUNNING or SHUTDOWN) -> STOP 調(diào)用
shutdownNow()時 - SHUTDOWN -> TIDYING 當隊列和池都空的時候
- STOP -> TIDYING 當池空的時候
- TIDYING -> TERMINATED 當
terminated()調(diào)用結(jié)束時
execute策略:
Proceed in 3 steps:
- 少于corePoolSize的線程在運行時,嘗試新建一個線程處理這個任務(wù)。addWorker方法檢查運行狀態(tài)和workerCount。
- 如果一個線程可以被移出隊列,檢查狀態(tài)判斷是否要新加一個線程還是取消這個任務(wù)
- 如果不能從隊列里拿出一個線程,就新加一個線程。失敗了就取消這個任務(wù)。
線程池的工作過程如下(轉(zhuǎn)個別人的總結(jié)):
- 線程池剛創(chuàng)建時,里面沒有一個線程。任務(wù)隊列是作為參數(shù)傳進來的。不過,就算隊列里面有任務(wù),線程池也不會馬上執(zhí)行它們。
- 當調(diào)用 execute() 方法添加一個任務(wù)時,線程池會做如下判斷:
- 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運行這個任務(wù);
- 如果正在運行的線程數(shù)量大于或等于 corePoolSize,那么將這個任務(wù)放入隊列。
- 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于 maximumPoolSize,那么還是要創(chuàng)建線程運行這個任務(wù);
- 如果隊列滿了,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”。
- 當一個線程完成任務(wù)時,它會從隊列中取下一個任務(wù)來執(zhí)行。
- 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數(shù)大于 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務(wù)完成后,它最終會收縮到 corePoolSize 的大小。
Executors
工具類
創(chuàng)建線程池,本質(zhì)是生成ThreadPoolExecutor,舉例:
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
- newCachedThreadPool
可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。線程池為無限大,當執(zhí)行第二個任務(wù)時第一個任務(wù)已經(jīng)完成,會復(fù)用執(zhí)行第一個任務(wù)的線程,而不用每次新建線程。 - newFixedThreadPool
定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。定長線程池的大小最好根據(jù)系統(tǒng)資源進行設(shè)置,如Runtime.getRuntime().availableProcessors()。 - newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。 - newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
Executors還提供默認的線程工廠defaultThreadFactory()