本文源碼版本1.8.0_241,更高版本源碼會(huì)有所不同。
常量
CompletableFuture類里只有2個(gè)常量
volatile Object result;// Either the result or boxed AltResult
volatile Completion stack;// Top of Treiber stack of dependent actions
result是用來保存任務(wù)執(zhí)行的結(jié)果,stack是用來保存阻塞等待結(jié)果的線程封裝對(duì)象,
abstract static class Completion extends ForkJoinTask implements Runnable, AsynchronousCompletionTask {
????volatile Completion next;// Treiber stack link
????/**
????* Performs completion action if triggered, returning a
????* dependent that may need propagation, if one exists.
????*
? ? * @param mode SYNC, ASYNC, or NESTED
????*/
? ? abstract CompletableFuture tryFire(int mode);
????/** Returns true if possibly still triggerable. Used by cleanStack. */
? ? abstract boolean isLive();
????public final void run()? ? ? ? ? ? ? ? { tryFire(ASYNC); }
????public final boolean exec()? ? ? ? ? ? { tryFire(ASYNC); return true; }
????public final Void getRawResult()? ? ? {return null; }
????public final void setRawResult(Void v) {}
}
從名字可以看出,這些等待結(jié)果的線程封裝對(duì)象是以棧結(jié)構(gòu)保存的,后文會(huì)說到。2個(gè)變量都是以volatile修飾的,保證可見性。
帶返回結(jié)果的執(zhí)行方法
CompletableFuture中提供了2種執(zhí)行方法,一種沒有返回結(jié)果,另一種有返回結(jié)果,先看有返回結(jié)果的
public static?<U>??CompletableFuture<U>?supplyAsync(Supplier<U> supplier) {
????????return asyncSupplyStage(asyncPool, supplier);
}
以supplyAsync為例,進(jìn)到asyncSupplyStage方法

創(chuàng)建了一個(gè)CompletableFuture對(duì)象并作為參數(shù)傳給了AsyncSupply對(duì)象,很明顯,AsyncSupply對(duì)象是一個(gè)封裝的任務(wù)類,

里面有2個(gè)變量,一個(gè)CompletableFuture用來保存運(yùn)行結(jié)果,一個(gè)Supplier對(duì)象,函數(shù)式接口,保存執(zhí)行的方法,內(nèi)部有個(gè)run方法用來執(zhí)行方法,核心就是d.completeValue(f.get()); 進(jìn)去completeValue看看

這里使用到了大名鼎鼎的UNSAFE對(duì)象進(jìn)行cas操作,cas方法有4個(gè)參數(shù),分別是對(duì)象,變量在內(nèi)存上在對(duì)象內(nèi)部的相對(duì)偏移地址,期望值,更新值,這里先說下這個(gè)RESULT

他是在CompletableFuture初始化時(shí),通過反射獲取到result變量在CompletableFuture對(duì)象里的內(nèi)存地址偏移量,然后保存了起來;所以上面cas操作的意思是如果RESULT位置處的result變量為null,那么就更新值為后面的三元判斷表達(dá)式得出的值,而result變量默認(rèn)值是為null的,所以如果運(yùn)行的方法結(jié)果為null,那么返回NIL對(duì)象,NIL是什么

可以看到是一個(gè)封裝了null的對(duì)象,所以如果執(zhí)行的方法結(jié)果是null,那么返回一個(gè)封裝了null值的對(duì)象,否則返回執(zhí)行的結(jié)果。
回到一開始的run方法,可以看到在方法執(zhí)行結(jié)束后執(zhí)行了一個(gè)d.postComplete()方法

如果該方法執(zhí)行比較耗時(shí),而在其他地方有線程阻塞等待方法執(zhí)行的結(jié)果,那么在這里會(huì)將那些阻塞等待的方法喚醒,可以看到while循環(huán)里的表達(dá)式,(h = f.stack) !=null,如果有線程阻塞等待的話,這個(gè)h是不為null的,那么會(huì)進(jìn)入到循環(huán)里,在方法的最后執(zhí)行tryFire方法,

可以看到這個(gè)方法是將阻塞等待的線程喚醒,如果此時(shí)有多個(gè)線程阻塞等待的話,那么通過if條件里的f.casStack(h, t = h.next)這個(gè)cas操作,會(huì)依次將所有線程喚醒。
另外,該方法還有一個(gè)同名方法,有2個(gè)參數(shù),支持傳入自定義的線程池

