CompletableFuture API詳解

??這篇文章介紹了CompletableFuture 類的功能和一些使用實例。在我們介紹開始之前,先來了解一下這個類的背景。在JAVA中,一個異步任務(wù)的調(diào)用可以使用Threads。然而,為了獲得最佳性能,需要仔細規(guī)劃業(yè)務(wù)流程中的各個步驟的編排,這對于不了解JAVA整個并發(fā)體系的人來說,非常容易出錯。如果JAVA提供了一個即用的容器來連接一系列任務(wù),并且能為任務(wù)的運行提供并發(fā)性但是卻不用編寫復(fù)雜的多線程代碼呢?CompletableFuture就是這樣一個別致的小東西。

創(chuàng)建CompletableFuture對象。

??我們可以直接new一個對象出來,也可以使用CompletableFuture為我們提供的靜態(tài)方法。
??注意:這種方法直接new出來的CompletableFuture對象是無法運行的,因為他并沒有處于一個“完成”狀態(tài),也就是說你調(diào)用get()方法是會被阻塞的。

CompletableFuture futrue = new CompletableFuture();

??推薦下面這種方法,使用CompletableFuture的靜態(tài)方法completedFuture(U value)。直接會拿到一個“完成”狀態(tài)的對象,當(dāng)用get()方法拿值時,你拿到的值也就是value。

String expectedValue = "the expected value";
CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);
assertThat(alreadyCompleted.get(), is(expectedValue));

開始運行我們的第一個Task

  • runAsync(Runnable)
  • runAsync(Runnable,Executor)
  • supplyAsync(Supplier<U>)
  • supplyAsync(Supplier<U>,Executor)

這里有2種方法來初始化我們的異步任務(wù)——使用runAsync()或者supplyAsnync()。先上一段代碼:

CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
         System.out.printf("[%s] I am Cool\n", Thread.currentThread().getName());
     });

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
         System.out.printf("[%s] Am Awesome\n", Thread.currentThread().getName());
         return null;
      });

打印結(jié)果:

[ForkJoinPool.commonPool-worker-3] I am Cool
[ForkJoinPool.commonPool-worker-3] Am Awesome

??從上面的例子中可以看出,有2種初始化CompletableFuture對象并運行我們的異步任務(wù)的方法。使用runAsync()supplyAsync()??梢院苋菀椎目闯鰜韮烧咧g的差別,supplyAsync()有返回值,這個返回值可以用于被下一個任務(wù)鏈結(jié)點所消費,后面我們會講到。除此之外,上述方法還提供了重載方法,當(dāng)我們傳入Executor時,該Task會使用傳入的Executor去執(zhí)行,否則默認去執(zhí)行任務(wù)的線程池就是fork-join thread pool,關(guān)于該線程池,暫不贅述。

PS:我個人覺得supplyAsync方法中傳入Callable比傳入Supplier更合適。它們倆都是函數(shù)式接口,但是Callable和異步任務(wù)的聯(lián)系更緊,并且可以拋出非運行時異常。

構(gòu)造任務(wù)鏈

??上述方法中我們只是異步的去執(zhí)行了一個任務(wù),如果我們想拿到這個任務(wù)的執(zhí)行結(jié)果,并執(zhí)行后面的任務(wù)呢?或者當(dāng)該任務(wù)運行拋出異常時我們想來處理這些異常時呢(后面講)?
??CompletableFuture為我們提供了幾十種方法來構(gòu)造任務(wù)鏈,這些任務(wù)鏈的構(gòu)造過程類似于Stream.map()方法,下面將結(jié)合實例詳細講解。

  • thenAccept(Consumer<T>)
    該方法是非靜態(tài)方法,是用來消費上一個任務(wù)運行之后的結(jié)點的返回值的??梢詫⒃摲椒ㄖ械腃onsumer參數(shù)視為上一個任務(wù)在完成時要調(diào)用的回調(diào)函數(shù)。
    ??

Tip:該任務(wù)有兩種方式來執(zhí)行。第一種是如果當(dāng)前線程調(diào)用thenAccept方法時,上一個任務(wù)還沒執(zhí)行完成時(這并不影響任務(wù)鏈執(zhí)行的順序性,因為這些任務(wù)都被存放在一個容器中),這時候調(diào)用此任務(wù)的線程就是上一個執(zhí)行上任務(wù)的線程(ForkJoinPool-Thread);第二種是如果當(dāng)前線程調(diào)用該方法時,上一個任務(wù)執(zhí)行完成,這時候調(diào)用到了thenAccept方法,那么此任務(wù)會被調(diào)用thenAccept方法的線程(Main)所調(diào)用

請看下面的例子:

