Java并發(fā)編程——CompletableFuture源碼解析

前言

JDK8 為我們帶來了 CompletableFuture 這個(gè)有意思的新類,它提供比 Future 更靈活更強(qiáng)大的回調(diào)功能,借助 CompletableFuture 我們可以更方便的編排異步任務(wù)。

由于 CompletableFuture 默認(rèn)的線程池是 ForkJoinPool,在講 CompletableFuture 之前覺得有必要先簡(jiǎn)單介紹一下 ForkJoinPool。

一、ForkJoinPool 工作原理

ForkJoin 框架,另一種風(fēng)格的線程池(相比于 ThreadPoolExecutor),采用分治算法,以及工作竊取策略,極大地提高了并行性。對(duì)于那種大任務(wù)分割小任務(wù)的(分治)又或者并行計(jì)算場(chǎng)景尤其有用。

1.1 ForkJoinPool

線程池,F(xiàn)orkJoinPool 類中有一個(gè)commonPool,默認(rèn)并發(fā)數(shù)為邏輯處理器個(gè)數(shù) - 1;

ForkJoinPool commonPool = ForkJoinPool.commonPool();//makeCommonPool

//看源碼發(fā)現(xiàn) 
int parallelism = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
// 如果有自定義,那么取自定義的作為 parallelism
// 如果沒有 邏輯處理器個(gè)數(shù)-1 
parallelism = Runtime.getRuntime().availableProcessors() - 1

當(dāng)然了,如果你不想使用 commonPool,你完全可以直接new 一個(gè)

ForkJoinPool fjp = new ForkJoinPool(4);// 最大并發(fā)數(shù)4

其實(shí)我當(dāng)時(shí)有個(gè)疑問,明明可以直接 new 一個(gè) ForkJoinPool,還可以很方便的指定parallelism(我寫的ForkJoinTest就是通過這種方式),為什么 ForkJoinPool 中還定義一個(gè)static的commonPool?

commonPool一般用于 Java8 中的并發(fā)流計(jì)算中或者 CompletableFuture 沒有指定線程池時(shí)使用的一個(gè)commonPool。

1.2 ForkJoinTask

ForkJoinPool 與 ForkJoinTask 之間的關(guān)系,可以類比 ThreadPoolExecutor 和 Runnable 的關(guān)系,都可以理解為提交任務(wù)到線程池,只不過分治任務(wù)有自己獨(dú)特類型 ForkJoinTask。它是一個(gè)抽象類。主要有兩個(gè)實(shí)現(xiàn)RecursiveAction(有返回值) 和 RecursiveTask(無返回值)

1.3 ForkJoinWorkerThread

運(yùn)行 ForkJoinTask 任務(wù)的工作線程(worker),最大并發(fā)數(shù)不會(huì)超過parallelism。

1.4 WorkQueue

任務(wù)隊(duì)列,每個(gè)worker對(duì)應(yīng)一個(gè)queue,這是和 ThreadPoolExecutor 最大不同的地方之一。

1.5 WorkQueue[]

ForkJoinPool 中的任務(wù)分為兩種,一種是本地提交的任務(wù)Submission task,通過execute()、submit()、invoke()等方法提交的任務(wù);比如 CompletableFuture 中不提供線程池時(shí),提交的都是Submission task。

另外一種是工作線程fork出的子任務(wù)Worker task。

兩種任務(wù)都會(huì)存放在WorkQueue數(shù)組中,Submission task存放在WorkQueue數(shù)組的偶數(shù)索引位置,Worker task存放在奇數(shù)索引位置。工作線程只會(huì)分配在奇數(shù)索引的工作隊(duì)列。

1.6 工作竊取機(jī)制

工作竊取是指當(dāng)某個(gè)線程的任務(wù)隊(duì)列中沒有可執(zhí)行任務(wù)的時(shí)候,從其他線程的任務(wù)隊(duì)列中竊取任務(wù)來執(zhí)行,以充分利用工作線程的計(jì)算能力,減少線程由于獲取不到任務(wù)而造成的空閑浪費(fèi)。在 ForkJoinPool 中,工作任務(wù)的隊(duì)列都采用雙端隊(duì)列容器。我們知道,在通常使用隊(duì)列的過程中,我們都在隊(duì)尾插入,而在隊(duì)頭消費(fèi)以實(shí)現(xiàn) FIFO。而為了實(shí)現(xiàn)工作竊取。一般我們會(huì)改成工作線程在工作隊(duì)列上 LIFO,而竊取其他線程的任務(wù)的時(shí)候,從隊(duì)列頭部取獲取。

工作線程worker1、worker2以及worker3都從WorkQueue的尾部popping獲取task,而任務(wù)也從尾部Pushing,當(dāng)worker3隊(duì)列中沒有任務(wù)的時(shí)候,就會(huì)從其他線程的隊(duì)列中竊取stealing,這樣就使得 worker3 不會(huì)由于沒有任務(wù)而空閑。這就是工作竊取算法的基本原理。 可以想象,要是不使用工作竊取算法,那么我們?cè)诓粩?fork 的過程中,可能某些 worker 就會(huì)一直處于 join 的等待中。