另外,值得注意的一點(diǎn)是,如果不傳入自定義線程池的話,使用的是底層自帶的ForkJoinPool池,該池的線程均為守護(hù)線程,所以使用時(shí)需要注意,可能出現(xiàn)線程池里的線程還沒執(zhí)行完,外面的主線程已經(jīng)執(zhí)行結(jié)束了,可以調(diào)用ForkJoinPool的awaitQuiescence方法等待線程池里的線程執(zhí)行結(jié)束。
不帶返回結(jié)果的執(zhí)行方法
以runAsync方法為例

同樣的套路,進(jìn)入asyncRunStage方法

一樣的封裝了個(gè)AsyncRun對(duì)象

可以看到大體的結(jié)構(gòu)和之前是一樣的,唯一的不同在于,在方法執(zhí)行結(jié)束后調(diào)用了d.completeNull()這個(gè)方法,

這個(gè)cas操作判斷result變量是否為null,為null就更新為之前說的那個(gè)封裝了null值的對(duì)象,而result變量默認(rèn)是為null的,所以直接返回一個(gè)封裝了null值的對(duì)象,因?yàn)閞un方法是沒有返回值的,所以直接就這樣返回了。其他的邏輯和之前說的一樣。
get方法
接下來看看get方法

如果調(diào)用get方法時(shí),方法還未執(zhí)行完成,那么此時(shí)result是為null的,調(diào)用waitingGet方法

可以看到會(huì)進(jìn)入一個(gè)while循環(huán)里,前2個(gè)if是自旋一段非常短的時(shí)間,在更高版本的源碼中,這2個(gè)if被刪除了,此處有個(gè)有意思的東西,可以看到有一個(gè)SPINS常量,這個(gè)常量是什么呢,

他調(diào)用了系統(tǒng)函數(shù)判斷機(jī)器是否是多核CPU,如果是則賦值256,否則為0,這個(gè)變量在之前的版本里是寫在方法里的,但是會(huì)有性能問題,所以后來抽出來了,有興趣的可以看看https://bugs.openjdk.java.net/projects/JDK/issues/,我們接著往下看,當(dāng)前2個(gè)if執(zhí)行完后,執(zhí)行到第三個(gè)if,會(huì)構(gòu)建一個(gè)Signaller對(duì)象

可以看到里面有一個(gè)thread變量保存了阻塞等待的線程,前面說的tryFire方法喚醒線程也在這里,接著走到第四個(gè)if,執(zhí)行tryPushStack方法

這里就是把所有阻塞等待的線程用棧保存起來,怎么做的呢,首先獲取CompletableFuture中的stack對(duì)象,是個(gè)null值,然后將封裝阻塞等待線程的Signaller對(duì)象中的next變量指向這個(gè)stack,

可以看到這里直接進(jìn)行了賦值,沒有走cas操作,因?yàn)檫@個(gè)Signaller對(duì)象,不同的線程是不一樣的,相互獨(dú)立的,所以沒有必要進(jìn)行cas操作,但是下一步UNSAFE.compareAndSwapObject(this,STACK, h, c),將CompletableFuture中的stack變量替換為封裝自身線程的Signaller對(duì)象時(shí),會(huì)有競(jìng)爭(zhēng)存在,所以需要走cas保證線程安全性,執(zhí)行失敗的線程會(huì)重新走一遍while循環(huán)走到這里,直到設(shè)置成功。下面一個(gè)if是判斷線程中斷的,我們跳過,最后一個(gè)if會(huì)執(zhí)行managedBlock方法

里面核心是 !blocker.isReleasable() && !blocker.block()這個(gè)條件,isReleasable方法恒為false,我們看block方法,

可以看到里面將線程掛起了。當(dāng)方法執(zhí)行完成后會(huì)將他們喚醒,回到get方法里,當(dāng)拿到結(jié)果后執(zhí)行reportGet方法

前面說過,如果方法執(zhí)行結(jié)果為null或者沒有返回結(jié)果時(shí),會(huì)返回一個(gè)封裝了null值的AltResult對(duì)象,這里判斷對(duì)象是否是AltResult對(duì)象,如果是,則判斷封裝的結(jié)果是否為null,是則返回null,不是則拋出相應(yīng)的異常,如果都不是返回正常執(zhí)行的結(jié)果。
所以,如果有多個(gè)線程阻塞等待的話,所有線程會(huì)先排個(gè)隊(duì),用stack數(shù)據(jù)結(jié)構(gòu)保存起來,然后各自將自己掛起,等方法運(yùn)行結(jié)束后會(huì)將這些阻塞線程全部喚醒。
以上即為CompletableFuture方法運(yùn)行和獲取值的原理,還有很多其他的方法,但是底層方法基本都是上面說的那幾個(gè)。