Java:CompletableFuture一文搞定

Prefix

CompletableFuture本身是為了解決任務(wù)的執(zhí)行和銜接。在后面我們可以看到,CompletableFuture的功能就是分派任務(wù)到線程,并以近乎于pipeline的概念將各任務(wù)進(jìn)行編排,下游任務(wù)可以獲取上游任務(wù)的結(jié)果。從方法名上我們可見pipeline

complete -> whenComplete -> then* 

后面我們會(huì)從例子探索pipeline的概念。

注意以Async結(jié)尾的方法:

不以Async結(jié)尾的方法由原來的線程計(jì)算;
以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運(yùn)行。

About CompletableFuture

同步式
盡管Future可以代表在另外的線程中執(zhí)行的一段異步代碼,但是你還是可以在本身線程(例如main)中執(zhí)行:

CompletableFuture<Integer> future = new CompletableFuture<>();

上面的代碼中future沒有關(guān)聯(lián)任何的Callback、線程池、異步任務(wù)等,如果客戶端調(diào)用future.get就會(huì)一直傻等下去。

future.get(); 
//一直等下去。源碼可以看到get調(diào)用了waitingGet,是循環(huán)獲取結(jié)果。
//因?yàn)闆]有調(diào)用complete這種方法生成結(jié)果,它就要一直循環(huán)

你可以通過下面的代碼完成一個(gè)計(jì)算,觸發(fā)客戶端的等待:

future.complete(100);

CompletableFuture.complete()只能調(diào)用一次,后續(xù)調(diào)用將被忽略。但也有一個(gè)后門叫做CompletableFuture.obtrudeValue(…)覆蓋一個(gè)新Future之前的價(jià)值,請小心使用。
Java 8:CompletableFuture終極指南

異步式
以下四個(gè)靜態(tài)方法用來為一段異步執(zhí)行的代碼創(chuàng)建CompletableFuture對象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

當(dāng)運(yùn)行完成時(shí)的回調(diào)操作

  • 當(dāng)運(yùn)行時(shí)出現(xiàn)了異常,可以通過exceptionally進(jìn)行補(bǔ)償(消化)
  • whenComplete,當(dāng)計(jì)算完成,或者拋出異常的時(shí)候,可以對其結(jié)果或異常進(jìn)行消費(fèi)
  • handle,當(dāng)計(jì)算完成,或者拋出異常的時(shí)候,可以對其結(jié)果或異常進(jìn)行轉(zhuǎn)換

handle使用場景:
exceptionally消化任務(wù)拋出的異常后,我們會(huì)有機(jī)會(huì)將此異常轉(zhuǎn)換為和Future類型的兼容的一些值來進(jìn)行恢復(fù)。safe進(jìn)一步的轉(zhuǎn)換將不再產(chǎn)生一個(gè)異常而是從提供功能的函數(shù)返回一個(gè)String值。
Java 8:CompletableFuture終極指南

這里有個(gè)關(guān)于任務(wù)的概念需要了解:

whenCompletehandle、exceptionally里面的運(yùn)算代碼算不算任務(wù)?可能在某些業(yè)務(wù)語境中是的,但至少在CompletableFuture的語義中我覺得不算。它們僅僅是任務(wù)的附屬,而不能獨(dú)立成為任務(wù)。從下面的例子就能反映:

    @Test
    public void testNoExceptionally() throws Exception {
        CompletableFuture<String> future = new CompletableFuture();
        // task1
        future.completeExceptionally(new Exception("測試拋異常"));
        future.whenComplete((s, t) -> {
            // 1
            log.info("1:{}", s);
            if(t != null)
                log.error(t.getMessage());})
        .whenComplete((s, t) -> {
            // 2
            log.info("2:{}", s);
            if(t != null)
                log.error(t.getMessage());})
        .thenApply(s -> {  // task2
            // 3
            log.info("3:{}", s);
            return s;})
        .exceptionally(e -> {
            // 4       異常中斷了task2,在這里被消化
            log.error("4:{}", e.getMessage());
            return e.getMessage();
        })
        .join();
    }

task1拋出異常,2個(gè)whenCompleteexceptionally都執(zhí)行了,但是task2并沒有被執(zhí)行。說明了2個(gè)問題:

  • whenComplete實(shí)際上是歸于task1
  • 只有exceptionally可以進(jìn)行任務(wù)中異常的消化,否則異常會(huì)傳遞下去,中斷后面的任務(wù)被執(zhí)行,直到有exceptionally消化異常

任務(wù)之間的承接

注意:下游任務(wù)并不承接上游任務(wù)的異常。如果上游任務(wù)拋出異常而沒有exceptionally消化,則下游任務(wù)不會(huì)被執(zhí)行,所以這里的方法并沒有像上面whenComplete、handle有異常參數(shù):

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)

A->B

  • thenApply,針對上一步(A)結(jié)果進(jìn)行轉(zhuǎn)換(B)。
  • thenAccept,針對上一步結(jié)果進(jìn)行消費(fèi)
  • thenRun,對上一步的計(jì)算結(jié)果不關(guān)心