因?yàn)檫@種機(jī)制,它能夠讓所有線程的工作量基本均衡,不會(huì)出現(xiàn)有的線程很忙,而有的線程很閑的狀況,所以性能很好。

  • 1、為什么說 ForkJoinPool 并發(fā)訪問一個(gè) IO 計(jì)算可能會(huì)拖垮整個(gè)系統(tǒng)?
    這主要說的是 ForkJoinPool 中的 commonPool,commonPool 是整個(gè)·系統(tǒng)共享的。比如你在 Stream 并行流中并發(fā)訪問一個(gè) IO 計(jì)算,某一時(shí)刻會(huì)導(dǎo)致commonPool 中大部分線程甚至所有線程都阻礙在這里。這可能就會(huì)造成其他程序使用到 commonPool 的程序線程饑餓,比如 CompletableFuture 中沒有指定線程池時(shí)。

  • 2、為什么說 cpu 密集型的任務(wù)使用 ForkJoinPool 性能更好?
    網(wǎng)上說主要有工作竊取機(jī)制。工作竊取的目的不就是充分利用 cpu,那普通線程不也可以嗎,把核心線程調(diào)成和 ForkJoinPool 默認(rèn)線程。

CompletableFuture 使用總結(jié)

關(guān)于 CompletableFuture(異步計(jì)算)的使用,比如訂單列表的并發(fā)請(qǐng)求遠(yuǎn)程服務(wù),服務(wù)中異步處理一些任務(wù),又或者異步導(dǎo)出,并發(fā)處理數(shù)據(jù)等等。

按功能分類

  • xxx():表示該方法將繼續(xù)在已有的線程中執(zhí)行;
  • xxxAsync():表示將異步在線程池中執(zhí)行。
  • 異步執(zhí)行方法默認(rèn)一個(gè)參數(shù)的話任務(wù)是在 ForkJoinPool.commonPool() 線程池中執(zhí)行的(帶看源碼),帶executor 參數(shù)的使用 executor線程池異步執(zhí)行。

按邏輯和組織方式來分的話(completableFuture 中大約有50個(gè)來方法)

  • 一種是 then 的邏輯,即前一個(gè)計(jì)算完成的時(shí)候調(diào)度后一個(gè)計(jì)算
  • 一種是 both 的邏輯,即等待兩個(gè)計(jì)算都完成之后執(zhí)行下一個(gè)計(jì)算,只要能組合一個(gè)和另一個(gè),我們就可以無限復(fù)用這個(gè) +1 的邏輯組合任意多的計(jì)算。(如果任務(wù)多,可以考慮可用allOf)
  • 另一種是 either 的邏輯,即等待兩個(gè)計(jì)算的其中一個(gè)完成之后執(zhí)行下一個(gè)計(jì)算。注意這樣的計(jì)算可以說是非確定性的。因?yàn)楸唤M合的兩個(gè)計(jì)算中先觸發(fā)下一個(gè)計(jì)算執(zhí)行的那個(gè)會(huì)被作為前一個(gè)計(jì)算,而這兩個(gè)前置的計(jì)算到底哪一個(gè)先完成是不可預(yù)知的(anyOf)

從依賴關(guān)系和出入?yún)?shù)類型區(qū)別,基本分為三類

  • apply 字樣的方式意味著組合方式是 Function,即接受前一個(gè)計(jì)算的結(jié)果,應(yīng)用函數(shù)之后返回一個(gè)新的結(jié)果。
  • accept 字樣的方式意味著組合方式是 Consumer,即接受前一個(gè)計(jì)算的結(jié)果,執(zhí)行消費(fèi)后不返回有意義的值。
  • run 字樣的方式意味著組合方式是 Runnable,即忽略前一個(gè)計(jì)算的結(jié)果,僅等待它完成后執(zhí)行動(dòng)作。

其中出入?yún)?shù)主要有 Java8 Function,Consumer 或 Runnable三中函數(shù)型接口,每一種都決定了是怎么樣一種依賴關(guān)系

二、數(shù)據(jù)結(jié)構(gòu)

2.1、CompletableFuture

CompletableFuture 實(shí)現(xiàn)了 Future 接口和 CompletionStage,F(xiàn)uture 不必多說,而 CompletionStage 定義了各種任務(wù)編排的 API:

CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions
}

CompletableFuture 的數(shù)據(jù)結(jié)構(gòu)包括用于記錄結(jié)果的 result,以及用于持有任務(wù)以及任務(wù)間依賴關(guān)系的 Completion 類型的成員變量 stack。

如果閱讀過 spring 注解相關(guān)功能的源碼的同學(xué),對(duì)于 CompletableFuture 和 Completion 應(yīng)該會(huì)有一種 TypeMappedAnnotation 和 AnnotationTypeMapping 的既視感,實(shí)際上他們兩者之間的關(guān)系確實(shí)也非常相似。

2.2、Completion

數(shù)據(jù)結(jié)構(gòu)

Completion 是 CompletableFuture 中的一個(gè)內(nèi)部類,我們可以簡(jiǎn)單的認(rèn)為它就是我們一般所說的“操作”。

abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;
}

它通過 next 指針在 CompletableFuture 中形成一個(gè)鏈表結(jié)構(gòu)。

