1.概覽
ExecutorService是一個由JDK提供的框架,它簡化了以異步模式運行task的工作。通常來說,ExecutorService會自動提供一個線程池以及常用的API。
2.實例化ExecutorService
?? 2.1 Executors類的工廠方法
??? 創(chuàng)建ExecutorService的最簡單的方法是使用Executors類的一個工廠方法。例如,下面的一行代碼將會創(chuàng)建一個擁有10個線程的線程池。
? ?? ExecutorService executor = Executors.newFixedThreadPool(10);
? 還有其他幾個工廠方法用于創(chuàng)建預(yù)定義的ExecutorService,它們會滿足特定的使用場景。尋找最合適你的方法,可以咨詢Oracle的官方文檔。
2.2 直接創(chuàng)建一個ExecutorService
由于ExecutorService是一個接口,所以可以使用它的任一實現(xiàn)來創(chuàng)建一個實例。在java.util.concurrent包中有好幾個實現(xiàn)可供選擇,或者你可以創(chuàng)建自己的實現(xiàn)。例如,ThreadPoolExecutor類就有好幾個構(gòu)造方法可用于配置一個executor service以及它內(nèi)部的線程池。
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
你可能注意到了,上面的代碼和工廠方法newSingleThreadExecutor()的源代碼很像,在大多數(shù)情況下,詳細的手工配置都不是必需的。
3.分配task給ExecutorService
ExecutorService可以運行 Runnable 以及Callable任務(wù)。為了讓事情簡單一點,我們將使用倆個最基本的任務(wù)。注意:這里我們使用拉姆達表達式來取代匿名內(nèi)部類。
Runnable runnableTask = () -> {??
? ? try {????????
??????? TimeUnit.MILLISECONDS.sleep(300);??
? ? ? ? } catch (InterruptedException e) {??
??????e.printStackTrace();?
???}
};
?Callable callableTask = () -> {??
??TimeUnit.MILLISECONDS.sleep(300);????
?? return "Task's execution";
};
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
有好幾個方法可以用于把task分配給ExecutorService,包括: execute()、 submit()、 invokeAny()、 invokeAll()。其中,execute()方法是繼承自Executor接口。execute()方法的返回值是void,使用它無法獲取任務(wù)的運行結(jié)果或者檢查任務(wù)的狀態(tài)(例如,是正在運行還是已經(jīng)被執(zhí)行了)。
executorService.execute(runnableTask);
submit()可以提交一個Callable或Runnable任務(wù)給ExecutorService,并且返回一個Future類型的結(jié)果。
Future future =? executorService.submit(callableTask);
invokeAny()可以給ExecutorService分配一個任務(wù)集合。導(dǎo)致他們中的每一個都被執(zhí)行,并且返回一個任務(wù)成功運行的結(jié)果(如果存在一個成功運行的話)。
String result = executorService.invokeAny(callableTasks);
invokeAll()也是可以給ExecutorService分配一個任務(wù)集合。導(dǎo)致他們中的每一個都被執(zhí)行,并且以Future集合的形式返回所有任務(wù)的運行結(jié)果。
List> futures = executorService.invokeAll(callableTasks);
現(xiàn)在,在進行下一步之前,我們有兩件事必須要討論一下: 關(guān)閉ExecutorService并且處理Future返回類型。
4.關(guān)閉一個ExecutorService
在通常情況下,當(dāng)沒有待處理的任務(wù)(task)時,ExecutorService將不會自動銷毀。它會一直保持存活并且等待執(zhí)行新任務(wù)。
在某些情況下,這或許很有用;例如,有一個應(yīng)用需要處理一些不規(guī)則的任務(wù)或大量在編譯時不可知的任務(wù)。在另一方面,即使一個app在執(zhí)行到最后,它也不會被停止,因為一個處于等待中的ExecutorService 將導(dǎo)致JVM保持運行。
為了正確地關(guān)閉一個ExecutorService,我們有兩個API可供使用: shutdown() 以及shutdownNow()。
shutdown()方法不會導(dǎo)致ExecutorService立即銷毀。它將讓ExecutorService停止接收新任務(wù),并且在所有運行中的線程完成他們當(dāng)前工作之后,關(guān)閉ExecutorService。
executorService.shutdown();
而shutdownNow()方法會試圖立即銷毀ExecutorService,但是它并不保證在同一時刻下所有運行中的線程都能被終止。該方法會返回一個任務(wù)集合,這些任務(wù)都是等待被ExecutorService處理的。具體怎么處理這些任務(wù)則取決于開發(fā)者。
List notExecutedTasks = executorService.shutDownNow();
關(guān)閉ExecutorService的最好方式(同時也是Oracle所推薦的)是上面?zhèn)z種方法與awaitTermination()的組合使用。使用這種方法,ExecutorService將會先停止接收新任務(wù),然后等待一段時間好讓所有任務(wù)都被完成。如果時間過期了,執(zhí)行將被理解終止:
executorService.shutdown();
try {
? ? if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
? ? ? ? executorService.shutdownNow();
? ? }
} catch (InterruptedException e) {
? ? executorService.shutdownNow();
}
5.Future接口
submit()和invokeAll()方法會返回一個對象或者一個Future類型的集合,這使得我們可以獲取任務(wù)的執(zhí)行結(jié)果或者檢查任務(wù)的狀態(tài)(是正在運行還是已經(jīng)執(zhí)行完成)。
Future接口提供了一個特殊的額阻塞方法get(),get()方法會返回一個Callable任務(wù)實際的執(zhí)行結(jié)果,或者null(如果是Runnable任務(wù)的話)。在任務(wù)仍處于運行期間的時候,調(diào)用get()方法將會導(dǎo)致阻塞,直到任務(wù)被恰當(dāng)?shù)貓?zhí)行并且結(jié)果是可用時。
Future future = executorService.submit(callableTask);
String result = null;
try {
? ? result = future.get();
} catch (InterruptedException | ExecutionException e) {
? ? e.printStackTrace();
}
如果get()方法導(dǎo)致長久的阻塞的話,應(yīng)用的執(zhí)行性能就會下降。如果結(jié)果數(shù)據(jù)不重要的話,可以使用超時時間來避免這個問題:
String result = future.get(200, TimeUnit.MILLISECONDS);
如果執(zhí)行時間比指定時間長的話(此時,即:200毫秒),就會拋出TimeoutException異常。isDone()方法可以用來檢查分配的任務(wù)是否已經(jīng)被處理了。Future接口也提供了cancel()方法用于取消任務(wù)的執(zhí)行,并且可以使用isCancelled()方法來檢查該任務(wù)是否已經(jīng)被取消。
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
6.ScheduledExecutorService接口
ScheduledExecutorService會在預(yù)定義的延遲之后,運行任務(wù)。再次說明,實例化ScheduledExecutorService的最好方式是使用Executor類的工廠方法。在本章節(jié)中,我們將使用單線程的ScheduledExecutorService:
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
在一個固定延遲之后,調(diào)度一個單任務(wù)執(zhí)行,可以使用ScheduledExecutorService的scheduled()方法。這里有兩個scheduled()方法供你運行一個Runnable或Callable任務(wù):
Future resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
scheduleAtFixedRate()方法可以讓我們在固定的延遲時間之后,周期性地執(zhí)行一個任務(wù)。上面的代碼會在執(zhí)行callableTask之前,延遲一秒。
下面的代碼塊則會在100毫秒的初始延遲之后,執(zhí)行一個任務(wù),并且在那之后,它會每隔450毫秒執(zhí)行同一個任務(wù)。如果處理器在處理一個任務(wù)時需要的時間比scheduledAtFixedRate()方法的period參數(shù)更長的話,ScheduledExecutorService將會在開始下一個任務(wù)之前一直等待,直到當(dāng)前任務(wù)完成。
Future resultFuture = service .scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
如果必需在兩個任務(wù)迭代之間有一個固定長度的延遲的話,應(yīng)當(dāng)使用scheduleWithFixedDelay()。例如,下面的代碼將會保證在結(jié)束當(dāng)前運行和開始另一個任務(wù)之間有150毫秒的停滯。
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
根據(jù)scheduledAtFixedRate()以及scheduleWithFixedDelay()方法的約定,如果ExecutorService終止或者在任務(wù)執(zhí)行過程中有異常拋出的時,任務(wù)的周期執(zhí)行將會結(jié)束。
7.ExecutorService VS Fork/Join
在java7發(fā)布之后,許多開發(fā)者任務(wù),應(yīng)該使用fork/join框架取代ExecutorService框架。但這并不總是合適的選擇。雖然fork/join可以帶來使用的簡潔性以及運行的流暢性,但是它也讓降低了我們對并發(fā)執(zhí)行的控制。ExecutorService讓開發(fā)者可以控制一定數(shù)量的線程,以及任務(wù)執(zhí)行的粒度。
ExecutorService最適用于處理那些相互獨立的任務(wù)。例如,事務(wù) 或 符合“單任務(wù)單線程”的請求等。
相比之下,根據(jù)Oracle的官方文檔,fork/join在設(shè)計之初的目的就是加速那些可以被分解成小塊的工作。
8.總結(jié)
盡管ExecutorService比較簡單,但還是有很多坑需要我們注意。我們來總結(jié)一下:
? 1.保持一個未使用的ExecutorService存活:? 我們在本篇文章的章節(jié)4中講解了如何關(guān)閉一個ExecutorService。
? 2.在使用固定長度的線程池時,錯誤的線程池容量: 決定一個應(yīng)用到底需要多少線程才可以有效地執(zhí)行任務(wù)時非常重要的,線程池太大會導(dǎo)致 不 必要的花銷,因為大多數(shù)線程將處于等待模式waiting mode。線程池太小的話,又會因為隊列中任務(wù)的長時間地等待而降低應(yīng)用的響應(yīng)性。
? 3.在一個task取消之后,調(diào)用Future的get()方法:試圖獲取一個已經(jīng)取消的任務(wù)的執(zhí)行結(jié)果將會觸發(fā)一個CancellationException。
? 4.Future.get()方法的長期阻塞:超時時間應(yīng)該用于避免無謂的等待。
文章中的代碼可以在github上找到,地址: sourceCode