前言
JDK8 為我們帶來了 CompletableFuture 這個(gè)有意思的新類,它提供比 Future 更靈活更強(qiáng)大的回調(diào)功能,借助 CompletableFuture 我們可以更方便的編排異步任務(wù)。
由于 CompletableFuture 默認(rèn)的線程池是 ForkJoinPool,在講 CompletableFuture 之前覺得有必要先簡(jiǎn)單介紹一下 ForkJoinPool。
一、ForkJoinPool 工作原理
ForkJoin 框架,另一種風(fēng)格的線程池(相比于 ThreadPoolExecutor),采用分治算法,以及工作竊取策略,極大地提高了并行性。對(duì)于那種大任務(wù)分割小任務(wù)的(分治)又或者并行計(jì)算場(chǎng)景尤其有用。
1.1 ForkJoinPool
線程池,F(xiàn)orkJoinPool 類中有一個(gè)commonPool,默認(rèn)并發(fā)數(shù)為邏輯處理器個(gè)數(shù) - 1;
ForkJoinPool commonPool = ForkJoinPool.commonPool();//makeCommonPool
//看源碼發(fā)現(xiàn)
int parallelism = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
// 如果有自定義,那么取自定義的作為 parallelism
// 如果沒有 邏輯處理器個(gè)數(shù)-1
parallelism = Runtime.getRuntime().availableProcessors() - 1
當(dāng)然了,如果你不想使用 commonPool,你完全可以直接new 一個(gè)
ForkJoinPool fjp = new ForkJoinPool(4);// 最大并發(fā)數(shù)4
其實(shí)我當(dāng)時(shí)有個(gè)疑問,明明可以直接 new 一個(gè) ForkJoinPool,還可以很方便的指定parallelism(我寫的ForkJoinTest就是通過這種方式),為什么 ForkJoinPool 中還定義一個(gè)static的commonPool?
commonPool一般用于 Java8 中的并發(fā)流計(jì)算中或者 CompletableFuture 沒有指定線程池時(shí)使用的一個(gè)commonPool。
1.2 ForkJoinTask
ForkJoinPool 與 ForkJoinTask 之間的關(guān)系,可以類比 ThreadPoolExecutor 和 Runnable 的關(guān)系,都可以理解為提交任務(wù)到線程池,只不過分治任務(wù)有自己獨(dú)特類型 ForkJoinTask。它是一個(gè)抽象類。主要有兩個(gè)實(shí)現(xiàn)RecursiveAction(有返回值) 和 RecursiveTask(無返回值)
1.3 ForkJoinWorkerThread
運(yùn)行 ForkJoinTask 任務(wù)的工作線程(worker),最大并發(fā)數(shù)不會(huì)超過parallelism。
1.4 WorkQueue
任務(wù)隊(duì)列,每個(gè)worker對(duì)應(yīng)一個(gè)queue,這是和 ThreadPoolExecutor 最大不同的地方之一。
1.5 WorkQueue[]
ForkJoinPool 中的任務(wù)分為兩種,一種是本地提交的任務(wù)Submission task,通過execute()、submit()、invoke()等方法提交的任務(wù);比如 CompletableFuture 中不提供線程池時(shí),提交的都是Submission task。
另外一種是工作線程fork出的子任務(wù)Worker task。
兩種任務(wù)都會(huì)存放在WorkQueue數(shù)組中,Submission task存放在WorkQueue數(shù)組的偶數(shù)索引位置,Worker task存放在奇數(shù)索引位置。工作線程只會(huì)分配在奇數(shù)索引的工作隊(duì)列。

1.6 工作竊取機(jī)制
工作竊取是指當(dāng)某個(gè)線程的任務(wù)隊(duì)列中沒有可執(zhí)行任務(wù)的時(shí)候,從其他線程的任務(wù)隊(duì)列中竊取任務(wù)來執(zhí)行,以充分利用工作線程的計(jì)算能力,減少線程由于獲取不到任務(wù)而造成的空閑浪費(fèi)。在 ForkJoinPool 中,工作任務(wù)的隊(duì)列都采用雙端隊(duì)列容器。我們知道,在通常使用隊(duì)列的過程中,我們都在隊(duì)尾插入,而在隊(duì)頭消費(fèi)以實(shí)現(xiàn) FIFO。而為了實(shí)現(xiàn)工作竊取。一般我們會(huì)改成工作線程在工作隊(duì)列上 LIFO,而竊取其他線程的任務(wù)的時(shí)候,從隊(duì)列頭部取獲取。