依賴關(guān)系

它還有兩個(gè)抽象的實(shí)現(xiàn)類 UniCompletion 和 BiCompletion:

abstract static class UniCompletion<T,V> extends Completion {
        Executor executor;                 // executor to use (null if none)
        CompletableFuture<V> dep;          // the dependent to complete
        CompletableFuture<T> src;          // source for action
}

abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
        CompletableFuture<U> snd; // second source for action
}

其中 executor 表示該操作的執(zhí)行者,而 src 和 snd 兩個(gè)指針表示要執(zhí)行的操作對(duì)應(yīng)的 CompletableFuture 實(shí)例,而 dep 則表示要執(zhí)行的操作依賴的前置操作對(duì)應(yīng)的 CompletableFuture 實(shí)例。多個(gè) Completion 彼此之間通過這些指針維護(hù)彼此的依賴關(guān)系。

實(shí)現(xiàn)類

在 CompletableFuture,我們會(huì)看到很多格式為 UniXXX 或者 BiXXX 的內(nèi)部類,它們大多數(shù)都基于上述兩抽象類實(shí)現(xiàn),分別對(duì)應(yīng)不同的操作。我們以 UniApply 為例:

static final class UniApply<T,V> extends UniCompletion<T,V> {
    Function<? super T,? extends V> fn;
}

其本質(zhì)上就是一個(gè)額外掛載了 Function 接口的 UniCompletion,同理,XXXAccept 就是掛載了 Consumer 的 Completion,而 XXXRun 就是掛載的 Runnable 接口的 Completion。

三、源碼分析

樣例

后面介紹的源碼都會(huì)以下面的用例為切入點(diǎn),循著調(diào)用軌跡理解源碼。如果任務(wù)很耗時(shí),記得傳Executor, 或者方法末尾加上future.get(); 因?yàn)镃ompletableFuture默認(rèn)使用ForkJoinPool, 而ForkJoinPool里面的線程都是daemon線程,主線程跑完了,虛擬機(jī)也就over了。

public void whenComplete() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    future.whenComplete((l, r) -> System.out.println(l));
}

public void thenApply() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    future.thenApply(i -> -i);
}

public void thenAccept() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    future.thenAccept(System.out::println);
}

public void thenRun() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    future.thenRun(() -> System.out.println("Done"));
}

public void thenAcceptBoth() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
    future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y));
}

public void acceptEither() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
    future.acceptEither(other, System.out::println);
}

public void allOf() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
    CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
    CompletableFuture.allOf(future, second, third);
}

public void anyOf() throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
    CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
    CompletableFuture.anyOf(future, second, third);
}

3.1 supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    // asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(實(shí)現(xiàn)了Executor接口,里面的內(nèi)容是{new Thread(r).start();})
    return asyncSupplyStage(asyncPool, supplier); 
}

3.2 asyncSupplyStage(Executor e, Supplier<U> f)

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null)
        throw new NullPointerException();
    // 構(gòu)建一個(gè)新的CompletableFuture, 以此構(gòu)建AsyncSupply作為Executor的執(zhí)行參數(shù)
    CompletableFuture<U> d = new CompletableFuture<U>(); 
    // AsyncSupply繼承了ForkJoinTask, 實(shí)現(xiàn)了Runnable, AsynchronousCompletionTask接口
    e.execute(new AsyncSupply<U>(d, f)); 
    // 返回d,立返
    return d; 
}

3.3 AsyncSupply

// CompletableFuture的靜態(tài)內(nèi)部類,作為一個(gè)ForkJoinTask
static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
    // AsyncSupply作為一個(gè)依賴Task,dep作為這個(gè)Task的Future
    CompletableFuture<T> dep; 
    
    // fn作為這個(gè)Task的具體執(zhí)行邏輯,函數(shù)式編程
    Supplier<T> fn; 

    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep;
        this.fn = fn;
    }

    public final Void getRawResult() {
        return null;
    }

    public final void setRawResult(Void v) {
    }

    public final boolean exec() {
        run();
        return true;
    }

    public void run() {
        CompletableFuture<T> d;
        Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) { // 非空判斷
            dep = null;
            fn = null;
            // 查看任務(wù)是否結(jié)束,如果已經(jīng)結(jié)束(result != null),直接調(diào)用postComplete()方法
            if (d.result == null) { 
                try {
                    // 等待任務(wù)結(jié)束,并設(shè)置結(jié)果
                    d.completeValue(f.get()); 
                } catch (Throwable ex) {
                    d.completeThrowable(ex); // 異常
                }
            }
            // 任務(wù)結(jié)束后,會(huì)執(zhí)行所有依賴此任務(wù)的其他任務(wù),這些任務(wù)以一個(gè)無鎖并發(fā)棧的形式存在
            d.postComplete(); 
        }
    }
}

3.4 postComplete()

