1.Future接口
1.1 什么是Future?
在jdk的官方的注解中寫(xiě)道
A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation.
在上面的注釋中我們能知道Future用來(lái)代表異步的結(jié)果,并且提供了檢查計(jì)算完成,等待完成,檢索結(jié)果完成等方法。簡(jiǎn)而言之就是提供一個(gè)異步運(yùn)算結(jié)果的一個(gè)建模。它可以讓我們把耗時(shí)的操作從我們本身的調(diào)用線程中釋放出來(lái),只需要完成后再進(jìn)行回調(diào)。就好像我們?nèi)ワ埖昀锩娉燥垼恍枰闳ブ箫?,而你這個(gè)時(shí)候可以做任何事,然后飯煮好后就會(huì)回調(diào)你去吃。
1.2 JDK8以前的Future
在JDK8以前的Future使用比較簡(jiǎn)單,我們只需要把我們需要用來(lái)異步計(jì)算的過(guò)程封裝在Callable或者Runnable中,比如一些很耗時(shí)的操作(不能占用我們的調(diào)用線程時(shí)間的),然后再將它提交給我們的線程池ExecutorService。代碼例子如下:
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
doSomethingElse();//在我們異步操作的同時(shí)一樣可以做其他操作
try {
String res = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
上面展示了我們的線程可以并發(fā)方式調(diào)用另一個(gè)線程去做我們耗時(shí)的操作。當(dāng)我們必須依賴我們的異步結(jié)果的時(shí)候我們就可以調(diào)用get方法去獲得。當(dāng)我們調(diào)用get方法的時(shí)候如果我們的任務(wù)完成就可以立馬返回,但是如果任務(wù)沒(méi)有完成就會(huì)阻塞,直到超時(shí)為止。
Future底層是怎么實(shí)現(xiàn)的呢?
我們首先來(lái)到我們ExecutorService的代碼中submit方法這里會(huì)返回一個(gè)Future
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
在sumbmit中會(huì)對(duì)我們的Callable進(jìn)行包裝封裝成我們的FutureTask,我們最后的Future其實(shí)也是Future的實(shí)現(xiàn)類FutureTask,F(xiàn)utureTask實(shí)現(xiàn)了Runnable接口所以這里直接調(diào)用execute。在FutureTask代碼中的run方法代碼如下:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
.......
}
可以看見(jiàn)當(dāng)我們執(zhí)行完成之后會(huì)set(result)來(lái)通知我們的結(jié)果完成了。set(result)代碼如下:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
首先用CAS置換狀態(tài)為完成,以及替換結(jié)果,當(dāng)替換結(jié)果完成之后,才會(huì)替換為我們的最終狀態(tài),這里主要是怕我們?cè)O(shè)置完COMPLETING狀態(tài)之后最終值還沒(méi)有真正的賦值出去,而我們的get就去使用了,所以還會(huì)有個(gè)最終狀態(tài)。我們的get()方法的代碼如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
首先獲得當(dāng)前狀態(tài),然后判斷狀態(tài)是否完成,如果沒(méi)有完成則進(jìn)入awaitDone循環(huán)等待,這也是我們阻塞的代碼,然后返回我們的最終結(jié)果。
1.2.1缺陷
我們的Future使用很簡(jiǎn)單,這也導(dǎo)致了如果我們想完成一些復(fù)雜的任務(wù)可能就比較難。比如下面一些例子:
- 將兩個(gè)異步計(jì)算合成一個(gè)異步計(jì)算,這兩個(gè)異步計(jì)算互相獨(dú)立,同時(shí)第二個(gè)又依賴第一個(gè)的結(jié)果。
- 當(dāng)Future集合中某個(gè)任務(wù)最快結(jié)束時(shí),返回結(jié)果。
- 等待Future結(jié)合中的所有任務(wù)都完成。
- 通過(guò)編程方式完成一個(gè)Future任務(wù)的執(zhí)行。
- 應(yīng)對(duì)Future的完成時(shí)間。也就是我們的回調(diào)通知。
1.3CompletableFuture
CompletableFuture是JDK8提出的一個(gè)支持非阻塞的多功能的Future,同樣也是實(shí)現(xiàn)了Future接口。
1.3.1CompletableFuture基本實(shí)現(xiàn)
下面會(huì)寫(xiě)一個(gè)比較簡(jiǎn)單的例子:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.complete(Thread.currentThread().getName());
}).start();
doSomethingelse();//做你想做的其他操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
用法上來(lái)說(shuō)和Future有一點(diǎn)不同,我們這里fork了一個(gè)新的線程來(lái)完成我們的異步操作,在異步操作中我們會(huì)設(shè)置值,然后在外部做我們其他操作。在complete中會(huì)用CAS替換result,然后當(dāng)我們get如果可以獲取到值得時(shí)候就可以返回了。
1.3.2錯(cuò)誤處理
上面介紹了正常情況下但是當(dāng)我們?cè)谖覀儺惒骄€程中產(chǎn)生了錯(cuò)誤的話就會(huì)非常的不幸,錯(cuò)誤的異常不會(huì)告知給你,會(huì)被扼殺在我們的異步線程中,而我們的get方法會(huì)被阻塞。
對(duì)于我們的CompletableFuture提供了completeException方法可以讓我們返回我們異步線程中的異常,代碼如下:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.completeExceptionally(new RuntimeException("error"));
completableFuture.complete(Thread.currentThread().getName());
}).start();
// doSomethingelse();//做你想做的耗時(shí)操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
--------------
輸出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
at futurepackge.jdk8Future.main(jdk8Future.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
在我們新建的異步線程中直接New一個(gè)異常拋出,在我們客戶端中依然可以獲得異常。
1.3.2工廠方法創(chuàng)建CompletableFuture
我們的上面的代碼雖然不復(fù)雜,但是我們的java8依然對(duì)其提供了大量的工廠方法,用這些方法更容易完成整個(gè)流程。如下面的例子:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
return Thread.currentThread().getName();
});
// doSomethingelse();//做你想做的耗時(shí)操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
---------
輸出:
ForkJoinPool.commonPool-worker-1
上面的例子通過(guò)工廠方法supplyAsync提供了一個(gè)Completable,在異步線程中的輸出是ForkJoinPool可以看出當(dāng)我們不指定線程池的時(shí)候會(huì)使用ForkJoinPool,而我們上面的compelte的操作在我們的run方法中做了,源代碼如下:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
上面代碼中通過(guò)d.completeValue(f.get());設(shè)置了我們的值。同樣的構(gòu)造方法還有runasync等等。
1.3.3計(jì)算結(jié)果完成時(shí)的處理
當(dāng)CompletableFuture計(jì)算結(jié)果完成時(shí),我們需要對(duì)結(jié)果進(jìn)行處理,或者當(dāng)CompletableFuture產(chǎn)生異常的時(shí)候需要對(duì)異常進(jìn)行處理。有如下幾種方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面的四種方法都返回了CompletableFuture,當(dāng)我們Action執(zhí)行完畢的時(shí)候,future返回的值和我們?cè)嫉腃ompletableFuture的值是一樣的。上面以Async結(jié)尾的會(huì)在新的線程池中執(zhí)行,上面沒(méi)有一Async結(jié)尾的會(huì)在之前的CompletableFuture執(zhí)行的線程中執(zhí)行。例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(v);
});
System.out.println("Main" + Thread.currentThread().getName());
System.out.println(f.get());
}
exceptionally方法返回一個(gè)新的CompletableFuture,當(dāng)原始的CompletableFuture拋出異常的時(shí)候,就會(huì)觸發(fā)這個(gè)CompletableFuture的計(jì)算,調(diào)用function計(jì)算值,否則如果原始的CompletableFuture正常計(jì)算完后,這個(gè)新的CompletableFuture也計(jì)算完成,它的值和原始的CompletableFuture的計(jì)算的值相同。也就是這個(gè)exceptionally方法用來(lái)處理異常的情況。
1.3.4計(jì)算結(jié)果完成時(shí)的轉(zhuǎn)換
上面我們討論了如何計(jì)算結(jié)果完成時(shí)進(jìn)行的處理,接下來(lái)我們討論如何對(duì)計(jì)算結(jié)果完成時(shí),對(duì)結(jié)果進(jìn)行轉(zhuǎn)換。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
這里同樣也是返回CompletableFuture,但是這個(gè)結(jié)果會(huì)由我們自定義返回去轉(zhuǎn)換他,同樣的不以Async結(jié)尾的方法由原來(lái)的線程計(jì)算,以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運(yùn)行。Java的CompletableFuture類總是遵循這樣的原則,下面就不一一贅述了。
例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
System.out.println(f.get());
}
上面的最終結(jié)果會(huì)輸出11,我們成功將其用兩個(gè)thenApply轉(zhuǎn)換為String。
1.3.5計(jì)算結(jié)果完成時(shí)的消費(fèi)
上面已經(jīng)講了結(jié)果完成時(shí)的處理和轉(zhuǎn)換,他們最后的CompletableFuture都會(huì)返回對(duì)應(yīng)的值,這里還會(huì)有一個(gè)只會(huì)對(duì)計(jì)算結(jié)果消費(fèi)不會(huì)返回任何結(jié)果的方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
函數(shù)接口為Consumer,就知道了只會(huì)對(duì)函數(shù)進(jìn)行消費(fèi),例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
future.thenAccept(System.out::println);
}
這個(gè)方法用法很簡(jiǎn)單我就不多說(shuō)了.Accept家族還有個(gè)方法是用來(lái)合并結(jié)果當(dāng)兩個(gè)CompletionStage都正常執(zhí)行的時(shí)候就會(huì)執(zhí)行提供的action,它用來(lái)組合另外一個(gè)異步的結(jié)果。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth是當(dāng)兩個(gè)CompletionStage都正常完成計(jì)算的時(shí)候,執(zhí)行一個(gè)Runnable,這個(gè)Runnable并不使用計(jì)算的結(jié)果。
示例代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> System.out.println(x+y)).get());
}
CompletableFuture也提供了執(zhí)行Runnable的辦法,這里我們就不能使用我們future中的值了。
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
1.3.6對(duì)計(jì)算結(jié)果的組合
首先是介紹一下連接兩個(gè)future的方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
對(duì)于Compose可以連接兩個(gè)CompletableFuture,其內(nèi)部處理邏輯是當(dāng)?shù)谝粋€(gè)CompletableFuture處理沒(méi)有完成時(shí)會(huì)合并成一個(gè)CompletableFuture,如果處理完成,第二個(gè)future會(huì)緊接上一個(gè)CompletableFuture進(jìn)行處理。
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
}
我們上面的thenAcceptBoth講了合并兩個(gè)future,但是沒(méi)有返回值這里將介紹一個(gè)有返回值的方法,如下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
例子比較簡(jiǎn)單如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> {return "計(jì)算結(jié)果:"+x+y;});
System.out.println(f.get());
}
上面介紹了兩個(gè)future完成的時(shí)候應(yīng)該完成的工作,接下來(lái)介紹任意一個(gè)future完成時(shí)需要執(zhí)行的工作,方法如下:
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
上面兩個(gè)是一個(gè)是純消費(fèi)不返回結(jié)果,一個(gè)是計(jì)算后返回結(jié)果。
1.3.6其他方法
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算。
anyOf方法是當(dāng)任意一個(gè)CompletableFuture執(zhí)行完后就會(huì)執(zhí)行計(jì)算,計(jì)算的結(jié)果相同。
1.3.7建議
CompletableFuture和Java8的Stream搭配使用對(duì)于一些并行訪問(wèn)的耗時(shí)操作有很大的性能提高,可以自行了解。
2.參考
- <<java8實(shí)戰(zhàn)>>
- Java CompletableFuture 詳解