工作線程worker1、worker2以及worker3都從WorkQueue的尾部popping獲取task,而任務(wù)也從尾部Pushing,當(dāng)worker3隊(duì)列中沒有任務(wù)的時(shí)候,就會(huì)從其他線程的隊(duì)列中竊取stealing,這樣就使得 worker3 不會(huì)由于沒有任務(wù)而空閑。這就是工作竊取算法的基本原理。 可以想象,要是不使用工作竊取算法,那么我們?cè)诓粩?fork 的過程中,可能某些 worker 就會(huì)一直處于 join 的等待中。
因?yàn)檫@種機(jī)制,它能夠讓所有線程的工作量基本均衡,不會(huì)出現(xiàn)有的線程很忙,而有的線程很閑的狀況,所以性能很好。
1、為什么說 ForkJoinPool 并發(fā)訪問一個(gè) IO 計(jì)算可能會(huì)拖垮整個(gè)系統(tǒng)?
這主要說的是 ForkJoinPool 中的 commonPool,commonPool 是整個(gè)·系統(tǒng)共享的。比如你在 Stream 并行流中并發(fā)訪問一個(gè) IO 計(jì)算,某一時(shí)刻會(huì)導(dǎo)致commonPool 中大部分線程甚至所有線程都阻礙在這里。這可能就會(huì)造成其他程序使用到 commonPool 的程序線程饑餓,比如 CompletableFuture 中沒有指定線程池時(shí)。2、為什么說 cpu 密集型的任務(wù)使用 ForkJoinPool 性能更好?
網(wǎng)上說主要有工作竊取機(jī)制。工作竊取的目的不就是充分利用 cpu,那普通線程不也可以嗎,把核心線程調(diào)成和 ForkJoinPool 默認(rèn)線程。
CompletableFuture 使用總結(jié)
關(guān)于 CompletableFuture(異步計(jì)算)的使用,比如訂單列表的并發(fā)請(qǐng)求遠(yuǎn)程服務(wù),服務(wù)中異步處理一些任務(wù),又或者異步導(dǎo)出,并發(fā)處理數(shù)據(jù)等等。
按功能分類
- xxx():表示該方法將繼續(xù)在已有的線程中執(zhí)行;
- xxxAsync():表示將異步在線程池中執(zhí)行。
- 異步執(zhí)行方法默認(rèn)一個(gè)參數(shù)的話任務(wù)是在 ForkJoinPool.commonPool() 線程池中執(zhí)行的(帶看源碼),帶executor 參數(shù)的使用 executor線程池異步執(zhí)行。
按邏輯和組織方式來分的話(completableFuture 中大約有50個(gè)來方法)
- 一種是 then 的邏輯,即前一個(gè)計(jì)算完成的時(shí)候調(diào)度后一個(gè)計(jì)算
- 一種是 both 的邏輯,即等待兩個(gè)計(jì)算都完成之后執(zhí)行下一個(gè)計(jì)算,只要能組合一個(gè)和另一個(gè),我們就可以無限復(fù)用這個(gè) +1 的邏輯組合任意多的計(jì)算。(如果任務(wù)多,可以考慮可用allOf)
- 另一種是 either 的邏輯,即等待兩個(gè)計(jì)算的其中一個(gè)完成之后執(zhí)行下一個(gè)計(jì)算。注意這樣的計(jì)算可以說是非確定性的。因?yàn)楸唤M合的兩個(gè)計(jì)算中先觸發(fā)下一個(gè)計(jì)算執(zhí)行的那個(gè)會(huì)被作為前一個(gè)計(jì)算,而這兩個(gè)前置的計(jì)算到底哪一個(gè)先完成是不可預(yù)知的(anyOf)
從依賴關(guān)系和出入?yún)?shù)類型區(qū)別,基本分為三類
- apply 字樣的方式意味著組合方式是 Function,即接受前一個(gè)計(jì)算的結(jié)果,應(yīng)用函數(shù)之后返回一個(gè)新的結(jié)果。
- accept 字樣的方式意味著組合方式是 Consumer,即接受前一個(gè)計(jì)算的結(jié)果,執(zhí)行消費(fèi)后不返回有意義的值。
- run 字樣的方式意味著組合方式是 Runnable,即忽略前一個(gè)計(jì)算的結(jié)果,僅等待它完成后執(zhí)行動(dòng)作。
其中出入?yún)?shù)主要有 Java8 Function,Consumer 或 Runnable三中函數(shù)型接口,每一種都決定了是怎么樣一種依賴關(guān)系