final void postComplete() {
    // 當(dāng)前CompletableFuture
    CompletableFuture<?> f = this; 
    // 無鎖并發(fā)棧,(Completion next), 保存的是依靠當(dāng)前的CompletableFuture一串任務(wù),完成即觸發(fā)(回調(diào))
    Completion h; 
    
    // 當(dāng)f的stack為空時(shí),使f重新指向當(dāng)前的CompletableFuture,繼續(xù)后面的結(jié)點(diǎn)
    while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { 
        CompletableFuture<?> d;
        Completion t;
        
        // 從頭遍歷stack,并更新頭元素
        if (f.casStack(h, t = h.next)) { 
            if (t != null) {
                // 如果f不是當(dāng)前CompletableFuture,則將它的頭結(jié)點(diǎn)壓入到當(dāng)前CompletableFuture的stack中,使樹形結(jié)構(gòu)變成鏈表結(jié)構(gòu),避免遞歸層次過深
                if (f != this) { 
                    pushStack(h);
                    // 繼續(xù)下一個(gè)結(jié)點(diǎn),批量壓入到當(dāng)前棧中
                    continue; 
                }
                // 如果是當(dāng)前CompletableFuture, 解除頭節(jié)點(diǎn)與棧的聯(lián)系
                h.next = null; 
            }
            // 調(diào)用頭節(jié)點(diǎn)的tryFire()方法,該方法可看作Completion的鉤子方法,執(zhí)行完邏輯后,會(huì)向后傳播的
            f = (d = h.tryFire(NESTED)) == null ? this : d; 
        }
    }
}

示意圖

每個(gè)CompletableFuture持有一個(gè)Completion棧stack, 每個(gè)Completion持有一個(gè)CompletableFuture -> dep, 如此遞歸循環(huán)下去,是層次很深的樹形結(jié)構(gòu),所以想辦法將其變成鏈表結(jié)構(gòu)。


首先取出頭結(jié)點(diǎn),下圖中灰色Completion結(jié)點(diǎn),它會(huì)返回一個(gè)CompletableFuture, 同樣也擁有一個(gè)stack,策略是遍歷這個(gè)CompletableFuture的stack的每個(gè)結(jié)點(diǎn),依次壓入到當(dāng)前CompletableFuture的stack中,關(guān)系如下箭頭所示,灰色結(jié)點(diǎn)指的是處理過的結(jié)點(diǎn)。


第一個(gè)Completion結(jié)點(diǎn)返回的CompletableFuture, 將擁有的stack里面的所有結(jié)點(diǎn)都?jí)喝肓水?dāng)前CompletableFuture的stack里面。


后續(xù)的Completion結(jié)點(diǎn)返回的CompletableFuture, 將擁有的stack里面的所有結(jié)點(diǎn)都?jí)喝肓水?dāng)前CompletableFuture的stack里面,重新構(gòu)成了一個(gè)鏈表結(jié)構(gòu),后續(xù)也按照前面的邏輯操作,如此反復(fù),便會(huì)遍歷完所有的CompletableFuture, 這些CompletableFuture(葉子結(jié)點(diǎn))的stack為空,也是結(jié)束條件。


postComplete()最后調(diào)用的是Completion#tryFire()方法,先看下Completion的數(shù)據(jù)結(jié)構(gòu)

Completion

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
    volatile Completion next; // 無鎖并發(fā)棧

    /**
     * 鉤子方法,有三種模式,postComplete()方法里面使用的是NESTED模式,避免過深的遞歸調(diào)用 SYNC, ASYNC, or NESTED
     */
    abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都調(diào)用了這個(gè)鉤子方法

    /** cleanStack()方法里有用到 */
    abstract boolean isLive();

    public final void run() {
        tryFire(ASYNC);
    }

    public final boolean exec() {
        tryFire(ASYNC);
        return true;
    }

    public final Void getRawResult() {
        return null;
    }

    public final void setRawResult(Void v) {
    }
}

static final int SYNC = 0; 同步
static final int ASYNC = 1; 異步
static final int NESTED = -1; 嵌套

繼承了ForkJoinTask, 實(shí)現(xiàn)了Runnable, AsynchronousCompletionTask接口,它有諸多子類,如下圖


后面的方法都對(duì)應(yīng)著不同的子類。

先看一個(gè)子類UniCompletion

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;                 // 執(zhí)行器
    CompletableFuture<V> dep;          // 依賴的任務(wù)
    CompletableFuture<T> src;          // 被依賴的任務(wù)

    UniCompletion(Executor executor, CompletableFuture<V> dep,
                    CompletableFuture<T> src) {
        this.executor = executor; this.dep = dep; this.src = src;
    }

    // 如果當(dāng)前任務(wù)可以被執(zhí)行,返回true,否則,返回false; 保證任務(wù)只被執(zhí)行一次
    final boolean claim() { 
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
            if (e == null)
                return true;
            executor = null; // 設(shè)置為不可用
            e.execute(this);
        }
        return false;
    }

    final boolean isLive() {
        return dep != null; 
    }
}

claim()方法保證任務(wù)只被執(zhí)行一次。

whenComplete

whenComplete()/whenCompleteAsync()

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}

xxx和xxxAsync方法的區(qū)別是,有沒有asyncPool作為入?yún)ⅲ械脑?,任?wù)直接入?yún)?,不檢查任務(wù)是否完成。uniWhenCompleteStage方法有說明。

uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f)

