Java 8 CompletableFuture
Java 8 有大量的新特性和增強如 Lambda 表達(dá)式,Streams,CompletableFuture等。在本篇文章中我將詳細(xì)解釋清楚CompletableFuture以及它所有方法的使用。
什么是CompletableFuture?
在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運行的任務(wù)在一個單獨的線程,與主線程隔離,并且會通知主線程它的進(jìn)度,成功或者失敗。
在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。
使用這種并行方式,可以極大的提高程序的性能。
Future vs CompletableFuture
CompletableFuture 是 Future API的擴(kuò)展。
Future 被用于作為一個異步計算結(jié)果的引用。提供一個 isDone() 方法來檢查計算任務(wù)是否完成。當(dāng)任務(wù)完成時,get() 方法用來接收計算任務(wù)的結(jié)果。
從 Callbale和 Future 教程可以學(xué)習(xí)更多關(guān)于 Future 知識.
Future API 是非常好的 Java 異步編程進(jìn)階,但是它缺乏一些非常重要和有用的特性。
Future 的局限性
- 不能手動完成 當(dāng)你寫了一個函數(shù),用于通過一個遠(yuǎn)程API獲取一個電子商務(wù)產(chǎn)品最新價格。因為這個 API 太耗時,你把它允許在一個獨立的線程中,并且從你的函數(shù)中返回一個 Future?,F(xiàn)在假設(shè)這個API服務(wù)宕機了,這時你想通過該產(chǎn)品的最新緩存價格手工完成這個Future 。你會發(fā)現(xiàn)無法這樣做。
- Future 的結(jié)果在非阻塞的情況下,不能執(zhí)行更進(jìn)一步的操作 Future 不會通知你它已經(jīng)完成了,它提供了一個阻塞的
get()方法通知你結(jié)果。你無法給 Future 植入一個回調(diào)函數(shù),當(dāng) Future 結(jié)果可用的時候,用該回調(diào)函數(shù)自動的調(diào)用 Future 的結(jié)果。 - 多個 Future 不能串聯(lián)在一起組成鏈?zhǔn)秸{(diào)用 有時候你需要執(zhí)行一個長時間運行的計算任務(wù),并且當(dāng)計算任務(wù)完成的時候,你需要把它的計算結(jié)果發(fā)送給另外一個長時間運行的計算任務(wù)等等。你會發(fā)現(xiàn)你無法使用 Future 創(chuàng)建這樣的一個工作流。
- 不能組合多個 Future 的結(jié)果 假設(shè)你有10個不同的Future,你想并行的運行,然后在它們運行未完成后運行一些函數(shù)。你會發(fā)現(xiàn)你也無法使用 Future 這樣做。
- 沒有異常處理 Future API 沒有任務(wù)的異常處理結(jié)構(gòu)居然有如此多的限制,幸好我們有CompletableFuture,你可以使用 CompletableFuture 達(dá)到以上所有目的。
CompletableFuture 實現(xiàn)了 Future 和 CompletionStage接口,并且提供了許多關(guān)于創(chuàng)建,鏈?zhǔn)秸{(diào)用和組合多個 Future 的便利方法集,而且有廣泛的異常處理支持。
創(chuàng)建 CompletableFuture
1. 簡單的例子 可以使用如下無參構(gòu)造函數(shù)簡單的創(chuàng)建 CompletableFuture:
CompletableFuture<String> completableFuture = new CompletableFuture<String>();
這是一個最簡單的 CompletableFuture,想獲取CompletableFuture 的結(jié)果可以使用 CompletableFuture.get() 方法:
String result = completableFuture.get()
get() 方法會一直阻塞直到 Future 完成。因此,以上的調(diào)用將被永遠(yuǎn)阻塞,因為該Future一直不會完成。
你可以使用 CompletableFuture.complete() 手工的完成一個 Future:
completableFuture.complete("Future's Result")
所有等待這個 Future 的客戶端都將得到一個指定的結(jié)果,并且 completableFuture.complete()之后的調(diào)用將被忽略。
2. 使用 runAsync() 運行異步計算 如果你想異步的運行一個后臺任務(wù)并且不想改任務(wù)返回任務(wù)東西,這時候可以使用 CompletableFuture.runAsync()方法,它持有一個Runnable 對象,并返回 CompletableFuture<Void>。
// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
}
});
// Block and wait for the future to complete
future.get()
你也可以以 lambda 表達(dá)式的形式傳入 Runnable 對象:
// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
});
在本文中,我使用lambda表達(dá)式會比較頻繁,如果以前你沒有使用過,建議你也多使用lambda 表達(dá)式。
3. 使用 supplyAsync() 運行一個異步任務(wù)并且返回結(jié)果 當(dāng)任務(wù)不需要返回任何東西的時候, CompletableFuture.runAsync() 非常有用。但是如果你的后臺任務(wù)需要返回一些結(jié)果應(yīng)該要怎么樣?
CompletableFuture.supplyAsync() 就是你的選擇。它持有supplier<T> 并且返回CompletableFuture<T>,T 是通過調(diào)用 傳入的supplier取得的值的類型。
// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}
});
// Block and get the result of the Future
String result = future.get();
System.out.println(result);
Supplier<T> 是一個簡單的函數(shù)式接口,表示supplier的結(jié)果。它有一個get()方法,該方法可以寫入你的后臺任務(wù)中,并且返回結(jié)果。
你可以使用lambda表達(dá)式使得上面的示例更加簡明:
// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
});
一個關(guān)于Executor 和Thread Pool筆記
CompletableFuture的方法如果以
Async結(jié)尾,它會異步的執(zhí)行(沒有指定executor的情況下), 異步執(zhí)行通過ForkJoinPool實現(xiàn), 它使用守護(hù)線程去執(zhí)行任務(wù)。注意這是CompletableFuture的特性, 其它CompletionStage可以override這個默認(rèn)的行為。CompletableFuture API 的所有方法都有兩個變體一個接受Executor作為參數(shù),另一個不接受Executor:
// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
創(chuàng)建一個線程池,并傳遞給其中一個方法:
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}, executor);
在 CompletableFuture 轉(zhuǎn)換和運行
CompletableFuture.get()方法是阻塞的。它會一直等到Future完成并且在完成后返回結(jié)果。 但是,這是我們想要的嗎?對于構(gòu)建異步系統(tǒng),我們應(yīng)該附上一個回調(diào)給CompletableFuture,當(dāng)Future完成的時候,自動的獲取結(jié)果。 如果我們不想等待結(jié)果返回,我們可以把需要等待Future完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。
可以使用 thenApply(), thenAccept() 和thenRun()方法附上一個回調(diào)給CompletableFuture。
1. thenApply() 可以使用 thenApply() 處理和改變CompletableFuture的結(jié)果。持有一個Function<R,T>作為參數(shù)。Function<R,T>是一個簡單的函數(shù)式接口,接受一個T類型的參數(shù),產(chǎn)出一個R類型的結(jié)果。
// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
return "Hello " + name;
});
// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev
你也可以通過附加一系列的thenApply()在回調(diào)方法 在CompletableFuture寫一個連續(xù)的轉(zhuǎn)換。這樣的話,結(jié)果中的一個 thenApply方法就會傳遞給該系列的另外一個 thenApply方法。
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
}).thenApply(name -> {
return "Hello " + name;
}).thenApply(greeting -> {
return greeting + ", Welcome to the CalliCoder Blog";
});
System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog
2. thenAccept() 和 thenRun() 如果你不想從你的回調(diào)函數(shù)中返回任何東西,僅僅想在Future完成后運行一些代碼片段,你可以使用thenAccept()和 thenRun()方法,這些方法經(jīng)常在調(diào)用鏈的最末端的最后一個回調(diào)函數(shù)中使用。 CompletableFuture.thenAccept()持有一個Consumer<T>,返回一個CompletableFuture<Void>。它可以訪問CompletableFuture的結(jié)果:
// thenAccept() example
CompletableFuture.supplyAsync(() -> {
return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
System.out.println("Got product detail from remote service " + product.getName())
});
雖然thenAccept()可以訪問CompletableFuture的結(jié)果,但thenRun()不能訪Future的結(jié)果,它持有一個Runnable返回CompletableFuture:
// thenRun() example
CompletableFuture.supplyAsync(() -> {
// Run some computation
}).thenRun(() -> {
// Computation Finished.
});
異步回調(diào)方法的筆記 CompletableFuture提供的所有回調(diào)方法都有兩個變體:
// thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)這些異步回調(diào)變體通過在獨立的線程中執(zhí)行回調(diào)任務(wù)幫助你進(jìn)一步執(zhí)行并行計算。 以下示例:
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Some Result"
}).thenApply(result -> {
/*
Executed in the same thread where the supplyAsync() task is executed
or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
*/
return "Processed Result"
})
在以上示例中,在thenApply()中的任務(wù)和在supplyAsync()中的任務(wù)執(zhí)行在相同的線程中。任何supplyAsync()立即執(zhí)行完成,那就是執(zhí)行在主線程中(嘗試刪除sleep測試下)。 為了控制執(zhí)行回調(diào)任務(wù)的線程,你可以使用異步回調(diào)。如果你使用thenApplyAsync()回調(diào),將從ForkJoinPool.commonPool()獲取不同的線程執(zhí)行。
CompletableFuture.supplyAsync(() -> {
return "Some Result"
}).thenApplyAsync(result -> {
// Executed in a different thread from ForkJoinPool.commonPool()
return "Processed Result"
})
此外,如果你傳入一個Executor到thenApplyAsync()回調(diào)中,,任務(wù)將從Executor線程池獲取一個線程執(zhí)行。
Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
return "Some result"
}).thenApplyAsync(result -> {
// Executed in a thread obtained from the executor
return "Processed Result"
}, executor);
組合兩個CompletableFuture
1. 使用 thenCompose()組合兩個獨立的future 假設(shè)你想從一個遠(yuǎn)程API中獲取一個用戶的詳細(xì)信息,一旦用戶信息可用,你想從另外一個服務(wù)中獲取他的貸方。 考慮下以下兩個方法getUserDetail()和getCreditRating()的實現(xiàn):
CompletableFuture<User> getUsersDetail(String userId) {
return CompletableFuture.supplyAsync(() -> {
UserService.getUserDetails(userId);
});
}
CompletableFuture<Double> getCreditRating(User user) {
return CompletableFuture.supplyAsync(() -> {
CreditRatingService.getCreditRating(user);
});
}
現(xiàn)在讓我們弄明白當(dāng)使用了thenApply()后是否會達(dá)到我們期望的結(jié)果-
CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));
在更早的示例中,Supplier函數(shù)傳入thenApply將返回一個簡單的值,但是在本例中,將返回一個CompletableFuture。以上示例的最終結(jié)果是一個嵌套的CompletableFuture。 如果你想獲取最終的結(jié)果給最頂層future,使用 thenCompose()方法代替-
CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));
因此,規(guī)則就是-如果你的回調(diào)函數(shù)返回一個CompletableFuture,但是你想從CompletableFuture鏈中獲取一個直接合并后的結(jié)果,這時候你可以使用thenCompose()。
2. 使用thenCombine()組合兩個獨立的 future 雖然thenCompose()被用于當(dāng)一個future依賴另外一個future的時候用來組合兩個future。thenCombine()被用來當(dāng)兩個獨立的Future都完成的時候,用來做一些事情。
System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return 65.0;
});
System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return 177.8;
});
System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
.thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
Double heightInMeter = heightInCm/100;
return weightInKg/(heightInMeter*heightInMeter);
});
System.out.println("Your BMI is - " + combinedFuture.get());
當(dāng)兩個Future都完成的時候,傳給``thenCombine()的回調(diào)函數(shù)將被調(diào)用。
組合多個CompletableFuture
我們使用thenCompose()和 thenCombine()把兩個CompletableFuture組合在一起?,F(xiàn)在如果你想組合任意數(shù)量的CompletableFuture,應(yīng)該怎么做?我們可以使用以下兩個方法組合任意數(shù)量的CompletableFuture。
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
1. CompletableFuture.allOf() CompletableFuture.allOf的使用場景是當(dāng)你一個列表的獨立future,并且你想在它們都完成后并行的做一些事情。
假設(shè)你想下載一個網(wǎng)站的100個不同的頁面。你可以串行的做這個操作,但是這非常消耗時間。因此你想寫一個函數(shù),傳入一個頁面鏈接,返回一個CompletableFuture,異步的下載頁面內(nèi)容。
CompletableFuture<String> downloadWebPage(String pageLink) {
return CompletableFuture.supplyAsync(() -> {
// Code to download and return the web page's content
});
}
現(xiàn)在,當(dāng)所有的頁面已經(jīng)下載完畢,你想計算包含關(guān)鍵字CompletableFuture頁面的數(shù)量??梢允褂?code>CompletableFuture.allOf()達(dá)成目的。
List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links
// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
.map(webPageLink -> downloadWebPage(webPageLink))
.collect(Collectors.toList());
// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);
使用CompletableFuture.allOf()的問題是它返回CompletableFuture。但是我們可以通過寫一些額外的代碼來獲取所有封裝的CompletableFuture結(jié)果。
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
return pageContentFutures.stream()
.map(pageContentFuture -> pageContentFuture.join())
.collect(Collectors.toList());
});
花一些時間理解下以上代碼片段。當(dāng)所有future完成的時候,我們調(diào)用了future.join(),因此我們不會在任何地方阻塞。
join()方法和get()方法非常類似,這唯一不同的地方是如果最頂層的CompletableFuture完成的時候發(fā)生了異常,它會拋出一個未經(jīng)檢查的異常。
現(xiàn)在讓我們計算包含關(guān)鍵字頁面的數(shù)量。
// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
return pageContents.stream()
.filter(pageContent -> pageContent.contains("CompletableFuture"))
.count();
});
System.out.println("Number of Web Pages having CompletableFuture keyword - " +
countFuture.get());
2. CompletableFuture.anyOf()
CompletableFuture.anyOf()和其名字介紹的一樣,當(dāng)任何一個CompletableFuture完成的時候【相同的結(jié)果類型】,返回一個新的CompletableFuture。以下示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 3";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(anyOfFuture.get()); // Result of Future 2
在以上示例中,當(dāng)三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。因為future2的休眠時間最少,因此她最先完成,最終的結(jié)果將是future2的結(jié)果。
CompletableFuture.anyOf()傳入一個Future可變參數(shù),返回CompletableFuture。CompletableFuture.anyOf()的問題是如果你的CompletableFuture返回的結(jié)果是不同類型的,這時候你講會不知道你最終CompletableFuture是什么類型。
CompletableFuture 異常處理
我們探尋了怎樣創(chuàng)建CompletableFuture,轉(zhuǎn)換它們,并組合多個CompletableFuture?,F(xiàn)在讓我們弄明白當(dāng)發(fā)生錯誤的時候我們應(yīng)該怎么做。
首先讓我們明白在一個回調(diào)鏈中錯誤是怎么傳遞的。思考下以下回調(diào)鏈:
CompletableFuture.supplyAsync(() -> {
// Code which might throw an exception
return "Some result";
}).thenApply(result -> {
return "processed result";
}).thenApply(result -> {
return "result after further processing";
}).thenAccept(result -> {
// do something with the final result
});
如果在原始的supplyAsync()任務(wù)中發(fā)生一個錯誤,這時候沒有任何thenApply會被調(diào)用并且future將以一個異常結(jié)束。如果在第一個thenApply發(fā)生錯誤,這時候第二個和第三個將不會被調(diào)用,同樣的,future將以異常結(jié)束。
1. 使用 exceptionally() 回調(diào)處理異常 exceptionally()回調(diào)給你一個從原始Future中生成的錯誤恢復(fù)的機會。你可以在這里記錄這個異常并返回一個默認(rèn)值。
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "Adult";
} else {
return "Child";
}
}).exceptionally(ex -> {
System.out.println("Oops! We have an exception - " + ex.getMessage());
return "Unknown!";
});
System.out.println("Maturity : " + maturityFuture.get());
2. 使用 handle() 方法處理異常 API提供了一個更通用的方法 - handle()從異常恢復(fù),無論一個異常是否發(fā)生它都會被調(diào)用。
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "Adult";
} else {
return "Child";
}
}).handle((res, ex) -> {
if(ex != null) {
System.out.println("Oops! We have an exception - " + ex.getMessage());
return "Unknown!";
}
return res;
});
System.out.println("Maturity : " + maturityFuture.get());
如果異常發(fā)生,res參數(shù)將是 null,否則,ex將是 null。
參考文章:
https://juejin.im/post/5adbf8226fb9a07aac240a67
http://www.importnew.com/28319.html
completablefuture和parallel stream比較
如果你是I/O密集型的計算,可以使用自定義線程池,增加線程池大小,如果是CPU密集型計算,就不能增加比你的處理器個數(shù)再多的線程了,還是使用parallel stream比較好一些。
參考文章:
https://www.jdon.com/idea/java/java-8-completablefuture-vs-parallel-stream.html