while (true) {
            {
                CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I am Cool").thenAccept(msg ->
                        System.out.printf("[%s] %s and am also Awesome\n", Thread.currentThread().getName(), msg));
                try {
                    cf.get();
                } catch (Exception ex) {
                    ex.printStackTrace(System.err);
                }
            }
        }

可以看到有多個運行結(jié)果:

[ForkJoinPool.commonPool-worker-2] I am Cool and am also Awesome
[main] I am Cool and am also Awesome
  • thenApply(Function<T, U>)
    該方法接收一個Function參數(shù),T類型是上一個結(jié)點傳入,需要被消費的值,U類型是生成的,會被輸出的值。此Function會在上一個任務(wù)執(zhí)行結(jié)束時被調(diào)用。調(diào)用該Function的線程使用規(guī)則請參考上述的Tip。以下是調(diào)用該線程的實例:
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")
/*這里的msg就是上一個異步任務(wù)的返回結(jié)果:I'm Awesome*/
        .thenApply(msg -> String.format("%s and am Super COOL !!!", msg))
        .thenAccept(msg -> System.out.printf("%s\n", msg));

輸出結(jié)果:

I'm Awesome and am Super COOL !!!

對比于thenApply,thenAccept更適用于作為任務(wù)鏈的結(jié)尾。

  • thenCombine(CompletionStage<U>, BiFunction<T, U, R>)
    該方法能組合2個彼此獨立的異步任務(wù)的輸出。該方法接收2個參數(shù),一個CompletionStage引用和一個BiFunction,類型T和類型U就是2個異步任務(wù)的輸入,類型R為輸出值。BiFunction會在上一個任務(wù)和傳入第一個參數(shù)(CompletionStage)完成時被調(diào)用。下面給出實例代碼:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Stunning", executor)
        .thenCombineAsync(CompletableFuture.supplyAsync(() -> "am New !!!"),
            (s1, s2) -> String.format("%s AND %s", s1, s2), executor)
         .thenAcceptAsync(msg -> 
                System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg), executor);

輸出結(jié)果:

[pool-1-thread-3] I'm Stunning AND am New !!!

注意這里用的是自定義線程池。當(dāng)線程池在任務(wù)執(zhí)行結(jié)束之前shutdown則會拋出rejectedExecution。

  • thenCompose(Function<T, CompletionStage<U>>)
    該方法接收一個Function,也是用來消費上一個任務(wù)結(jié)點的,不過返回的是CompletionStage<U>,當(dāng)這個CompletionStage<U>執(zhí)行結(jié)束時,它會返回一個類型U的值。請看下面代碼:
 CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Smart")
         .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " & am NIMBLE !!!"))
         .thenAccept(msg -> 
         System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));

輸出結(jié)果:

I'm Smart & am NIMBLE !!!

thenCompose VS thenApply
這兩個方法都是接收一個參數(shù)并且返回的是CompletableFuture對象。它們的不同之處在于Function的返回值,thenCompose返回的是CompletableFuture,是一個你自己已經(jīng)包裝好的對象;而thenApply返回的是值,它底層會將這個值包裝成對象返回給你。這就類似于Optianal中faltMap()和map()之間的區(qū)別。如果你想鏈接一個已經(jīng)存在的返回CompletableFuture的方法,thenCompose是一個更好的選擇,如下:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
  • thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>)
    該方法作用如其名,該方法的功能可以為2個不相關(guān)的并行執(zhí)行的異步任務(wù)執(zhí)行完成時,提供回調(diào)。第一個傳入的參數(shù)CompletionStage<U>就是其中一個需要執(zhí)行的異步任務(wù),BiConsumer<T, U>這個參數(shù)就是當(dāng)2個異步任務(wù)都執(zhí)行完成時執(zhí)行的回調(diào)函數(shù)。類型T是上一個任務(wù)的返回值,U就是傳入的任務(wù)完成時的返回值。下面請看代碼:
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Nimble !!!");
CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (s1, s2) -> 
System.out.printf("[%s] %s and %s\n", Thread.currentThread().getName(), s1, s2));

輸出結(jié)果:

[main] I am Fast and am Nimble !!!
  • acceptEither(CompletionStage<T>, Consumer<T>)
    該方法接收2個參數(shù):一個CompletionStage的引用和一個Consumer函數(shù)式接口。該回調(diào)函數(shù)會在上一個任務(wù)執(zhí)行完成第一個傳入的參數(shù)執(zhí)行完成時被調(diào)用。返回值為CompletableFuture<Void>。下面請看代碼:
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
        randomDelay();
        return "I am Awesome";
            });
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
        randomDelay();
        return "I am Cool";
            });
CompletableFuture cf3 = cf1.acceptEither(cf2, msg ->
System.out.printf("[%s] %s and am NIMBLE !!!\n", Thread.currentThread().getName(), msg));

可能的輸出結(jié)果:

