CompletableFuture 異步處理任務(wù)總結(jié)

oracle JDK8 有關(guān)內(nèi)容的文檔:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

創(chuàng)建異步任務(wù)

runAsync 執(zhí)行 CompletableFuture 任務(wù),沒有返回值

static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

supplyAsync 執(zhí)行 CompletableFuture 任務(wù),可有返回值

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

如果不指定 Executor 實現(xiàn),則使用 ForkJoinPool.commonPool() 作為執(zhí)行異步代碼的線程池

創(chuàng)建異步任務(wù)后,可根據(jù)需求進行如下的操作:

方法名稱 類型 傳參 返回值
thenRun 單任務(wù)消費 無傳參 無返回值
thenRunAsync 單任務(wù)消費 無傳參 無返回值
thenApply 單任務(wù)消費 要傳參 有返回值
thenApplyAsync 單任務(wù)消費 要傳參 有返回值
thenAccept 單任務(wù)消費 要傳參 無返回值
thenAcceptAsync 單任務(wù)消費 要傳參 無返回值
thenCombine 雙任務(wù)消費(與) 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) 有返回值
thenCombineAsync 雙任務(wù)消費(與) 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) 有返回值
thenAcceptBoth 雙任務(wù)消費(與) 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) 無返回值
thenAcceptBothAsync 雙任務(wù)消費(與) 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) 無返回值
runAfterBoth 雙任務(wù)消費(與) 無傳參 無返回值
runAfterBothAsync 雙任務(wù)消費(與) 無傳參 無返回值
applyToEither 雙任務(wù)消費(或) 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) 有返回值
applyToEitherAsync 雙任務(wù)消費(或) 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) 有返回值
acceptEither 雙任務(wù)消費(或) 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) 無返回值
acceptEitherAsync 雙任務(wù)消費(或) 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) 無返回值
runAfterEither 雙任務(wù)消費(或) 無傳參 無返回值
runAfterEitherAsync 雙任務(wù)消費(或) 無傳參 無返回值
whenComplete 單任務(wù)消費 要傳參(正常返回值和異常) 無返回值
whenCompleteAsync 單任務(wù)消費 要傳參(正常返回值和異常) 無返回值
handle 單任務(wù)消費 要傳參(正常返回值和異常) 有返回值
handleAsync 單任務(wù)消費 要傳參(正常返回值和異常) 有返回值
exceptionally 單任務(wù)消費 要傳參 (異常) 無返回值
thenCompose 單任務(wù)消費 要傳參 有返回值
allOf 多任務(wù)消費(與) 要傳參(任務(wù)列表) 無返回值
anyOf 多任務(wù)消費(或) 要傳參(任務(wù)列表) 無返回值

不帶 Async 版本由上一個任務(wù)的線程繼續(xù)執(zhí)行該任務(wù),Async 版本可以指定執(zhí)行該異步任務(wù)的 Executor 實現(xiàn),如果不指定,默認使用 ForkJoinPool.commonPool()

單任務(wù)消費

回調(diào)方法 類型 傳參 返回值
thenRun 單任務(wù)消費 無傳參 無返回值
thenRunAsync 單任務(wù)消費 無傳參 無返回值
thenAccept 單任務(wù)消費 要傳參 無返回值
thenAcceptAsync 單任務(wù)消費 要傳參 無返回值
thenApply 單任務(wù)消費 要傳參 有返回值
thenApplyAsync 單任務(wù)消費 要傳參 有返回值
public static void main(String[] args) throws Exception {
    var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsyncTask=" + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "";
    }, executor);

    // thenApply
    var thenApplyTask = supplyAsyncTask.thenApply((param) -> {
        System.out.println("thenApplyTask=" + Thread.currentThread().getName());
        return "";
    });

    // thenApplyAsync不指定線程池
    var thenApplyAsyncTask = supplyAsyncTask.thenApplyAsync((param) -> {
        System.out.println("thenApplyAsyncTask=" + Thread.currentThread().getName());
        return "";
    });

    // thenApplyAsync指定線程池
    var thenApplyAsyncTask2 = supplyAsyncTask.thenApplyAsync((param) -> {
        System.out.println("thenApplyAsyncTask2=" + Thread.currentThread().getName());
        return "";
    }, executor);

    // 不調(diào)用get()將不執(zhí)行回調(diào)
    thenApplyAsyncTask.get();
    thenApplyAsyncTask2.get();

    // 關(guān)閉線程池
    executor.shutdown();
}

