帶你進入java中的ExecutorService

1.概覽

ExecutorService是一個由JDK提供的框架,它簡化了以異步模式運行task的工作。通常來說,ExecutorService會自動提供一個線程池以及常用的API。


2.實例化ExecutorService

?? 2.1 Executors類的工廠方法

??? 創(chuàng)建ExecutorService的最簡單的方法是使用Executors類的一個工廠方法。例如,下面的一行代碼將會創(chuàng)建一個擁有10個線程的線程池。

? ?? ExecutorService executor = Executors.newFixedThreadPool(10);

? 還有其他幾個工廠方法用于創(chuàng)建預(yù)定義的ExecutorService,它們會滿足特定的使用場景。尋找最合適你的方法,可以咨詢Oracle的官方文檔。

2.2 直接創(chuàng)建一個ExecutorService

由于ExecutorService是一個接口,所以可以使用它的任一實現(xiàn)來創(chuàng)建一個實例。在java.util.concurrent包中有好幾個實現(xiàn)可供選擇,或者你可以創(chuàng)建自己的實現(xiàn)。例如,ThreadPoolExecutor類就有好幾個構(gòu)造方法可用于配置一個executor service以及它內(nèi)部的線程池。

ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

你可能注意到了,上面的代碼和工廠方法newSingleThreadExecutor()的源代碼很像,在大多數(shù)情況下,詳細的手工配置都不是必需的。


3.分配task給ExecutorService

ExecutorService可以運行 Runnable 以及Callable任務(wù)。為了讓事情簡單一點,我們將使用倆個最基本的任務(wù)。注意:這里我們使用拉姆達表達式來取代匿名內(nèi)部類。

Runnable runnableTask = () -> {??

? ? try {????????

??????? TimeUnit.MILLISECONDS.sleep(300);??

? ? ? ? } catch (InterruptedException e) {??

??????e.printStackTrace();?

???}

};

?Callable callableTask = () -> {??

??TimeUnit.MILLISECONDS.sleep(300);????

?? return "Task's execution";

};

callableTasks.add(callableTask);

callableTasks.add(callableTask);

callableTasks.add(callableTask);

有好幾個方法可以用于把task分配給ExecutorService,包括: execute()、 submit()、 invokeAny()、 invokeAll()。其中,execute()方法是繼承自Executor接口。execute()方法的返回值是void,使用它無法獲取任務(wù)的運行結(jié)果或者檢查任務(wù)的狀態(tài)(例如,是正在運行還是已經(jīng)被執(zhí)行了)。

executorService.execute(runnableTask);

submit()可以提交一個Callable或Runnable任務(wù)給ExecutorService,并且返回一個Future類型的結(jié)果。

Future future =? executorService.submit(callableTask);

invokeAny()可以給ExecutorService分配一個任務(wù)集合。導(dǎo)致他們中的每一個都被執(zhí)行,并且返回一個任務(wù)成功運行的結(jié)果(如果存在一個成功運行的話)。

String result = executorService.invokeAny(callableTasks);

invokeAll()也是可以給ExecutorService分配一個任務(wù)集合。導(dǎo)致他們中的每一個都被執(zhí)行,并且以Future集合的形式返回所有任務(wù)的運行結(jié)果。

List> futures = executorService.invokeAll(callableTasks);

現(xiàn)在,在進行下一步之前,我們有兩件事必須要討論一下: 關(guān)閉ExecutorService并且處理Future返回類型。


4.關(guān)閉一個ExecutorService

在通常情況下,當(dāng)沒有待處理的任務(wù)(task)時,ExecutorService將不會自動銷毀。它會一直保持存活并且等待執(zhí)行新任務(wù)。

在某些情況下,這或許很有用;例如,有一個應(yīng)用需要處理一些不規(guī)則的任務(wù)或大量在編譯時不可知的任務(wù)。在另一方面,即使一個app在執(zhí)行到最后,它也不會被停止,因為一個處于等待中的ExecutorService 將導(dǎo)致JVM保持運行。

為了正確地關(guān)閉一個ExecutorService,我們有兩個API可供使用: shutdown() 以及shutdownNow()。

shutdown()方法不會導(dǎo)致ExecutorService立即銷毀。它將讓ExecutorService停止接收新任務(wù),并且在所有運行中的線程完成他們當(dāng)前工作之后,關(guān)閉ExecutorService。

executorService.shutdown();

而shutdownNow()方法會試圖立即銷毀ExecutorService,但是它并不保證在同一時刻下所有運行中的線程都能被終止。該方法會返回一個任務(wù)集合,這些任務(wù)都是等待被ExecutorService處理的。具體怎么處理這些任務(wù)則取決于開發(fā)者。

List notExecutedTasks = executorService.shutDownNow();

關(guān)閉ExecutorService的最好方式(同時也是Oracle所推薦的)是上面?zhèn)z種方法與awaitTermination()的組合使用。使用這種方法,ExecutorService將會先停止接收新任務(wù),然后等待一段時間好讓所有任務(wù)都被完成。如果時間過期了,執(zhí)行將被理解終止:

executorService.shutdown();

try {

? ? if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {

? ? ? ? executorService.shutdownNow();

? ? }

} catch (InterruptedException e) {

? ? executorService.shutdownNow();

}


5.Future接口

