CompletableFuture 詳解

CompletableFuture類實(shí)現(xiàn)了CompletionStage和Future接口。Future是Java 5添加的類,用來(lái)描述一個(gè)異步計(jì)算的結(jié)果,但是獲取一個(gè)結(jié)果時(shí)方法較少,要么通過輪詢isDone,確認(rèn)完成后,調(diào)用get()獲取值,要么調(diào)用get()設(shè)置一個(gè)超時(shí)時(shí)間。但是這個(gè)get()方法會(huì)阻塞住調(diào)用線程,這種阻塞的方式顯然和我們的異步編程的初衷相違背。
為了解決這個(gè)問題,JDK吸收了guava的設(shè)計(jì)思想,加入了Future的諸多擴(kuò)展功能形成了CompletableFuture。

CompletionStage是一個(gè)接口,從命名上看得知是一個(gè)完成的階段,它里面的方法也標(biāo)明是在某個(gè)運(yùn)行階段得到了結(jié)果之后要做的事情。

  1. 進(jìn)行變換
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

首先說明一下已Async結(jié)尾的方法都是可以異步執(zhí)行的,如果指定了線程池,會(huì)在指定的線程池中執(zhí)行,如果沒有指定,默認(rèn)會(huì)在ForkJoinPool.commonPool()中執(zhí)行,下文中將會(huì)有好多類似的,都不詳細(xì)解釋了。關(guān)鍵的入?yún)⒅挥幸粋€(gè)Function,它是函數(shù)式接口,所以使用Lambda表示起來(lái)會(huì)更加優(yōu)雅。它的入?yún)⑹巧弦粋€(gè)階段計(jì)算后的結(jié)果,返回值是經(jīng)過轉(zhuǎn)化后結(jié)果。
例如:

    @Test
    public void thenApply() {
        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
        System.out.println(result);
    }

結(jié)果為:

hello world
  1. 進(jìn)行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

thenAccept是針對(duì)結(jié)果進(jìn)行消耗,因?yàn)樗娜雲(yún)⑹荂onsumer,有入?yún)o(wú)返回值。
例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}

結(jié)果為:

hello world
  1. 對(duì)上一步的計(jì)算結(jié)果不關(guān)心,執(zhí)行下一個(gè)操作。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun它的入?yún)⑹且粋€(gè)Runnable的實(shí)例,表示當(dāng)?shù)玫缴弦徊降慕Y(jié)果時(shí)的操作。
例如:

    @Test
    public void thenRun(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(() -> System.out.println("hello world"));
        while (true){}
    }

結(jié)果為:

hello world

4.結(jié)合兩個(gè)CompletionStage的結(jié)果,進(jìn)行轉(zhuǎn)化后返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

它需要原來(lái)的處理返回值,并且other代表的CompletionStage也要返回值之后,利用這兩個(gè)返回值,進(jìn)行轉(zhuǎn)換后返回指定類型的值。
例如:

    @Test
    public void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }

結(jié)果為:

hello world
  1. 結(jié)合兩個(gè)CompletionStage的結(jié)果,進(jìn)行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

它需要原來(lái)的處理返回值,并且other代表的CompletionStage也要返回值之后,利用這兩個(gè)返回值,進(jìn)行消耗。
例如:

    @Test
    public void thenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> System.out.println(s1 + " " + s2));
        while (true){}
    }

結(jié)果為:

hello world
  1. 在兩個(gè)CompletionStage都運(yùn)行完執(zhí)行。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

不關(guān)心這兩個(gè)CompletionStage的結(jié)果,只關(guān)心這兩個(gè)CompletionStage執(zhí)行完畢,之后在進(jìn)行操作(Runnable)。
例如:

    @Test
    public void runAfterBoth(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true){}
    }

結(jié)果為

hello world

6.兩個(gè)CompletionStage,誰(shuí)計(jì)算的快,我就用那個(gè)CompletionStage的結(jié)果進(jìn)行下一步的轉(zhuǎn)化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

我們現(xiàn)實(shí)開發(fā)場(chǎng)景中,總會(huì)碰到有兩種渠道完成同一個(gè)事情,所以就可以調(diào)用這個(gè)方法,找一個(gè)最快的結(jié)果進(jìn)行處理。
例如:

    @Test
    public void applyToEither() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), s -> s).join();
        System.out.println(result);
    }

結(jié)果為:

hello world
  1. 兩個(gè)CompletionStage,誰(shuí)計(jì)算的快,我就用那個(gè)CompletionStage的結(jié)果進(jìn)行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

例如:

    @Test
    public void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }

結(jié)果為:

hello world
  1. 兩個(gè)CompletionStage,任何一個(gè)完成了都會(huì)執(zhí)行下一步的操作(Runnable)。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

例如:

    @Test
    public void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }

結(jié)果為:

hello world
  1. 當(dāng)運(yùn)行時(shí)出現(xiàn)了異常,可以通過exceptionally進(jìn)行補(bǔ)償。
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

例如:

    @Test
    public void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測(cè)試一下異常情況");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結(jié)果為:

java.lang.RuntimeException: 測(cè)試一下異常情況
hello world
  1. 當(dāng)運(yùn)行完成時(shí),對(duì)結(jié)果的記錄。這里的完成時(shí)有兩種情況,一種是正常執(zhí)行,返回值。另外一種是遇到異常拋出造成程序的中斷。這里為什么要說成記錄,因?yàn)檫@幾個(gè)方法都會(huì)返回CompletableFuture,當(dāng)Action執(zhí)行完畢后它的結(jié)果返回原始的CompletableFuture的計(jì)算結(jié)果或者返回異常。所以不會(huì)對(duì)結(jié)果產(chǎn)生任何的作用。
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

例如:

    @Test
    public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測(cè)試一下異常情況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結(jié)果為:

null
java.lang.RuntimeException: 測(cè)試一下異常情況
java.lang.RuntimeException: 測(cè)試一下異常情況
hello world

這里也可以看出,如果使用了exceptionally,就會(huì)對(duì)最終的結(jié)果產(chǎn)生影響,它沒有口子返回如果沒有異常時(shí)的正確的值,這也就引出下面我們要介紹的handle。

  1. 運(yùn)行完成時(shí),對(duì)結(jié)果的處理。這里的完成時(shí)有兩種情況,一種是正常執(zhí)行,返回值。另外一種是遇到異常拋出造成程序的中斷。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

例如:
出現(xiàn)異常時(shí)

    @Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現(xiàn)異常
            if (1 == 1) {
                throw new RuntimeException("測(cè)試一下異常情況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結(jié)果為:

hello world

未出現(xiàn)異常時(shí)

    @Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結(jié)果為:

s1

上面就是CompletionStage接口中方法的使用實(shí)例,CompletableFuture同樣也同樣實(shí)現(xiàn)了Future,所以也同樣可以使用get進(jìn)行阻塞獲取值,總的來(lái)說,CompletableFuture使用起來(lái)還是比較爽的,看起來(lái)也比較優(yōu)雅一點(diǎn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容