輸出結(jié)果:

supplyAsyncTask=pool-1-thread-1
thenApplyAsyncTask2=pool-1-thread-2
thenApplyTask=pool-1-thread-2
thenApplyAsyncTask=ForkJoinPool.commonPool-worker-3

雙任務(wù)消費(與)

將兩個 CompletableFuture 組合起來,只有這兩個都正常執(zhí)行完了,才會執(zhí)行某個任務(wù)。

方法名稱 類型 傳參 返回值
thenCombine 雙任務(wù)消費(與) 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) 有返回值
thenCombineAsync 雙任務(wù)消費(與) 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) 有返回值
thenAcceptBoth 雙任務(wù)消費(與) 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) 無返回值
thenAcceptBothAsync 雙任務(wù)消費(與) 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) 無返回值
runAfterBoth 雙任務(wù)消費(與) 無傳參 無返回值
runAfterBothAsync 雙任務(wù)消費(與) 無傳參 無返回值
public static void main(String[] args) throws Exception {

    var task1 = CompletableFuture.supplyAsync(() -> "task1");
    var task2 = CompletableFuture.supplyAsync(() -> "task2");
    var task3 = CompletableFuture.supplyAsync(() -> "task3");

    task1.thenCombine(task2, (param1, param2) -> {
        // task1task2
        System.out.println(param1 + param2);
        return param1 + param2;
    }).thenCombine(task3, (param12, param3) -> {
        // task1task2task3
        System.out.println(param12 + param3);
        return param12 + param3;
    });

    task1.thenAcceptBoth(task2, (param1, param2) -> {
        // task1task2
        System.out.println(param1 + param2);
    }).thenAcceptBoth(task3, (param12, param3) -> {
        // nulltask3
        System.out.println(param12 + param3);
    });

    task1.runAfterBoth(task2, () -> {
        // task1 and task2
        System.out.println("task1 and task2");
    });
}

雙任務(wù)消費(或)

將兩個 CompletableFuture 組合起來,只要其中一個執(zhí)行完了,就執(zhí)行回調(diào)方法。

方法名稱 類型 傳參 返回值
applyToEither 雙任務(wù)消費(或) 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) 有返回值
applyToEitherAsync 雙任務(wù)消費(或) 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) 有返回值
acceptEither 雙任務(wù)消費(或) 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) 無返回值
acceptEitherAsync 雙任務(wù)消費(或) 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) 無返回值
runAfterEither 雙任務(wù)消費(或) 無傳參 無返回值
runAfterEitherAsync 雙任務(wù)消費(或) 無傳參 無返回值
public static void main(String[] args) throws Exception {

    var task1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task1";
    });
    var task2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task2";
    });
    var task3 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task3";
    });

    task1.applyToEither(task2, (param) -> {
        // applyToEither=task2
        System.out.println("applyToEither=" + param);
        return param;
    }).acceptEither(task3, (param) -> {
        // acceptEither=task2 或 acceptEither=task3
        System.out.println("acceptEither=" + param);
    }).get();

    // task1 or task2
    task1.runAfterEither(task2,()-> System.out.println("task1 or task2"));
}

其他

whenComplete、whenCompleteAsync

某個任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,無返回值。可以訪問 CompletableFuture 的結(jié)果和異常作為參數(shù),使用它們并執(zhí)行想要的操作。此方法并不能轉(zhuǎn)換完成的結(jié)果。會內(nèi)部拋出異常。其正常返回的 CompletableFuture 的結(jié)果來自上個任務(wù)。

handle、handleAsync

不論正常返回還是出異常都會進入 handle,參數(shù)通常為 new BiFunction<T, Throwable, R>();,其中

  • T:上一任務(wù)傳入的對象類型
  • Throwable:上一任務(wù)傳入的異常
  • R:返回的對象類型

handle 和 thenApply 的區(qū)別:如果任務(wù)出現(xiàn)異常不會進入 thenApply;任務(wù)出現(xiàn)異常也會進入 handle,可對異常處理。

handle 和 whenComplete 的區(qū)別:handle 可對傳入值 T 進行轉(zhuǎn)換,并產(chǎn)生自己的返回結(jié)果 R;whenComplete 的返回值和上級任務(wù)傳入的結(jié)果一致,不能轉(zhuǎn)換。

whenComplete、whenCompleteAsync、handle 和 handleAsync 的輸入?yún)?shù)一個是正常結(jié)果一個是異常結(jié)果,而 exceptionally 的輸入?yún)?shù)為異常結(jié)果。