[ForkJoinPool.commonPool-worker-9] I am Awesome and am NIMBLE !!!
或者
[ForkJoinPool.commonPool-worker-2] I am Cool and am NIMBLE !!!
  • applyToEither(CompletionStage<T>, Function<T, U>)該方法和上述方法類型,兩個異步任務(wù)的其中一個執(zhí)行完成時會調(diào)用Function,不過返回值為CompletableFuture<U>。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
      randomDelay();
      return "I am Awesome";
      });
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
      randomDelay();
      return "I am Bold";
      });
CompletableFuture cf3 = cf1.applyToEither(cf2, msg -> String.format("%s and am Cool !!!", msg))
            .thenAccept(msg -> System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));

可能出現(xiàn)以下結(jié)果

[ForkJoinPool.commonPool-worker-9] I am Awesome and am Cool !!!
或者
[ForkJoinPool.commonPool-worker-2] I am Bold and am Cool !!!

值得注意的是:以上兩種"Either"方法在執(zhí)行完成時,另一個還沒執(zhí)行完的任務(wù)會繼續(xù)執(zhí)行。

還有2個靜態(tài)方法和Either,Both類方法大同小異
  • CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 會執(zhí)行cfs中所有的異步任務(wù)。
  • CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 返回cfs中第一個執(zhí)行完成的任務(wù),其他任務(wù)都會繼續(xù)執(zhí)行。此處便不再贅述。

下面來考一考大家,請看下面一道代碼題,輸出該任務(wù)鏈的最終值:

        Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());
        CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");
        CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");
        CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);
        CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);
        //simulatedTask第一個參數(shù)為執(zhí)行時間,第二個參數(shù)為返回值。
        CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));
        CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);
        CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));
        CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);
        CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);
        CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);

答案會放在文末。

至此,我們講述了絕大大部分構(gòu)造任務(wù)鏈的方法,這些方法能讓我們不斷地向后傳遞不同的返回值,并且保證了任務(wù)鏈的順序性。

任務(wù)鏈完成時的回調(diào)和異常處理

在講本節(jié)內(nèi)容之前我們先來看2個方法。

  • T get() throws InterruptedException, ExecutionException
  • T join()
    這兩個方法都是從CompletableFuture里面取值的,調(diào)用時會阻塞當(dāng)前線程??梢院苋菀卓闯鰆oin沒有拋出非運行時異常,不過它會拋出2個運行時異常:當(dāng)這個任務(wù)被取消時會拋出CancellationException;當(dāng)任務(wù)執(zhí)行時拋出異常時,會拋出CompletionException。當(dāng)我們需要從任務(wù)中拿值時,推薦join()方法。

我們知道上面兩種方法在調(diào)用時都會阻塞當(dāng)前線程,如果想繼續(xù)向下運行,則必須中斷或取消任務(wù),但是如果我們想正常的結(jié)束任務(wù),或者在拿值時如果值不存在,則拋出自定義的異常或返回默認值呢?CompletableFuture中為我們提供以下解決方法。

  • T getNow(T valueIfAbsent)
    該方法不阻塞,如果任務(wù)尚未完成,則返回默認值。
  • boolean complete(T)
    如果任務(wù)未完成,則在拿取返回值時返回T。如果此次調(diào)用將CompletableFuture轉(zhuǎn)化為“完成”狀態(tài),則返回true,否則返回false。意思是如果通過get()或join()拿到的是類型T的值,則返回true,否則返回false。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                /*循環(huán)*/
                infiniteLoop();
                return "I am Awesome";
            });
System.out.println("complete:"+cf1.complete("Default"));
System.out.println("isDone:"+cf1.isDone());
System.out.println("result:"+cf1.join());

輸出結(jié)果:

complete:true
isDone:true
result:Default
  • boolean completeExceptionally(Throwable)
    如果任務(wù)未完成,則在拿值時拋出Throwable異常,該異??梢宰远x。如果此次調(diào)用將CompletableFuture轉(zhuǎn)化為“完成”狀態(tài),則返回true,否則返回false。如果你想進一步傳遞某個異常,可以使用該方法。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
       /*循環(huán)*/
       infiniteLoop();
       return "I am Awesome";
       });
System.out.println("complete:"+cf1.completeExceptionally(new RuntimeException("completeExceptionally")));
System.out.println("isDone:"+cf1.isDone());
System.out.println("isCompletedExceptionally:"+cf1.isCompletedExceptionally());
System.out.println("result:"+cf1.join());

輸出結(jié)果:

complete:true
isDone:true
Exception in thread "main" 
isCompletedExceptionally:true
java.util.concurrent.CompletionException: java.lang.RuntimeException: completeExceptionally
    at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
    at CompletableFutureSample.main(CompletableFutureSample.java:27)