private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {
    if (f == null)
        throw new NullPointerException();
        // 構(gòu)建future
        CompletableFuture<T> d = new CompletableFuture<T>(); 
    if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果線程池不為空,直接構(gòu)建任務(wù)入棧,并調(diào)用tryFire()方法;否則,調(diào)用uniWhenComplete()方法,檢查依賴的那個(gè)任務(wù)是否完成,沒有完成返回false,
                                                         // 完成了返回true, 以及后續(xù)一些操作。
         UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete繼承了UniCompletion
         push(c);
         c.tryFire(SYNC); // 先調(diào)一下鉤子方法,檢查一下任務(wù)是否結(jié)束
    }
    return d;
}

uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c)

final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) {
    Object r;
    T t;
    Throwable x = null;
    if (a == null || (r = a.result) == null || f == null) // 被依賴的任務(wù)還未完成
        return false;
    if (result == null) { // 被依賴的任務(wù)完成了
        try {
            if (c != null && !c.claim()) // 判斷任務(wù)是否能被執(zhí)行
                return false;
            if (r instanceof AltResult) { // 判斷異常,AltResult類型很簡(jiǎn)單,里面只有一個(gè)屬性Throwable ex; 
                x = ((AltResult) r).ex;
                t = null;
            } else {
                @SuppressWarnings("unchecked")
                T tr = (T) r; // 正常的結(jié)果
                t = tr;
            }
            f.accept(t, x); // 執(zhí)行任務(wù)
            if (x == null) {
                internalComplete(r); // 任務(wù)的結(jié)果設(shè)置為被依賴任務(wù)的結(jié)果
                return true;
            }
        } catch (Throwable ex) {
        if (x == null)
            x = ex; // 記錄異常
        }
        completeThrowable(x, r); // 設(shè)置異常和結(jié)果
    }
    return true;
}

push()

final void push(UniCompletion<?, ?> c) {
    if (c != null) {
        while (result == null && !tryPushStack(c))
            lazySetNext(c, null); // 失敗重置c的next域
    }
}

final boolean tryPushStack(Completion c) {
    Completion h = stack;
    lazySetNext(c, h);
    return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}

static void lazySetNext(Completion c, Completion next) {
    UNSAFE.putOrderedObject(c, NEXT, next);
}

UniWhenComplete

static final class UniWhenComplete<T> extends UniCompletion<T, T> {
    BiConsumer<? super T, ? super Throwable> fn;

    UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src,
         BiConsumer<? super T, ? super Throwable> fn) {
        super(executor, dep, src);
        this.fn = fn;
    }

    final CompletableFuture<T> tryFire(int mode) { // 鉤子方法
        CompletableFuture<T> d; // 依賴的任務(wù)
        CompletableFuture<T> a; // 被依賴的任務(wù)
        if ((d = dep) == null || !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) // 如果是異步模式(mode = 1),就不判斷任務(wù)是否結(jié)束
            return null; // dep為空,說明已經(jīng)調(diào)用過了
        dep = null;
        src = null;
        fn = null;
        return d.postFire(a, mode); // 鉤子方法之后的處理
    }
}

postFire(CompletableFuture<?> a, int mode)

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    if (a != null && a.stack != null) { // 被依賴的任務(wù)存在,且stack不為空,先處理它
        if (mode < 0 || a.result == null) // 如果是嵌套模式(mode = -1), 或者任務(wù)的結(jié)果為空,直接清空棧
            a.cleanStack();
        else
            a.postComplete(); // 否則,調(diào)用postComplete()方法
    }
    if (result != null && stack != null) { // 再處理當(dāng)前任務(wù)
        if (mode < 0) // 嵌套模式,直接返回自身(樹 -> 鏈表,避免過深的遞歸調(diào)用)
            return this;
        else
            postComplete(); // 調(diào)用postComplete()方法
    }
    return null;
}

cleanStack()

final void cleanStack() { // 過濾掉已經(jīng)死掉的結(jié)點(diǎn)(Not isLive)
    for (Completion p = null, q = stack; q != null;) { // q指針從頭節(jié)點(diǎn)開始,向右移動(dòng),s一直執(zhí)行q的下一個(gè)結(jié)點(diǎn),p要么為空,要么指向遍歷過的最后一個(gè)活著的結(jié)點(diǎn),一旦發(fā)現(xiàn)q死掉了,就斷開q, 連接p, s
        Completion s = q.next;
        if (q.isLive()) { // 還活著,p指向遍歷過的最后一個(gè)結(jié)點(diǎn),q向右移動(dòng)
            p = q;
            q = s;
        } else if (p == null) { // 說明第一個(gè)結(jié)點(diǎn)就是死掉的,cas stack, q指向stack
            casStack(q, s);
            q = stack;
        } else { // 否則的話,連接p, s
        p.next = s;
            if (p.isLive()) // 再次判斷p結(jié)點(diǎn)是否還或者(在這期間是否有別的線程改動(dòng)了)
                q = s; // 還活著,q繼續(xù)向右移動(dòng)
            else {
                p = null; // 過期的值,從新開始
                q = stack;
            }
        }
    }
}