A,B -> C

  • thenCombine,結(jié)合上一步(A)結(jié)果和本次(B)的結(jié)果,然后進(jìn)行轉(zhuǎn)換
  • thenAcceptBoth,結(jié)合上一步結(jié)果和本次的結(jié)果,然后進(jìn)行消耗。消耗計(jì)算是在本次線程(本次線程就是上文提到的原來的線程)上執(zhí)行;而Async是在線程池上執(zhí)行。下面以此類推
  • runAfterBoth,不關(guān)心上一步和本次的結(jié)果,只關(guān)心它們運(yùn)算完畢,然后進(jìn)行下步操作
  • acceptEither,上一步和本次哪個(gè)快就用哪個(gè)的結(jié)果,然后消費(fèi)

輔助方法 allOf 和 anyOf

  • allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算
  • anyOf方法是當(dāng)任意一個(gè)CompletableFuture執(zhí)行完后就會(huì)執(zhí)行計(jì)算,計(jì)算的結(jié)果相同。

探索一下pipeline的概念

下面的代碼中,我先后執(zhí)行2個(gè)任務(wù):task1、task2

task1中拋出異常
1.task1如果拋異常,則打印異常信息;
2.等task1完成后,task后處理打印task1的輸出字符串或異常信息;
3.task后處理如果拋異常,則打印異常信息;
4.task2task后處理完成后,打印task1的輸出字符串

    @Test
    public void testPipeline() throws Exception {
        CompletableFuture<String> future = new CompletableFuture();
        // task1
        future.completeExceptionally(new Exception("測試拋異常"));
        future.exceptionally(e -> {
            // 1
            log.error("1:{}", e.getMessage());
            return e.getMessage();
        }).whenComplete((s, t) -> { 
            // 2
            log.info("2:{}", s);
            if(t != null)   // 注意,t如果為null,會(huì)導(dǎo)致拋異常
                log.error(t.getMessage());
        }).exceptionally(e -> {
            // 3    1將異常消化了,所以不會(huì)觸發(fā)3
            log.error("3:{}", e.getMessage());
            return e.getMessage();
        }).thenAccept(s -> {  // task2
            // 4
            log.info("4:{}", s+" world");})
         .join();     //main線程等任務(wù)執(zhí)行完再結(jié)束
    }

結(jié)果是1、2、4 會(huì)被觸發(fā)。
下面我們做個(gè)修改:task1不拋出異常。

    @Test
    public void testPipeline2() throws Exception {
        CompletableFuture<String> future = new CompletableFuture();
        // task1
        future.complete("正常");
        future.exceptionally(e -> {
            // 1
            log.error("1:{}", e.getMessage());
            return e.getMessage();
        }).whenComplete((s, t) -> {   
            // 2
            log.info("2:{}", s);
            if(t != null) 
                log.error(t.getMessage());
        }).exceptionally(e -> {
            // 3
            log.error("3:{}", e.getMessage());
            return e.getMessage();
        }).thenAccept(s -> {  // task2
            // 4
            log.info("4:{}", s+" world");})
                .join(); 
    }

那么只有2、4被觸發(fā)。

About thread

allOf

Q:allOfthenApply中的任務(wù)哪個(gè)線程執(zhí)行?

可以清楚的是,thenApply中的任務(wù)是在allOf返回的CompletableFuture的線程上執(zhí)行。那么allOf返回的CompletableFuture的線程是什么?allOf返回的CompletableFuture的線程一般是allOf中最后一個(gè)完成的任務(wù)的線程。

不過凡事總有例外。實(shí)際上無論是new CompletableFuture()還是CompletableFuture.supplyAsync都是立刻執(zhí)行任務(wù)。如果main執(zhí)行到allOf時(shí),像下面的例子里allOf內(nèi)的任務(wù)都已經(jīng)結(jié)束了,即使future1是異步的,thenApply仍然是同步的(此處為main線程)。
比如:

    @Test
    public void testCmopletableFuture() throws Exception {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(this::getThreadName);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(this::getThreadName);

        sleep(3000);
        CompletableFuture.allOf(future1, future2)  //  執(zhí)行到allof,future1和 future2已經(jīng)結(jié)束
                        .thenApply(t ->{
                            log.info(Thread.currentThread().getName());
                            return getThreadName(future1, future2);})
                        .exceptionally(e->{
                            log.info(Thread.currentThread().getName());
                            return Lists.newArrayList();})
                        .join();
    }
    private String getThreadName(){
        sleep(1000);
        String res = Thread.currentThread().getName();
        return res;
    }

    private List<String> getThreadName(CompletableFuture<String> future1, CompletableFuture<String> future2){
        List<String> res = Lists.newLinkedList();
        try{
            res.add(future1.get());
            res.add(future2.get());
        }catch(Exception e){

        }
        log.info("{}", res);
        return res;
    }

    private void sleep(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Ref

簡明扼要:CompletableFuture 詳解 - 簡書
全面:Java CompletableFuture 詳解 | 鳥窩

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容