public static void main(String[] args) throws Exception {

    var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
        // 制造一個異常
        // int value = 1 / 0;
        return "supplyAsyncTask";
    });

    var handle = supplyAsyncTask.handle((s, throwable) -> {
        if (Optional.ofNullable(throwable).isPresent()) {
            return throwable.getMessage();
        }
        return new ArrayList() {{
            add(s);
        }};
    });

    // supplyAsyncTask異常時,輸出1:java.lang.ArithmeticException: / by zero
    // 輸出2:[supplyAsyncTask]
    System.out.println(handle.get());
}

exceptionally

某個任務(wù)執(zhí)行拋出異常時執(zhí)行的回調(diào)方法。拋出異常作為參數(shù),傳遞到回調(diào)方法。僅處理異常情況。如果任務(wù)成功完成,那么將被跳過。

public static void main(String[] args) throws Exception {

    var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
        double error=1/0;
        return "ok";
    }, executor).exceptionally((e)->{
        // java.lang.ArithmeticException: / by zero
        System.out.println(e.getMessage());
        return "error";
    });

    // "error"
    System.out.println(supplyAsyncTask.get());
}

complete

如果尚未完成,則將 get() 和相關(guān)方法返回的值設(shè)置為給定值。如果此調(diào)用導(dǎo)致此 CompletableFuture 轉(zhuǎn)換到完成狀態(tài),則返回 true,否則返回 false。文檔描述:

If not already completed, sets the value returned by get() and related methods to the given value.

Params:
value – the result value
Returns:
true if this invocation caused this CompletableFuture to transition to a completed state, else false
public static void main(String[] args) throws Exception {
    var task1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(15);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return 10;
    });

    // 若get放在此處,一直等待task1完成,輸出10
    // System.out.println(task1.get());
    // 強制task1完成,輸出true
    System.out.println(task1.complete(5));
    // 輸出5
    System.out.println(task1.get());
    // task1已完成,輸出false
    System.out.println(task1.complete(50));
}

thenCompose

源碼定義

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

thenCompose 方法會在某個任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為入?yún)ⅲ瑘?zhí)行指定的方法。該方法會返回一個新的 CompletableFuture 實例

public static void main(String[] args) throws Exception {
    var task1 = CompletableFuture.supplyAsync(() -> 10);

    var task2 = task1.thenCompose(param -> {
        System.out.println("this is task2 param=" + param);
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("this is task2 square");
            return Math.pow(param, 2);
        });
    }).thenApply(param -> {
        System.out.println("thenApply get the square=" + param);
        return param;
    });

    var task3 = task1.thenCompose(param -> {
        System.out.println("this is task3 param=" + param);
        return CompletableFuture.runAsync(() -> {
            System.out.println("this is task3 square");
            System.out.println(Math.pow(param, 2));
        });
    });

    System.out.println("task2 get=" + task2.get());
    System.out.println("task3 get=" + task3.get());
}

輸出:
this is task2 param=10
this is task2 square
thenApply get the square=100.0
this is task3 param=10
this is task3 square
100.0
task2 get=100.0
task3 get=null

allOf

靜態(tài)方法,阻塞等待所有給定的 CompletableFuture 執(zhí)行結(jié)束后,返回一個 CompletableFuture<Void> 結(jié)果。所有任務(wù)都執(zhí)行完成后,才執(zhí)行 allOf 的回調(diào)方法。如果任意一個任務(wù)異常,執(zhí)行 get 方法時會拋出異常。

public static void main(String[] args) throws Exception {

    var task1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task1";
    });

    var task2 = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        int value = 1 / 0;
        System.out.println("task2 is over");
    });

    CompletableFuture.allOf(task1, task2).whenComplete((param, throwable) -> {
        // null
        System.out.println(param);
    }).exceptionally(throwable -> {
        // task3 allOf throwable=java.lang.ArithmeticException: / by zero
        System.out.println("task3 allOf throwable=" + throwable.getMessage());
        return null;
    }).get();
}

anyOf

靜態(tài)方法,阻塞等待任意一個給定的 CompletableFuture 對象執(zhí)行結(jié)束后,返回一個 CompletableFuture<Void> 結(jié)果。任意一個任務(wù)執(zhí)行完,就執(zhí)行 anyOf 的回調(diào)方法。如果執(zhí)行的任務(wù)異常,執(zhí)行 get 方法時會拋出異常。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容