下面看一個(gè)服務(wù)的調(diào)用鏈

設(shè)想一下這5個(gè)查詢服務(wù),平均每次消耗50ms,那么本次調(diào)用至少是250ms,我們細(xì)想一下,在這個(gè)這五個(gè)服務(wù)其實(shí)并沒(méi)有任何的依賴,誰(shuí)先獲取誰(shuí)后獲取都可以,那么我們可以想想,是否可以用多重影分身之術(shù),同時(shí)獲取這五個(gè)服務(wù)的信息呢??jī)?yōu)化如下:

將這五個(gè)查詢服務(wù)并行查詢,在理想情況下可以優(yōu)化至50ms。當(dāng)然說(shuō)起來(lái)簡(jiǎn)單,我們真正如何落地呢?
1.CountDownLatch
CountDownLatch,可以將其看成是一個(gè)計(jì)數(shù)器,await()方法可以阻塞至超時(shí)或者計(jì)數(shù)器減至0,其他線程當(dāng)完成自己目標(biāo)的時(shí)候可以減少1,利用這個(gè)機(jī)制我們可以將其用來(lái)做并發(fā)。 可以用如下的代碼實(shí)現(xiàn)我們上面的下訂單的需求:
public class CountDownTask {
private static final int CORE_POOL_SIZE = 4;
private static final int MAX_POOL_SIZE = 12;
private static final long KEEP_ALIVE_TIME = 5L;
private final static int QUEUE_SIZE = 1600;
protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE)
);
public static void main(String[] args) throws InterruptedException {
// 新建一個(gè)為5的計(jì)數(shù)器
CountDownLatch countDownLatch = new CountDownLatch(5);
OrderInfo orderInfo = new OrderInfo();
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Customer,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setCustomerInfo(new CustomerInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Discount,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setDiscountInfo(new DiscountInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Food,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setFoodListInfo(new FoodListInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Tenant,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setTenantInfo(new TenantInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)OtherInfo,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setOtherInfo(new OtherInfo());
countDownLatch.countDown();
});
countDownLatch.await(1, TimeUnit.SECONDS);
System.out.println("主線程:"+ Thread.currentThread().getName());
}
}
建立一個(gè)線程池(具體配置根據(jù)具體業(yè)務(wù),具體機(jī)器配置),進(jìn)行并發(fā)的執(zhí)行我們的任務(wù)(生成用戶信息,菜品信息等),最后利用await方法阻塞等待結(jié)果成功返回。
2.CompletableFuture
CountDownLatch雖然能實(shí)現(xiàn)我們需要滿足的功能但是其任然有個(gè)問(wèn)題是,在我們的業(yè)務(wù)代碼需要耦合CountDownLatch的代碼,比如在我們獲取用戶信息之后我們會(huì)執(zhí)行countDownLatch.countDown(),很明顯我們的業(yè)務(wù)代碼顯然不應(yīng)該關(guān)心這一部分邏輯,并且在開(kāi)發(fā)的過(guò)程中萬(wàn)一寫(xiě)漏了,那我們的await方法將只會(huì)被各種異常喚醒。
在JDK1.8中提供了一個(gè)類CompletableFuture,它是一個(gè)多功能的非阻塞的Future。(什么是Future:用來(lái)代表異步結(jié)果,并且提供了檢查計(jì)算完成,等待完成,檢索結(jié)果完成等方法。)
我們將每個(gè)任務(wù)的計(jì)算完成的結(jié)果都用CompletableFuture來(lái)表示,利用CompletableFuture.allOf匯聚成一個(gè)大的CompletableFuture,那么利用get()方法就可以阻塞。
public class CompletableFutureParallel {
private static final int CORE_POOL_SIZE = 4;
private static final int MAX_POOL_SIZE = 12;
private static final long KEEP_ALIVE_TIME = 5L;
private final static int QUEUE_SIZE = 1600;
protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE)
);
public static void main(String[] args) throws Exception {
OrderInfo orderInfo = new OrderInfo();
List futures = new ArrayList<>();
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Customer,線程名字為:" + Thread.currentThread().getName());
orderInfo.setCustomerInfo(new CustomerInfo());
}, THREAD_POOL));
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Discount,線程名字為:" + Thread.currentThread().getName());
orderInfo.setDiscountInfo(new DiscountInfo());
}, THREAD_POOL));
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Food,線程名字為:" + Thread.currentThread().getName());
orderInfo.setFoodListInfo(new FoodListInfo());
}, THREAD_POOL));
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Other,線程名字為:" + Thread.currentThread().getName());
orderInfo.setOtherInfo(new OtherInfo());
}, THREAD_POOL));
CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
allDoneFuture.get(10, TimeUnit.SECONDS);
System.out.println(orderInfo);
}
}
可以看見(jiàn)我們使用CompletableFuture能很快的完成的需求,當(dāng)然這還不夠。
3.Fork/Join
我們上面用CompletableFuture完成了我們對(duì)多組任務(wù)并行執(zhí)行,但是其依然是依賴我們的線程池,在我們的線程池中使用的是阻塞隊(duì)列,也就是當(dāng)我們某個(gè)線程執(zhí)行完任務(wù)的時(shí)候需要通過(guò)這個(gè)阻塞隊(duì)列進(jìn)行,那么肯定會(huì)發(fā)生競(jìng)爭(zhēng),所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

