新穎的、優(yōu)雅的異步處理數(shù)據(jù)的方法
Java SE 8為Java平臺(tái)帶來了許多新東西,其中很多已經(jīng)在生產(chǎn)環(huán)境當(dāng)中得到了應(yīng)用。但是在異步編程方法,卻并不是每個(gè)程序員都能很好的使用,也并非所有應(yīng)用程序都使用java.util.concurrent包,即使此包中對(duì)于編寫正確的并發(fā)代碼提供的原語非常有用。
java.util.concurrent包在Java 8中增加了幾個(gè)非常好的補(bǔ)充接口和實(shí)現(xiàn)類。我們?cè)诒疚闹杏懻摰氖?em>CompletionStage接口和CompletableFuture實(shí)現(xiàn)類。 與Future接口一起,它們?yōu)闃?gòu)建異步系統(tǒng)提供了非常好的應(yīng)用模式。
Problem Statement
讓我們從下面的代碼開始。 我們不用關(guān)心是哪個(gè)API提供的方法,也不關(guān)心使用的類,甚至不用關(guān)心是不是正確的Java代碼。
queryEngine.select("select user from User user")
.forEach(user -> System.out.println(user));
我們這里有一個(gè)查詢引擎,它從數(shù)據(jù)庫上執(zhí)行Java Persistence Query Language (JPQL)類型的請(qǐng)求。 查詢的結(jié)果,然后打印結(jié)果。 查詢數(shù)據(jù)庫的速度可能很慢,因此我們希望在單獨(dú)的線程中執(zhí)行此代碼,并在獲取結(jié)果以后觸發(fā)打印。 一旦我們啟動(dòng)了這項(xiàng)任務(wù),我們真的不想再慢慢等待了。
我們?cè)贘ava 7中有哪些API工具來實(shí)現(xiàn)此需求? 眾所周知,Java 5中引入的Callable,我們可以將要執(zhí)行的任務(wù)包裝在Callable中,并將此對(duì)象提交給ExecutorService。 如下所示:
Callable<String> task = () -> "select user from User";
Future<String> future = executorService.submit(task);
從future對(duì)象獲取結(jié)果的唯一方法是在提交Callable任務(wù)的線程中調(diào)用其get() 方法。 此方法是阻塞的,因此此調(diào)用將阻止線程,直到結(jié)果查詢完畢。
這正是CompletionStage的使用場景。
第一個(gè)鏈模式
讓我們使用CompletionStage模式重寫任務(wù)提交。
原來:
executor.submit(() -> {
() -> "select user from User";
});
改變:
CompletableFuture<List<User>> completableFuture =
CompletableFuture.supplyAsync(() -> { () -> dbEngine.query("select user from User"); }, executor);
我們將參數(shù)Callable傳遞給CompletableFuture的靜態(tài)supplyAsync()方法,而不是將它傳遞給ExecutorService的submit()方法。 此方法還可以將Executor作為第二個(gè)參數(shù),使客戶端可以自定義選擇執(zhí)行Callable的線程池。
它返回一個(gè)CompletableFuture實(shí)例,這是一個(gè)Java 8的新類。在這個(gè)對(duì)象上,我們可以做以下操作:
completableFuture.thenAccept(System.out::println);
thenAccept()方法接收的參數(shù)是一個(gè)Consumer,在結(jié)果可用時(shí)自動(dòng)調(diào)用該方法,無需編寫額外的等待代碼。 避免像前一種情況那樣造成線程阻塞。
CompletionStage是什么?
簡而言之,CompletionStage是一個(gè)承載任務(wù)的模型。接下來我們將看到,任務(wù)可以是任意的Runnable,Consumer或Function的實(shí)例。 任務(wù)是鏈的一個(gè)要素。 CompletionStage以不同的方式鏈接在一起。 “上游”元素是之前執(zhí)行的CompletionStage。 因此,“下游”元素是在之后執(zhí)行的CompletionStage。
執(zhí)行完成一個(gè)或多個(gè)上游CompletionStageS后,將觸發(fā)當(dāng)前的CompletionStage執(zhí)行。 執(zhí)行完CompletionStageS可能會(huì)有返回值,返回值可以傳遞給當(dāng)前的CompletionStage。 當(dāng)前CompletionStage執(zhí)行完會(huì)觸發(fā)其他下游的CompletionStageS,并且將生成返回值傳遞下去。
因此,CompletionStage是鏈的一個(gè)元素。
CompletionStage接口的實(shí)現(xiàn)類是java.util.concurrent.CompletableFuture的實(shí)現(xiàn)。 請(qǐng)注意,CompletableFuture也實(shí)現(xiàn)了Future接口。但是 CompletionStage沒有繼承Future。
一個(gè)任務(wù)一般有如下三種狀態(tài):
1、正在執(zhí)行
2、執(zhí)行正常完成,并產(chǎn)生正確的結(jié)果
3、執(zhí)行異常完成,可能會(huì)產(chǎn)生異常
Future的方法介紹
Future定義了三種類型五個(gè)方法:
- cancel(), 試圖取消正在執(zhí)行的任務(wù)
- isCanceled() 和 isDone() 判斷任務(wù)的狀態(tài)(取消成功|完成)
- get(), 兩個(gè)方法,一個(gè)不帶參數(shù)的,和一個(gè)帶超時(shí)參數(shù)的。
CompletableFuture增加了六種類似Future的新方法。
前兩個(gè)方法是join()和getNow(value)。 第一個(gè),join(),阻塞直到CompletableFuture完成,和Future的get()方法一樣。 主要區(qū)別在于join() 方法不會(huì)拋出顯式的異常(get()方法拋出InterruptedException, ExecutionException異常),從而更簡單。getNow(value)類似,它會(huì)立即返回,如果完成,則返回執(zhí)行結(jié)果,如果沒有完成,則返回給定得默認(rèn)值value。 請(qǐng)注意,此調(diào)用不會(huì)阻塞,不會(huì)等待CompletableFuture完成。
其余四種方法強(qiáng)制CompletableFuture結(jié)束,無論是使用默認(rèn)值還是異常,如果CompletableFuture已經(jīng)完成,它們可以覆蓋此CompletableFuture得返回值。
- complete(value)方法完成CompletableFuture(如果CompletableFuture沒有結(jié)束),并返回value。 如果CompletableFuture已完成,則complete(value)方法不會(huì)執(zhí)行。 如果需要更改value,則需要調(diào)用obtrude(value) 方法。 此方法確實(shí)會(huì)更改CompletableFuture的值,即使它已經(jīng)完成。 但是使用的時(shí)候要小心,因?yàn)閏omplete已經(jīng)觸發(fā)了客戶端,有可能導(dǎo)致客戶端會(huì)得到不期望的結(jié)果。
- 另一對(duì)方法的工作方式相同,但它們強(qiáng)制CompletableFuture拋出異常并結(jié)束: completeExceptionally(throwable)和obtrudeExceptionally(throwable)。 如果CompletableFuture尚未完成,則第一個(gè)拋出RuntimeException,第二個(gè)強(qiáng)制CompletableFuture更改其狀態(tài),和*obtrude(value) *類似。
如何創(chuàng)建CompletableFuture
創(chuàng)建CompletableFutures有以下幾種方式。
創(chuàng)建一個(gè)已完成的CompletableFuture
首先介紹的第一種方式,創(chuàng)建了一個(gè)已完成的CompletableFuture。 創(chuàng)建這樣的Future可能看起來很奇怪,但它在測試環(huán)境中非常有用。
CompletableFuture<String> cf =
CompletableFuture.completedFuture("I'm done!");
cf.isDone(); // return true
cf.join(); // return "I'm done"
從任務(wù)創(chuàng)建一個(gè)CompletableFuture
CompletableFuture可以構(gòu)建在兩種任務(wù)上:一個(gè)不帶任何參數(shù)且沒有返回值的Runnable,另一個(gè)是不帶參數(shù),返回一個(gè)對(duì)象的Supplier。 在這兩種情況下,都可以傳遞Executor來設(shè)置執(zhí)行任務(wù)的線程池。如下所示:
CompletableFuture<Void> cf1 =
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture<T> cf2 =
CompletableFuture.supplyAsync(Supplier<T> supplier);
如果未提供ExecutorService,則任務(wù)將在 ForkJoinPool.commonPool()線程池
中執(zhí)行,該池與Stream并行執(zhí)行的線程池相同。
自定義線程池,如下所示:
Runnable runnable = () -> {
System.out.println("Executing in " +
Thread.currentThread().getName());
};
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> cf =
CompletableFuture.runAsync(runnable, executor);
cf.thenRun(() -> System.out.println("I'm done"));
executor.shutdown();
執(zhí)行上面的代碼,輸入如下:
Executing in pool-1-thread-1
I'm done
在這種情況下,Runnable在我們自定義的的SingleThreadExecutor線程池中執(zhí)行。
構(gòu)建CompletableFuture鏈
我們?cè)诒疚牡拈_始已經(jīng)介紹過,CompletableFuture是鏈的一個(gè)元素。 上一節(jié)中看到了如何從任務(wù)(Runnable或Supplier)創(chuàng)建此鏈的第一個(gè)元素。 現(xiàn)在讓我們看看如何將其他任務(wù)鏈接到這個(gè)任務(wù)。 事實(shí)上,根據(jù)前面的例子,我們已經(jīng)猜到了該怎么做了。
鏈的任務(wù)
第一個(gè)任務(wù)是由Runnable或Supplier構(gòu)建的,兩個(gè)功能接口(你可以看成是functions),它們不帶任何參數(shù),但是可能會(huì),也可能不會(huì)有返回值。
鏈的第二個(gè)元素和其他元素都可以獲取前一個(gè)元素的結(jié)果(如果有的話)。 所以我們需要不同的functions來構(gòu)建這些元素。 我們先嘗試簡單的理解一下。
鏈的前一個(gè)元素可能會(huì),也可能不會(huì)有返回值。 所以鏈的functions的入?yún)⒖梢杂幸粋€(gè)對(duì)象,或者沒有參數(shù)。 此鏈元素可能會(huì)有,可也能不會(huì)有返回值。 所以鏈的函數(shù)應(yīng)該有一個(gè)返回值,或者沒有返回值。 這有四種可能的情況。 在這四種可能的函數(shù)中,其中不帶結(jié)果參數(shù),并產(chǎn)生返回值的函數(shù)是鏈的起點(diǎn),我們已在上一節(jié)中看到過。
CompletableFuture API中使用的四種可能的functions的名稱
| Takes a Parameters? | Returns Void | Returns R |
|---|---|---|
| Takes T | Consumer<T> | Function<T, R> |
| Does not take anything | Runnable | Not an element of a chain |
鏈的類型
現(xiàn)在我們已經(jīng)對(duì)API支持的任務(wù)有所了解,讓我們來看看鏈的含義。 到目前為止,我們假設(shè)鏈?zhǔn)顷P(guān)于觸發(fā)另一個(gè)任務(wù)的任務(wù),將第一個(gè)的結(jié)果作為參數(shù)傳遞給第二個(gè)任務(wù)。 這是基本的一對(duì)一的鏈。
我們也可以組合元素而不是鏈接它們。 這僅適用于獲取前一任務(wù)結(jié)果并包裝成CompletableFuture對(duì)象提供給另一個(gè)任務(wù)。 這又是一對(duì)一的關(guān)系(不是鏈,因?yàn)檫@是組合)。
但我們也可以構(gòu)建一個(gè)樹狀結(jié)構(gòu),其中由兩個(gè)上游任務(wù)而不是一個(gè)上游任務(wù)觸發(fā)的任務(wù)。 我們可以想象成兩個(gè)提供組合結(jié)果,或者在第一個(gè)上游元素提供結(jié)果,并觸發(fā)當(dāng)前任務(wù)。 這兩種情況都有意義,我們將會(huì)說到它們的例子。
選擇一個(gè)執(zhí)行器
最后,我們希望能夠根據(jù)不同的情形來選擇ExecutorService(即線程池)執(zhí)行我們的任務(wù)。 這有很多種情況需要我們來判斷:
- 我們的任務(wù)之一可能是更新圖形用戶界面。 在這種情況下,我們希望它在人機(jī)界面(HMI)線程中運(yùn)行。 Swing,JavaFX和Android就屬于這種情況。
- 我們有些I/O任務(wù)或計(jì)算任務(wù)需要在專門的線程池中執(zhí)行。
- 我們的變量中可能存在可見性問題,需要在同一個(gè)線程中執(zhí)行任務(wù)。
- 我們希望在默認(rèn)的fork/join池中異步執(zhí)行任務(wù)。
所有的這些情況下,我們必須增加ExecutorService參數(shù)用來定制執(zhí)行器 。
Note: 調(diào)整線程池的大小
《Java并發(fā)編程實(shí)戰(zhàn)》一書中,Brian Goetz和合著者們?yōu)榫€程池大小 的優(yōu)化提供了不少中肯的建議。這非常重要,如果線程池中線程的數(shù)量過多,最終它們會(huì)競爭 稀缺的處理器和內(nèi)存資源,浪費(fèi)大量的時(shí)間在上下文切換上。反之,如果線程的數(shù)目過少,正 如你的應(yīng)用所面臨的情況,處理器的一些核可能就無法充分利用。Brian Goetz建議,線程池大 小與處理器的利用率之比可以使用下面的公式進(jìn)行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
1、NCPU是處理器的核的數(shù)目,可以通過Runtime.getRuntime().availableProce- ssors()得到
2、UCPU是期望的CPU利用率(該值應(yīng)該介于0和1之間)
3、W/C是等待時(shí)間與計(jì)算時(shí)間的比率
豐富的API實(shí)現(xiàn)
CompletableFuture類中有很多API方法! 三種類型的任務(wù),四種類型的鏈接和組合,三種方式指定ExecutorService。36種鏈接任務(wù)的方法。大量可用的方法使這個(gè)類變得很復(fù)雜。
逐一的學(xué)習(xí)API方法將是非常繁瑣的,所以讓我們看看該如何正確的選擇合適的API。
模式選擇
以下是一些可用模式的描述。
一對(duì)一的模式
在這種情況下,從第一個(gè)CompletableFuture開始,當(dāng)完成其任務(wù)執(zhí)行時(shí),我們創(chuàng)建的第二個(gè)CompletableFuture開始執(zhí)行。如下所示:
CompletableFuture<String> cf1 =
CompletableFuture.supplyAsync(() -> "Hello world");
CompletableFuture<String> cf2 =
cf1.thenApply(s -> s + " from the Future!");
有三種 "then-apply"的方法。 它們都有一個(gè)Function的參數(shù),T為上游元素的結(jié)果,并返回一個(gè)新的對(duì)象R。
我們?cè)贋榱魉€添加一個(gè)步驟。 這次,我們thenAccept()方法,參數(shù)為Consumer<String>,沒有返回值(Void)。
CompletableFuture<Void> cf3 =
cf2.thenAccept(System.out::println);
讓我們?yōu)檫@個(gè)流水線添加最后一步。 調(diào)用thenRun(),參數(shù)為Runnable(不帶參數(shù),并且沒有返回值) .
CompletableFuture<Void> cf4 =
cf3.thenRun(() -> System.out.println("Done processing this chain"));
這些方法的命名都很清晰:以then開頭,后面跟上函數(shù)接口的名稱(run的參數(shù)是Runnable,accept參數(shù)為Consumer,apply參數(shù)為Function)。所有這些方法都在與上游任務(wù)具有相同的執(zhí)行器(同一個(gè)線程池)。
然后,這些方法還可以進(jìn)一步的采用相同的后綴:async。 異步方法在默認(rèn)的fork/join池( ForkJoinPool.commonPool())中執(zhí)行其任務(wù),當(dāng)然你也可以指定任務(wù)執(zhí)行器Executor。
我們用異步的方式重寫cf4,如下所示:
CompletableFuture<Void> cf4 =
cf3.thenRunAsync(() -> System.out.println("Done processing this chain"));
在這種情況下,Runnable任務(wù)將在默認(rèn)的fork/join池中執(zhí)行。
二對(duì)一的組合模式
組合模式是下一步任務(wù)接收兩個(gè)上游任務(wù)的結(jié)果的模式。 在這種情況下可以使用兩個(gè)函數(shù):BiFunction和BiConsumer。 也可以在組合模式中執(zhí)行Runnable。 如下所示:
| Method | Description |
|---|---|
| <U,V> CompletableFuture<V> thenCombine(CompletionStage<U> other, BiFunction<T, U, R> action) |
當(dāng)前和另一個(gè)給定的階段都正常完成時(shí),兩個(gè)結(jié)果作為BiFunction函數(shù)的參數(shù)執(zhí)行。 |
| <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action) |
當(dāng)這個(gè)和另一個(gè)給定的階段都正常完成時(shí),兩個(gè)結(jié)果作為提供的BiConsumer操作的參數(shù)被執(zhí)行。 |
| CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) |
當(dāng)這個(gè)和另一個(gè)給定的階段都正常完成時(shí),執(zhí)行給定的Runnable動(dòng)作。 |
這些方法也可以采用async后綴,與上一節(jié)方法集具有相同的語義。
二對(duì)一的選擇模式
最后一類模式還是二對(duì)一模式。 但是這次,不是完成兩個(gè)上游元素后再執(zhí)行下游元素,而是,并且當(dāng)兩個(gè)上游元素中其中一個(gè)完成時(shí),即可執(zhí)行下游元素。這非常有用, 例如,當(dāng)我們想要解析域名時(shí), 我們可能會(huì)發(fā)現(xiàn)查詢一組域名服務(wù)器的效率比只查詢一個(gè)域名服務(wù)器更高。 我們不想從不同的服務(wù)器獲得相同的結(jié)果,因此我們不只要其中一個(gè)服務(wù)器返回結(jié)果即可,所有其他查詢可以安全的取消。
該模式只需要在上游元素的一個(gè)結(jié)果,這些方法的名稱中都有關(guān)鍵字,因?yàn)橹粫?huì)選擇其中一個(gè),所以組合元素應(yīng)該產(chǎn)生相同類型的結(jié)果。
| Method | Description |
|---|---|
| <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) |
當(dāng)這個(gè)或另一個(gè)給定階段正常完成時(shí),執(zhí)行相應(yīng)的結(jié)果作為提供的Function函數(shù)的參數(shù)。。 |
| CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
當(dāng)這個(gè)或另一個(gè)給定階段正常完成時(shí),執(zhí)行相應(yīng)的結(jié)果作為提供的Consumer操作的參數(shù)。 |
| CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) |
當(dāng)這個(gè)或另一個(gè)給定階段正常完成時(shí),執(zhí)行給定的Runnable動(dòng)作。 |
這些方法也可以采用async后綴,與上一節(jié)方法集具有相同的語義。
示例
我們先看看幾個(gè)例子。
在Jersey中測試一個(gè)耗時(shí)的請(qǐng)求
下面是 Jersey documentation中的一段代碼.
@Path("/resource")
public class AsyncResource {
@Inject
private Executor executor;
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
executor.execute(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
}
}
這是一段基本的REST服務(wù)代碼,它調(diào)用耗時(shí)的操作。 這種情況下,典型的處理方法是在另一個(gè)線程中的異步調(diào)用耗時(shí)操作。 此方法沒有返回值; 上面的代碼時(shí)Jersey的實(shí)現(xiàn)方式。
我們?cè)谶@里又遇到了這么一個(gè)問題:我們?nèi)绾螌?duì)該方法進(jìn)行單元測試? 測試 longOperation()不是問題:我們可以單獨(dú)對(duì)該方法進(jìn)行單元測試。 我們需要在這里測試的是如何將result對(duì)象正確地傳遞給asyncResponse對(duì)象的 resume()方法。 這可以通過測試框架輕松完成,例如Mockito。 但是我們又面臨的問題如下:
- 在 "main"主線程中執(zhí)行executor.execute() 。
- 但是asyncResponse.resume()是在另一個(gè)線程中異步調(diào)用的, 同時(shí)我們無法獲取到結(jié)果.
在測試中我們需要的是在asyncResponse.resume() 后執(zhí)行的某種回調(diào),以便我們可以模擬測試。如下所示:
Mockito.verify(mockAsyncResponse).resume(result);
我們運(yùn)行這段簡單的代碼:
- 調(diào)用resume()方法
- 假設(shè)和執(zhí)行resume()是同一個(gè)線程; 那么,確信我們的模擬測試中不會(huì)出現(xiàn)任何并發(fā)問題(特別是可見性)
這時(shí)候,CompletionStage框架終于排上用場了!我們依據(jù)Runnable創(chuàng)建一個(gè)CompletionStage對(duì)象,而不是將Runnable傳遞給executor.execute()方法。
原來:
executor.submit(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
使用CompletionStage重寫:
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(() -> {
String result = longOperation();
asyncResponse.resume(result);
}, executor);
因?yàn)?em>CompletionStage可以觸發(fā)其他任務(wù),我們使用下面的代碼進(jìn)行測試:
completableFuture
.thenRun(() -> {
Mockito.verify(mockAsyncResponse).resume(result);
}
);
這段代碼完全符合我們的要求:
- 它由前一個(gè)CompletionStage的Runnable完成觸發(fā)。
- 它在同一個(gè)線程中執(zhí)行。
要實(shí)現(xiàn)此該方案,我們需要在類中創(chuàng)建一個(gè)公共方法,該類返回CompletableFuture。 如果我們修改了Jersey方法的返回類型,那么Jersey將嘗試使用此返回類型構(gòu)建響應(yīng),將其轉(zhuǎn)換為XML或JSON。 對(duì)于CompletableFuture,可能會(huì)導(dǎo)致運(yùn)行失敗。
因此,完整的測試模式如下:
- 在mocks中創(chuàng)建模擬對(duì)象:
String result = Mockito.mock(String.class);
AsyncResponse response = Mockito.mock(AsyncResponse.class);
Runnable train = () -> {
Mockito.doReturn(result).when(response).longOperation();
}
Runnable verify = () -> Mockito.verify(response).resume(result);
2、創(chuàng)建調(diào)用和驗(yàn)證對(duì)象:
Runnable callAndVerify = () -> {
asyncResource.executeAsync(response).thenRun(verify); }
3、最后創(chuàng)建要測試的任務(wù):
ExecutorService executor = Executors.newSingleThreadExecutor();
AsyncResource asyncResource = new AsyncResource();
asyncResource.setExecutorService(executor);
CompletableFuture
.runAsync(train, executor)
.thenRun(callAndVerify);
因?yàn)檫@是一個(gè)單元測試,如果在給定的時(shí)間后沒有看到響應(yīng),我們可能希望失敗。 我們可以使用CompletableFuture中對(duì)于Future接口的get()方法來實(shí)現(xiàn)。
異步分析網(wǎng)頁的鏈接
讓我們編碼實(shí)現(xiàn)如下需求:在Swing面板中顯示自動(dòng)的分析網(wǎng)頁的鏈接(異步方式)
我們需要如下幾個(gè)步驟:
1、讀取網(wǎng)頁內(nèi)容
2、獲取網(wǎng)頁鏈接
3、Swing面板中顯示鏈接
當(dāng)然,修改Swing組件應(yīng)該從合適的線程完成,但是,我們不希望在此線程中運(yùn)行長任務(wù)。
使用CompletableFuture,很簡單就能實(shí)現(xiàn)了:
CompletableFuture.supplyAsync(
() -> readPage("http://whatever.com/")
)
.thenApply(page -> linkParser.getLinks(page))
.thenAcceptAsync(
links -> displayPanel.display(links),
executor
);
第一步是創(chuàng)建異步執(zhí)行的Supplier。 比如它以String形式返回網(wǎng)頁內(nèi)容。
第二步是將獲取到的頁面內(nèi)容傳遞給linkParser. 這是一個(gè)返回List<String> 的function函數(shù). 這前兩個(gè)任務(wù)在同一個(gè)線程中執(zhí)行.
最后一步只是獲取鏈接列表并顯示。 這個(gè)任務(wù)需要訪問Swing組件,所以它應(yīng)該在Swing線程中執(zhí)行。 我們通過傳遞正確的executor作為參數(shù)來做到這一點(diǎn)。
有一點(diǎn)比較好:Executor接口是一個(gè)functional interface。 我們可以用lambda實(shí)現(xiàn)它:
Executor executor = runnable -> SwingUtilities.invokeLater(runnable);
我們可以利用方法引用語法來編寫此模式的最終版本:
CompletableFuture.supplyAsync(
() -> readPage("http://whatever.com/")
)
.thenApply(Parser::getLinks)
.thenAcceptAsync(
DisplayPanel::display,
SwingUtilities::invokeLater
);
CompletableFutures結(jié)合lambdas和方法引用可以編寫非常優(yōu)雅的代碼。
異常處理
CompletionStage API還提供了異常處理模式。 讓我們看一個(gè)例子。
假設(shè)我們有如圖所示的處理鏈:

所有這些CompletableFutures都使用我們?cè)谏厦嬲f到的模式鏈接在一起。
現(xiàn)在假設(shè)CF21引發(fā)異常。 如果沒有對(duì)此異常做處理,則所有下游的CompletableFutures都會(huì)出錯(cuò)。 這意味著兩件事:
- CF21, CF31, 和 CF41的CompletableFutures調(diào)用isCompletedExceptionally()都返回 true .
- 這些對(duì)象調(diào)用get()方法都會(huì)拋出ExecutionException, 原因是因?yàn)镃F21引發(fā)的根異常.
我們可以使用下圖所示的模式處理CompletableFutures鏈中的異常。
cf30 = cf21.exceptionally();

此模式創(chuàng)建的CompletableFuture具有以下屬性:
- 如果CF21正常完成,則CF30將透明地返回與CF21相同的值。
- 如果CF21發(fā)生異常,則CF30能夠捕獲它,并且可以將正常值傳輸?shù)紺F31。
有好幾種方法可以做到這一點(diǎn),用不同方法的接受異常。
exceptionally(Function<Throwable, T> function)是最簡單的方法調(diào)用. 它返回一個(gè)CompletionStage,如果上游CompletionStage也正常完成,則返回的CompletionStage也會(huì)以相同的值正常完成。 否則,如果此上游CompletionStage發(fā)生異常,則將此異常傳遞給提供的函數(shù)。返回的CompletionStage正常完成,返回Function的結(jié)果。 此方法沒有異步版本。
handle(BiFunction<T, Throwable, R> bifunction) 具有相同的語義. 它返回一個(gè)CompletionStage,當(dāng)此階段正常或異常完成時(shí),將使用此階段的結(jié)果和異常作為所提供函數(shù)的參數(shù)執(zhí)行。 如果上游CompletionStage正常完成,則Throwable為null調(diào)用BiFunction,如果異常完成,則R為null調(diào)用BiFunction。 在這兩種情況下,都被能正常返回的CompletionStage。該方法有兩個(gè)姐妹方法handleAsync(BiFunction<? super T,Throwable,? extends U> fn)和handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor), 這兩種方法的工作方式相同,都是異步的,只是執(zhí)行器不同。執(zhí)行器可以作為參數(shù)提供。如果沒有提供,則使用公共的fork/join 線程池。
第三種處理異常的方式是whenComplete(BiConsumer<T, Throwable> biconsumer)。handle() 可以正常結(jié)束并返回CompletionStage,而whenComplete()則不盡然。 它遵循構(gòu)建的CompletionStage的流水線行為。 因此,如果上游CompletionStage發(fā)生異常,則whenComplete()返回的CompletionStage也會(huì)異常完成(結(jié)合exceptionally()理解)。使用上游CompletionStage的返回值及其此階段返回值調(diào)用BiConsumer. 與handle()情況一樣,將使用結(jié)果(或 null如果沒有))和此階段的異常(或 null如果沒有)調(diào)用BiConsumer;BiConsumer沒有返回值。 所以它只是一個(gè)不會(huì)影響CompletionStages流水線處理的回調(diào)。 與handle()方法類似,該方法也有兩個(gè)姐妹方法whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)和whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)。這兩種方法的工作方式都是異步的。執(zhí)行器可以作為參數(shù)提供。如果沒有提供,則使用公共的fork/join 線程池。.
結(jié)論
CompletionStage 接口和CompletableFuture類帶來了異步處理數(shù)據(jù)的新方式。這個(gè)API非常復(fù)雜,主要是由于這個(gè)接口和類暴露的方法數(shù)量較多,但是,豐富的API使得我們處理異步數(shù)據(jù)流水線方面有了更多的選擇,以便更好的滿足應(yīng)用程序的需求。
這些API基于lambda表達(dá)式構(gòu)建,從而創(chuàng)造非常干凈且優(yōu)雅的代碼。 它可以很好地控制哪個(gè)線程執(zhí)行每個(gè)任務(wù)。 它還允許以多種方式構(gòu)建流水線和組合任務(wù),并且在處理異常方面也提供對(duì)應(yīng)的方式方法。