CompletableFuture深入理解

1.Future接口

1.1 什么是Future?

在jdk的官方的注解中寫(xiě)道

A {@code Future} represents the result of an asynchronous
 * computation.  Methods are provided to check if the computation is
 * complete, to wait for its completion, and to retrieve the result of
 * the computation.

在上面的注釋中我們能知道Future用來(lái)代表異步的結(jié)果,并且提供了檢查計(jì)算完成,等待完成,檢索結(jié)果完成等方法。簡(jiǎn)而言之就是提供一個(gè)異步運(yùn)算結(jié)果的一個(gè)建模。它可以讓我們把耗時(shí)的操作從我們本身的調(diào)用線程中釋放出來(lái),只需要完成后再進(jìn)行回調(diào)。就好像我們?nèi)ワ埖昀锩娉燥垼恍枰闳ブ箫?,而你這個(gè)時(shí)候可以做任何事,然后飯煮好后就會(huì)回調(diào)你去吃。

1.2 JDK8以前的Future

在JDK8以前的Future使用比較簡(jiǎn)單,我們只需要把我們需要用來(lái)異步計(jì)算的過(guò)程封裝在Callable或者Runnable中,比如一些很耗時(shí)的操作(不能占用我們的調(diào)用線程時(shí)間的),然后再將它提交給我們的線程池ExecutorService。代碼例子如下:

public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName();
            }
        });

        doSomethingElse();//在我們異步操作的同時(shí)一樣可以做其他操作
        try {
            String res = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

上面展示了我們的線程可以并發(fā)方式調(diào)用另一個(gè)線程去做我們耗時(shí)的操作。當(dāng)我們必須依賴我們的異步結(jié)果的時(shí)候我們就可以調(diào)用get方法去獲得。當(dāng)我們調(diào)用get方法的時(shí)候如果我們的任務(wù)完成就可以立馬返回,但是如果任務(wù)沒(méi)有完成就會(huì)阻塞,直到超時(shí)為止。

Future底層是怎么實(shí)現(xiàn)的呢?
我們首先來(lái)到我們ExecutorService的代碼中submit方法這里會(huì)返回一個(gè)Future

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

在sumbmit中會(huì)對(duì)我們的Callable進(jìn)行包裝封裝成我們的FutureTask,我們最后的Future其實(shí)也是Future的實(shí)現(xiàn)類FutureTask,F(xiàn)utureTask實(shí)現(xiàn)了Runnable接口所以這里直接調(diào)用execute。在FutureTask代碼中的run方法代碼如下:

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
        .......
    }

可以看見(jiàn)當(dāng)我們執(zhí)行完成之后會(huì)set(result)來(lái)通知我們的結(jié)果完成了。set(result)代碼如下:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

首先用CAS置換狀態(tài)為完成,以及替換結(jié)果,當(dāng)替換結(jié)果完成之后,才會(huì)替換為我們的最終狀態(tài),這里主要是怕我們?cè)O(shè)置完COMPLETING狀態(tài)之后最終值還沒(méi)有真正的賦值出去,而我們的get就去使用了,所以還會(huì)有個(gè)最終狀態(tài)。我們的get()方法的代碼如下:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

首先獲得當(dāng)前狀態(tài),然后判斷狀態(tài)是否完成,如果沒(méi)有完成則進(jìn)入awaitDone循環(huán)等待,這也是我們阻塞的代碼,然后返回我們的最終結(jié)果。

1.2.1缺陷

我們的Future使用很簡(jiǎn)單,這也導(dǎo)致了如果我們想完成一些復(fù)雜的任務(wù)可能就比較難。比如下面一些例子:

  • 將兩個(gè)異步計(jì)算合成一個(gè)異步計(jì)算,這兩個(gè)異步計(jì)算互相獨(dú)立,同時(shí)第二個(gè)又依賴第一個(gè)的結(jié)果。
  • 當(dāng)Future集合中某個(gè)任務(wù)最快結(jié)束時(shí),返回結(jié)果。
  • 等待Future結(jié)合中的所有任務(wù)都完成。
  • 通過(guò)編程方式完成一個(gè)Future任務(wù)的執(zhí)行。
  • 應(yīng)對(duì)Future的完成時(shí)間。也就是我們的回調(diào)通知。

1.3CompletableFuture