二、數(shù)據(jù)結(jié)構(gòu)
2.1、CompletableFuture
CompletableFuture 實(shí)現(xiàn)了 Future 接口和 CompletionStage,F(xiàn)uture 不必多說,而 CompletionStage 定義了各種任務(wù)編排的 API:
CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
}
CompletableFuture 的數(shù)據(jù)結(jié)構(gòu)包括用于記錄結(jié)果的 result,以及用于持有任務(wù)以及任務(wù)間依賴關(guān)系的 Completion 類型的成員變量 stack。
如果閱讀過 spring 注解相關(guān)功能的源碼的同學(xué),對(duì)于 CompletableFuture 和 Completion 應(yīng)該會(huì)有一種 TypeMappedAnnotation 和 AnnotationTypeMapping 的既視感,實(shí)際上他們兩者之間的關(guān)系確實(shí)也非常相似。
2.2、Completion
數(shù)據(jù)結(jié)構(gòu)
Completion 是 CompletableFuture 中的一個(gè)內(nèi)部類,我們可以簡(jiǎn)單的認(rèn)為它就是我們一般所說的“操作”。
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next;
}
它通過 next 指針在 CompletableFuture 中形成一個(gè)鏈表結(jié)構(gòu)。
依賴關(guān)系
它還有兩個(gè)抽象的實(shí)現(xiàn)類 UniCompletion 和 BiCompletion:
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // executor to use (null if none)
CompletableFuture<V> dep; // the dependent to complete
CompletableFuture<T> src; // source for action
}
abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
CompletableFuture<U> snd; // second source for action
}
其中 executor 表示該操作的執(zhí)行者,而 src 和 snd 兩個(gè)指針表示要執(zhí)行的操作對(duì)應(yīng)的 CompletableFuture 實(shí)例,而 dep 則表示要執(zhí)行的操作依賴的前置操作對(duì)應(yīng)的 CompletableFuture 實(shí)例。多個(gè) Completion 彼此之間通過這些指針維護(hù)彼此的依賴關(guān)系。
實(shí)現(xiàn)類
在 CompletableFuture,我們會(huì)看到很多格式為 UniXXX 或者 BiXXX 的內(nèi)部類,它們大多數(shù)都基于上述兩抽象類實(shí)現(xiàn),分別對(duì)應(yīng)不同的操作。我們以 UniApply 為例:
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
}
其本質(zhì)上就是一個(gè)額外掛載了 Function 接口的 UniCompletion,同理,XXXAccept 就是掛載了 Consumer 的 Completion,而 XXXRun 就是掛載的 Runnable 接口的 Completion。
三、源碼分析
樣例
后面介紹的源碼都會(huì)以下面的用例為切入點(diǎn),循著調(diào)用軌跡理解源碼。如果任務(wù)很耗時(shí),記得傳Executor, 或者方法末尾加上future.get(); 因?yàn)镃ompletableFuture默認(rèn)使用ForkJoinPool, 而ForkJoinPool里面的線程都是daemon線程,主線程跑完了,虛擬機(jī)也就over了。
public void whenComplete() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
future.whenComplete((l, r) -> System.out.println(l));
}
public void thenApply() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
future.thenApply(i -> -i);
}
public void thenAccept() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
future.thenAccept(System.out::println);
}
public void thenRun() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
future.thenRun(() -> System.out.println("Done"));
}
public void thenAcceptBoth() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y));
}
public void acceptEither() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
future.acceptEither(other, System.out::println);
}
public void allOf() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
CompletableFuture.allOf(future, second, third);
}
public void anyOf() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
CompletableFuture.anyOf(future, second, third);
}
3.1 supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
// asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(實(shí)現(xiàn)了Executor接口,里面的內(nèi)容是{new Thread(r).start();})
return asyncSupplyStage(asyncPool, supplier);
}
3.2 asyncSupplyStage(Executor e, Supplier<U> f)
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null)
throw new NullPointerException();
// 構(gòu)建一個(gè)新的CompletableFuture, 以此構(gòu)建AsyncSupply作為Executor的執(zhí)行參數(shù)
CompletableFuture<U> d = new CompletableFuture<U>();
// AsyncSupply繼承了ForkJoinTask, 實(shí)現(xiàn)了Runnable, AsynchronousCompletionTask接口
e.execute(new AsyncSupply<U>(d, f));
// 返回d,立返
return d;
}
3.3 AsyncSupply
// CompletableFuture的靜態(tài)內(nèi)部類,作為一個(gè)ForkJoinTask
static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
// AsyncSupply作為一個(gè)依賴Task,dep作為這個(gè)Task的Future
CompletableFuture<T> dep;
// fn作為這個(gè)Task的具體執(zhí)行邏輯,函數(shù)式編程
Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep;
this.fn = fn;
}
public final Void getRawResult() {
return null;
}
public final void setRawResult(Void v) {
}
public final boolean exec() {
run();
return true;
}
public void run() {
CompletableFuture<T> d;
Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) { // 非空判斷
dep = null;
fn = null;
// 查看任務(wù)是否結(jié)束,如果已經(jīng)結(jié)束(result != null),直接調(diào)用postComplete()方法
if (d.result == null) {
try {
// 等待任務(wù)結(jié)束,并設(shè)置結(jié)果
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex); // 異常
}
}
// 任務(wù)結(jié)束后,會(huì)執(zhí)行所有依賴此任務(wù)的其他任務(wù),這些任務(wù)以一個(gè)無鎖并發(fā)棧的形式存在
d.postComplete();
}
}
}
3.4 postComplete()
final void postComplete() {
// 當(dāng)前CompletableFuture
CompletableFuture<?> f = this;
// 無鎖并發(fā)棧,(Completion next), 保存的是依靠當(dāng)前的CompletableFuture一串任務(wù),完成即觸發(fā)(回調(diào))
Completion h;
// 當(dāng)f的stack為空時(shí),使f重新指向當(dāng)前的CompletableFuture,繼續(xù)后面的結(jié)點(diǎn)
while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d;
Completion t;
// 從頭遍歷stack,并更新頭元素
if (f.casStack(h, t = h.next)) {
if (t != null) {
// 如果f不是當(dāng)前CompletableFuture,則將它的頭結(jié)點(diǎn)壓入到當(dāng)前CompletableFuture的stack中,使樹形結(jié)構(gòu)變成鏈表結(jié)構(gòu),避免遞歸層次過深
if (f != this) {
pushStack(h);
// 繼續(xù)下一個(gè)結(jié)點(diǎn),批量壓入到當(dāng)前棧中
continue;
}
// 如果是當(dāng)前CompletableFuture, 解除頭節(jié)點(diǎn)與棧的聯(lián)系
h.next = null;
}
// 調(diào)用頭節(jié)點(diǎn)的tryFire()方法,該方法可看作Completion的鉤子方法,執(zhí)行完邏輯后,會(huì)向后傳播的
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
示意圖
每個(gè)CompletableFuture持有一個(gè)Completion棧stack, 每個(gè)Completion持有一個(gè)CompletableFuture -> dep, 如此遞歸循環(huán)下去,是層次很深的樹形結(jié)構(gòu),所以想辦法將其變成鏈表結(jié)構(gòu)。

首先取出頭結(jié)點(diǎn),下圖中灰色Completion結(jié)點(diǎn),它會(huì)返回一個(gè)CompletableFuture, 同樣也擁有一個(gè)stack,策略是遍歷這個(gè)CompletableFuture的stack的每個(gè)結(jié)點(diǎn),依次壓入到當(dāng)前CompletableFuture的stack中,關(guān)系如下箭頭所示,灰色結(jié)點(diǎn)指的是處理過的結(jié)點(diǎn)。

第一個(gè)Completion結(jié)點(diǎn)返回的CompletableFuture, 將擁有的stack里面的所有結(jié)點(diǎn)都?jí)喝肓水?dāng)前CompletableFuture的stack里面。