ForkJoinPool中每個(gè)線程都有自己的工作隊(duì)列,并且采用Work-Steal算法防止線程饑餓。 Worker線程用LIFO的方法取出任務(wù),但是會(huì)用FIFO的方法去偷取別人隊(duì)列的任務(wù),這樣就減少了鎖的沖突。

網(wǎng)上這個(gè)框架的例子很多,我們看看如何使用代碼其完成我們上面的下訂單需求:
public class OrderTask extends RecursiveTask {
@Override
protected OrderInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
CustomerTask customerTask = new CustomerTask();
TenantTask tenantTask = new TenantTask();
DiscountTask discountTask = new DiscountTask();
FoodTask foodTask = new FoodTask();
OtherTask otherTask = new OtherTask();
invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);
OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());
returnorderInfo;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1);
System.out.println(forkJoinPool.invoke(new OrderTask()));
}
}
class CustomerTask extends RecursiveTask {
@Override
protected CustomerInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new CustomerInfo();
}
}
class TenantTask extends RecursiveTask {
@Override
protected TenantInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new TenantInfo();
}
}
class DiscountTask extends RecursiveTask {
@Override
protected DiscountInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new DiscountInfo();
}
}
class FoodTask extends RecursiveTask {
@Override
protected FoodListInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new FoodListInfo();
}
}
class OtherTask extends RecursiveTask {
@Override
protected OtherInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new OtherInfo();
}
}
我們定義一個(gè)OrderTask并且定義五個(gè)獲取信息的任務(wù),在compute中分別fork執(zhí)行這五個(gè)任務(wù),最后在將這五個(gè)任務(wù)的結(jié)果通過(guò)Join獲得,最后完成我們的并行化的需求。
4. parallelStream
在jdk1.8中提供了并行流的API,當(dāng)我們使用集合的時(shí)候能很好的進(jìn)行并行處理,下面舉了一個(gè)簡(jiǎn)單的例子從1加到100:
public class ParallelStream {
public static void main(String[] args) {
ArrayList list = new ArrayList();
for (int i = 1; i <= 100; i++) {
list.add(i);
}
LongAdder sum = new LongAdder();
list.parallelStream().forEach(integer -> {
System.out.println("當(dāng)前線程" + Thread.currentThread().getName());
sum.add(integer);
});
System.out.println(sum);
}
}
parallelStream中底層使用的那一套也是Fork/Join的那一套,默認(rèn)的并發(fā)程度是可用CPU數(shù)-1。
5.分片
可以想象有這么一個(gè)需求,每天定時(shí)對(duì)id在某個(gè)范圍之間的用戶發(fā)券,比如這個(gè)范圍之間的用戶有幾百萬(wàn),如果給一臺(tái)機(jī)器發(fā)的話,可能全部發(fā)完需要很久的時(shí)間,所以分布式調(diào)度框架比如:elastic-job。都提供了分片的功能,比如你用50臺(tái)機(jī)器,那么id%50=0的在第0臺(tái)機(jī)器上,=1的在第1臺(tái)機(jī)器上發(fā)券,那么我們的執(zhí)行時(shí)間其實(shí)就分?jǐn)偟搅瞬煌臋C(jī)器上了。
并行化注意事項(xiàng)
線程安全:在parallelStream中我們列舉的代碼中使用的是LongAdder,并沒(méi)有直接使用我們的Integer和Long,這個(gè)是因?yàn)樵诙嗑€程環(huán)境下Integer和Long線程不安全。所以線程安全我們需要特別注意。
合理參數(shù)配置:可以看見(jiàn)我們需要配置的參數(shù)比較多,比如我們的線程池的大小,等待隊(duì)列大小,并行度大小以及我們的等待超時(shí)時(shí)間等等,我們都需要根據(jù)自己的業(yè)務(wù)不斷的調(diào)優(yōu)防止出現(xiàn)隊(duì)列不夠用或者超時(shí)時(shí)間不合理等等。
CompletableFuture詳細(xì)介紹
JDK8以前的Future
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
doSomethingElse();//在我們異步操作的同時(shí)一樣可以做其他操作
try {
String res = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
上面展示了我們的線程可以并發(fā)方式調(diào)用另一個(gè)線程去做我們耗時(shí)的操作。當(dāng)我們必須依賴我們的異步結(jié)果的時(shí)候我們就可以調(diào)用get方法去獲得。當(dāng)我們調(diào)用get方法的時(shí)候如果我們的任務(wù)完成就可以立馬返回,但是如果任務(wù)沒(méi)有完成就會(huì)阻塞,直到超時(shí)為止。
Future底層是怎么實(shí)現(xiàn)的呢? 我們首先來(lái)到我們ExecutorService的代碼中submit方法這里會(huì)返回一個(gè)Future
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
在sumbmit中會(huì)對(duì)我們的Callable進(jìn)行包裝封裝成我們的FutureTask,我們最后的Future其實(shí)也是Future的實(shí)現(xiàn)類FutureTask,F(xiàn)utureTask實(shí)現(xiàn)了Runnable接口所以這里直接調(diào)用execute。在FutureTask代碼中的run方法代碼如下:
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
.......
}
可以看見(jiàn)當(dāng)我們執(zhí)行完成之后會(huì)set(result)來(lái)通知我們的結(jié)果完成了。set(result)代碼如下:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
首先用CAS置換狀態(tài)為完成,以及替換結(jié)果,當(dāng)替換結(jié)果完成之后,才會(huì)替換為我們的最終狀態(tài),這里主要是怕我們?cè)O(shè)置完COMPLETING狀態(tài)之后最終值還沒(méi)有真正的賦值出去,而我們的get就去使用了,所以還會(huì)有個(gè)最終狀態(tài)。我們的get()方法的代碼如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
首先獲得當(dāng)前狀態(tài),然后判斷狀態(tài)是否完成,如果沒(méi)有完成則進(jìn)入awaitDone循環(huán)等待,這也是我們阻塞的代碼,然后返回我們的最終結(jié)果。
缺陷
我們的Future使用很簡(jiǎn)單,這也導(dǎo)致了如果我們想完成一些復(fù)雜的任務(wù)可能就比較難。比如下面一些例子:
將兩個(gè)異步計(jì)算合成一個(gè)異步計(jì)算,這兩個(gè)異步計(jì)算互相獨(dú)立,同時(shí)第二個(gè)又依賴第一個(gè)的結(jié)果。
當(dāng)Future集合中某個(gè)任務(wù)最快結(jié)束時(shí),返回結(jié)果。
等待Future結(jié)合中的所有任務(wù)都完成。
通過(guò)編程方式完成一個(gè)Future任務(wù)的執(zhí)行。
應(yīng)對(duì)Future的完成時(shí)間。也就是我們的回調(diào)通知。
CompletableFuture
CompletableFuture是JDK8提出的一個(gè)支持非阻塞的多功能的Future,同樣也是實(shí)現(xiàn)了Future接口。
下面會(huì)寫(xiě)一個(gè)比較簡(jiǎn)單的例子:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.complete(Thread.currentThread().getName());
}).start();
doSomethingelse();//做你想做的其他操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
用法上來(lái)說(shuō)和Future有一點(diǎn)不同,我們這里fork了一個(gè)新的線程來(lái)完成我們的異步操作,在異步操作中我們會(huì)設(shè)置值,然后在外部做我們其他操作。在complete中會(huì)用CAS替換result,然后當(dāng)我們get如果可以獲取到值得時(shí)候就可以返回了。
錯(cuò)誤處理
上面介紹了正常情況下但是當(dāng)我們?cè)谖覀儺惒骄€程中產(chǎn)生了錯(cuò)誤的話就會(huì)非常的不幸,錯(cuò)誤的異常不會(huì)告知給你,會(huì)被扼殺在我們的異步線程中,而我們的get方法會(huì)被阻塞。
對(duì)于我們的CompletableFuture提供了completeException方法可以讓我們返回我們異步線程中的異常,代碼如下:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.completeExceptionally(new RuntimeException("error"));
completableFuture.complete(Thread.currentThread().getName());
}).start();
doSomethingelse();//做你想做的耗時(shí)操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
--------------
輸出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
at futurepackge.jdk8Future.main(jdk8Future.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
在我們新建的異步線程中直接New一個(gè)異常拋出,在我們客戶端中依然可以獲得異常。
工廠方法創(chuàng)建CompletableFuture
我們的上面的代碼雖然不復(fù)雜,但是我們的java8依然對(duì)其提供了大量的工廠方法,用這些方法更容易完成整個(gè)流程。如下面的例子:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
return Thread.currentThread().getName();
});
doSomethingelse();//做你想做的耗時(shí)操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
---------
輸出:
ForkJoinPool.commonPool-worker-1
上面的例子通過(guò)工廠方法supplyAsync提供了一個(gè)Completable,在異步線程中的輸出是ForkJoinPool可以看出當(dāng)我們不指定線程池的時(shí)候會(huì)使用ForkJoinPool,而我們上面的compelte的操作在我們的run方法中做了,源代碼如下:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null;
fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
上面代碼中通過(guò)d.completeValue(f.get());設(shè)置了我們的值。同樣的構(gòu)造方法還有runasync等等。
計(jì)算結(jié)果完成時(shí)的處理
當(dāng)CompletableFuture計(jì)算結(jié)果完成時(shí),我們需要對(duì)結(jié)果進(jìn)行處理,或者當(dāng)CompletableFuture產(chǎn)生異常的時(shí)候需要對(duì)異常進(jìn)行處理。有如下幾種方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面的四種方法都返回了CompletableFuture,當(dāng)我們Action執(zhí)行完畢的時(shí)候,future返回的值和我們?cè)嫉腃ompletableFuture的值是一樣的。上面以Async結(jié)尾的會(huì)在新的線程池中執(zhí)行,上面沒(méi)有一Async結(jié)尾的會(huì)在之前的CompletableFuture執(zhí)行的線程中執(zhí)行。例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(v);
});
System.out.println("Main" + Thread.currentThread().getName());
System.out.println(f.get());
}
exceptionally方法返回一個(gè)新的CompletableFuture,當(dāng)原始的CompletableFuture拋出異常的時(shí)候,就會(huì)觸發(fā)這個(gè)CompletableFuture的計(jì)算,調(diào)用function計(jì)算值,否則如果原始的CompletableFuture正常計(jì)算完后,這個(gè)新的CompletableFuture也計(jì)算完成,它的值和原始的CompletableFuture的計(jì)算的值相同。也就是這個(gè)exceptionally方法用來(lái)處理異常的情況。
上面我們討論了如何計(jì)算結(jié)果完成時(shí)進(jìn)行的處理,接下來(lái)我們討論如何對(duì)計(jì)算結(jié)果完成時(shí),對(duì)結(jié)果進(jìn)行轉(zhuǎn)換。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
這里同樣也是返回CompletableFuture,但是這個(gè)結(jié)果會(huì)由我們自定義返回去轉(zhuǎn)換他,同樣的不以Async結(jié)尾的方法由原來(lái)的線程計(jì)算,以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運(yùn)行。Java的CompletableFuture類總是遵循這樣的原則,下面就不一一贅述了。
例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
System.out.println(f.get());
}
上面的最終結(jié)果會(huì)輸出11,我們成功將其用兩個(gè)thenApply轉(zhuǎn)換為String。
計(jì)算結(jié)果完成時(shí)的消費(fèi)
上面已經(jīng)講了結(jié)果完成時(shí)的處理和轉(zhuǎn)換,他們最后的CompletableFuture都會(huì)返回對(duì)應(yīng)的值,這里還會(huì)有一個(gè)只會(huì)對(duì)計(jì)算結(jié)果消費(fèi)不會(huì)返回任何結(jié)果的方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
函數(shù)接口為Consumer,就知道了只會(huì)對(duì)函數(shù)進(jìn)行消費(fèi),例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
future.thenAccept(System.out::println);
}
這個(gè)方法用法很簡(jiǎn)單我就不多說(shuō)了.Accept家族還有個(gè)方法是用來(lái)合并結(jié)果當(dāng)兩個(gè)CompletionStage都正常執(zhí)行的時(shí)候就會(huì)執(zhí)行提供的action,它用來(lái)組合另外一個(gè)異步的結(jié)果。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth是當(dāng)兩個(gè)CompletionStage都正常完成計(jì)算的時(shí)候,執(zhí)行一個(gè)Runnable,這個(gè)Runnable并不使用計(jì)算的結(jié)果。 示例代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> System.out.println(x+y)).get());
}
CompletableFuture也提供了執(zhí)行Runnable的辦法,這里我們就不能使用我們future中的值了。
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
對(duì)計(jì)算結(jié)果的組合
首先是介紹一下連接兩個(gè)future的方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
對(duì)于Compose可以連接兩個(gè)CompletableFuture,其內(nèi)部處理邏輯是當(dāng)?shù)谝粋€(gè)CompletableFuture處理沒(méi)有完成時(shí)會(huì)合并成一個(gè)CompletableFuture,如果處理完成,第二個(gè)future會(huì)緊接上一個(gè)CompletableFuture進(jìn)行處理。
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
}
我們上面的thenAcceptBoth講了合并兩個(gè)future,但是沒(méi)有返回值這里將介紹一個(gè)有返回值的方法,如下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
例子比較簡(jiǎn)單如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> {return "計(jì)算結(jié)果:"+x+y;});
System.out.println(f.get());
}
上面介紹了兩個(gè)future完成的時(shí)候應(yīng)該完成的工作,接下來(lái)介紹任意一個(gè)future完成時(shí)需要執(zhí)行的工作,方法如下:
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
上面兩個(gè)是一個(gè)是純消費(fèi)不返回結(jié)果,一個(gè)是計(jì)算后返回結(jié)果。
其他方法
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算。
anyOf方法是當(dāng)任意一個(gè)CompletableFuture執(zhí)行完后就會(huì)執(zhí)行計(jì)算,計(jì)算的結(jié)果相同。