CompletableFuture是JDK8提出的一個(gè)支持非阻塞的多功能的Future,同樣也是實(shí)現(xiàn)了Future接口。

1.3.1CompletableFuture基本實(shí)現(xiàn)

下面會(huì)寫(xiě)一個(gè)比較簡(jiǎn)單的例子:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(()->{
            completableFuture.complete(Thread.currentThread().getName());
        }).start();
        doSomethingelse();//做你想做的其他操作
        
        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

用法上來(lái)說(shuō)和Future有一點(diǎn)不同,我們這里fork了一個(gè)新的線程來(lái)完成我們的異步操作,在異步操作中我們會(huì)設(shè)置值,然后在外部做我們其他操作。在complete中會(huì)用CAS替換result,然后當(dāng)我們get如果可以獲取到值得時(shí)候就可以返回了。

1.3.2錯(cuò)誤處理

上面介紹了正常情況下但是當(dāng)我們?cè)谖覀儺惒骄€程中產(chǎn)生了錯(cuò)誤的話就會(huì)非常的不幸,錯(cuò)誤的異常不會(huì)告知給你,會(huì)被扼殺在我們的異步線程中,而我們的get方法會(huì)被阻塞。

對(duì)于我們的CompletableFuture提供了completeException方法可以讓我們返回我們異步線程中的異常,代碼如下:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(()->{
            completableFuture.completeExceptionally(new RuntimeException("error"));
            completableFuture.complete(Thread.currentThread().getName());
        }).start();
//        doSomethingelse();//做你想做的耗時(shí)操作

        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
--------------
輸出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
    at futurepackge.jdk8Future.main(jdk8Future.java:19)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
    at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
    at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

在我們新建的異步線程中直接New一個(gè)異常拋出,在我們客戶端中依然可以獲得異常。

1.3.2工廠方法創(chuàng)建CompletableFuture

我們的上面的代碼雖然不復(fù)雜,但是我們的java8依然對(duì)其提供了大量的工廠方法,用這些方法更容易完成整個(gè)流程。如下面的例子:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
                return Thread.currentThread().getName();
        });
//        doSomethingelse();//做你想做的耗時(shí)操作

        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
---------
輸出:
ForkJoinPool.commonPool-worker-1

上面的例子通過(guò)工廠方法supplyAsync提供了一個(gè)Completable,在異步線程中的輸出是ForkJoinPool可以看出當(dāng)我們不指定線程池的時(shí)候會(huì)使用ForkJoinPool,而我們上面的compelte的操作在我們的run方法中做了,源代碼如下:

public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }

上面代碼中通過(guò)d.completeValue(f.get());設(shè)置了我們的值。同樣的構(gòu)造方法還有runasync等等。

1.3.3計(jì)算結(jié)果完成時(shí)的處理

當(dāng)CompletableFuture計(jì)算結(jié)果完成時(shí),我們需要對(duì)結(jié)果進(jìn)行處理,或者當(dāng)CompletableFuture產(chǎn)生異常的時(shí)候需要對(duì)異常進(jìn)行處理。有如下幾種方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面的四種方法都返回了CompletableFuture,當(dāng)我們Action執(zhí)行完畢的時(shí)候,future返回的值和我們?cè)嫉腃ompletableFuture的值是一樣的。上面以Async結(jié)尾的會(huì)在新的線程池中執(zhí)行,上面沒(méi)有一Async結(jié)尾的會(huì)在之前的CompletableFuture執(zhí)行的線程中執(zhí)行。例子代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println(v);
        });
        System.out.println("Main" + Thread.currentThread().getName());
        System.out.println(f.get());
    }

exceptionally方法返回一個(gè)新的CompletableFuture,當(dāng)原始的CompletableFuture拋出異常的時(shí)候,就會(huì)觸發(fā)這個(gè)CompletableFuture的計(jì)算,調(diào)用function計(jì)算值,否則如果原始的CompletableFuture正常計(jì)算完后,這個(gè)新的CompletableFuture也計(jì)算完成,它的值和原始的CompletableFuture的計(jì)算的值相同。也就是這個(gè)exceptionally方法用來(lái)處理異常的情況。

1.3.4計(jì)算結(jié)果完成時(shí)的轉(zhuǎn)換