后續(xù)的Completion結(jié)點(diǎn)返回的CompletableFuture, 將擁有的stack里面的所有結(jié)點(diǎn)都?jí)喝肓水?dāng)前CompletableFuture的stack里面,重新構(gòu)成了一個(gè)鏈表結(jié)構(gòu),后續(xù)也按照前面的邏輯操作,如此反復(fù),便會(huì)遍歷完所有的CompletableFuture, 這些CompletableFuture(葉子結(jié)點(diǎn))的stack為空,也是結(jié)束條件。

postComplete()最后調(diào)用的是Completion#tryFire()方法,先看下Completion的數(shù)據(jù)結(jié)構(gòu)
Completion
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 無鎖并發(fā)棧
/**
* 鉤子方法,有三種模式,postComplete()方法里面使用的是NESTED模式,避免過深的遞歸調(diào)用 SYNC, ASYNC, or NESTED
*/
abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都調(diào)用了這個(gè)鉤子方法
/** cleanStack()方法里有用到 */
abstract boolean isLive();
public final void run() {
tryFire(ASYNC);
}
public final boolean exec() {
tryFire(ASYNC);
return true;
}
public final Void getRawResult() {
return null;
}
public final void setRawResult(Void v) {
}
}
static final int SYNC = 0; 同步
static final int ASYNC = 1; 異步
static final int NESTED = -1; 嵌套
繼承了ForkJoinTask, 實(shí)現(xiàn)了Runnable, AsynchronousCompletionTask接口,它有諸多子類,如下圖

