1 Future介紹
1.1 Future的主要功能
JDK5新增了Future接口,用于描述一個(gè)異步計(jì)算的結(jié)果。
Future就是對(duì)于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果等操作。必要時(shí)可以通過(guò)get方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。
Future類位于java.util.concurrent包下,它是一個(gè)接口:
public interface Future<V> {
/**
* 方法用來(lái)取消任務(wù),如果取消任務(wù)成功則返回true,如果取消任務(wù)失敗則返回false。 *
* @param mayInterruptIfRunning 表示是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務(wù),如果設(shè)置true,則表示可以取消正在執(zhí)行過(guò)程中的任務(wù)。
* @return 如果任務(wù)已經(jīng)完成,則無(wú)論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經(jīng)完成的任務(wù)會(huì)返回false;
* 如果任務(wù)正在執(zhí)行,若mayInterruptIfRunning設(shè)置為true,則返回true,若mayInterruptIfRunning設(shè)置為false,則返回false;
* 如果任務(wù)還沒有執(zhí)行,則無(wú)論mayInterruptIfRunning為true還是false,肯定返回true。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 方法表示任務(wù)是否被取消成功
* @return 如果在任務(wù)正常完成前被取消成功,則返回 true
*/
boolean isCancelled();
/**
* 方法表示任務(wù)是否已經(jīng)完成
* @return 若任務(wù)完成,則返回true
*/
boolean isDone();
/**
* 方法用來(lái)獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)產(chǎn)生阻塞,會(huì)一直等到任務(wù)執(zhí)行完畢才返回
* @return 任務(wù)執(zhí)行的結(jié)果值
* @throws InterruptedException 線程被中斷異常
* @throws ExecutionException 任務(wù)執(zhí)行異常,如果任務(wù)被取消,還會(huì)拋出CancellationException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 用來(lái)獲取執(zhí)行結(jié)果,如果在指定時(shí)間內(nèi),還沒獲取到結(jié)果,就直接返回null(并不是拋出異常,需要注意)。
* @param timeout 超時(shí)時(shí)間
* @param unit 超時(shí)單位
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException 如果計(jì)算超時(shí),將拋出TimeoutException(待確認(rèn))
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
從上面方法的注釋可以看出,F(xiàn)utrue提供了三種功能:
1)判斷任務(wù)是否完成;
2)能夠中斷任務(wù);
3)能夠獲取任務(wù)執(zhí)行結(jié)果。(最為常用的)
1.2 Future的局限性
從本質(zhì)上說(shuō),F(xiàn)uture表示一個(gè)異步計(jì)算的結(jié)果。它提供了isDone()來(lái)檢測(cè)計(jì)算是否已經(jīng)完成,并且在計(jì)算結(jié)束后,可以通過(guò)get()方法來(lái)獲取計(jì)算結(jié)果。在異步計(jì)算中,F(xiàn)uture確實(shí)是個(gè)非常優(yōu)秀的接口。但是,它的本身也確實(shí)存在著許多限制:
- 并發(fā)執(zhí)行多任務(wù):Future只提供了get()方法來(lái)獲取結(jié)果,并且是阻塞的。所以,除了等待你別無(wú)他法;當(dāng) for 循環(huán)批量獲取 Future 的結(jié)果時(shí)容易 block,因此get 方法調(diào)用時(shí)應(yīng)使用 timeout 限制。
- 無(wú)法對(duì)多個(gè)任務(wù)進(jìn)行鏈?zhǔn)秸{(diào)用:如果你希望在計(jì)算任務(wù)完成后執(zhí)行特定動(dòng)作,比如發(fā)郵件,但Future卻沒有提供這樣的能力;
- 無(wú)法組合多個(gè)任務(wù):如果你運(yùn)行了10個(gè)任務(wù),并期望在它們?nèi)繄?zhí)行結(jié)束后執(zhí)行特定動(dòng)作,那么在Future中這是無(wú)能為力的;
- 沒有異常處理:Future接口中沒有關(guān)于異常處理的方法;
2 CompletableFuture介紹
雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過(guò)阻塞或者輪詢的方式得到任務(wù)的結(jié)果。如果遇到前面的task執(zhí)行較慢時(shí)需要阻塞等待前面的task執(zhí)行完后面task才能取得結(jié)果。
阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會(huì)耗費(fèi)無(wú)謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果。而CompletableFuture的主要功能就是一邊生成任務(wù),一邊獲取任務(wù)的返回值。讓兩件事分開執(zhí)行,任務(wù)之間不會(huì)互相阻塞,可以實(shí)現(xiàn)先執(zhí)行完的先取結(jié)果,不再依賴任務(wù)順序了。
2.1 CompletableFuture原理
內(nèi)部通過(guò)阻塞隊(duì)列+FutureTask,實(shí)現(xiàn)了任務(wù)先完成可優(yōu)先獲取到,即結(jié)果按照完成先后順序排序,內(nèi)部有一個(gè)先進(jìn)先出的阻塞隊(duì)列,用于保存已經(jīng)執(zhí)行完成的Future,通過(guò)調(diào)用它的take方法或poll方法可以獲取到一個(gè)已經(jīng)執(zhí)行完成的Future,進(jìn)而通過(guò)調(diào)用Future接口實(shí)現(xiàn)類的get方法獲取最終的結(jié)果。
2.2 應(yīng)用場(chǎng)景
當(dāng)需要批量提交異步任務(wù)的時(shí)候建議使用CompletableFuture。CompletableFuture將線程池Executor和阻塞隊(duì)列BlockingQueue的功能融合在了一起,能夠讓批量異步任務(wù)的管理更簡(jiǎn)單。
CompletableFuture能夠讓異步任務(wù)的執(zhí)行結(jié)果有序化。先執(zhí)行完的先進(jìn)入阻塞隊(duì)列,利用這個(gè)特性,你可以輕松實(shí)現(xiàn)后續(xù)處理的有序性,避免無(wú)謂的等待。
線程池隔離。CompletionService支持自己創(chuàng)建線程池,這種隔離性能避免幾個(gè)特別耗時(shí)的任務(wù)拖垮整個(gè)應(yīng)用的風(fēng)險(xiǎn)。
2.3 CompletableFuture使用詳解
簡(jiǎn)單的任務(wù),用Future獲取結(jié)果還好,但我們并行提交的多個(gè)異步任務(wù),往往并不是獨(dú)立的,很多時(shí)候業(yè)務(wù)邏輯處理存在串行[依賴]、并行、聚合的關(guān)系。如果要我們手動(dòng)用 Fueture 實(shí)現(xiàn),是非常麻煩的。
CompletableFuture是Future接口的擴(kuò)展和增強(qiáng)。CompletableFuture實(shí)現(xiàn)了Future接口,并在此基礎(chǔ)上進(jìn)行了豐富地?cái)U(kuò)展,完美地彌補(bǔ)了Future上述的種種問(wèn)題。
更為重要的是,CompletableFuture實(shí)現(xiàn)了對(duì)任務(wù)的編排能力。借助這項(xiàng)能力,我們可以輕松地組織不同任務(wù)的運(yùn)行順序、規(guī)則以及方式。從某種程度上說(shuō),這項(xiàng)能力是它的核心能力。而在以往,雖然通過(guò)CountDownLatch等工具類也可以實(shí)現(xiàn)任務(wù)的編排,但需要復(fù)雜的邏輯處理,不僅耗費(fèi)精力且難以維護(hù)。
3 CompletableFuture應(yīng)用梳理


很多方法上,可以指定線程池,而沒有指定Executor的方法會(huì)使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼。如果指定線程池,則使用指定的線程池運(yùn)行。
默認(rèn)情況下 CompletableFuture 會(huì)使用公共的 ForkJoinPool 線程池,這個(gè)線程池默認(rèn)創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過(guò) JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來(lái)設(shè)置 ForkJoinPool 線程池的線程數(shù))。
如果所有 CompletableFuture 共享一個(gè)線程池,那么一旦有任務(wù)執(zhí)行一些很慢的 I/O 操作,就會(huì)導(dǎo)致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓,進(jìn)而影響整個(gè)系統(tǒng)的性能。所以,強(qiáng)烈建議要根據(jù)不同的業(yè)務(wù)類型創(chuàng)建不同的線程池,以避免互相干擾。
-
等我們使用的時(shí)候,會(huì)注意到
CompletableFuture的方法命名規(guī)則:-
xxx():表示該方法將繼續(xù)在已有的線程中執(zhí)行;
-
xxxAsync():表示可能會(huì)使用其它的線程去執(zhí)行(如果使用相同的線程池,也可能會(huì)被同一個(gè)線程選中執(zhí)行)。
4 使用案例
4.1 基礎(chǔ)使用案例
串行執(zhí)行:
定義兩個(gè)CompletableFuture,第一個(gè)CompletableFuture根據(jù)證券名稱查詢證券代碼,第二個(gè)CompletableFuture根據(jù)證券代碼查詢證券價(jià)格,這兩個(gè)CompletableFuture實(shí)現(xiàn)串行操作如下:
CompletableFuture.supplyAsync():創(chuàng)建一個(gè)包含返回值的異步任務(wù);
thenApplyAsync():獲取前一個(gè)線程的結(jié)果進(jìn)行轉(zhuǎn)換,有返回值;
thenAccept():獲取前一個(gè)線程的結(jié)果進(jìn)行消費(fèi),無(wú)返回值。
public class Demo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 第一個(gè)任務(wù):創(chuàng)建一個(gè)包含返回值的CompletableFuture
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中國(guó)石油");
});
// cfQuery成功后繼續(xù)執(zhí)行下一個(gè)任務(wù):
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印結(jié)果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主線程不要結(jié)束,否則CompletableFuture默認(rèn)使用的線程池會(huì)立刻關(guān)閉:
countDownLatch.await();
}
public static void main2(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> queryCode("中國(guó)石油"))
.thenApplyAsync((code) -> fetchPrice(code))
.thenAccept((result) -> System.out.println("price: " + result));
countDownLatch.await();
}
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String code = "601857";
System.out.println("查詢證券編碼,name:" + name + ",code:" + code);
return code;
}
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
Double price = 5 + Math.random() * 20;
System.out.println("根據(jù)證券編碼查詢價(jià)格,code:" + code + ";price:" + price);
return price;
}
}
并行執(zhí)行:
除了串行執(zhí)行外,多個(gè)CompletableFuture還可以并行執(zhí)行。例如,我們考慮這樣的場(chǎng)景:
同時(shí)從新浪和網(wǎng)易查詢證券代碼,只要任意一個(gè)返回結(jié)果,就進(jìn)行下一步查詢價(jià)格,查詢價(jià)格也同時(shí)從新浪和網(wǎng)易查詢,只要任意一個(gè)返回結(jié)果,就完成操作:
CompletableFuture.supplyAsync():創(chuàng)建一個(gè)包含返回值的異步任務(wù);
CompletableFuture.anyOf(cf1,cf2,cf3).join():多個(gè)異步線程任一執(zhí)行完即返回,有返回值Object;
thenAccept():獲取前一個(gè)線程的結(jié)果進(jìn)行消費(fèi),無(wú)返回值。
public class Demo2 {
private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 兩個(gè)CompletableFuture執(zhí)行異步查詢:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中國(guó)石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中國(guó)石油", "https://money.163.com/code/");
});
// 用anyOf合并為一個(gè)新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 兩個(gè)CompletableFuture執(zhí)行異步查詢:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并為一個(gè)新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最終結(jié)果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主線程不要立刻結(jié)束,否則CompletableFuture默認(rèn)使用的線程池會(huì)立刻關(guān)閉:
COUNT_DOWN_LATCH.await();
}
public static void main2(String[] args) throws Exception {
// 兩個(gè)CompletableFuture執(zhí)行異步查詢:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> queryCode("中國(guó)石油", "https://finance.sina.com.cn/code/"));
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> queryCode("中國(guó)石油", "https://money.163.com/code/"));
// 用anyOf合并為一個(gè)新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 兩個(gè)CompletableFuture執(zhí)行異步查詢:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://finance.sina.com.cn/price/"));
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://money.163.com/price/"));
// 用anyOf合并為一個(gè)新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最終結(jié)果:
cfFetch.thenAccept((result) -> System.out.println("price: " + result));
// 主線程不要立刻結(jié)束,否則CompletableFuture默認(rèn)使用的線程池會(huì)立刻關(guān)閉:
COUNT_DOWN_LATCH.await();
}
static String queryCode(String name, String url) {
System.out.println(Thread.currentThread().getName() + " query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
String code = "601857";
System.out.println(Thread.currentThread().getName() + " 查詢證券編碼,name:" + name + ",code:" + code);
return code;
}
static Double fetchPrice(String code, String url) {
System.out.println(Thread.currentThread().getName() + " query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
Double price = 5 + Math.random() * 20;
System.out.println(Thread.currentThread().getName() + " 根據(jù)證券編碼查詢價(jià)格,code:" + code + ";price:" + price);
return price;
}
}
上述邏輯實(shí)現(xiàn)的異步查詢規(guī)則實(shí)際上是:
4.2 實(shí)現(xiàn)最優(yōu)的“燒水泡茶”程序

public class Demo3 {
public static void main(String[] args) {
//任務(wù)1:洗水壺 -> 燒開水
CompletableFuture<String> f11 = CompletableFuture.supplyAsync(() -> {
System.out.println("T1:洗水壺...開始");
sleep(1000);
return "T1:洗水壺...完成";
});
CompletableFuture<String> f12 = f11.thenApply((f11Result) -> {
System.out.println(f11Result);
System.out.println("T1:燒開水...開始");
sleep(3000);
return "T1:燒開水...完成";
});
//任務(wù)2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<Void> f21 = CompletableFuture.runAsync(() -> {
System.out.println("==============T2:洗茶壺...開始");
sleep(1000);
System.out.println("==============T2:洗茶壺...完成");
});
CompletableFuture<Void> f22 = f21.thenRun(() -> {
System.out.println("==============T2:洗茶杯...開始");
sleep(2000);
System.out.println("==============T2:洗茶杯...完成");
});
CompletableFuture<String> f23 = f22.thenApply(result -> {
System.out.println("==============T2:拿茶葉...開始");
sleep(1000);
System.out.println("==============T2:拿茶葉...完成");
return "龍井";
});
//任務(wù)3:任務(wù)1和任務(wù)2完成后執(zhí)行:泡茶
CompletableFuture<String> f3 = f12.thenCombine(f23, (f1Result, f2Result) -> {
System.out.println(f1Result);
System.out.println("************T2:拿到茶葉:result" + f2Result);
System.out.println("************T3:泡茶...,什么茶:" + f2Result);
return "上茶:" + f2Result;
});
//等待任務(wù)3執(zhí)行結(jié)果
System.out.println(f3.join());
}
static void sleep(int t) {
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}