上面我們討論了如何計(jì)算結(jié)果完成時(shí)進(jìn)行的處理,接下來(lái)我們討論如何對(duì)計(jì)算結(jié)果完成時(shí),對(duì)結(jié)果進(jìn)行轉(zhuǎn)換。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

這里同樣也是返回CompletableFuture,但是這個(gè)結(jié)果會(huì)由我們自定義返回去轉(zhuǎn)換他,同樣的不以Async結(jié)尾的方法由原來(lái)的線程計(jì)算,以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運(yùn)行。Java的CompletableFuture類總是遵循這樣的原則,下面就不一一贅述了。
例子代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
        System.out.println(f.get());
    }

上面的最終結(jié)果會(huì)輸出11,我們成功將其用兩個(gè)thenApply轉(zhuǎn)換為String。

1.3.5計(jì)算結(jié)果完成時(shí)的消費(fèi)

上面已經(jīng)講了結(jié)果完成時(shí)的處理和轉(zhuǎn)換,他們最后的CompletableFuture都會(huì)返回對(duì)應(yīng)的值,這里還會(huì)有一個(gè)只會(huì)對(duì)計(jì)算結(jié)果消費(fèi)不會(huì)返回任何結(jié)果的方法。

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

函數(shù)接口為Consumer,就知道了只會(huì)對(duì)函數(shù)進(jìn)行消費(fèi),例子代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        future.thenAccept(System.out::println);
    }

這個(gè)方法用法很簡(jiǎn)單我就不多說(shuō)了.Accept家族還有個(gè)方法是用來(lái)合并結(jié)果當(dāng)兩個(gè)CompletionStage都正常執(zhí)行的時(shí)候就會(huì)執(zhí)行提供的action,它用來(lái)組合另外一個(gè)異步的結(jié)果。

public <U> CompletableFuture<Void>  thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)

runAfterBoth是當(dāng)兩個(gè)CompletionStage都正常完成計(jì)算的時(shí)候,執(zhí)行一個(gè)Runnable,這個(gè)Runnable并不使用計(jì)算的結(jié)果。
示例代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            return 20;
        }),(x,y) -> System.out.println(x+y)).get());
    }

CompletableFuture也提供了執(zhí)行Runnable的辦法,這里我們就不能使用我們future中的值了。

public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)

1.3.6對(duì)計(jì)算結(jié)果的組合

首先是介紹一下連接兩個(gè)future的方法:

public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

對(duì)于Compose可以連接兩個(gè)CompletableFuture,其內(nèi)部處理邏輯是當(dāng)?shù)谝粋€(gè)CompletableFuture處理沒(méi)有完成時(shí)會(huì)合并成一個(gè)CompletableFuture,如果處理完成,第二個(gè)future會(huì)緊接上一個(gè)CompletableFuture進(jìn)行處理。

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
    }

我們上面的thenAcceptBoth講了合并兩個(gè)future,但是沒(méi)有返回值這里將介紹一個(gè)有返回值的方法,如下:

public <U,V> CompletableFuture<V>   thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

例子比較簡(jiǎn)單如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
            return 20;
        }),(x,y) -> {return "計(jì)算結(jié)果:"+x+y;});
        System.out.println(f.get());
    }

上面介紹了兩個(gè)future完成的時(shí)候應(yīng)該完成的工作,接下來(lái)介紹任意一個(gè)future完成時(shí)需要執(zhí)行的工作,方法如下:

public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

上面兩個(gè)是一個(gè)是純消費(fèi)不返回結(jié)果,一個(gè)是計(jì)算后返回結(jié)果。

1.3.6其他方法

public static CompletableFuture<Void>       allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>     anyOf(CompletableFuture<?>... cfs)

allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算。

anyOf方法是當(dāng)任意一個(gè)CompletableFuture執(zhí)行完后就會(huì)執(zhí)行計(jì)算,計(jì)算的結(jié)果相同。

1.3.7建議

CompletableFuture和Java8的Stream搭配使用對(duì)于一些并行訪問(wèn)的耗時(shí)操作有很大的性能提高,可以自行了解。

2.參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、并發(fā) 進(jìn)程:每個(gè)進(jìn)程都擁有自己的一套變量 線程:線程之間共享數(shù)據(jù) 1.線程 Java中為多線程任務(wù)提供了很多的...
    SeanMa閱讀 2,807評(píng)論 0 11

友情鏈接更多精彩內(nèi)容