Caused by: java.lang.RuntimeException: completeExceptionally
    at CompletableFutureSample.main(CompletableFutureSample.java:24)

上述方法在構(gòu)建我們不想等待太多時間的健壯系統(tǒng)時很有用。

  • obtrudeValue(T value)

  • obtrudeException(Throwable ex)
    使用這兩個方法可以強制地將值設(shè)置或者將異常拋出,無論該之前任務(wù)是否完成。這兩個方法類似于complete和completeExceptionally,但是complete的功能是如果任務(wù)未完成才返回設(shè)定的值。在某些場景下,你有可能想放棄該任務(wù)的執(zhí)行,發(fā)出一個失敗的信號。請謹(jǐn)慎使用這兩個方法,因為他們會覆蓋前一個Future的值(或異常)。

  • CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> )
    該方法的作用是:當(dāng)前面的任務(wù)出了異常時,就會返回T值;否則還是返回原先前面任務(wù)應(yīng)該返回的值。請看下面的代碼:

CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
      return user.getName();
      });
cf1.exceptionally(ex -> ex.getMessage())
    /*如果拋出了異常,這里傳遞給下一個結(jié)點的值是ex.getMessage()*/
    .thenAccept(System.out::println);
    /*如果使用get或join拿里面的值的話,如果任務(wù)有異常,會拋出CompletionException異常的*/
System.out.println("isDone:" + cf1.isDone());
System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());

輸出結(jié)果:

/*user為空時的打印結(jié)果*/
java.lang.NullPointerException
isDone:true
isCompletedExceptionally:true
/*正常的打印結(jié)果*/
Jack
isDone:true
isCompletedExceptionally:false
  • handle(BiFunction<T, Throwable, U>)
    這個方法有點類似于thenApply和exceptionally的結(jié)合,如果上一個任務(wù)出了異常,則傳入的T類型為null,Throwable為拋出的異常;如果正常運行,則T類型為運行后的返回值,Throwable為null。
    下面用handle來還原上個例子的代碼的功能:
 CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
         return user.getName();
      }).handle((v,ex)->{
          if(v==null)
              return "user is null";
          return v;
          }).thenAccept(System.out::println);
            /*如果使用get或join拿里面的值的話,如果任務(wù)有異常,會拋出CompletionException異常的*/
System.out.println("isDone:" + cf1.isDone());
            /*使用handle時,無論出不出異常,該值都為false。*/
System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());
/*user為空時的打印結(jié)果*/
user is null!
isDone:true
isCompletedExceptionally:false
/*正常的打印結(jié)果*/
Jack
isDone:true
isCompletedExceptionally:false
  • whenComplete(BiConsumer<T, Throwable>)
    該方法有點像thenAccept(Consumer<T>) ,只不過增加了一個異常處理的功能。傳入的參數(shù)類似于handle,如果上一個任務(wù)出了異常,則傳入的T類型為null,Throwable為拋出的異常;如果正常運行,則T類型為之前任務(wù)運行后的返回值,Throwable為null。下面上代碼:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
                return user.getName();
     }).handle((v,ex)->{
            if(v==null)
                 System.out.println("user is null");
            else
                 System.out.println(v);
       }).thenAccept(System.out::println);

輸出結(jié)果和上例相同,為了簡潔就不展示了。

Async Methods

CompletableFuture API為絕大部分的方法提供了2個額外的方法變體,它們后綴名都是“Async”。這些異步方法都會使用線程池來執(zhí)行,如果傳入的線程池為null,則還會使用默認的fork/join pool來執(zhí)行任務(wù),這可以更有效率地提高你任務(wù)的并發(fā)性。

總結(jié)

??這篇文章我們總結(jié)了CompletableFuture API的功能,從創(chuàng)建對象,到構(gòu)造任務(wù)鏈,最后再到異常的處理。

答案

THE QUICK BROWN FOX JUMPED OVER the lazy dog

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 在現(xiàn)代軟件開發(fā)中,系統(tǒng)功能越來越復(fù)雜,管理復(fù)雜度的方法就是分而治之,系統(tǒng)的很多功能可能會被切分為小的服務(wù),對外提供...
    天堂鳥6閱讀 7,291評論 0 23
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,621評論 1 32
  • Java 8 CompletableFuture Java 8 有大量的新特性和增強如 Lambda 表達式,St...
    單純小碼農(nóng)閱讀 2,209評論 0 8
  • Java 8 有大量的新特性和增強如 Lambda 表達式,Streams,CompletableFuture等。...
    YDDMAX_Y閱讀 4,849評論 0 15
  • 為什么有男人和女人呢?他們是那樣的不同,不能相互理解,但又相互愛戀、依靠,必然互相傷害。有時候我想,設(shè)計男女這樣一...
    孤獨小說家閱讀 460評論 0 0

友情鏈接更多精彩內(nèi)容