oracle JDK8 有關(guān)內(nèi)容的文檔:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
創(chuàng)建異步任務(wù)
runAsync 執(zhí)行 CompletableFuture 任務(wù),沒有返回值
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
supplyAsync 執(zhí)行 CompletableFuture 任務(wù),可有返回值
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
如果不指定 Executor 實現(xiàn),則使用
ForkJoinPool.commonPool()作為執(zhí)行異步代碼的線程池
創(chuàng)建異步任務(wù)后,可根據(jù)需求進行如下的操作:
| 方法名稱 | 類型 | 傳參 | 返回值 |
|---|---|---|---|
| thenRun | 單任務(wù)消費 | 無傳參 | 無返回值 |
| thenRunAsync | 單任務(wù)消費 | 無傳參 | 無返回值 |
| thenApply | 單任務(wù)消費 | 要傳參 | 有返回值 |
| thenApplyAsync | 單任務(wù)消費 | 要傳參 | 有返回值 |
| thenAccept | 單任務(wù)消費 | 要傳參 | 無返回值 |
| thenAcceptAsync | 單任務(wù)消費 | 要傳參 | 無返回值 |
| thenCombine | 雙任務(wù)消費(與) | 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| thenCombineAsync | 雙任務(wù)消費(與) | 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| thenAcceptBoth | 雙任務(wù)消費(與) | 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| thenAcceptBothAsync | 雙任務(wù)消費(與) | 要傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| runAfterBoth | 雙任務(wù)消費(與) | 無傳參 | 無返回值 |
| runAfterBothAsync | 雙任務(wù)消費(與) | 無傳參 | 無返回值 |
| applyToEither | 雙任務(wù)消費(或) | 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| applyToEitherAsync | 雙任務(wù)消費(或) | 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| acceptEither | 雙任務(wù)消費(或) | 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| acceptEitherAsync | 雙任務(wù)消費(或) | 要傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| runAfterEither | 雙任務(wù)消費(或) | 無傳參 | 無返回值 |
| runAfterEitherAsync | 雙任務(wù)消費(或) | 無傳參 | 無返回值 |
| whenComplete | 單任務(wù)消費 | 要傳參(正常返回值和異常) | 無返回值 |
| whenCompleteAsync | 單任務(wù)消費 | 要傳參(正常返回值和異常) | 無返回值 |
| handle | 單任務(wù)消費 | 要傳參(正常返回值和異常) | 有返回值 |
| handleAsync | 單任務(wù)消費 | 要傳參(正常返回值和異常) | 有返回值 |
| exceptionally | 單任務(wù)消費 | 要傳參 (異常) | 無返回值 |
| thenCompose | 單任務(wù)消費 | 要傳參 | 有返回值 |
| allOf | 多任務(wù)消費(與) | 要傳參(任務(wù)列表) | 無返回值 |
| anyOf | 多任務(wù)消費(或) | 要傳參(任務(wù)列表) | 無返回值 |
不帶 Async 版本由上一個任務(wù)的線程繼續(xù)執(zhí)行該任務(wù),Async 版本可以指定執(zhí)行該異步任務(wù)的 Executor 實現(xiàn),如果不指定,默認使用
ForkJoinPool.commonPool()
單任務(wù)消費
| 回調(diào)方法 | 類型 | 傳參 | 返回值 |
|---|---|---|---|
| thenRun | 單任務(wù)消費 | 無傳參 | 無返回值 |
| thenRunAsync | 單任務(wù)消費 | 無傳參 | 無返回值 |
| thenAccept | 單任務(wù)消費 | 要傳參 | 無返回值 |
| thenAcceptAsync | 單任務(wù)消費 | 要傳參 | 無返回值 |
| thenApply | 單任務(wù)消費 | 要傳參 | 有返回值 |
| thenApplyAsync | 單任務(wù)消費 | 要傳參 | 有返回值 |
public static void main(String[] args) throws Exception {
var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsyncTask=" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "";
}, executor);
// thenApply
var thenApplyTask = supplyAsyncTask.thenApply((param) -> {
System.out.println("thenApplyTask=" + Thread.currentThread().getName());
return "";
});
// thenApplyAsync不指定線程池
var thenApplyAsyncTask = supplyAsyncTask.thenApplyAsync((param) -> {
System.out.println("thenApplyAsyncTask=" + Thread.currentThread().getName());
return "";
});
// thenApplyAsync指定線程池
var thenApplyAsyncTask2 = supplyAsyncTask.thenApplyAsync((param) -> {
System.out.println("thenApplyAsyncTask2=" + Thread.currentThread().getName());
return "";
}, executor);
// 不調(diào)用get()將不執(zhí)行回調(diào)
thenApplyAsyncTask.get();
thenApplyAsyncTask2.get();
// 關(guān)閉線程池
executor.shutdown();
}
輸出結(jié)果:
supplyAsyncTask=pool-1-thread-1
thenApplyAsyncTask2=pool-1-thread-2
thenApplyTask=pool-1-thread-2
thenApplyAsyncTask=ForkJoinPool.commonPool-worker-3
雙任務(wù)消費(與)
將兩個 CompletableFuture 組合起來,只有這兩個都正常執(zhí)行完了,才會執(zhí)行某個任務(wù)。
| 方法名稱 | 類型 | 傳參 | 返回值 |
|---|---|---|---|
| thenCombine | 雙任務(wù)消費(與) | 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| thenCombineAsync | 雙任務(wù)消費(與) | 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| thenAcceptBoth | 雙任務(wù)消費(與) | 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| thenAcceptBothAsync | 雙任務(wù)消費(與) | 有傳參(兩個任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| runAfterBoth | 雙任務(wù)消費(與) | 無傳參 | 無返回值 |
| runAfterBothAsync | 雙任務(wù)消費(與) | 無傳參 | 無返回值 |
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> "task1");
var task2 = CompletableFuture.supplyAsync(() -> "task2");
var task3 = CompletableFuture.supplyAsync(() -> "task3");
task1.thenCombine(task2, (param1, param2) -> {
// task1task2
System.out.println(param1 + param2);
return param1 + param2;
}).thenCombine(task3, (param12, param3) -> {
// task1task2task3
System.out.println(param12 + param3);
return param12 + param3;
});
task1.thenAcceptBoth(task2, (param1, param2) -> {
// task1task2
System.out.println(param1 + param2);
}).thenAcceptBoth(task3, (param12, param3) -> {
// nulltask3
System.out.println(param12 + param3);
});
task1.runAfterBoth(task2, () -> {
// task1 and task2
System.out.println("task1 and task2");
});
}
雙任務(wù)消費(或)
將兩個 CompletableFuture 組合起來,只要其中一個執(zhí)行完了,就執(zhí)行回調(diào)方法。
| 方法名稱 | 類型 | 傳參 | 返回值 |
|---|---|---|---|
| applyToEither | 雙任務(wù)消費(或) | 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| applyToEitherAsync | 雙任務(wù)消費(或) | 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 有返回值 |
| acceptEither | 雙任務(wù)消費(或) | 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| acceptEitherAsync | 雙任務(wù)消費(或) | 有傳參(已完成任務(wù)的執(zhí)行結(jié)果) | 無返回值 |
| runAfterEither | 雙任務(wù)消費(或) | 無傳參 | 無返回值 |
| runAfterEitherAsync | 雙任務(wù)消費(或) | 無傳參 | 無返回值 |
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task1";
});
var task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task2";
});
var task3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task3";
});
task1.applyToEither(task2, (param) -> {
// applyToEither=task2
System.out.println("applyToEither=" + param);
return param;
}).acceptEither(task3, (param) -> {
// acceptEither=task2 或 acceptEither=task3
System.out.println("acceptEither=" + param);
}).get();
// task1 or task2
task1.runAfterEither(task2,()-> System.out.println("task1 or task2"));
}
其他
whenComplete、whenCompleteAsync
某個任務(wù)執(zhí)行完成后,執(zhí)行的回調(diào)方法,無返回值。可以訪問 CompletableFuture 的結(jié)果和異常作為參數(shù),使用它們并執(zhí)行想要的操作。此方法并不能轉(zhuǎn)換完成的結(jié)果。會內(nèi)部拋出異常。其正常返回的 CompletableFuture 的結(jié)果來自上個任務(wù)。
handle、handleAsync
不論正常返回還是出異常都會進入 handle,參數(shù)通常為 new BiFunction<T, Throwable, R>();,其中
- T:上一任務(wù)傳入的對象類型
- Throwable:上一任務(wù)傳入的異常
- R:返回的對象類型
handle 和 thenApply 的區(qū)別:如果任務(wù)出現(xiàn)異常不會進入 thenApply;任務(wù)出現(xiàn)異常也會進入 handle,可對異常處理。
handle 和 whenComplete 的區(qū)別:handle 可對傳入值 T 進行轉(zhuǎn)換,并產(chǎn)生自己的返回結(jié)果 R;whenComplete 的返回值和上級任務(wù)傳入的結(jié)果一致,不能轉(zhuǎn)換。
whenComplete、whenCompleteAsync、handle 和 handleAsync 的輸入?yún)?shù)一個是正常結(jié)果一個是異常結(jié)果,而 exceptionally 的輸入?yún)?shù)為異常結(jié)果。
public static void main(String[] args) throws Exception {
var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
// 制造一個異常
// int value = 1 / 0;
return "supplyAsyncTask";
});
var handle = supplyAsyncTask.handle((s, throwable) -> {
if (Optional.ofNullable(throwable).isPresent()) {
return throwable.getMessage();
}
return new ArrayList() {{
add(s);
}};
});
// supplyAsyncTask異常時,輸出1:java.lang.ArithmeticException: / by zero
// 輸出2:[supplyAsyncTask]
System.out.println(handle.get());
}
exceptionally
某個任務(wù)執(zhí)行拋出異常時執(zhí)行的回調(diào)方法。拋出異常作為參數(shù),傳遞到回調(diào)方法。僅處理異常情況。如果任務(wù)成功完成,那么將被跳過。
public static void main(String[] args) throws Exception {
var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
double error=1/0;
return "ok";
}, executor).exceptionally((e)->{
// java.lang.ArithmeticException: / by zero
System.out.println(e.getMessage());
return "error";
});
// "error"
System.out.println(supplyAsyncTask.get());
}
complete
如果尚未完成,則將 get() 和相關(guān)方法返回的值設(shè)置為給定值。如果此調(diào)用導(dǎo)致此 CompletableFuture 轉(zhuǎn)換到完成狀態(tài),則返回 true,否則返回 false。文檔描述:
If not already completed, sets the value returned by get() and related methods to the given value.
Params:
value – the result value
Returns:
true if this invocation caused this CompletableFuture to transition to a completed state, else false
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10;
});
// 若get放在此處,一直等待task1完成,輸出10
// System.out.println(task1.get());
// 強制task1完成,輸出true
System.out.println(task1.complete(5));
// 輸出5
System.out.println(task1.get());
// task1已完成,輸出false
System.out.println(task1.complete(50));
}
thenCompose
源碼定義
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
thenCompose 方法會在某個任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為入?yún)ⅲ瑘?zhí)行指定的方法。該方法會返回一個新的 CompletableFuture 實例
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> 10);
var task2 = task1.thenCompose(param -> {
System.out.println("this is task2 param=" + param);
return CompletableFuture.supplyAsync(() -> {
System.out.println("this is task2 square");
return Math.pow(param, 2);
});
}).thenApply(param -> {
System.out.println("thenApply get the square=" + param);
return param;
});
var task3 = task1.thenCompose(param -> {
System.out.println("this is task3 param=" + param);
return CompletableFuture.runAsync(() -> {
System.out.println("this is task3 square");
System.out.println(Math.pow(param, 2));
});
});
System.out.println("task2 get=" + task2.get());
System.out.println("task3 get=" + task3.get());
}
輸出:
this is task2 param=10
this is task2 square
thenApply get the square=100.0
this is task3 param=10
this is task3 square
100.0
task2 get=100.0
task3 get=null
allOf
靜態(tài)方法,阻塞等待所有給定的 CompletableFuture 執(zhí)行結(jié)束后,返回一個 CompletableFuture<Void> 結(jié)果。所有任務(wù)都執(zhí)行完成后,才執(zhí)行 allOf 的回調(diào)方法。如果任意一個任務(wù)異常,執(zhí)行 get 方法時會拋出異常。
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task1";
});
var task2 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int value = 1 / 0;
System.out.println("task2 is over");
});
CompletableFuture.allOf(task1, task2).whenComplete((param, throwable) -> {
// null
System.out.println(param);
}).exceptionally(throwable -> {
// task3 allOf throwable=java.lang.ArithmeticException: / by zero
System.out.println("task3 allOf throwable=" + throwable.getMessage());
return null;
}).get();
}
anyOf
靜態(tài)方法,阻塞等待任意一個給定的 CompletableFuture 對象執(zhí)行結(jié)束后,返回一個 CompletableFuture<Void> 結(jié)果。任意一個任務(wù)執(zhí)行完,就執(zhí)行 anyOf 的回調(diào)方法。如果執(zhí)行的任務(wù)異常,執(zhí)行 get 方法時會拋出異常。