Java并發(fā)編程的藝術(shù)筆記
- 1.并發(fā)編程的挑戰(zhàn)
- 2.Java并發(fā)機(jī)制的底層實(shí)現(xiàn)原理
- 3.Java內(nèi)存模型
- 4.Java并發(fā)編程基礎(chǔ)
- 5.Java中的鎖的使用和實(shí)現(xiàn)介紹
- 6.Java并發(fā)容器和框架
- 7.Java中的12個原子操作類介紹
- 8.Java中的并發(fā)工具類
- 9.Java中的線程池
- 10.Executor框架
前言
在Java中,使用線程來異步執(zhí)行任務(wù)。
Java線程的創(chuàng)建與銷毀需要一定的開銷,如果我們?yōu)槊恳粋€任務(wù)創(chuàng)建一個新線程來執(zhí)行,這些線程的創(chuàng)建與銷毀將消耗大量的計(jì)算資源。
同時,為每一個任務(wù)創(chuàng)建一個新線程來執(zhí)行,這種策略可能會使處于高負(fù)荷狀態(tài)的應(yīng)用最終崩潰。
Java的線程既是工作單元,也是執(zhí)行機(jī)制。
從 JDK 5 開始,把 工作單元 與 執(zhí)行機(jī)制 分離開來。
-
工作單元 :
RunnableCallable
-
執(zhí)行機(jī)制:
Executor框架
Executor框架簡介
Executor框架的兩級調(diào)度模型
在HotSpot VM的線程模型中,Java線程(java.lang.Thread)被 一對一映射為本地操作系統(tǒng)線程。Java線程啟動時會創(chuàng)建一個本地操作系統(tǒng)線程;當(dāng)該Java線程終止時,這個操作系統(tǒng)線程也會被回收。
操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的CPU。
在上層,Java多線程程序通常把應(yīng)用分解為若干個任務(wù),然后使用用戶級的調(diào)度器(Executor框架)將這些任務(wù)映射為固定數(shù)量的線程;在底層,操作系統(tǒng)內(nèi)核將這些線程映射到硬件處理器上。這種兩級調(diào)度模型的示意圖下面有介紹。
從下圖中可以看出,應(yīng)用程序通過Executor框架控制上層的調(diào)度;而下層的調(diào)度由操作系統(tǒng)內(nèi)核控制,下層的調(diào)度不受應(yīng)用程序的控制。
Executor框架的結(jié)構(gòu)與成員
Executor框架的結(jié)構(gòu)
任務(wù)。包括被執(zhí)行任務(wù)需要實(shí)現(xiàn)的接口:
Runnable接口或Callable接口。任務(wù)的執(zhí)行。包括任務(wù)執(zhí)行機(jī)制的核心接口
Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關(guān)鍵類實(shí)現(xiàn)了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。異步計(jì)算的結(jié)果。包括接口
Future和實(shí)現(xiàn)Future接口的FutureTask類。-
Executor框架包含的主要的類與接口如下圖所示:
Executor框架的類與接口下面是這些類和接口的簡介:
-
Executor是一個接口,它是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離開來。 -
ThreadPoolExecutor是線程池的核心實(shí)現(xiàn)類,用來執(zhí)行被提交的任務(wù)。 -
ScheduledThreadPoolExecutor是一個實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強(qiáng)大。 -
Future接口和實(shí)現(xiàn)Future接口的FutureTask類,代表異步計(jì)算的結(jié)果。 -
Runnable接口和Callable接口的實(shí)現(xiàn)類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行。
-
Executor`框架的使用示意圖如下:
主線程首先要創(chuàng)建實(shí)現(xiàn)Runnable或者Callable接口的任務(wù)對象。
工具類Executors可以通過以下兩個方法把一個Runnable對象封裝為一個Callable對象:
Executors.callable(Runnable task)-
Executors.callable(Runnable task, Object resule)。
然后可以把Runnable對象直接交給ExecutorService執(zhí)行ExecutorService.execute(Runnable command);或者也可以把Runnable對象或Callable對象提交給ExecutorService 執(zhí)行 ExecutorService.submit(Runnable task) 或 ExecutorService.submit(Callable<T>task)。
如果執(zhí)行ExecutorService.submit(…),ExecutorService 將返回一個實(shí)現(xiàn) Future 接口的對象(到目前為止的JDK中,返回的是FutureTask對象)。由于FutureTask實(shí)現(xiàn)了Runnable,程序員也可以創(chuàng)建FutureTask,然后直接交給ExecutorService執(zhí)行。
最后,主線程可以執(zhí)行 FutureTask.get() 方法來等待任務(wù)執(zhí)行完成。主線程也可以執(zhí)行
FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務(wù)的執(zhí)行。
Executor框架的成員
這里將介紹Executor框架的主要成員:
ThreadPoolExecutorScheduledThreadPoolExecutor-
Future接口 -
Runnable接口 -
Callable接口 Executors
ThreadPoolExecutor
通常使用工廠類Executors來創(chuàng)建。Executors可以創(chuàng)建3種類型的ThreadPoolExecutor:
-
SingleThreadExecutor:適用于需要保證順序地執(zhí)行各個任務(wù);并且在任意時間點(diǎn),不會有多個線程是活動的應(yīng)用場景。
下面是Executors提供的,創(chuàng)建使用單個線程的SingleThreadExecutor的API。public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) ); } public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) { return new Executors.FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0) ); } -
FixedThreadPool:適用于為了滿足資源管理的需求,而需要限制當(dāng)前線程數(shù)量的應(yīng)用場景,它適用于負(fù)載比較重的服務(wù)器。
下面是Executors提供的,創(chuàng)建使用固定線程數(shù)的FixedThreadPool的API:public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1); } -
CachedThreadPool:是大小無界的線程池,適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者是負(fù)載較輕的服務(wù)器。
下面是Executors提供的,創(chuàng)建一個會根據(jù)需要創(chuàng)建新線程的CachedThreadPool的API。public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } public static ExecutorService newCachedThreadPool(ThreadFactory var0) { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0); } -
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor通常使用工廠類Executors來創(chuàng)建。
Executors可以創(chuàng)建2種類型的ScheduledThreadPoolExecutor,如下:-
ScheduledThreadPoolExecutor。包含若干個線程的ScheduledThreadPoolExecutor。
適用于需要多個后臺線程執(zhí)行周期任務(wù),同時為了滿足資源管理的需求而需要限制后臺線程的數(shù)量的應(yīng)用場景。
下面是工廠類Executors提供的,創(chuàng)建 固定個數(shù) 線程的ScheduledThreadPoolExecutor的API。public static ScheduledExecutorService newScheduledThreadPool(int var0) { return new ScheduledThreadPoolExecutor(var0); } public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) { return new ScheduledThreadPoolExecutor(var0, var1); } -
SingleThreadScheduledExecutor。只包含一個線程的ScheduledThreadPoolExecutor。
適用于需要單個后臺線程執(zhí)行周期任務(wù),同時需要保證順序地執(zhí)行各個任務(wù)的應(yīng)用場景。
下面是Executors提供的,創(chuàng)建單個線程的SingleThreadScheduledExecutor的API。public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory var0) { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, var0)); }
-
Future接口
Future接口和實(shí)現(xiàn)Future接口的FutureTask類用來表示異步計(jì)算的結(jié)果。
當(dāng)我們把Runnable接口或Callable接口的實(shí)現(xiàn)類提交(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,ThreadPoolExecutor或 ScheduledThreadPoolExecutor會向我們返回一個FutureTask對象。
下面是對應(yīng)的API:
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)
有一點(diǎn)需要讀者注意,到目前最新的JDK 8為止,Java通過上述API返回的是一個FutureTask對象。但從API可以看到,Java僅僅保證返回的是一個實(shí)現(xiàn)了Future接口的對象。在將來的JDK實(shí)現(xiàn)中,返回的可能不一定是FutureTask。
Runnable接口和 Callable接口
Runnable接口和Callable接口的實(shí)現(xiàn)類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行。
它們之間的區(qū)別是Runnable不會返回結(jié)果,而Callable可以返回結(jié)果。
除了可以自己創(chuàng)建實(shí)現(xiàn)Callable接口的對象外,還可以使用工廠類Executors來把一個Runnable包裝成一個Callable。
下面是Executors提供的,把一個Runnable包裝成一個Callable的API。
public static Callable<Object> callable(Runnable task) // 假設(shè)返回對象Callable1
下面是Executors提供的,把一個Runnable和一個待返回的結(jié)果包裝成一個Callable的API。
public static <T> Callable<T> callable(Runnable task, T result) // 假設(shè)返回對象Callable2
前面講過,當(dāng)我們把一個Callable對象(比如上面的Callable1或Callable2)提交給ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行時,submit(…)會向我們返回一個FutureTask對象。我們可以執(zhí)行FutureTask.get()方法來 等待任務(wù)執(zhí)行完成。當(dāng)任務(wù)成功完成后FutureTask.get()將返回該任務(wù)的結(jié)果。
例如,如果提交的是對象Callable1,FutureTask.get()方法將返回null;如果提交的是對象Callable2,FutureTask.get()方法將返回result對象。
ThreadPoolExecutor詳解
Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實(shí)現(xiàn)類,主要由下列4個組件構(gòu)成。
-
corePool:核心線程池的大小。 -
maximumPool:最大線程池的大小。 -
BlockingQueue:用來暫時保存任務(wù)的工作隊(duì)列。 -
RejectedExecutionHandler:當(dāng)ThreadPoolExecutor已經(jīng)關(guān)閉或ThreadPoolExecutor已經(jīng)飽和時(達(dá)到了最大線程池大小且工作隊(duì)列已滿),execute()方法將要調(diào)用的Handler。
通過Executor框架的工具類Executors,可以創(chuàng)建3種類型的ThreadPoolExecutor。
FixedThreadPoolSingleThreadExecutorCachedThreadPool
FixedThreadPool詳解
FixedThreadPool被稱為可重用固定線程數(shù)的線程池。下面是FixedThreadPool的源代碼實(shí)現(xiàn)。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool的corePoolSize和maximumPoolSize都被設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads。
當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime為多余的空閑線程等待新任務(wù)的最長時間,超過這個時間后多余的線程將被終止。這里把keepAliveTime設(shè)置為0L,意味著多余的空閑線程會被立即終止。
FixedThreadPool的execute()方法的運(yùn)行示意圖如下圖所示。
-
圖中1:如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)。 -
圖中2:在線程池完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)等于corePoolSize),將任務(wù)加入LinkedBlockingQueue。 -
圖中3:線程執(zhí)行完1中的任務(wù)后,會在循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來執(zhí)行。
FixedThreadPool使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列(隊(duì)列的容量為Integer.MAX_VALUE)。
使用無界隊(duì)列作為工作隊(duì)列會對線程池帶來如下影響:
- 當(dāng)線程池中的線程數(shù)達(dá)到
corePoolSize后,新任務(wù)將在無界隊(duì)列中等待,因此線程池中的線程數(shù)不會超過corePoolSize。 - 由于上一點(diǎn),使用無界隊(duì)列時
maximumPoolSize將是一個無效參數(shù)。 - 由于前面兩點(diǎn),使用無界隊(duì)列時
keepAliveTime將是一個無效參數(shù)。 - 由于使用無界隊(duì)列,運(yùn)行中的
FixedThreadPool(未執(zhí)行方法shutdown()或shutdownNow())不會拒絕任務(wù)(不會調(diào)用RejectedExecutionHandler.rejectedExecution方法)。
SingleThreadExecutor詳解
SingleThreadExecutor是使用單個worker線程的Executor。
下面是SingleThreadExecutor的源代碼實(shí)現(xiàn):
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
);
}
SingleThreadExecutor的corePoolSize和maximumPoolSize被設(shè)置為1。其他參數(shù)與FixedThreadPool相同。SingleThreadExecutor使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列(隊(duì)列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊(duì)列作為工作隊(duì)列對線程池帶來的影響與FixedThreadPool相同,這里就不贅述了。
SingleThreadExecutor的運(yùn)行示意圖如下圖所示:
-
上圖1:如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize(即線程池中無運(yùn)行的線程),則創(chuàng)建一個新線程來執(zhí)行任務(wù)。 -
上圖2:在線程池完成預(yù)熱之后(當(dāng)前線程池中有一個運(yùn)行的線程),將任務(wù)加入LinkedBlockingQueue。 -
上圖3:線程執(zhí)行完上圖1中的任務(wù)后,會在一個無限循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來執(zhí)行。
CachedThreadPool詳解
CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池。
下面是創(chuàng)建CachedThreadPool的源代碼。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被設(shè)置為0,即corePool為空;maximumPoolSize被設(shè)置為Integer.MAX_VALUE,即maximumPool是無界的。這里把keepAliveTime設(shè)置為60L,意味著CachedThreadPool中的空閑線程等待新任務(wù)的最長時間為60秒,空閑線程超過60秒后將會被終止。
FixedThreadPool和SingleThreadExecutor使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列。
CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊(duì)列,但CachedThreadPool的maximumPool是無界的。
這意味著,如果主線程提交任務(wù)的速度高于maximumPool中線程處理任務(wù)的速度時,CachedThreadPool會不斷創(chuàng)建新線程。
極端情況下,CachedThreadPool會因?yàn)閯?chuàng)建過多線程而耗盡CPU和內(nèi)存資源。
CachedThreadPool的execute()方法的執(zhí)行示意圖如下圖所示:
-
上圖1:首先執(zhí)行SynchronousQueue.offer(Runnable task)。如果當(dāng)前maximumPool中有空閑線程正在執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行offer操作與空閑線程執(zhí)行的poll操作配對成功,主線程把任務(wù)交給空閑線程執(zhí)行,execute()方法執(zhí)行完成;否則執(zhí)行下面的步驟2。 -
上圖2:當(dāng)初始maximumPool為空,或者maximumPool中當(dāng)前沒有空閑線程時,將沒有線程執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟1將失敗。此時CachedThreadPool會創(chuàng)建一個新線程執(zhí)行任務(wù),execute()方法執(zhí)行完成。 -
上圖3:.在步驟2中新創(chuàng)建的線程將任務(wù)執(zhí)行完后,會執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閑線程最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內(nèi)主線程提交了一個新任務(wù)(主線程執(zhí)行步驟1),那么這個空閑線程將執(zhí)行主線程提交的新任務(wù);否則,這個空閑線程將終止。由于空閑60秒的空閑線程會被終止,因此長時間保持空閑的CachedThreadPool不會使用任何資源。
前面提到過,SynchronousQueue是一個沒有容量的阻塞隊(duì)列。每個插入操作必須等待另一個線程的對應(yīng)移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主線程提交的任務(wù)傳遞給空閑線程執(zhí)行。
CachedThreadPool中任務(wù)傳遞的示意圖如下圖所示:
ScheduledThreadPoolExecutor詳解
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲之后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強(qiáng)大、更靈活。Timer對應(yīng)的是單個后臺線程,而ScheduledThreadPoolExecutor可以在構(gòu)造函數(shù)中指定多個對應(yīng)的后臺線程數(shù)。
-
ScheduledThreadPoolExecutor的運(yùn)行機(jī)制
ScheduledThreadPoolExecutor的執(zhí)行示意圖(基于JDK 6)如下圖所示:
ScheduledThreadPoolExecutor的任務(wù)傳遞示意圖DelayQueue是一個無界隊(duì)列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什么意義(設(shè)置maximumPoolSize的大小沒有什么效果)。
ScheduledThreadPoolExecutor的執(zhí)行主要分為兩大部分。
- 當(dāng)調(diào)用
ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實(shí)現(xiàn)了RunnableScheduledFutur接口的ScheduledFutureTask。 - 線程池中的線程從
DelayQueue中獲取ScheduledFutureTask,然后執(zhí)行任務(wù)。
ScheduledThreadPoolExecutor為了實(shí)現(xiàn)周期性的執(zhí)行任務(wù),對ThreadPoolExecutor做了如下的修改。
- 使用
DelayQueue作為任務(wù)隊(duì)列。 - 獲取任務(wù)的方式不同(后文會說明)。
- 執(zhí)行周期任務(wù)后,增加了額外的處理(后文會說明)。
ScheduledThreadPoolExecutor的實(shí)現(xiàn)
前面我們提到過,ScheduledThreadPoolExecutor會把待調(diào)度的任務(wù)(ScheduledFutureTask)放到一個DelayQueue中。
ScheduledFutureTask主要包含3個成員變量,如下。
-
long型成員變量time,表示這個任務(wù)將要被執(zhí)行的具體時間。 -
long型成員變量sequenceNumber,表示這個任務(wù)被添加到ScheduledThreadPoolExecutor中的序號。 -
long型成員變量period,表示任務(wù)執(zhí)行的間隔周期。
DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對隊(duì)列中的ScheduledFutureTask進(jìn)行排序。排序時,time小的排在前面(時間早的任務(wù)將被先執(zhí)行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務(wù)的執(zhí)行時間相同,那么先提交的任務(wù)將被先執(zhí)行)。
首先,讓我們看看ScheduledThreadPoolExecutor中的線程執(zhí)行周期任務(wù)的過程。下圖是ScheduledThreadPoolExecutor中的線程1執(zhí)行某個周期任務(wù)的4個步驟。
下面是對這4個步驟的說明:
-
上圖1:線程·1從DelayQueue中獲取已到期的ScheduledFutureTask的DelayQueue.take()。到期任務(wù)是指ScheduledFutureTask的time大于等于當(dāng)前時間。 -
上圖2:線程1執(zhí)行這個ScheduledFutureTask。 -
上圖3:線程1修改ScheduledFutureTask的time變量為下次將要被執(zhí)行的時間。 -
上圖4:線程1把這個修改time之后的ScheduledFutureTask放回DelayQueue中DelayQueue.add()。
接下來,讓我們看看上面的步驟1獲取任務(wù)的過程。下面是DelayQueue.take()方法的源代碼實(shí)現(xiàn):
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 1
try {
for (; ; ) {
E first = q.peek();
if (first == null) {
available.await(); // 2.1
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay); // 2.2
} else {
E x = q.poll(); // 2.3.1
assert x != null;
if (q.size() != 0)
available.signalAll(); // 2.3.2
return x;
}
}
}
} finally {
lock.unlock(); // 3
}
}
下圖是DelayQueue.take()的執(zhí)行示意圖:
如圖所示,獲取任務(wù)分為3大步驟。
- 獲取
Lock。 - 獲取周期任務(wù)。
- 如果
PriorityQueue為空,當(dāng)前線程到Condition中等待;否則執(zhí)行下面的2.2。 - 如果
PriorityQueue的頭元素的time時間比當(dāng)前時間大,到Condition中等待到time時間;否則執(zhí)行下面的2.3。 - 獲取
PriorityQueue的頭元素(2.3.1);如果PriorityQueue不為空,則喚醒在Condition中等待的所有線程(2.3.2)。
- 如果
- 釋放
Lock。
ScheduledThreadPoolExecutor在一個循環(huán)中執(zhí)行步驟2,直到線程從PriorityQueue獲取到一個元素之后(執(zhí)行2.3.1之后),才會退出無限循環(huán)(結(jié)束步驟2)。
最后,讓我們看看ScheduledThreadPoolExecutor中的線程執(zhí)行任務(wù)的步驟4,把ScheduledFutureTask放入DelayQueue中的過程。
下面是DelayQueue.add()的源代碼實(shí)現(xiàn):
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 1
try {
E first = q.peek();
q.offer(e); // 2.1
if (first == null || e.compareTo(first) < 0) {
available.signalAll(); // 2.2
}
return true;
} finally {
lock.unlock(); // 3
}
}
下圖是DelayQueue.add()的執(zhí)行示意圖:
如圖所示,添加任務(wù)分為3大步驟。
- 獲取
Lock。 - 添加任務(wù)。
- 向
PriorityQueue添加任務(wù)。 - 如果在上面
2.1中添加的任務(wù)是PriorityQueue的頭元素,喚醒在Condition中等待的所有線程。
- 向
- 釋放
Lock。
FutureTask詳解
Future接口和實(shí)現(xiàn)Future接口的FutureTask類,代表 異步計(jì)算的結(jié)果。
FutureTask簡介
FutureTask除了實(shí)現(xiàn)Future接口外,還實(shí)現(xiàn)了Runnable接口。因此,FutureTask可以交給Executor執(zhí)行,也可以由調(diào)用線程直接執(zhí)行FutureTask.run()。根據(jù)FutureTask.run()方法被執(zhí)行的時機(jī),FutureTask可以處于下面3種狀態(tài)。
-
未啟動。
FutureTask.run()方法還沒有被執(zhí)行之前,FutureTask處于未啟動狀態(tài)。當(dāng)創(chuàng)建一個FutureTask,且沒有執(zhí)行FutureTask.run()方法之前,這個FutureTask處于未啟動狀態(tài)。 -
已啟動。
FutureTask.run()方法被執(zhí)行的過程中,FutureTask處于已啟動狀態(tài)。 -
已完成。
FutureTask.run()方法執(zhí)行完后正常結(jié)束,或被取消FutureTask.cancel(…),或執(zhí)行FutureTask.run()方法時拋出異常而異常結(jié)束,FutureTask處于已完成狀態(tài)。
下圖是FutureTask的 狀態(tài)遷移 的示意圖。
當(dāng)FutureTask處于未啟動或已啟動狀態(tài)時,執(zhí)行FutureTask.get()方法將導(dǎo)致調(diào)用線程阻塞;當(dāng)FutureTask處于已完成狀態(tài)時,執(zhí)行FutureTask.get()方法將導(dǎo)致調(diào)用線程立即返回結(jié)果或拋出異常。
當(dāng)FutureTask處于未啟動狀態(tài)時,執(zhí)行FutureTask.cancel()方法將導(dǎo)致此任務(wù)永遠(yuǎn)不會被執(zhí)行;當(dāng)FutureTask處于已啟動狀態(tài)時,執(zhí)行FutureTask.cancel(true)方法將以中斷執(zhí)行此任務(wù)線程的方式來試圖停止任務(wù);當(dāng)FutureTask處于已啟動狀態(tài)時,執(zhí)行FutureTask.cancel(false)方法將不會對正在執(zhí)行此任務(wù)的線程產(chǎn)生影響(讓正在執(zhí)行的任務(wù)運(yùn)行完成);當(dāng)FutureTask處于已完成狀態(tài)時,執(zhí)行FutureTask.cancel(…)方法將返回false。
下圖是get方法和cancel方法的執(zhí)行示意圖:
FutureTask的使用
可以把FutureTask交給Executor執(zhí)行;也可以通過ExecutorService.submit(...)方法返回一個FutureTask,然后執(zhí)行FutureTask.get()方法或FutureTask.cancel(...)方法。除此以外,還可以單獨(dú)使用FutureTask。
當(dāng)一個線程需要等待另一個線程把某個任務(wù)執(zhí)行完后它才能繼續(xù)執(zhí)行,此時可以使用FutureTask。假設(shè)有多個線程執(zhí)行若干任務(wù),每個任務(wù)最多只能被執(zhí)行一次。當(dāng)多個線程試圖同時執(zhí)行同一個任務(wù)時,只允許一個線程執(zhí)行任務(wù),其他線程需要等待這個任務(wù)執(zhí)行完后才能繼續(xù)執(zhí)行。
下面是對應(yīng)的示例代碼:
private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();
private String executionTask(final String taskName)
throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); // 1.1, 2.1
if (future == null) {
Callable<String> task = new Callable<String>() {
@Override
public String call() throws InterruptedException {
return taskName;
}
};
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
if (future == null) {
future = futureTask;
futureTask.run(); // 1.4執(zhí)行任務(wù)
}
}
try {
return future.get(); // 1.5, 2.2
} catch (CancellationException e) {
taskCache.remove(taskName, future);
}
}
}
上述代碼的執(zhí)行示意圖如下圖所示:
當(dāng)兩個線程試圖同時執(zhí)行同一個任務(wù)時,如果線程1執(zhí)行1.3后線程2執(zhí)行2.1,那么接下來線程2將在2.2等待,直到線程1執(zhí)行完1.4后線程2才能2.2(FutureTask.get())返回。
FutureTask的實(shí)現(xiàn)
FutureTask的實(shí)現(xiàn)基于AbstractQueuedSynchronizer(以下簡稱為 AQS)。java.util.concurrent中的很多可阻塞類(比如ReentrantLock)都是基于AQS來實(shí)現(xiàn)的。AQS是一個同步框架,它提供通用機(jī)制來原子性管理同步狀態(tài)、阻塞和喚醒線程,以及維護(hù)被阻塞線程的隊(duì)列。
JDK 6 中AQS被廣泛使用,基于AQS實(shí)現(xiàn)的同步器包括:
ReentrantLockSemaphoreReentrantReadWriteLockCountDownLatch-
FutureTask。
每一個基于AQS實(shí)現(xiàn)的同步器都會包含兩種類型的操作,如下:
- 至少一個
acquire操作。這個操作阻塞調(diào)用線程,除非/直到AQS的狀態(tài)允許這個線程繼續(xù)執(zhí)行。FutureTask的acquire操作為get() / get(long timeout,TimeUnit unit)方法調(diào)用。 - 至少一個
release操作。這個操作改變AQS的狀態(tài),改變后的狀態(tài)可允許一個或多個阻塞線程被解除阻塞。FutureTask的release操作包括run()方法和cancel(...)方法。
基于“ 復(fù)合優(yōu)先于繼承 ”的原則,FutureTask聲明了一個內(nèi)部私有的繼承于AQS的子類Sync,對FutureTask所有公有方法的調(diào)用都會委托給這個內(nèi)部子類。
AQS被作為“模板方法模式”的基礎(chǔ)類提供給FutureTask的內(nèi)部子類Sync,這個內(nèi)部子類只需要實(shí)現(xiàn)狀態(tài)檢查和狀態(tài)更新的方法即可,這些方法將控制FutureTask的獲取和釋放操作。具體來說,Sync實(shí)現(xiàn)了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態(tài)。
FutureTask的設(shè)計(jì)示意圖如下圖所示:
如圖所示,Sync是FutureTask的內(nèi)部私有類,它繼承自AQS。創(chuàng)建FutureTask時會創(chuàng)建內(nèi)部私有的成員對象Sync,FutureTask所有的的公有方法都直接委托給了內(nèi)部私有的Sync。
FutureTask.get()方法會調(diào)用AQS.acquireSharedInterruptibly(int arg)方法,這個方法的執(zhí)行過程如下:
- 調(diào)用
AQS.acquireSharedInterruptibly(int arg)方法,這個方法首先會回調(diào)在子類Sync中實(shí)現(xiàn)的tryAcquireShared()方法來判斷acquire操作是否可以成功。acquire操作可以成功的條件為:state為執(zhí)行完成狀態(tài)RAN或已取消狀態(tài)CANCELLED,且runner不為null。 - 如果成功則
get()方法立即返回。如果失敗則到線程等待隊(duì)列中去等待其他線程執(zhí)行
release操作。 - 當(dāng)其他線程執(zhí)行
release操作(比如FutureTask.run()或FutureTask.cancel(…))喚醒當(dāng)前線程后,當(dāng)前線程再次執(zhí)行tryAcquireShared()將返回正值1,當(dāng)前線程將離開線程等待隊(duì)列并喚醒它的后繼線程(這里會產(chǎn)生級聯(lián)喚醒的效果,后面會介紹)。 - 最后返回計(jì)算的結(jié)果或拋出異常。
FutureTask.run()的執(zhí)行過程如下:
- 執(zhí)行在構(gòu)造函數(shù)中指定的任務(wù)(
Callable.call())。 - 以原子方式來更新同步狀態(tài)(調(diào)用
AQS.compareAndSetState(int expect,int update),設(shè)置state為執(zhí)行完成狀態(tài)RAN)。如果這個原子操作成功,就設(shè)置代表計(jì)算結(jié)果的變量result的值為Callable.call()的返回值,然后調(diào)用AQS.releaseShared(int arg)。 -
AQS.releaseShared(int arg)首先會回調(diào)在子類Sync中實(shí)現(xiàn)的tryReleaseShared(arg)來執(zhí)行release操作(設(shè)置運(yùn)行任務(wù)的線程runner為null,然會返回true);AQS.releaseShared(int arg),然后喚醒線程等待隊(duì)列中的第一個線程。 - 調(diào)用
FutureTask.done()。
當(dāng)執(zhí)行FutureTask.get()方法時,如果FutureTask不是處于執(zhí)行完成狀態(tài)RAN或已取消狀態(tài)CANCELLED,當(dāng)前執(zhí)行線程將到AQS的線程等待隊(duì)列中等待(見下圖的線程A、B、C和D)。當(dāng)某個線程執(zhí)行FutureTask.run()方法或FutureTask.cancel(...)方法時,會喚醒線程等待隊(duì)列的第一個線程(見下圖所示的線程E喚醒線程A)。
假設(shè)開始時FutureTask處于未啟動狀態(tài)或已啟動狀態(tài),等待隊(duì)列中已經(jīng)有3個線程(A、B和C)在等待。此時,線程D執(zhí)行get()方法將導(dǎo)致線程D也到等待隊(duì)列中去等待。
當(dāng)線程E執(zhí)行run()方法時,會喚醒隊(duì)列中的第一個線程A。線程A被喚醒后,首先把自己從隊(duì)列中刪除,然后喚醒它的后繼線程B,最后線程A從get()方法返回。線程B、C和D重復(fù)A線程的處理流程。最終,在隊(duì)列中等待的所有線程都被級聯(lián)喚醒并從get()方法返回。
小結(jié)
本文介紹了Executor框架的整體結(jié)構(gòu)和成員組件。
希望讀者閱讀本章之后,能夠?qū)?code>Executor框架有一個比較深入的理解,同時也希望本章內(nèi)容有助于讀者更熟練地使用Executor框架。
</article>