Executor框架的兩級(jí)調(diào)度模型
在HotSpot VM的線程模型中,Java線程(java.lang.Thread)被一對(duì)一映射為本地操作系統(tǒng)線程。Java線程啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)本地操作系統(tǒng)線程;當(dāng)該Java線程終止時(shí),這個(gè)操作系統(tǒng)線程也會(huì)被回收。操作系統(tǒng)會(huì)調(diào)度所有線程并將它們分配給可用的CPU。
在上層,Java多線程程序通常把應(yīng)用分解為若干個(gè)任務(wù),然后使用用戶級(jí)的調(diào)度器
應(yīng)用程序通過Executor框架控制上層的調(diào)度;而下層的調(diào)度由操作系統(tǒng)內(nèi)核控制,下層的調(diào)度不受應(yīng)用程序的控制。
1.Executor框架的結(jié)構(gòu)
Executor框架主要由3大部分組成如下:
1、任務(wù)。包括被執(zhí)行任務(wù)需要實(shí)現(xiàn)的接口:Runnable接口或Callable接口。
2、任務(wù)的執(zhí)行。包括任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的
ExecutorService接口。Executor框架有兩個(gè)關(guān)鍵類實(shí)現(xiàn)了ExecutorService接口
(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
3、異步計(jì)算的結(jié)果。包括接口Future和實(shí)現(xiàn)Future接口的FutureTask類。
下面是這些類和接口的簡介。
·Executor是一個(gè)接口,它是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離開來。
·ThreadPoolExecutor是線程池的核心實(shí)現(xiàn)類,用來執(zhí)行被提交的任務(wù)。
·ScheduledThreadPoolExecutor是一個(gè)實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強(qiáng)大。
Future接口和實(shí)現(xiàn)Future接口的FutureTask類,代表異步計(jì)算的結(jié)果。
·Runnable和Callable接口實(shí)現(xiàn)類都可被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor執(zhí)行。
下面來看下使用示意圖:
主線程首先要?jiǎng)?chuàng)建實(shí)現(xiàn)Runnable或者Callable接口的任務(wù)對(duì)象。工具類Executors可以把一
個(gè)Runnable對(duì)象封裝為一個(gè)Callable對(duì)象(Executors.callable(Runnable task)或
Executors.callable(Runnable task,Object resule))。
然后可以把Runnable對(duì)象直接交給ExecutorService執(zhí)行(ExecutorService.execute(Runnable
command));或者也可以把Runnable對(duì)象或Callable對(duì)象提交給ExecutorService執(zhí)行(Executor-
Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task))。
如果執(zhí)行ExecutorService.submit(…),ExecutorService將返回一個(gè)實(shí)現(xiàn)Future接口的對(duì)象
(到目前為止的JDK中,返回的是FutureTask對(duì)象)。由于FutureTask實(shí)現(xiàn)了Runnable,程序員也可
以創(chuàng)建FutureTask,然后直接交給ExecutorService執(zhí)行。
最后,主線程執(zhí)行FutureTask.get()方法來等待任務(wù)執(zhí)行完成。主線程也可以執(zhí)行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務(wù)的執(zhí)行。
Executor框架的成員
主要介紹Executor框架的主要成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、
Future接口、Runnable接口、Callable接口和Executors。
(1)ThreadPoolExecutor(想深入理解實(shí)現(xiàn)原理可以看這篇文章:https://juejin.im/post/5d67e5b4e51d4561f64a0849)
ThreadPoolExecutor通常使用工廠類Executors來創(chuàng)建。Executors可以創(chuàng)建3種類型的
ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
下面分別介紹這3種ThreadPoolExecutor。
(1) ThreadPoolExecutor
(1)FixedThreadPool。
是Executors提供的,創(chuàng)建使用固定線程數(shù)的FixedThreadPool的API。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactorythreadFactory
FixedThreadPool適用于為了滿足資源管理的需求,而需要限制當(dāng)前線程數(shù)量的應(yīng)用場景,它適用于負(fù)載比較重的服務(wù)器。
(2)SingleThreadExecutor
下面是Executors提供的,創(chuàng)建使用單個(gè)線程的SingleThread-Executor的API。
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
SingleThreadExecutor適用于需要保證順序地執(zhí)行各個(gè)任務(wù);并且在任意時(shí)間點(diǎn),不會(huì)有多個(gè)線程是活動(dòng)的應(yīng)用場景。
(3)CachedThreadPool。
下面是Executors提供的,創(chuàng)建一個(gè)會(huì)根據(jù)需要?jiǎng)?chuàng)建新線程的CachedThreadPool的API。
public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
CachedThreadPool是大小無界的線程池,適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者是負(fù)載較輕的服務(wù)器。
(2)ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor通常使用工廠類Executors來創(chuàng)建。Executors可以創(chuàng)建2種類型的ScheduledThreadPoolExecutor.
如下:
·ScheduledThreadPoolExecutor。包含若干個(gè)線程的ScheduledThreadPoolExecutor。
·SingleThreadScheduledExecutor。只包含一個(gè)線程的ScheduledThreadPoolExecutor。
下面是工廠類Executors提供的,創(chuàng)建固定個(gè)數(shù)線程的ScheduledThreadPoolExecutor的API。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory)
ScheduledThreadPoolExecutor適用于需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù),同時(shí)為了滿足資源管理的需求而需要限制后臺(tái)線程的數(shù)量的應(yīng)用場景。
下面是Executors提供的,創(chuàng)建單個(gè)線程的SingleThreadScheduledExecutor的API
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
SingleThreadScheduledExecutor適用于需要單個(gè)后臺(tái)線程執(zhí)行周期任務(wù),同時(shí)需要保證順序地執(zhí)行各個(gè)任務(wù)的應(yīng)用場景。
(3)Future接口
Future接口和實(shí)現(xiàn)Future接口的FutureTask類用來表示異步計(jì)算的結(jié)果。當(dāng)我們把Runnable
接口或Callable接口的實(shí)現(xiàn)類提交(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時(shí),ThreadPoolExecutor或ScheduledThreadPoolExecutor會(huì)向我們返回一個(gè)FutureTask對(duì)象。
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)
FutureTask是Future的子類。
有一點(diǎn)需要讀者注意,到目前最新的JDK8為止,Java通過上述API返回的是一個(gè)FutureTask對(duì)象。但從API可以看到,Java僅僅保證返回的是一個(gè)實(shí)現(xiàn)了Future接口的對(duì)象。在將來的JDK實(shí)現(xiàn)中,返回的可能不一定是FutureTask。
(4)Runnable接口和Callable接口
Runnable接口和Callable接口的實(shí)現(xiàn)類,都可以被ThreadPoolExecutor或Scheduled-
ThreadPoolExecutor執(zhí)行。它們之間的區(qū)別是Runnable不會(huì)返回結(jié)果,而Callable可以返回結(jié)
果。
除了可以自己創(chuàng)建實(shí)現(xiàn)Callable接口的對(duì)象外,還可以使用工廠類Executors來把一個(gè)
Runnable包裝成一個(gè)Callable。
下面是Executors提供的,把一個(gè)Runnable包裝成一個(gè)Callable的API。
public static Callable<Object> callable(Runnable task)
下面是Executors提供的,把一個(gè)Runnable和一個(gè)待返回的結(jié)果包裝成一個(gè)Callable的API。
public static <T> Callable<T> callable(Runnable task, T result) // 假設(shè)返回對(duì)象Callable2
當(dāng)我們把一個(gè)Callable對(duì)象(比如上面的Callable1或Callable2)提交給
ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行時(shí),submit(…)會(huì)向我們返回一個(gè)
FutureTask對(duì)象。我們可以執(zhí)行FutureTask.get()方法來等待任務(wù)執(zhí)行完成。
當(dāng)任務(wù)成功完成后FutureTask.get()將返回該任務(wù)的結(jié)果。例如,如果提交的是對(duì)象Callable1,F(xiàn)utureTask.get()方法將返回null;如果提交的是對(duì)象Callable2,F(xiàn)utureTask.get()方法將返回result對(duì)象。
ThreadPoolExecutor詳解
可以看這兩篇文章:
從源碼來看JDK8線程池ThreadPoolExecutor的實(shí)現(xiàn)原理(一)
https://juejin.im/post/5d67e5b4e51d4561f64a0849
從源碼來看JDK8線程池ThreadPoolExecutor的實(shí)現(xiàn)原理(二)
https://juejin.im/post/5d688686e51d4561ce5a1c8b
·通過Executor框架的工具類Executors,可以創(chuàng)建3種類型的ThreadPoolExecutor。
·FixedThreadPool。
·SingleThreadExecutor。
·CachedThreadPool。
下面將分別介紹這3種ThreadPoolExecutor
FixedThreadPool詳解
FixedThreadPool被稱為可重用固定線程數(shù)的線程池。下面是FixedThreadPool的源代碼實(shí)現(xiàn)。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool的corePoolSize和maximumPoolSize都被設(shè)置為創(chuàng)建FixedThreadPool時(shí)指定的參數(shù)nThreads。
FixedThreadPool的execute()方法的運(yùn)行示意圖:
1)如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)。
2)在線程池完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)等于corePoolSize),將任務(wù)加入LinkedBlockingQueue。
3)線程執(zhí)行完1中的任務(wù)后,會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來執(zhí)行。
FixedThreadPool使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列(隊(duì)列的容量為
Integer.MAX_VALUE)。
FixedThreadPool使用無界隊(duì)列作為工作隊(duì)列會(huì)對(duì)線程池帶來如下影響:
1)當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize后,新任務(wù)將在無界隊(duì)列中等待,因此線程池中
的線程數(shù)不會(huì)超過corePoolSize。
2)由于1,使用無界隊(duì)列時(shí)maximumPoolSize將是一個(gè)無效參數(shù)。
3)由于1和2,使用無界隊(duì)列時(shí)keepAliveTime將是一個(gè)無效參數(shù)。
4)由于使用無界隊(duì)列,運(yùn)行中的FixedThreadPool(未執(zhí)行方法shutdown()或
shutdownNow())不會(huì)拒絕任務(wù)(不會(huì)調(diào)用RejectedExecutionHandler.rejectedExecution方法)。
SingleThreadExecutor詳解
SingleThreadExecutor是使用單個(gè)worker線程的Executor。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor的corePoolSize和maximumPoolSize被設(shè)置為1。其他參數(shù)與
FixedThreadPool相同。SingleThreadExecutor使用無界隊(duì)列LinkedBlockingQueue作為線程池的工
作隊(duì)列(隊(duì)列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊(duì)列作為工作隊(duì)列
對(duì)線程池帶來的影響與FixedThreadPool相同.
執(zhí)行流程:
1)如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize(即線程池中無運(yùn)行的線程),則創(chuàng)建一個(gè)新線
程來執(zhí)行任務(wù)。
2)在線程池完成預(yù)熱之后(當(dāng)前線程池中有一個(gè)運(yùn)行的線程),將任務(wù)加入Linked-
BlockingQueue。
3)線程執(zhí)行完1中的任務(wù)后,會(huì)在一個(gè)無限循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來
執(zhí)行。
3、CachedThreadPool詳解
CachedThreadPool是一個(gè)會(huì)根據(jù)需要?jiǎng)?chuàng)建新線程的線程池。下面是創(chuàng)建CachedThread-Pool的源代碼。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被設(shè)置為0,即corePool為空;maximumPoolSize被設(shè)置為Integer.MAX_VALUE,即maximumPool是無界的。這里把keepAliveTime設(shè)置為60L,意味著CachedThreadPool中的空閑線程等待新任務(wù)的最長時(shí)間為60秒,空閑線程超過60秒后將會(huì)被終止。FixedThreadPool和SingleThreadExecutor使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列。
CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊(duì)列,但CachedThreadPool的maximumPool是無界的。這意味著,如果主線程提交任務(wù)的速度高于maximumPool中線程處理任務(wù)的速度時(shí),CachedThreadPool會(huì)不斷創(chuàng)建新線程。極端情況下,CachedThreadPool會(huì)因?yàn)閯?chuàng)建過多線程而耗盡CPU和內(nèi)存資源。