如下圖

    1. 第1個(gè)結(jié)點(diǎn)是無效結(jié)點(diǎn),更新stack,更新指針


    1. 第2個(gè)結(jié)點(diǎn)是有效結(jié)點(diǎn),更新指針


    1. 第3個(gè)結(jié)點(diǎn)是無效結(jié)點(diǎn),更新指針


    1. 第4個(gè)結(jié)點(diǎn)是有效結(jié)點(diǎn),更新指針


thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
    return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}

private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {
    if (f == null)
        throw new NullPointerException();
    CompletableFuture<V> d = new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T, V> c = new UniApply<T, V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S, ? extends T> f, UniApply<S, T> c) {
    Object r;
    Throwable x;
    if (a == null || (r = a.result) == null || f == null)
        return false;
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult) r).ex) != null) {
                completeThrowable(x, r); // 有異常,直接跳出
                break tryComplete;
            }
            r = null;
        }
        try {
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked")
            S s = (S) r;
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

static final class UniApply<T, V> extends UniCompletion<T, V> {
    Function<? super T, ? extends V> fn;

    UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src,
            Function<? super T, ? extends V> fn) {
        super(executor, dep, src);
        this.fn = fn;
    }

    final CompletableFuture<V> tryFire(int mode) {
        CompletableFuture<V> d;
        CompletableFuture<T> a;
        if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null;
        src = null;
        fn = null;
        return d.postFire(a, mode);
    }
}

一樣的套路,thenApply/thenApplyAsync -> uniApplyStage -> uniApply -> tryFire -> postFire

thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
    if (f == null)
        throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (e != null || !d.uniAccept(this, f, null)) {
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) {
    Object r;
    Throwable x;
    if (a == null || (r = a.result) == null || f == null)
        return false;
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult) r).ex) != null) {
                completeThrowable(x, r); // 有異常直接跳出
                break tryComplete;
            }
            r = null;
        }
        try {
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked")
            S s = (S) r;
            f.accept(s);
            completeNull();
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

static final class UniAccept<T> extends UniCompletion<T, Void> {
    Consumer<? super T> fn;

    UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) {
        super(executor, dep, src);
        this.fn = fn;
    }

    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null;
        src = null;
        fn = null;
        return d.postFire(a, mode);
    }
}

thenAccept/thenAcceptAsync -> uniAcceptStage -> uniAccept -> tryFire -> postFire

thenRun

public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
    if (f == null)
        throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (e != null || !d.uniRun(this, f, null)) {
        UniRun<T> c = new UniRun<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
    Object r;
    Throwable x;
    if (a == null || (r = a.result) == null || f == null)
        return false;
    if (result == null) {
        if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
            completeThrowable(x, r);
        else
            try {
                if (c != null && !c.claim())
                    return false;
                f.run();
                completeNull();
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
    }
    return true;
}

static final class UniRun<T> extends UniCompletion<T, Void> {
    Runnable fn;

    UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) {
        super(executor, dep, src);
        this.fn = fn;
    }

    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null;
        src = null;
        fn = null;
        return d.postFire(a, mode);
    }
}

thenRun/thenRunAsync -> uniRunStage -> uniRun -> tryFire -> postFire

thenAcceptBoth

thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(null, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(asyncPool, other, action);
}

biAcceptStage

private <U> CompletableFuture<Void> biAcceptStage(Executor e, CompletionStage<U> o,
        BiConsumer<? super T, ? super U> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (e != null || !d.biAccept(this, b, f, null)) {
        BiAccept<T, U> c = new BiAccept<T, U>(e, d, this, b, f);
        bipush(b, c);
        c.tryFire(SYNC);
    }
    return d;
}

bipush

final void bipush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
    if (c != null) {
        Object r;
        while ((r = result) == null && !tryPushStack(c)) // a的result還沒準(zhǔn)備好,c壓入棧
            lazySetNext(c, null); // 失敗重置c的next域
        if (b != null && b != this && b.result == null) { // b的result也還沒準(zhǔn)備好
            Completion q = (r != null) ? c : new CoCompletion(c); // 根據(jù)a的result決定是否構(gòu)建CoCompletion, 如果a未結(jié)束,則構(gòu)建一個(gè)CoCompletion, CoCompletion最后調(diào)用的也是BiCompletion的tryFire
            while (b.result == null && !b.tryPushStack(q)) // 將q壓入棧
                lazySetNext(q, null); // 失敗重置q的next域
        }
    }
}

CoCompletion

static final class CoCompletion extends Completion {
    BiCompletion<?, ?, ?> base;

    CoCompletion(BiCompletion<?, ?, ?> base) {
        this.base = base;
    }

    final CompletableFuture<?> tryFire(int mode) {
        BiCompletion<?, ?, ?> c;
        CompletableFuture<?> d;
        if ((c = base) == null || (d = c.tryFire(mode)) == null) // 調(diào)用的還是BiCompletion的tryFire方法
            return null;
        base = null;
        return d;
    }

    final boolean isLive() {
        BiCompletion<?, ?, ?> c;
        return (c = base) != null && c.dep != null;
    }
}

biAccept

