Executor框架總結(jié)

Executor框架圖(轉(zhuǎn))

Executor

是一個接口,僅要求實現(xiàn)void execute(Runnable command);,要求是在各種場合(新開的線程、線程池內(nèi)等等),同步(在execute方法里立刻執(zhí)行)或異步得完成某一任務(wù)。

ExecutorService

接口,繼承自Executor,擴展了Executor并添加了一些生命周期管理的方法。
一個ExecutorService的生命周期有三種狀態(tài):運行、關(guān)閉、中止。
當調(diào)用ExecutorService.shutdown()后,處于關(guān)閉狀態(tài),isShutdown()方法返回true。此時無法新加任務(wù)。
所有已添加的任務(wù)關(guān)閉后,Executor處于終止狀態(tài),isTerminated()返回true。這要求之前肯定調(diào)用過shutdownshutdownNow??梢杂?code>awaitTermination阻塞式得等待所有任務(wù)都已經(jīng)關(guān)閉。
此外,調(diào)用多了submit和invoke:
submitexecutor的區(qū)別在于它的入?yún)⒖梢允?code>Runnable也可以是Callable,而且有返回值Future<?>
invoke系(invokeAll、invokeAny)的入?yún)⒍加?code>Collection<? extends Callable<T>> tasks,返回T或T的集合。

Future FutureTask

Future<V>是一個接口,規(guī)定了異步執(zhí)行的操作,通過get()方法可以獲得操作的結(jié)果,如果異步操作還沒有完成,則get()會使當前線程阻塞,可以指定等待時間。cancel方法取消操作。

FutureTask<V>是一個類,實現(xiàn)了RunnableFuture<V>,該接口繼承自RunnableFuture<V>
內(nèi)部持有一個Callable<V>,構(gòu)造函數(shù)需要一個Callable<V>但也可以接受一個Runnable
(轉(zhuǎn)化this.callable = Executors.callable(runnable, result);
異步的原理:

  • 持有一個volatile int state,表面目前的狀態(tài),是新建還是完成還是取消之類的。
  • 內(nèi)部類WaitNode,是一個存儲thread的鏈表,當前節(jié)點指向當前線程,next指向下一個可用線程。通過CAS操作更改運行線程。
  • 通過LockSupport.park在運行完成前阻塞獲取,LockSupport.unpark取消阻塞。

ThreadPoolExecutor

ThreadPoolExecutor 繼承自AbstractExecutorService,后者實現(xiàn)了ExecutorService
首先實現(xiàn)了ExecutorService的各方法
構(gòu)造函數(shù)和各參數(shù):

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:核心線程數(shù)
  • maximumPoolSize:最大線程數(shù)
  • keepAliveTime:線程空閑時間
  • TimeUnit 時間尺度
  • workQueue 阻塞隊列,用于存放等待著的線程
  • threadFactory 線程工廠,給內(nèi)部類worker提供線程
  • rejectedExecutionHandler:任務(wù)拒絕處理器
    提供了四種方式來處理任務(wù)拒絕策略
    1. 直接丟棄(DiscardPolicy)
    2. 丟棄隊列中最老的任務(wù)(DiscardOldestPolicy)。
    3. 拋異常(AbortPolicy)
    4. 將任務(wù)分給調(diào)用線程來執(zhí)行(CallerRunsPolicy)。

內(nèi)部有一個worker類作為任務(wù)執(zhí)行者,HashSet<Worker> workers作為線程池,workQueue作為等待的隊列

狀態(tài)通過final AtomicInteger ctl保存
五個狀態(tài):

  • RUNNING 接受新任務(wù)和處理隊列里的任務(wù)
  • SHUTDOWN 不接受新任務(wù)但是處理隊列里的任務(wù)
  • STOP 不接受新任務(wù),不處理隊列里的任務(wù),中斷處理中的任務(wù)
  • TIDYING 所有任務(wù)都被關(guān)閉(terminated),workerCount為0,將調(diào)用terminated
  • TERMINATED terminated已被調(diào)用

狀態(tài)轉(zhuǎn)換:

  • RUNNING -> SHUTDOWN 調(diào)用shutdown(),可能隱式地在finalize()時調(diào)用
  • (RUNNING or SHUTDOWN) -> STOP 調(diào)用shutdownNow()
  • SHUTDOWN -> TIDYING 當隊列和池都空的時候
  • STOP -> TIDYING 當池空的時候
  • TIDYING -> TERMINATED 當terminated()調(diào)用結(jié)束時

execute策略:
Proceed in 3 steps:

  • 少于corePoolSize的線程在運行時,嘗試新建一個線程處理這個任務(wù)。addWorker方法檢查運行狀態(tài)和workerCount。
  • 如果一個線程可以被移出隊列,檢查狀態(tài)判斷是否要新加一個線程還是取消這個任務(wù)
  • 如果不能從隊列里拿出一個線程,就新加一個線程。失敗了就取消這個任務(wù)。

線程池的工作過程如下(轉(zhuǎn)個別人的總結(jié)):

  1. 線程池剛創(chuàng)建時,里面沒有一個線程。任務(wù)隊列是作為參數(shù)傳進來的。不過,就算隊列里面有任務(wù),線程池也不會馬上執(zhí)行它們。
  2. 當調(diào)用 execute() 方法添加一個任務(wù)時,線程池會做如下判斷:
    1. 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運行這個任務(wù);
    2. 如果正在運行的線程數(shù)量大于或等于 corePoolSize,那么將這個任務(wù)放入隊列。
    3. 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于 maximumPoolSize,那么還是要創(chuàng)建線程運行這個任務(wù);
    4. 如果隊列滿了,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”。
  3. 當一個線程完成任務(wù)時,它會從隊列中取下一個任務(wù)來執(zhí)行。
  4. 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數(shù)大于 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務(wù)完成后,它最終會收縮到 corePoolSize 的大小。

Executors

工具類

創(chuàng)建線程池,本質(zhì)是生成ThreadPoolExecutor,舉例:

  • newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
  • newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
  • newCachedThreadPool
    可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。線程池為無限大,當執(zhí)行第二個任務(wù)時第一個任務(wù)已經(jīng)完成,會復(fù)用執(zhí)行第一個任務(wù)的線程,而不用每次新建線程。
  • newFixedThreadPool
    定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。定長線程池的大小最好根據(jù)系統(tǒng)資源進行設(shè)置,如Runtime.getRuntime().availableProcessors()。
  • newScheduledThreadPool
    創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。
  • newSingleThreadExecutor
    創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。

Executors還提供默認的線程工廠defaultThreadFactory()

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容