??這篇文章介紹了CompletableFuture 類的功能和一些使用實例。在我們介紹開始之前,先來了解一下這個類的背景。在JAVA中,一個異步任務(wù)的調(diào)用可以使用Threads。然而,為了獲得最佳性能,需要仔細規(guī)劃業(yè)務(wù)流程中的各個步驟的編排,這對于不了解JAVA整個并發(fā)體系的人來說,非常容易出錯。如果JAVA提供了一個即用的容器來連接一系列任務(wù),并且能為任務(wù)的運行提供并發(fā)性但是卻不用編寫復(fù)雜的多線程代碼呢?CompletableFuture就是這樣一個別致的小東西。
創(chuàng)建CompletableFuture對象。
??我們可以直接new一個對象出來,也可以使用CompletableFuture為我們提供的靜態(tài)方法。
??注意:這種方法直接new出來的CompletableFuture對象是無法運行的,因為他并沒有處于一個“完成”狀態(tài),也就是說你調(diào)用get()方法是會被阻塞的。
CompletableFuture futrue = new CompletableFuture();
??推薦下面這種方法,使用CompletableFuture的靜態(tài)方法completedFuture(U value)。直接會拿到一個“完成”狀態(tài)的對象,當(dāng)用get()方法拿值時,你拿到的值也就是value。
String expectedValue = "the expected value";
CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);
assertThat(alreadyCompleted.get(), is(expectedValue));
開始運行我們的第一個Task
- runAsync(Runnable)
- runAsync(Runnable,Executor)
- supplyAsync(Supplier<U>)
- supplyAsync(Supplier<U>,Executor)
這里有2種方法來初始化我們的異步任務(wù)——使用runAsync()或者supplyAsnync()。先上一段代碼:
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
System.out.printf("[%s] I am Cool\n", Thread.currentThread().getName());
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.printf("[%s] Am Awesome\n", Thread.currentThread().getName());
return null;
});
打印結(jié)果:
[ForkJoinPool.commonPool-worker-3] I am Cool
[ForkJoinPool.commonPool-worker-3] Am Awesome
??從上面的例子中可以看出,有2種初始化CompletableFuture對象并運行我們的異步任務(wù)的方法。使用runAsync()和supplyAsync()??梢院苋菀椎目闯鰜韮烧咧g的差別,supplyAsync()有返回值,這個返回值可以用于被下一個任務(wù)鏈結(jié)點所消費,后面我們會講到。除此之外,上述方法還提供了重載方法,當(dāng)我們傳入Executor時,該Task會使用傳入的Executor去執(zhí)行,否則默認去執(zhí)行任務(wù)的線程池就是fork-join thread pool,關(guān)于該線程池,暫不贅述。
PS:我個人覺得supplyAsync方法中傳入Callable比傳入Supplier更合適。它們倆都是函數(shù)式接口,但是Callable和異步任務(wù)的聯(lián)系更緊,并且可以拋出非運行時異常。
構(gòu)造任務(wù)鏈
??上述方法中我們只是異步的去執(zhí)行了一個任務(wù),如果我們想拿到這個任務(wù)的執(zhí)行結(jié)果,并執(zhí)行后面的任務(wù)呢?或者當(dāng)該任務(wù)運行拋出異常時我們想來處理這些異常時呢(后面講)?
??CompletableFuture為我們提供了幾十種方法來構(gòu)造任務(wù)鏈,這些任務(wù)鏈的構(gòu)造過程類似于Stream.map()方法,下面將結(jié)合實例詳細講解。
-
thenAccept(Consumer<T>)
該方法是非靜態(tài)方法,是用來消費上一個任務(wù)運行之后的結(jié)點的返回值的??梢詫⒃摲椒ㄖ械腃onsumer參數(shù)視為上一個任務(wù)在完成時要調(diào)用的回調(diào)函數(shù)。
??
Tip:該任務(wù)有兩種方式來執(zhí)行。第一種是如果當(dāng)前線程調(diào)用thenAccept方法時,上一個任務(wù)還沒執(zhí)行完成時(這并不影響任務(wù)鏈執(zhí)行的順序性,因為這些任務(wù)都被存放在一個容器中),這時候調(diào)用此任務(wù)的線程就是上一個執(zhí)行上任務(wù)的線程(ForkJoinPool-Thread);第二種是如果當(dāng)前線程調(diào)用該方法時,上一個任務(wù)執(zhí)行完成,這時候調(diào)用到了thenAccept方法,那么此任務(wù)會被調(diào)用thenAccept方法的線程(Main)所調(diào)用
請看下面的例子:
while (true) {
{
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I am Cool").thenAccept(msg ->
System.out.printf("[%s] %s and am also Awesome\n", Thread.currentThread().getName(), msg));
try {
cf.get();
} catch (Exception ex) {
ex.printStackTrace(System.err);
}
}
}
可以看到有多個運行結(jié)果:
[ForkJoinPool.commonPool-worker-2] I am Cool and am also Awesome
[main] I am Cool and am also Awesome
-
thenApply(Function<T, U>)
該方法接收一個Function參數(shù),T類型是上一個結(jié)點傳入,需要被消費的值,U類型是生成的,會被輸出的值。此Function會在上一個任務(wù)執(zhí)行結(jié)束時被調(diào)用。調(diào)用該Function的線程使用規(guī)則請參考上述的Tip。以下是調(diào)用該線程的實例:
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")
/*這里的msg就是上一個異步任務(wù)的返回結(jié)果:I'm Awesome*/
.thenApply(msg -> String.format("%s and am Super COOL !!!", msg))
.thenAccept(msg -> System.out.printf("%s\n", msg));
輸出結(jié)果:
I'm Awesome and am Super COOL !!!
對比于thenApply,thenAccept更適用于作為任務(wù)鏈的結(jié)尾。
-
thenCombine(CompletionStage<U>, BiFunction<T, U, R>)
該方法能組合2個彼此獨立的異步任務(wù)的輸出。該方法接收2個參數(shù),一個CompletionStage引用和一個BiFunction,類型T和類型U就是2個異步任務(wù)的輸入,類型R為輸出值。BiFunction會在上一個任務(wù)和傳入第一個參數(shù)(CompletionStage)完成時被調(diào)用。下面給出實例代碼:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Stunning", executor)
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "am New !!!"),
(s1, s2) -> String.format("%s AND %s", s1, s2), executor)
.thenAcceptAsync(msg ->
System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg), executor);
輸出結(jié)果:
[pool-1-thread-3] I'm Stunning AND am New !!!
注意這里用的是自定義線程池。當(dāng)線程池在任務(wù)執(zhí)行結(jié)束之前shutdown則會拋出rejectedExecution。
-
thenCompose(Function<T, CompletionStage<U>>)
該方法接收一個Function,也是用來消費上一個任務(wù)結(jié)點的,不過返回的是CompletionStage<U>,當(dāng)這個CompletionStage<U>執(zhí)行結(jié)束時,它會返回一個類型U的值。請看下面代碼:
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Smart")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " & am NIMBLE !!!"))
.thenAccept(msg ->
System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));
輸出結(jié)果:
I'm Smart & am NIMBLE !!!
thenCompose VS thenApply
這兩個方法都是接收一個參數(shù)并且返回的是CompletableFuture對象。它們的不同之處在于Function的返回值,thenCompose返回的是CompletableFuture,是一個你自己已經(jīng)包裝好的對象;而thenApply返回的是值,它底層會將這個值包裝成對象返回給你。這就類似于Optianal中faltMap()和map()之間的區(qū)別。如果你想鏈接一個已經(jīng)存在的返回CompletableFuture的方法,thenCompose是一個更好的選擇,如下:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
-
thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>)
該方法作用如其名,該方法的功能可以為2個不相關(guān)的并行執(zhí)行的異步任務(wù)都執(zhí)行完成時,提供回調(diào)。第一個傳入的參數(shù)CompletionStage<U>就是其中一個需要執(zhí)行的異步任務(wù),BiConsumer<T, U>這個參數(shù)就是當(dāng)2個異步任務(wù)都執(zhí)行完成時執(zhí)行的回調(diào)函數(shù)。類型T是上一個任務(wù)的返回值,U就是傳入的任務(wù)完成時的返回值。下面請看代碼:
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Nimble !!!");
CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (s1, s2) ->
System.out.printf("[%s] %s and %s\n", Thread.currentThread().getName(), s1, s2));
輸出結(jié)果:
[main] I am Fast and am Nimble !!!
-
acceptEither(CompletionStage<T>, Consumer<T>)
該方法接收2個參數(shù):一個CompletionStage的引用和一個Consumer函數(shù)式接口。該回調(diào)函數(shù)會在上一個任務(wù)執(zhí)行完成或第一個傳入的參數(shù)執(zhí)行完成時被調(diào)用。返回值為CompletableFuture<Void>。下面請看代碼:
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Awesome";
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Cool";
});
CompletableFuture cf3 = cf1.acceptEither(cf2, msg ->
System.out.printf("[%s] %s and am NIMBLE !!!\n", Thread.currentThread().getName(), msg));
可能的輸出結(jié)果:
[ForkJoinPool.commonPool-worker-9] I am Awesome and am NIMBLE !!!
或者
[ForkJoinPool.commonPool-worker-2] I am Cool and am NIMBLE !!!
- applyToEither(CompletionStage<T>, Function<T, U>)該方法和上述方法類型,兩個異步任務(wù)的其中一個執(zhí)行完成時會調(diào)用Function,不過返回值為CompletableFuture<U>。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Awesome";
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Bold";
});
CompletableFuture cf3 = cf1.applyToEither(cf2, msg -> String.format("%s and am Cool !!!", msg))
.thenAccept(msg -> System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));
可能出現(xiàn)以下結(jié)果
[ForkJoinPool.commonPool-worker-9] I am Awesome and am Cool !!!
或者
[ForkJoinPool.commonPool-worker-2] I am Bold and am Cool !!!
值得注意的是:以上兩種"Either"方法在執(zhí)行完成時,另一個還沒執(zhí)行完的任務(wù)會繼續(xù)執(zhí)行。
還有2個靜態(tài)方法和Either,Both類方法大同小異
- CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 會執(zhí)行cfs中所有的異步任務(wù)。
- CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 返回cfs中第一個執(zhí)行完成的任務(wù),其他任務(wù)都會繼續(xù)執(zhí)行。此處便不再贅述。
下面來考一考大家,請看下面一道代碼題,輸出該任務(wù)鏈的最終值:
Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());
CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");
CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");
CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);
CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);
//simulatedTask第一個參數(shù)為執(zhí)行時間,第二個參數(shù)為返回值。
CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));
CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);
CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));
CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);
CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);
CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);
答案會放在文末。
至此,我們講述了絕大大部分構(gòu)造任務(wù)鏈的方法,這些方法能讓我們不斷地向后傳遞不同的返回值,并且保證了任務(wù)鏈的順序性。
任務(wù)鏈完成時的回調(diào)和異常處理
在講本節(jié)內(nèi)容之前我們先來看2個方法。
- T get() throws InterruptedException, ExecutionException
-
T join()
這兩個方法都是從CompletableFuture里面取值的,調(diào)用時會阻塞當(dāng)前線程??梢院苋菀卓闯鰆oin沒有拋出非運行時異常,不過它會拋出2個運行時異常:當(dāng)這個任務(wù)被取消時會拋出CancellationException;當(dāng)任務(wù)執(zhí)行時拋出異常時,會拋出CompletionException。當(dāng)我們需要從任務(wù)中拿值時,推薦join()方法。
我們知道上面兩種方法在調(diào)用時都會阻塞當(dāng)前線程,如果想繼續(xù)向下運行,則必須中斷或取消任務(wù),但是如果我們想正常的結(jié)束任務(wù),或者在拿值時如果值不存在,則拋出自定義的異常或返回默認值呢?CompletableFuture中為我們提供以下解決方法。
-
T getNow(T valueIfAbsent)
該方法不阻塞,如果任務(wù)尚未完成,則返回默認值。 -
boolean complete(T)
如果任務(wù)未完成,則在拿取返回值時返回T。如果此次調(diào)用將CompletableFuture轉(zhuǎn)化為“完成”狀態(tài),則返回true,否則返回false。意思是如果通過get()或join()拿到的是類型T的值,則返回true,否則返回false。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
/*循環(huán)*/
infiniteLoop();
return "I am Awesome";
});
System.out.println("complete:"+cf1.complete("Default"));
System.out.println("isDone:"+cf1.isDone());
System.out.println("result:"+cf1.join());
輸出結(jié)果:
complete:true
isDone:true
result:Default
-
boolean completeExceptionally(Throwable)
如果任務(wù)未完成,則在拿值時拋出Throwable異常,該異??梢宰远x。如果此次調(diào)用將CompletableFuture轉(zhuǎn)化為“完成”狀態(tài),則返回true,否則返回false。如果你想進一步傳遞某個異常,可以使用該方法。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
/*循環(huán)*/
infiniteLoop();
return "I am Awesome";
});
System.out.println("complete:"+cf1.completeExceptionally(new RuntimeException("completeExceptionally")));
System.out.println("isDone:"+cf1.isDone());
System.out.println("isCompletedExceptionally:"+cf1.isCompletedExceptionally());
System.out.println("result:"+cf1.join());
輸出結(jié)果:
complete:true
isDone:true
Exception in thread "main"
isCompletedExceptionally:true
java.util.concurrent.CompletionException: java.lang.RuntimeException: completeExceptionally
at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
at CompletableFutureSample.main(CompletableFutureSample.java:27)
Caused by: java.lang.RuntimeException: completeExceptionally
at CompletableFutureSample.main(CompletableFutureSample.java:24)
上述方法在構(gòu)建我們不想等待太多時間的健壯系統(tǒng)時很有用。
obtrudeValue(T value)
obtrudeException(Throwable ex)
使用這兩個方法可以強制地將值設(shè)置或者將異常拋出,無論該之前任務(wù)是否完成。這兩個方法類似于complete和completeExceptionally,但是complete的功能是如果任務(wù)未完成才返回設(shè)定的值。在某些場景下,你有可能想放棄該任務(wù)的執(zhí)行,發(fā)出一個失敗的信號。請謹(jǐn)慎使用這兩個方法,因為他們會覆蓋前一個Future的值(或異常)。CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> )
該方法的作用是:當(dāng)前面的任務(wù)出了異常時,就會返回T值;否則還是返回原先前面任務(wù)應(yīng)該返回的值。請看下面的代碼:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
return user.getName();
});
cf1.exceptionally(ex -> ex.getMessage())
/*如果拋出了異常,這里傳遞給下一個結(jié)點的值是ex.getMessage()*/
.thenAccept(System.out::println);
/*如果使用get或join拿里面的值的話,如果任務(wù)有異常,會拋出CompletionException異常的*/
System.out.println("isDone:" + cf1.isDone());
System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());
輸出結(jié)果:
/*user為空時的打印結(jié)果*/
java.lang.NullPointerException
isDone:true
isCompletedExceptionally:true
/*正常的打印結(jié)果*/
Jack
isDone:true
isCompletedExceptionally:false
-
handle(BiFunction<T, Throwable, U>)
這個方法有點類似于thenApply和exceptionally的結(jié)合,如果上一個任務(wù)出了異常,則傳入的T類型為null,Throwable為拋出的異常;如果正常運行,則T類型為運行后的返回值,Throwable為null。
下面用handle來還原上個例子的代碼的功能:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
return user.getName();
}).handle((v,ex)->{
if(v==null)
return "user is null";
return v;
}).thenAccept(System.out::println);
/*如果使用get或join拿里面的值的話,如果任務(wù)有異常,會拋出CompletionException異常的*/
System.out.println("isDone:" + cf1.isDone());
/*使用handle時,無論出不出異常,該值都為false。*/
System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());
/*user為空時的打印結(jié)果*/
user is null!
isDone:true
isCompletedExceptionally:false
/*正常的打印結(jié)果*/
Jack
isDone:true
isCompletedExceptionally:false
-
whenComplete(BiConsumer<T, Throwable>)
該方法有點像thenAccept(Consumer<T>) ,只不過增加了一個異常處理的功能。傳入的參數(shù)類似于handle,如果上一個任務(wù)出了異常,則傳入的T類型為null,Throwable為拋出的異常;如果正常運行,則T類型為之前任務(wù)運行后的返回值,Throwable為null。下面上代碼:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
return user.getName();
}).handle((v,ex)->{
if(v==null)
System.out.println("user is null");
else
System.out.println(v);
}).thenAccept(System.out::println);
輸出結(jié)果和上例相同,為了簡潔就不展示了。
Async Methods
CompletableFuture API為絕大部分的方法提供了2個額外的方法變體,它們后綴名都是“Async”。這些異步方法都會使用線程池來執(zhí)行,如果傳入的線程池為null,則還會使用默認的fork/join pool來執(zhí)行任務(wù),這可以更有效率地提高你任務(wù)的并發(fā)性。
總結(jié)
??這篇文章我們總結(jié)了CompletableFuture API的功能,從創(chuàng)建對象,到構(gòu)造任務(wù)鏈,最后再到異常的處理。
答案
THE QUICK BROWN FOX JUMPED OVER the lazy dog