Thread, ThreadLocal, ThreadLocalMap
- Thread類比較常用,線程類內(nèi)部維持一個ThreadLocalMap類實例(t.threadLocals)
- ThreadLocal類用于存儲以線程為作用域的數(shù)據(jù),線程之間數(shù)據(jù)隔離。
- ThreadLocalMap類是ThreadLocal的靜態(tài)內(nèi)部類,通過操作Entry來存儲數(shù)據(jù)。
Thread.currentThread() :返回一個Thread實例,該實例是當(dāng)前運(yùn)行的Thread 的引用
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
get方法同樣是獲取方法調(diào)用線程,之后也是通過getMap(t)方法獲取ThreadLocalMap 的實例map。如果map!=null直接通過 ThreadLocalMap.Entry e = map.getEntry(this);獲取ThreadLocalMap.Entry返回其中的value數(shù)據(jù)。(ThreadLocalMap類將數(shù)據(jù)存儲到其持有Entry對象的value屬性中,Entry是其靜態(tài)內(nèi)部類,繼承自WeakReference)
如果map==null則會調(diào)用setInitialValue()方法:
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
protected T initialValue() {
return null;
}
setInitialValue()方法一般只調(diào)用一次(當(dāng)調(diào)用ThreadLocal的remove方法后需要再次調(diào)用此方法來初始化數(shù)據(jù)),用于初始化數(shù)據(jù),其首先調(diào)用initialValue()方法返回默認(rèn)返回一個null,我們可以通過重寫這個方法來指定初始化的默認(rèn)值。后面的代碼和上面相似,就是獲取ThreadLocalMap將初始化的value存儲到ThreadLocalMap中。
CompletableFuture
使用CompletableFuture構(gòu)建異步應(yīng)用
Future 接口的局限性
future接口可以構(gòu)建異步應(yīng)用,但依然有其局限性。它很難直接表述多個Future 結(jié)果之間的依賴性。實際開發(fā)中,我們經(jīng)常需要達(dá)成以下目的:
將兩個異步計算合并為一個——這兩個異步計算之間相互獨立,同時第二個又依賴于第
一個的結(jié)果。
等待 Future 集合中的所有任務(wù)都完成。
僅等待 Future 集合中最快結(jié)束的任務(wù)完成(有可能因為它們試圖通過不同的方式計算同
一個值),并返回它的結(jié)果。
通過編程方式完成一個 Future 任務(wù)的執(zhí)行(即以手工設(shè)定異步操作結(jié)果的方式)。
應(yīng)對 Future 的完成事件(即當(dāng) Future 的完成事件發(fā)生時會收到通知,并能使用 Future
計算的結(jié)果進(jìn)行下一步的操作,不只是簡單地阻塞等待操作的結(jié)果)
新的CompletableFuture將使得這些成為可能。
CompletableFuture
異步執(zhí)行
首先,CompletableFuture實現(xiàn)了Future接口,因此你可以像Future那樣使用它。
其次,CompletableFuture并非一定要交給線程池執(zhí)行才能實現(xiàn)異步,你可以像下面這樣實現(xiàn)異步運(yùn)行。
public static void test1() throws Exception{
CompletableFuture<String> completableFuture=new CompletableFuture();
new Thread(new Runnable() {
@Override
public void run() {
//模擬執(zhí)行耗時任務(wù)
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//告訴completableFuture任務(wù)已經(jīng)完成
completableFuture.complete("result");
}
}).start();
//獲取任務(wù)結(jié)果,如果沒有完成會一直阻塞等待
String result=completableFuture.get();
System.out.println("計算結(jié)果:"+result);
}
錯誤處理 completeExceptionally
如果沒有意外,上面發(fā)的代碼工作得很正常。但是,如果任務(wù)執(zhí)行過程中產(chǎn)生了異常會怎樣呢?
非常不幸,這種情況下你會得到一個相當(dāng)糟糕的結(jié)果:異常會被限制在執(zhí)行任務(wù)的線程的范圍內(nèi),最終會殺死該線程,而這會導(dǎo)致等待 get 方法返回結(jié)果的線程永久地被阻塞。
客戶端可以使用重載版本的 get 方法,它使用一個超時參數(shù)來避免發(fā)生這樣的情況。這是一種值得推薦的做法,你應(yīng)該盡量在你的代碼中添加超時判斷的邏輯,避免發(fā)生類似的問題。
使用這種方法至少能防止程序永久地等待下去,超時發(fā)生時,程序會得到通知發(fā)生了 Timeout-Exception 。不過,也因為如此,你不能指定執(zhí)行任務(wù)的線程內(nèi)到底發(fā)生了什么問題。
為了能獲取任務(wù)線程內(nèi)發(fā)生的異常,你需要使用
CompletableFuture 的completeExceptionally方法將導(dǎo)致CompletableFuture 內(nèi)發(fā)生問題的異常拋出。這樣,當(dāng)執(zhí)行任務(wù)發(fā)生異常時,調(diào)用get()方法的線程將會收到一個 ExecutionException 異常,該異常接收了一個包含失敗原因的Exception 參數(shù)。
public static void test2() throws Exception{
CompletableFuture<String> completableFuture=new CompletableFuture();
new Thread(new Runnable() {
@Override
public void run() {
try {
//模擬執(zhí)行耗時任務(wù)
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("拋異常了");
}catch (Exception e) {
//告訴completableFuture任務(wù)發(fā)生異常了
completableFuture.completeExceptionally(e);
}
}
}).start();
//獲取任務(wù)結(jié)果,如果沒有完成會一直阻塞等待
String result=completableFuture.get();
System.out.println("計算結(jié)果:"+result);
}
工廠方法
前面我們通過編程自己創(chuàng)建 CompletableFuture 對象以及如何獲取返回值,雖然看起來這些操作已經(jīng)比較方便,但還有進(jìn)一步提升的空間.
CompletableFuture 類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔(dān)心實現(xiàn)的細(xì)節(jié)。
supplyAsync
supplyAsync 方法接受一個生產(chǎn)者(Supplier)作為參數(shù),返回一個 CompletableFuture
對象。生產(chǎn)者方法會交由 ForkJoinPool池中的某個執(zhí)行線程( Executor )運(yùn)行,但是你也可以使用 supplyAsync 方法的重載版本,傳遞第二個參數(shù)指定線程池執(zhí)行器執(zhí)行生產(chǎn)者方法。
public static void test3() throws Exception {
//supplyAsync內(nèi)部使用ForkJoinPool線程池執(zhí)行任務(wù)
CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
//模擬執(zhí)行耗時任務(wù)
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return "result";
});
System.out.println("計算結(jié)果:"+completableFuture.get());
}
allOf 工廠方法接收一個由CompletableFuture 構(gòu)成的數(shù)組,數(shù)組中的所有 Completable-Future 對象執(zhí)行完成之后,它返回一個 CompletableFuture<Void> 對象。這意味著,如果你需要等待多個 CompletableFuture 對象執(zhí)行完畢,對 allOf 方法返回的
CompletableFuture 執(zhí)行 join 操作可以等待CompletableFuture執(zhí)行完成。
或者你可能希望只要 CompletableFuture 對象數(shù)組中有任何一個執(zhí)行完畢就不再等待,在這種情況下,你可以使用一個類似的工廠方法 anyOf 。
該方法接收一個 CompletableFuture 對象構(gòu)成的數(shù)組,返回由第一個執(zhí)行完畢的 CompletableFuture 對象的返回值構(gòu)成的 CompletableFuture<Object> 。
public static void test4() throws Exception {
CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
//模擬執(zhí)行耗時任務(wù)
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return "result1";
});
CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{
//模擬執(zhí)行耗時任務(wù)
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return "result2";
});
CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2);
System.out.println("第一個完成的任務(wù)結(jié)果:"+anyResult.get());
CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2);
//阻塞等待所有任務(wù)執(zhí)行完成
allResult.join();
System.out.println("所有任務(wù)執(zhí)行完成");
}
將兩個CompletableFuture建立聯(lián)系
通常,我們會有多個需要獨立運(yùn)行但又有所依賴的的任務(wù)。比如先等用于的訂單處理完畢然后才發(fā)送郵件通知客戶。
thenCompose
thenCompose 方法允許你對兩個異步操作進(jìn)行流水線,第一個操作完成時,將其結(jié)果作為參數(shù)傳遞給第二個操作。你可以創(chuàng)建兩個CompletableFutures 對象,對第一個 CompletableFuture 對象調(diào)用thenCompose ,并向其傳遞一個函數(shù)。當(dāng)?shù)谝粋€CompletableFuture 執(zhí)行完畢后,它的結(jié)果將作為該函數(shù)的參數(shù),這個函數(shù)的返回值是以第一個 CompletableFuture 的返回做輸入計算出的第二個 CompletableFuture 對象。
public static void test5() throws Exception {
CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
//模擬執(zhí)行耗時任務(wù)
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return "result1";
});
//等第一個任務(wù)完成后,將任務(wù)結(jié)果傳給參數(shù)result,執(zhí)行后面的任務(wù)并返回一個代表任務(wù)的completableFuture
CompletableFuture<String> completableFuture2= completableFuture1.thenCompose(result->CompletableFuture.supplyAsync(()->{
//模擬執(zhí)行耗時任務(wù)
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return "result2";
}));
System.out.println(completableFuture2.get());
}
thenCombine
另一種比較常見的情況是,你需要將兩個完全不相干的 CompletableFuture 對象的結(jié)果整合起來,而且你也不希望等到第一個任務(wù)完全結(jié)束才開始第二項任務(wù)。
這種情況,你應(yīng)該使用 thenCombine 方法,它接收名為 BiFunction 的第二參數(shù),這個參數(shù)
定義了當(dāng)兩個 CompletableFuture 對象完成計算后,結(jié)果如何合并。
public static void test6() throws Exception {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
//模擬執(zhí)行耗時任務(wù)
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return 100;
});
//將第一個任務(wù)與第二個任務(wù)組合一起執(zhí)行,都執(zhí)行完成后,將兩個任務(wù)的結(jié)果合并
CompletableFuture<Integer> completableFuture2 = completableFuture1.thenCombine(
//第二個任務(wù)
CompletableFuture.supplyAsync(() -> {
//模擬執(zhí)行耗時任務(wù)
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return 2000;
}),
//合并函數(shù)
(result1, result2) -> result1 + result2);
System.out.println(completableFuture2.get());
}
響應(yīng) CompletableFuture 的 completion 事件
我們可以在每個CompletableFuture 上注冊一個操作,該操作會在 CompletableFuture 完成執(zhí)行后調(diào)用它。CompletableFuture 通過 thenAccept 方法提供了這一功能,它接收CompletableFuture 執(zhí)行完畢后的返回值做參數(shù)。
public static void test7() throws Exception {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
//模擬執(zhí)行耗時任務(wù)
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return 100;
});
//注冊完成事件
completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result));
CompletableFuture<Integer> completableFuture2=
//第二個任務(wù)
CompletableFuture.supplyAsync(() -> {
//模擬執(zhí)行耗時任務(wù)
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結(jié)果
return 2000;
});
//注冊完成事件
completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result));
//將第一個任務(wù)與第二個任務(wù)組合一起執(zhí)行,都執(zhí)行完成后,將兩個任務(wù)的結(jié)果合并
CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2,
//合并函數(shù)
(result1, result2) -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result1 + result2;
});
System.out.println(completableFuture3.get());
}