final <R, S> boolean biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R, ? super S> f,
    BiAccept<R, S> c) {
    Object r, s;
    Throwable x;
    if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null)
        return false; // a和b都完成了,才會(huì)往下走
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult) r).ex) != null) { // a的異常檢查
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        if (s instanceof AltResult) {
            if ((x = ((AltResult) s).ex) != null) { // b的異常檢查
                completeThrowable(x, s);
                break tryComplete;
            }
            s = null;
        }
        try {
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked")
            R rr = (R) r;
            @SuppressWarnings("unchecked")
            S ss = (S) s;
            f.accept(rr, ss); // 執(zhí)行任務(wù)
            completeNull();
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

BiAccept

static final class BiAccept<T, U> extends BiCompletion<T, U, Void> {
    BiConsumer<? super T, ? super U> fn;

    BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
        BiConsumer<? super T, ? super U> fn) {
        super(executor, dep, src, snd);
        this.fn = fn;
    }

    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null || !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
            return null;
        dep = null;
        src = null;
        snd = null;
        fn = null;
        return d.postFire(a, b, mode);
    }
}

abstract static class BiCompletion<T, U, V> extends UniCompletion<T, V> {
    CompletableFuture<U> snd; // second source for action

    BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
        super(executor, dep, src);
        this.snd = snd;
    }
}

thenAcceptBoth/thenAcceptBothAsync -> biAcceptStage -> biAccept -> tryFire -> postFire

acceptEither

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(null, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(asyncPool, other, action);
}

private <U extends T> CompletableFuture<Void> orAcceptStage(Executor e, CompletionStage<U> o,
        Consumer<? super T> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (e != null || !d.orAccept(this, b, f, null)) {
        OrAccept<T, U> c = new OrAccept<T, U>(e, d, this, b, f);
        orpush(b, c);
        c.tryFire(SYNC);
    }
    return d;
}

final <R, S extends R> boolean orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f,
        OrAccept<R, S> c) {
    Object r;
    Throwable x;
    if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null) || f == null)
        return false; // a和b有一個(gè)完成了就往下走
    tryComplete: if (result == null) {
        try {
            if (c != null && !c.claim())
                return false;
            if (r instanceof AltResult) { // 異常
                if ((x = ((AltResult) r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            @SuppressWarnings("unchecked")
            R rr = (R) r;
            f.accept(rr); // 執(zhí)行
            completeNull();
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> {
    Consumer<? super T> fn;

    OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
            Consumer<? super T> fn) {
        super(executor, dep, src, snd);
        this.fn = fn;
    }

    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null || !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
            return null;
        dep = null;
        src = null;
        snd = null;
        fn = null;
        return d.postFire(a, b, mode);
    }
}

final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
    if (c != null) {
        while ((b == null || b.result == null) && result == null) { // a和b的result都沒好,才會(huì)考慮入棧
            if (tryPushStack(c)) { // 先入a的棧
                if (b != null && b != this && b.result == null) { // 入a的棧成功,b的result還沒好
                    Completion q = new CoCompletion(c); // a還未結(jié)束,用c構(gòu)建CoCompletion
                    while (result == null && b.result == null && !b.tryPushStack(q)) // 再次判斷,a和b的result都沒好,才會(huì)考慮入棧
                    lazySetNext(q, null); // 失敗置空q的next域
                }
                break;
            }
            lazySetNext(c, null); // 失敗置空c的next域
        }
    }
}

acceptEither/acceptEitherAsync -> orAcceptStage -> orAccept -> tryFire -> postFire

allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個(gè)數(shù)組構(gòu)建成一棵樹,二叉樹,動(dòng)態(tài)規(guī)劃
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (lo > hi) // empty
        d.result = NIL;
    else {
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null
                || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : andTree(cfs, mid + 1, hi))) == null)
            throw new NullPointerException();
        if (!d.biRelay(a, b)) {
            BiRelay<?, ?> c = new BiRelay<>(d, a, b);
            a.bipush(b, c); // both
            c.tryFire(SYNC);
        }
    }
    return d;
}

static final class BiRelay<T, U> extends BiCompletion<T, U, Void> { // for And
    BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
        super(null, dep, src, snd);
    }

    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null || !d.biRelay(a = src, b = snd))
            return null;
        src = null;
        snd = null;
        dep = null;
        return d.postFire(a, b, mode);
    }
}

boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
    Object r, s;
    Throwable x;
    if (a == null || (r = a.result) == null || b == null || (s = b.result) == null)
        return false; // a和b都結(jié)束了才往下執(zhí)行
    if (result == null) {
        if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
            completeThrowable(x, r);
        else if (s instanceof AltResult && (x = ((AltResult) s).ex) != null)
            completeThrowable(x, s);
        else
            completeNull(); // 輔助結(jié)點(diǎn),什么都不做
    }
    return true;
}

allOf -> andTree -> biRelay -> tryFire -> postFire

anyOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個(gè)數(shù)組構(gòu)建成一棵樹,二叉樹,動(dòng)態(tài)規(guī)劃
    CompletableFuture<Object> d = new CompletableFuture<Object>();
    if (lo <= hi) {
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null
                || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : orTree(cfs, mid + 1, hi))) == null)
            throw new NullPointerException();
        if (!d.orRelay(a, b)) {
            OrRelay<?, ?> c = new OrRelay<>(d, a, b);
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}