submit()和invokeAll()方法會返回一個對象或者一個Future類型的集合,這使得我們可以獲取任務(wù)的執(zhí)行結(jié)果或者檢查任務(wù)的狀態(tài)(是正在運行還是已經(jīng)執(zhí)行完成)。

Future接口提供了一個特殊的額阻塞方法get(),get()方法會返回一個Callable任務(wù)實際的執(zhí)行結(jié)果,或者null(如果是Runnable任務(wù)的話)。在任務(wù)仍處于運行期間的時候,調(diào)用get()方法將會導(dǎo)致阻塞,直到任務(wù)被恰當(dāng)?shù)貓?zhí)行并且結(jié)果是可用時。

Future future = executorService.submit(callableTask);

String result = null;

try {

? ? result = future.get();

} catch (InterruptedException | ExecutionException e) {

? ? e.printStackTrace();

}

如果get()方法導(dǎo)致長久的阻塞的話,應(yīng)用的執(zhí)行性能就會下降。如果結(jié)果數(shù)據(jù)不重要的話,可以使用超時時間來避免這個問題:

String result = future.get(200, TimeUnit.MILLISECONDS);

如果執(zhí)行時間比指定時間長的話(此時,即:200毫秒),就會拋出TimeoutException異常。isDone()方法可以用來檢查分配的任務(wù)是否已經(jīng)被處理了。Future接口也提供了cancel()方法用于取消任務(wù)的執(zhí)行,并且可以使用isCancelled()方法來檢查該任務(wù)是否已經(jīng)被取消。

boolean canceled = future.cancel(true);

boolean isCancelled = future.isCancelled();


6.ScheduledExecutorService接口

ScheduledExecutorService會在預(yù)定義的延遲之后,運行任務(wù)。再次說明,實例化ScheduledExecutorService的最好方式是使用Executor類的工廠方法。在本章節(jié)中,我們將使用單線程的ScheduledExecutorService:

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

在一個固定延遲之后,調(diào)度一個單任務(wù)執(zhí)行,可以使用ScheduledExecutorService的scheduled()方法。這里有兩個scheduled()方法供你運行一個Runnable或Callable任務(wù):

Future resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

scheduleAtFixedRate()方法可以讓我們在固定的延遲時間之后,周期性地執(zhí)行一個任務(wù)。上面的代碼會在執(zhí)行callableTask之前,延遲一秒。

下面的代碼塊則會在100毫秒的初始延遲之后,執(zhí)行一個任務(wù),并且在那之后,它會每隔450毫秒執(zhí)行同一個任務(wù)。如果處理器在處理一個任務(wù)時需要的時間比scheduledAtFixedRate()方法的period參數(shù)更長的話,ScheduledExecutorService將會在開始下一個任務(wù)之前一直等待,直到當(dāng)前任務(wù)完成。

Future resultFuture = service .scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

如果必需在兩個任務(wù)迭代之間有一個固定長度的延遲的話,應(yīng)當(dāng)使用scheduleWithFixedDelay()。例如,下面的代碼將會保證在結(jié)束當(dāng)前運行和開始另一個任務(wù)之間有150毫秒的停滯。

service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

根據(jù)scheduledAtFixedRate()以及scheduleWithFixedDelay()方法的約定,如果ExecutorService終止或者在任務(wù)執(zhí)行過程中有異常拋出的時,任務(wù)的周期執(zhí)行將會結(jié)束。


7.ExecutorService VS Fork/Join

在java7發(fā)布之后,許多開發(fā)者任務(wù),應(yīng)該使用fork/join框架取代ExecutorService框架。但這并不總是合適的選擇。雖然fork/join可以帶來使用的簡潔性以及運行的流暢性,但是它也讓降低了我們對并發(fā)執(zhí)行的控制。ExecutorService讓開發(fā)者可以控制一定數(shù)量的線程,以及任務(wù)執(zhí)行的粒度。

ExecutorService最適用于處理那些相互獨立的任務(wù)。例如,事務(wù) 或 符合“單任務(wù)單線程”的請求等。

相比之下,根據(jù)Oracle的官方文檔,fork/join在設(shè)計之初的目的就是加速那些可以被分解成小塊的工作。


8.總結(jié)

盡管ExecutorService比較簡單,但還是有很多坑需要我們注意。我們來總結(jié)一下:

? 1.保持一個未使用的ExecutorService存活:? 我們在本篇文章的章節(jié)4中講解了如何關(guān)閉一個ExecutorService。

? 2.在使用固定長度的線程池時,錯誤的線程池容量: 決定一個應(yīng)用到底需要多少線程才可以有效地執(zhí)行任務(wù)時非常重要的,線程池太大會導(dǎo)致 不 必要的花銷,因為大多數(shù)線程將處于等待模式waiting mode。線程池太小的話,又會因為隊列中任務(wù)的長時間地等待而降低應(yīng)用的響應(yīng)性。

? 3.在一個task取消之后,調(diào)用Future的get()方法:試圖獲取一個已經(jīng)取消的任務(wù)的執(zhí)行結(jié)果將會觸發(fā)一個CancellationException。

? 4.Future.get()方法的長期阻塞:超時時間應(yīng)該用于避免無謂的等待。

文章中的代碼可以在github上找到,地址: sourceCode

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容