后面的方法都對(duì)應(yīng)著不同的子類。
先看一個(gè)子類UniCompletion
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // 執(zhí)行器
CompletableFuture<V> dep; // 依賴的任務(wù)
CompletableFuture<T> src; // 被依賴的任務(wù)
UniCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src) {
this.executor = executor; this.dep = dep; this.src = src;
}
// 如果當(dāng)前任務(wù)可以被執(zhí)行,返回true,否則,返回false; 保證任務(wù)只被執(zhí)行一次
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // 設(shè)置為不可用
e.execute(this);
}
return false;
}
final boolean isLive() {
return dep != null;
}
}
claim()方法保證任務(wù)只被執(zhí)行一次。
whenComplete
whenComplete()/whenCompleteAsync()
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
xxx和xxxAsync方法的區(qū)別是,有沒有asyncPool作為入?yún)ⅲ械脑?,任?wù)直接入?yún)?,不檢查任務(wù)是否完成。uniWhenCompleteStage方法有說明。
uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f)
private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {
if (f == null)
throw new NullPointerException();
// 構(gòu)建future
CompletableFuture<T> d = new CompletableFuture<T>();
if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果線程池不為空,直接構(gòu)建任務(wù)入棧,并調(diào)用tryFire()方法;否則,調(diào)用uniWhenComplete()方法,檢查依賴的那個(gè)任務(wù)是否完成,沒有完成返回false,
// 完成了返回true, 以及后續(xù)一些操作。
UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete繼承了UniCompletion
push(c);
c.tryFire(SYNC); // 先調(diào)一下鉤子方法,檢查一下任務(wù)是否結(jié)束
}
return d;
}
uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c)
final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) {
Object r;
T t;
Throwable x = null;
if (a == null || (r = a.result) == null || f == null) // 被依賴的任務(wù)還未完成
return false;
if (result == null) { // 被依賴的任務(wù)完成了
try {
if (c != null && !c.claim()) // 判斷任務(wù)是否能被執(zhí)行
return false;
if (r instanceof AltResult) { // 判斷異常,AltResult類型很簡(jiǎn)單,里面只有一個(gè)屬性Throwable ex;
x = ((AltResult) r).ex;
t = null;
} else {
@SuppressWarnings("unchecked")
T tr = (T) r; // 正常的結(jié)果
t = tr;
}
f.accept(t, x); // 執(zhí)行任務(wù)
if (x == null) {
internalComplete(r); // 任務(wù)的結(jié)果設(shè)置為被依賴任務(wù)的結(jié)果
return true;
}
} catch (Throwable ex) {
if (x == null)
x = ex; // 記錄異常
}
completeThrowable(x, r); // 設(shè)置異常和結(jié)果
}
return true;
}
push()
final void push(UniCompletion<?, ?> c) {
if (c != null) {
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // 失敗重置c的next域
}
}
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
static void lazySetNext(Completion c, Completion next) {
UNSAFE.putOrderedObject(c, NEXT, next);
}
UniWhenComplete
static final class UniWhenComplete<T> extends UniCompletion<T, T> {
BiConsumer<? super T, ? super Throwable> fn;
UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src,
BiConsumer<? super T, ? super Throwable> fn) {
super(executor, dep, src);
this.fn = fn;
}
final CompletableFuture<T> tryFire(int mode) { // 鉤子方法
CompletableFuture<T> d; // 依賴的任務(wù)
CompletableFuture<T> a; // 被依賴的任務(wù)
if ((d = dep) == null || !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) // 如果是異步模式(mode = 1),就不判斷任務(wù)是否結(jié)束
return null; // dep為空,說明已經(jīng)調(diào)用過了
dep = null;
src = null;
fn = null;
return d.postFire(a, mode); // 鉤子方法之后的處理
}
}
postFire(CompletableFuture<?> a, int mode)
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) { // 被依賴的任務(wù)存在,且stack不為空,先處理它
if (mode < 0 || a.result == null) // 如果是嵌套模式(mode = -1), 或者任務(wù)的結(jié)果為空,直接清空棧
a.cleanStack();
else
a.postComplete(); // 否則,調(diào)用postComplete()方法
}
if (result != null && stack != null) { // 再處理當(dāng)前任務(wù)
if (mode < 0) // 嵌套模式,直接返回自身(樹 -> 鏈表,避免過深的遞歸調(diào)用)
return this;
else
postComplete(); // 調(diào)用postComplete()方法
}
return null;
}
cleanStack()
final void cleanStack() { // 過濾掉已經(jīng)死掉的結(jié)點(diǎn)(Not isLive)
for (Completion p = null, q = stack; q != null;) { // q指針從頭節(jié)點(diǎn)開始,向右移動(dòng),s一直執(zhí)行q的下一個(gè)結(jié)點(diǎn),p要么為空,要么指向遍歷過的最后一個(gè)活著的結(jié)點(diǎn),一旦發(fā)現(xiàn)q死掉了,就斷開q, 連接p, s
Completion s = q.next;
if (q.isLive()) { // 還活著,p指向遍歷過的最后一個(gè)結(jié)點(diǎn),q向右移動(dòng)
p = q;
q = s;
} else if (p == null) { // 說明第一個(gè)結(jié)點(diǎn)就是死掉的,cas stack, q指向stack
casStack(q, s);
q = stack;
} else { // 否則的話,連接p, s
p.next = s;
if (p.isLive()) // 再次判斷p結(jié)點(diǎn)是否還或者(在這期間是否有別的線程改動(dòng)了)
q = s; // 還活著,q繼續(xù)向右移動(dòng)
else {
p = null; // 過期的值,從新開始
q = stack;
}
}
}
}
如下圖
-
第1個(gè)結(jié)點(diǎn)是無效結(jié)點(diǎn),更新stack,更新指針
-
-
第2個(gè)結(jié)點(diǎn)是有效結(jié)點(diǎn),更新指針
-
-
第3個(gè)結(jié)點(diǎn)是無效結(jié)點(diǎn),更新指針
-
-
第4個(gè)結(jié)點(diǎn)是有效結(jié)點(diǎn),更新指針
-
thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {
if (f == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T, V> c = new UniApply<T, V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S, ? extends T> f, UniApply<S, T> c) {
Object r;
Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult) r).ex) != null) {
completeThrowable(x, r); // 有異常,直接跳出
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked")
S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
static final class UniApply<T, V> extends UniCompletion<T, V> {
Function<? super T, ? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src,
Function<? super T, ? extends V> fn) {
super(executor, dep, src);
this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d;
CompletableFuture<T> a;
if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null;
src = null;
fn = null;
return d.postFire(a, mode);
}
}
一樣的套路,thenApply/thenApplyAsync -> uniApplyStage -> uniApply -> tryFire -> postFire
thenAccept
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
if (f == null)
throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) {
Object r;
Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult) r).ex) != null) {
completeThrowable(x, r); // 有異常直接跳出
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked")
S s = (S) r;
f.accept(s);
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
static final class UniAccept<T> extends UniCompletion<T, Void> {
Consumer<? super T> fn;
UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) {
super(executor, dep, src);
this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
return null;
dep = null;
src = null;
fn = null;
return d.postFire(a, mode);
}
}
thenAccept/thenAcceptAsync -> uniAcceptStage -> uniAccept -> tryFire -> postFire
thenRun
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null)
throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniRun(this, f, null)) {
UniRun<T> c = new UniRun<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
Object r;
Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
completeThrowable(x, r);
else
try {
if (c != null && !c.claim())
return false;
f.run();
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
static final class UniRun<T> extends UniCompletion<T, Void> {
Runnable fn;
UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) {
super(executor, dep, src);
this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this))
return null;
dep = null;
src = null;
fn = null;
return d.postFire(a, mode);
}
}
thenRun/thenRunAsync -> uniRunStage -> uniRun -> tryFire -> postFire
thenAcceptBoth
thenAcceptBoth
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
biAcceptStage
private <U> CompletableFuture<Void> biAcceptStage(Executor e, CompletionStage<U> o,
BiConsumer<? super T, ? super U> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.biAccept(this, b, f, null)) {
BiAccept<T, U> c = new BiAccept<T, U>(e, d, this, b, f);
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
bipush
final void bipush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
if (c != null) {
Object r;
while ((r = result) == null && !tryPushStack(c)) // a的result還沒準(zhǔn)備好,c壓入棧
lazySetNext(c, null); // 失敗重置c的next域
if (b != null && b != this && b.result == null) { // b的result也還沒準(zhǔn)備好
Completion q = (r != null) ? c : new CoCompletion(c); // 根據(jù)a的result決定是否構(gòu)建CoCompletion, 如果a未結(jié)束,則構(gòu)建一個(gè)CoCompletion, CoCompletion最后調(diào)用的也是BiCompletion的tryFire
while (b.result == null && !b.tryPushStack(q)) // 將q壓入棧
lazySetNext(q, null); // 失敗重置q的next域
}
}
}
CoCompletion
static final class CoCompletion extends Completion {
BiCompletion<?, ?, ?> base;
CoCompletion(BiCompletion<?, ?, ?> base) {
this.base = base;
}
final CompletableFuture<?> tryFire(int mode) {
BiCompletion<?, ?, ?> c;
CompletableFuture<?> d;
if ((c = base) == null || (d = c.tryFire(mode)) == null) // 調(diào)用的還是BiCompletion的tryFire方法
return null;
base = null;
return d;
}
final boolean isLive() {
BiCompletion<?, ?, ?> c;
return (c = base) != null && c.dep != null;
}
}
biAccept
final <R, S> boolean biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R, ? super S> f,
BiAccept<R, S> c) {
Object r, s;
Throwable x;
if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null)
return false; // a和b都完成了,才會(huì)往下走
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult) r).ex) != null) { // a的異常檢查
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
if (s instanceof AltResult) {
if ((x = ((AltResult) s).ex) != null) { // b的異常檢查
completeThrowable(x, s);
break tryComplete;
}
s = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked")
R rr = (R) r;
@SuppressWarnings("unchecked")
S ss = (S) s;
f.accept(rr, ss); // 執(zhí)行任務(wù)
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
BiAccept
static final class BiAccept<T, U> extends BiCompletion<T, U, Void> {
BiConsumer<? super T, ? super U> fn;
BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
BiConsumer<? super T, ? super U> fn) {
super(executor, dep, src, snd);
this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
return null;
dep = null;
src = null;
snd = null;
fn = null;
return d.postFire(a, b, mode);
}
}
abstract static class BiCompletion<T, U, V> extends UniCompletion<T, V> {
CompletableFuture<U> snd; // second source for action
BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
super(executor, dep, src);
this.snd = snd;
}
}
thenAcceptBoth/thenAcceptBothAsync -> biAcceptStage -> biAccept -> tryFire -> postFire
acceptEither
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}
private <U extends T> CompletableFuture<Void> orAcceptStage(Executor e, CompletionStage<U> o,
Consumer<? super T> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.orAccept(this, b, f, null)) {
OrAccept<T, U> c = new OrAccept<T, U>(e, d, this, b, f);
orpush(b, c);
c.tryFire(SYNC);
}
return d;
}
final <R, S extends R> boolean orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f,
OrAccept<R, S> c) {
Object r;
Throwable x;
if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null) || f == null)
return false; // a和b有一個(gè)完成了就往下走
tryComplete: if (result == null) {
try {
if (c != null && !c.claim())
return false;
if (r instanceof AltResult) { // 異常
if ((x = ((AltResult) r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
@SuppressWarnings("unchecked")
R rr = (R) r;
f.accept(rr); // 執(zhí)行
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> {
Consumer<? super T> fn;
OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
Consumer<? super T> fn) {
super(executor, dep, src, snd);
this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
return null;
dep = null;
src = null;
snd = null;
fn = null;
return d.postFire(a, b, mode);
}
}
final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
if (c != null) {
while ((b == null || b.result == null) && result == null) { // a和b的result都沒好,才會(huì)考慮入棧
if (tryPushStack(c)) { // 先入a的棧
if (b != null && b != this && b.result == null) { // 入a的棧成功,b的result還沒好
Completion q = new CoCompletion(c); // a還未結(jié)束,用c構(gòu)建CoCompletion
while (result == null && b.result == null && !b.tryPushStack(q)) // 再次判斷,a和b的result都沒好,才會(huì)考慮入棧
lazySetNext(q, null); // 失敗置空q的next域
}
break;
}
lazySetNext(c, null); // 失敗置空c的next域
}
}
}
acceptEither/acceptEitherAsync -> orAcceptStage -> orAccept -> tryFire -> postFire
allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個(gè)數(shù)組構(gòu)建成一棵樹,二叉樹,動(dòng)態(tài)規(guī)劃
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture<?> a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null
|| (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : andTree(cfs, mid + 1, hi))) == null)
throw new NullPointerException();
if (!d.biRelay(a, b)) {
BiRelay<?, ?> c = new BiRelay<>(d, a, b);
a.bipush(b, c); // both
c.tryFire(SYNC);
}
}
return d;
}
static final class BiRelay<T, U> extends BiCompletion<T, U, Void> { // for And
BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
super(null, dep, src, snd);
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.biRelay(a = src, b = snd))
return null;
src = null;
snd = null;
dep = null;
return d.postFire(a, b, mode);
}
}
boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
Object r, s;
Throwable x;
if (a == null || (r = a.result) == null || b == null || (s = b.result) == null)
return false; // a和b都結(jié)束了才往下執(zhí)行
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
completeThrowable(x, r);
else if (s instanceof AltResult && (x = ((AltResult) s).ex) != null)
completeThrowable(x, s);
else
completeNull(); // 輔助結(jié)點(diǎn),什么都不做
}
return true;
}
allOf -> andTree -> biRelay -> tryFire -> postFire
anyOf
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個(gè)數(shù)組構(gòu)建成一棵樹,二叉樹,動(dòng)態(tài)規(guī)劃
CompletableFuture<Object> d = new CompletableFuture<Object>();
if (lo <= hi) {
CompletableFuture<?> a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null
|| (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : orTree(cfs, mid + 1, hi))) == null)
throw new NullPointerException();
if (!d.orRelay(a, b)) {
OrRelay<?, ?> c = new OrRelay<>(d, a, b);
a.orpush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
static final class OrRelay<T, U> extends BiCompletion<T, U, Object> { // for Or
OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
super(null, dep, src, snd);
}
final CompletableFuture<Object> tryFire(int mode) {
CompletableFuture<Object> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.orRelay(a = src, b = snd))
return null;
src = null;
snd = null;
dep = null;
return d.postFire(a, b, mode);
}
}
final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
Object r;
if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null))
return false; // a和b有一個(gè)結(jié)束就往下進(jìn)行
if (result == null)
completeRelay(r);
return true;
}
anyOf -> orTree -> orRelay -> tryFire -> postFire
數(shù)組構(gòu)建樹
allOf和anyOf都用到了數(shù)組構(gòu)建成樹的策略。
假設(shè)有一個(gè)任務(wù)Z(虛擬的,什么都不做),依賴一組任務(wù)[A, B, C, D, E, F, G, H]
對(duì)于allOf, 當(dāng)這組任務(wù)都完成時(shí),才會(huì)執(zhí)行Z;對(duì)于anyOf, 當(dāng)這組任務(wù)中有任何一個(gè)完成,就執(zhí)行任務(wù)Z。
如果這組任務(wù)是數(shù)組結(jié)構(gòu)或者鏈表結(jié)構(gòu),我們?cè)撊绾谓鉀Q呢?遍歷數(shù)組或者是鏈表,當(dāng)任務(wù)都完成或者有一個(gè)完成時(shí),就執(zhí)行Z,需要不停地遍歷,這是輪詢的方法,不合適。
整個(gè)基調(diào)是回調(diào),是指,當(dāng)一個(gè)任務(wù)完成時(shí),會(huì)接著執(zhí)行所有依賴于它的任務(wù)。
作為一個(gè)數(shù)組或者鏈表,該如何應(yīng)用回調(diào)呢?誰在先,誰在后呢?因?yàn)椴恢滥膫€(gè)任務(wù)會(huì)先完成,所以沒法確定次序。而且這組任務(wù)之間也不應(yīng)該相互依賴,它們只不過都是被Z依賴。
如果這組任務(wù)只有一個(gè)的話,那就演變成了X.thenXXX(Z), 如果這組任務(wù)有兩個(gè)的話,allOf -> Both,anyOf -> Either
如果Z依賴Z1,Z2兩個(gè)個(gè)任務(wù),Z1和Z2依賴Z11,Z12和Z21,Z22四個(gè)任務(wù),依次類推,當(dāng)虛擬的任務(wù)的個(gè)數(shù)達(dá)到真實(shí)任務(wù)的個(gè)數(shù)的一半時(shí),就讓虛擬任務(wù)監(jiān)聽真實(shí)的任務(wù),動(dòng)態(tài)規(guī)劃加二叉樹,時(shí)間復(fù)雜度也只是logn級(jí)別的。
static String array2Tree(String[] cfs, int lo, int hi) {
String d = new String(cfs[lo] + cfs[hi]);
if (lo <= hi) {
String a, b;
int mid = (lo + hi) >>> 1; // 二分
if (lo == mid) { // a作為左半部分的的結(jié)果
a = cfs[lo]; // 當(dāng)只有不超過兩個(gè)元素時(shí),a直接取第一個(gè)值
} else {
a = array2Tree(cfs, lo, mid);
}
if (lo == hi) { // 當(dāng)只有一個(gè)元素的時(shí)候,b取a的值
b = a;
} else {
if (hi == mid + 1) { // 右半部分只有兩個(gè)元素時(shí),b取第二個(gè)元素的值
b = cfs[hi];
} else {
b = array2Tree(cfs, mid + 1, hi);
}
}
if (a == null || b == null) {
throw new NullPointerException();
}
System.out.println("[" + a + "][" + b + "]->[" + d + "]");
}
return d;
}
Console
[A][B]->[AB]
[C][D]->[CD]
[AB][CD]->[AD]
[E][F]->[EF]
[G][H]->[GH]
[EF][GH]->[EH]
[AD][EH]->[AH]
如下圖

對(duì)于allOf, Z只要保證Z1和Z2都完成了就行,Z1和Z2分別保證Z11,Z12 和 Z21,Z22都完成了就像,而Z11,Z12,Z21,Z22則分別保證了A-H任務(wù)都完成。
對(duì)應(yīng)anyOf, Z 只要保證Z1和Z2有一個(gè)完成了就像,Z1和Z2聯(lián)合保證了Z11,Z12,Z21,Z22這4個(gè)任務(wù)只要有一個(gè)完成了就行,同理,Z11,Z12,Z21,Z22則聯(lián)合保證了A-H中有一個(gè)任務(wù)完成了就行。
然后,Z就可以執(zhí)行了,其實(shí)Z什么也沒做,只是從這組任務(wù)里得出一個(gè)結(jié)果。
參考:
https://www.cnblogs.com/Createsequence/p/16963895.html
https://www.cnblogs.com/aniao/p/aniao_cf.html