static final class OrRelay<T, U> extends BiCompletion<T, U, Object> { // for Or
    OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
        super(null, dep, src, snd);
    }

    final CompletableFuture<Object> tryFire(int mode) {
        CompletableFuture<Object> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null || !d.orRelay(a = src, b = snd))
            return null;
        src = null;
        snd = null;
        dep = null;
        return d.postFire(a, b, mode);
    }
}

final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
    Object r;
    if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null))
        return false; // a和b有一個(gè)結(jié)束就往下進(jìn)行
    if (result == null)
        completeRelay(r);
    return true;
}

anyOf -> orTree -> orRelay -> tryFire -> postFire

數(shù)組構(gòu)建樹

allOf和anyOf都用到了數(shù)組構(gòu)建成樹的策略。

假設(shè)有一個(gè)任務(wù)Z(虛擬的,什么都不做),依賴一組任務(wù)[A, B, C, D, E, F, G, H]

對(duì)于allOf, 當(dāng)這組任務(wù)都完成時(shí),才會(huì)執(zhí)行Z;對(duì)于anyOf, 當(dāng)這組任務(wù)中有任何一個(gè)完成,就執(zhí)行任務(wù)Z。

如果這組任務(wù)是數(shù)組結(jié)構(gòu)或者鏈表結(jié)構(gòu),我們?cè)撊绾谓鉀Q呢?遍歷數(shù)組或者是鏈表,當(dāng)任務(wù)都完成或者有一個(gè)完成時(shí),就執(zhí)行Z,需要不停地遍歷,這是輪詢的方法,不合適。

整個(gè)基調(diào)是回調(diào),是指,當(dāng)一個(gè)任務(wù)完成時(shí),會(huì)接著執(zhí)行所有依賴于它的任務(wù)。

作為一個(gè)數(shù)組或者鏈表,該如何應(yīng)用回調(diào)呢?誰在先,誰在后呢?因?yàn)椴恢滥膫€(gè)任務(wù)會(huì)先完成,所以沒法確定次序。而且這組任務(wù)之間也不應(yīng)該相互依賴,它們只不過都是被Z依賴。

如果這組任務(wù)只有一個(gè)的話,那就演變成了X.thenXXX(Z), 如果這組任務(wù)有兩個(gè)的話,allOf -> Both,anyOf -> Either

如果Z依賴Z1,Z2兩個(gè)個(gè)任務(wù),Z1和Z2依賴Z11,Z12和Z21,Z22四個(gè)任務(wù),依次類推,當(dāng)虛擬的任務(wù)的個(gè)數(shù)達(dá)到真實(shí)任務(wù)的個(gè)數(shù)的一半時(shí),就讓虛擬任務(wù)監(jiān)聽真實(shí)的任務(wù),動(dòng)態(tài)規(guī)劃加二叉樹,時(shí)間復(fù)雜度也只是logn級(jí)別的。

static String array2Tree(String[] cfs, int lo, int hi) {
    String d = new String(cfs[lo] + cfs[hi]);
    if (lo <= hi) {
        String a, b;
        int mid = (lo + hi) >>> 1; // 二分
        if (lo == mid) { // a作為左半部分的的結(jié)果
            a = cfs[lo]; // 當(dāng)只有不超過兩個(gè)元素時(shí),a直接取第一個(gè)值
        } else {
            a = array2Tree(cfs, lo, mid);
        }
        if (lo == hi) { // 當(dāng)只有一個(gè)元素的時(shí)候,b取a的值
            b = a;
        } else {
            if (hi == mid + 1) { // 右半部分只有兩個(gè)元素時(shí),b取第二個(gè)元素的值
                b = cfs[hi];
            } else {
                b = array2Tree(cfs, mid + 1, hi);
            }
        }
        if (a == null || b == null) {
            throw new NullPointerException();
        }
        System.out.println("[" + a + "][" + b + "]->[" + d + "]");
    }
    return d;
}

Console

[A][B]->[AB]
[C][D]->[CD]
[AB][CD]->[AD]
[E][F]->[EF]
[G][H]->[GH]
[EF][GH]->[EH]
[AD][EH]->[AH]

如下圖


對(duì)于allOf, Z只要保證Z1和Z2都完成了就行,Z1和Z2分別保證Z11,Z12 和 Z21,Z22都完成了就像,而Z11,Z12,Z21,Z22則分別保證了A-H任務(wù)都完成。

對(duì)應(yīng)anyOf, Z 只要保證Z1和Z2有一個(gè)完成了就像,Z1和Z2聯(lián)合保證了Z11,Z12,Z21,Z22這4個(gè)任務(wù)只要有一個(gè)完成了就行,同理,Z11,Z12,Z21,Z22則聯(lián)合保證了A-H中有一個(gè)任務(wù)完成了就行。

然后,Z就可以執(zhí)行了,其實(shí)Z什么也沒做,只是從這組任務(wù)里得出一個(gè)結(jié)果。

參考:
https://www.cnblogs.com/Createsequence/p/16963895.html

https://www.cnblogs.com/aniao/p/aniao_cf.html

https://blog.csdn.net/PNGYUL/article/details/119838961

https://blog.csdn.net/qq_33512765/article/details/126427491

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容