java并發(fā)編程(7):CompletableFuture異步框架源碼詳解及實(shí)例

CompletableFuture為異步編程框架,當(dāng)我們?cè)谑褂镁€程池處理任務(wù)時(shí),我們只能通過(guò)阻塞的Future#get()獲取異步的結(jié)果,當(dāng)任務(wù)處理需要的時(shí)間比較長(zhǎng)時(shí),效率和性能就會(huì)比較差。而CompletableFuture彌補(bǔ)了Future,其主要是在任務(wù)處理完成后,調(diào)用應(yīng)用的回調(diào)函數(shù),這樣應(yīng)用就無(wú)需通過(guò)Future#get()的方式獲取處理結(jié)果,而是通過(guò)任務(wù)的回調(diào)來(lái)通知應(yīng)用結(jié)果,這樣極大的提高了應(yīng)用的效率。同時(shí)CompletableFuture還提供了任務(wù)串行、并行等處理,方便了任務(wù)的異步邏輯組合。

1、CompletableFuture繼承關(guān)系

CompletableFuture繼承關(guān)系.png

CompletableFuture主要繼承于Future接口及CompletionStage接口,F(xiàn)uture為異步結(jié)果接口,CompletionStage定義了CompletableFuture異步處理及依賴接口。

2、Completion繼承關(guān)系

Completion為CompletableFuture的任務(wù)依賴堆,保存了當(dāng)前CompletableFuture依賴的任務(wù)。其繼承于ForkJoinTask,主要繼承結(jié)構(gòu)如下:

Completion繼承關(guān)系.png

UniCompletion為基礎(chǔ)抽象類,其包含了任務(wù)的線程池信息、依賴任務(wù)及任務(wù)執(zhí)行體。

2.1、Completion解析

abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    //堆中的下個(gè)任務(wù)
    volatile Completion next;      // Treiber stack link
    //執(zhí)行被觸發(fā)的任務(wù),返回需要傳播的依賴任務(wù)
    abstract CompletableFuture<?> tryFire(int mode);

    //任務(wù)是否可觸發(fā)
    abstract boolean isLive();

    public final void run()                { tryFire(ASYNC); }
    public final boolean exec()            { tryFire(ASYNC); return true; }
    public final Void getRawResult()       { return null; }
    public final void setRawResult(Void v) {}
}

2.2、UniCompletion解析

abstract static class UniCompletion<T,V> extends Completion {
    //執(zhí)行當(dāng)前任務(wù)的線程池
    Executor executor;                 // executor to use (null if none)
    //當(dāng)然依賴的任務(wù)
    CompletableFuture<V> dep;          // the dependent to complete
    //當(dāng)前任務(wù)的執(zhí)行實(shí)體
    CompletableFuture<T> src;          // source for action

    UniCompletion(Executor executor, CompletableFuture<V> dep,
                  CompletableFuture<T> src) {
        this.executor = executor; this.dep = dep; this.src = src;
    }

   //若當(dāng)前任務(wù)可執(zhí)行,則返回true。若異步執(zhí)行,則提交當(dāng)前任務(wù)
    final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
            if (e == null)
                return true;
            executor = null; // disable
            e.execute(this);
        }
        return false;
    }

    final boolean isLive() { return dep != null; }
}

2.3、BiCompletion解析

BiCompletion主要增加了一個(gè)任務(wù)。

abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
    CompletableFuture<U> snd; // second source for action
    BiCompletion(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src, CompletableFuture<U> snd) {
        super(executor, dep, src); this.snd = snd;
    }
}

3、主要方法詳解

3.1、工廠方法創(chuàng)建CompletableFuture

CompletableFuture的工廠方法方便用戶創(chuàng)建及使用CompletableFuture。主要分為兩類,執(zhí)行有返回值的任務(wù)(Callable)和無(wú)返回值的任務(wù)(Runnable)

//在線程池中異步執(zhí)行一個(gè)有返回值的任務(wù),返回結(jié)果封裝在CompletableFuture中,
//任務(wù)體為supplier中
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

//在線程池中異步執(zhí)行一個(gè)有返回值的任務(wù),返回結(jié)果封裝在CompletableFuture中,
//顯式提供線程池executor
//任務(wù)體為supplier中
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

//在線程池中異步執(zhí)行一個(gè)無(wú)返回值的任務(wù),返回結(jié)果封裝在CompletableFuture中;
//任務(wù)體為supplier中
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

//在線程池中異步執(zhí)行一個(gè)無(wú)返回值的任務(wù),返回結(jié)果封裝在CompletableFuture中;
//顯式提供線程池executor
//任務(wù)體為supplier中
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                               Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

//獲取一個(gè)已完成的CompletableFuture,并用value作為結(jié)果。
//任務(wù)體為supplier中
public static <U> CompletableFuture<U> completedFuture(U value) {
    return new CompletableFuture<U>((value == null) ? NIL : value);
}

//執(zhí)行有返回值的任務(wù),主要是將任務(wù)封裝為一個(gè)AsyncSupply并交由線程池執(zhí)行
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

//執(zhí)行無(wú)返回值的任務(wù),主要是將任務(wù)封裝為一個(gè)AsyncRun并交由線程池執(zhí)行
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    e.execute(new AsyncRun(d, f));
    return d;
}

AsyncSupply及AsyncRun實(shí)現(xiàn):

//封裝的task,用于執(zhí)行無(wú)返回值的任務(wù)
static final class AsyncRun extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    //dep:當(dāng)前任務(wù)的異步執(zhí)行結(jié)果的Future;fn:當(dāng)前任務(wù)的執(zhí)行體,函數(shù)式編程        
    CompletableFuture<Void> dep; Runnable fn;
    AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return true; }

    public void run() {
        CompletableFuture<Void> d; Runnable f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //任務(wù)未執(zhí)行結(jié)束?
            if (d.result == null) {
                try {
                    //執(zhí)行任務(wù)
                    f.run();
                    //設(shè)置執(zhí)行結(jié)果問(wèn)null的AltResult
                    d.completeNull();
                } catch (Throwable ex) {
                    //若異常則設(shè)置異常結(jié)果
                    d.completeThrowable(ex);
                }
            }
            //傳播任務(wù)完成的消息,執(zhí)行所有依賴此任務(wù)的其他任務(wù),依賴任務(wù)存儲(chǔ)在棧中
            d.postComplete();
        }
    }
}

//封裝的task,用于執(zhí)行有返回值的任務(wù)
static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    //dep:當(dāng)前任務(wù)的異步執(zhí)行結(jié)果的Future;fn:當(dāng)前任務(wù)的執(zhí)行體,函數(shù)式編程             
    CompletableFuture<T> dep; Supplier<T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return true; }

    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //任務(wù)未執(zhí)行?
            if (d.result == null) {
                try {
                    //執(zhí)行任務(wù),并設(shè)置任務(wù)的執(zhí)行結(jié)果
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    //執(zhí)行異常則設(shè)置異常結(jié)果
                    d.completeThrowable(ex);
                }
            }
            //傳播任務(wù)完成的消息,執(zhí)行所有依賴此任務(wù)的其他任務(wù),依賴任務(wù)存儲(chǔ)在棧中
            d.postComplete();
        }
    }
}


final void postComplete() {
    //f:當(dāng)前CompletableFuture
    CompletableFuture<?> f = this; Completion h;
    //當(dāng)前CompletableFuture的依賴棧不為空;
    //或當(dāng)f的stack為空時(shí),使f重新指向當(dāng)前的CompletableFuture,繼續(xù)后面的結(jié)點(diǎn)
    //一次執(zhí)行一個(gè)依賴任務(wù)的處理
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        //更新堆的頭節(jié)點(diǎn)下個(gè)節(jié)點(diǎn)
        if (f.casStack(h, t = h.next)) {
            //頭結(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)不為空?
            if (t != null) {
                // 如果f不是當(dāng)前CompletableFuture,則將它的頭結(jié)點(diǎn)壓入到當(dāng)前CompletableFuture的stack中,
                // 使樹(shù)形結(jié)構(gòu)變成鏈表結(jié)構(gòu),避免遞歸層次過(guò)深
                if (f != this) {
                    // 繼續(xù)下一個(gè)結(jié)點(diǎn),批量壓入到當(dāng)前棧中
                    pushStack(h);
                    continue;
                }
                // 如果是當(dāng)前CompletableFuture, 解除頭節(jié)點(diǎn)與棧的聯(lián)系
                h.next = null;    // detach
            }
            // 調(diào)用頭節(jié)點(diǎn)的tryFire()方法,該方法可看作Completion的鉤子方法,
            // 執(zhí)行完邏輯后,會(huì)向后傳播的
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

3.2、獲取CompletableFuture的異步結(jié)果

CompletableFuture繼承于Future,實(shí)現(xiàn)了獲取異步執(zhí)行結(jié)果的一些方法。

//異步任務(wù)是否已經(jīng)完成
public boolean isDone() {
    return result != null;
}

//獲取異步的執(zhí)行結(jié)果,若任務(wù)未執(zhí)行完成,則阻塞等待;
//若執(zhí)行結(jié)果中有異常,則直接拋出異常
public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

//在給定的超時(shí)時(shí)間內(nèi)獲取異步結(jié)果
public T get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    Object r;
    long nanos = unit.toNanos(timeout);
    return reportGet((r = result) == null ? timedGet(nanos) : r);
}

//阻塞等待任務(wù)執(zhí)行完成并獲取任務(wù)結(jié)果
public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

//立即獲取執(zhí)行結(jié)果,若任務(wù)還未執(zhí)行完成則直接使用給定的默認(rèn)值,否則返回結(jié)果;
//若執(zhí)行結(jié)果中有異常,則直接拋出異常
public T getNow(T valueIfAbsent) {
    Object r;
    return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

//獲取異步執(zhí)行結(jié)果,若結(jié)果有異常,則直接拋出異常
private static <T> T reportJoin(Object r) {
    if (r instanceof AltResult) {
        Throwable x;
        if ((x = ((AltResult)r).ex) == null)
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x;
        if (x instanceof CompletionException)
            throw (CompletionException)x;
        throw new CompletionException(x);
    }
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

3.3、計(jì)算結(jié)果完成后的相關(guān)處理(UniWhenComplete)

當(dāng)CompletableFuture計(jì)算結(jié)果完成時(shí),我們需要對(duì)結(jié)果進(jìn)行處理,或者當(dāng)CompletableFuture產(chǎn)生異常的時(shí)候需要對(duì)異常進(jìn)行處理。方法中以Async結(jié)尾的會(huì)在新的線程池中執(zhí)行,沒(méi)有Async結(jié)尾的會(huì)在之前的CompletableFuture執(zhí)行的線程中執(zhí)行。

//當(dāng)完成后同步執(zhí)行action
public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}

//完成后異步執(zhí)行action
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}

//完成后異步執(zhí)行action,帶線程池
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}

//異常后執(zhí)行
public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(fn);
}

uniWhenCompleteStage相關(guān)處理:

private CompletableFuture<T> uniWhenCompleteStage(
    Executor e, BiConsumer<? super T, ? super Throwable> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<T> d = new CompletableFuture<T>();
    //若線程池為空,則調(diào)用uniWhenComplete方法進(jìn)行任務(wù)狀態(tài)判斷及處理
    //若線程池非空,則構(gòu)建UniWhenComplete任務(wù)并將任務(wù)入隊(duì),同時(shí)調(diào)用tryFire()進(jìn)行同步處理
    if (e != null || !d.uniWhenComplete(this, f, null)) {
        UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
        push(c);
        //調(diào)用鉤子方法,對(duì)任務(wù)進(jìn)行處理并處理相關(guān)依賴
        c.tryFire(SYNC);
    }
    return d;
}


final boolean uniWhenComplete(CompletableFuture<T> a,
                              BiConsumer<? super T,? super Throwable> f,
                              UniWhenComplete<T> c) {
    Object r; T t; Throwable x = null;
    //檢查依賴的任務(wù)是否完成,未完成則直接返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //當(dāng)前任務(wù)未完成?
    if (result == null) {
        try {
            //uniWhenComplete中所有c都為空,無(wú)需考慮
            if (c != null && !c.claim())
                return false;
            //判斷執(zhí)行結(jié)果是否異常    
            if (r instanceof AltResult) {
                x = ((AltResult)r).ex;
                t = null;
            } else {
                @SuppressWarnings("unchecked") T tr = (T) r;
                t = tr;
            }
            //執(zhí)行任務(wù)
            f.accept(t, x);
            if (x == null) {
                internalComplete(r);
                return true;
            }
        } catch (Throwable ex) {
            if (x == null)
                x = ex;
        }
        //設(shè)置異常結(jié)果
        completeThrowable(x, r);
    }
    return true;
}

//whenComplete任務(wù)的封裝
static final class UniWhenComplete<T> extends UniCompletion<T,T> {
    BiConsumer<? super T, ? super Throwable> fn;
    UniWhenComplete(Executor executor, CompletableFuture<T> dep,
                    CompletableFuture<T> src,
                    BiConsumer<? super T, ? super Throwable> fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<T> tryFire(int mode) {
        CompletableFuture<T> d; CompletableFuture<T> a;
        if ((d = dep) == null ||
            !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}

3.4、計(jì)算結(jié)果完成時(shí)的轉(zhuǎn)換處理(thenApply)

計(jì)算結(jié)果完成時(shí)的轉(zhuǎn)換的處理會(huì)將上個(gè)計(jì)算結(jié)果轉(zhuǎn)換為當(dāng)前任務(wù)的輸入?yún)?shù)。Async結(jié)尾的方法由原來(lái)的線程計(jì)算,以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運(yùn)行。

//
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

uniApplyStage()處理解析:

private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    //當(dāng)線程池為空時(shí),直接調(diào)用uniApply對(duì)任務(wù)進(jìn)行處理
    //當(dāng)線程池非空時(shí),將任務(wù)加入堆棧,并調(diào)用tryFire對(duì)任務(wù)進(jìn)行處理
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f,
                           UniApply<S,T> c) {
    Object r; Throwable x;
    //依賴任務(wù)未完成?直接返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //當(dāng)前任務(wù)未完成?    
    tryComplete: if (result == null) {
        //依賴的任務(wù)處理異常?則設(shè)置當(dāng)前任務(wù)異常結(jié)果
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            //thenApply中所有c都為空,無(wú)需考慮
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            //執(zhí)行任務(wù)并設(shè)置任務(wù)結(jié)果
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            //執(zhí)行任務(wù)異常則設(shè)置異常結(jié)果
            completeThrowable(ex);
        }
    }
    return true;
}

//thenApply任務(wù)的封裝
static final class UniApply<T,V> extends UniCompletion<T,V> {
    Function<? super T,? extends V> fn;
    UniApply(Executor executor, CompletableFuture<V> dep,
             CompletableFuture<T> src,
             Function<? super T,? extends V> fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<V> tryFire(int mode) {
        CompletableFuture<V> d; CompletableFuture<T> a;
        if ((d = dep) == null ||
            !d.uniApply(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}

3.5、計(jì)算結(jié)果完成時(shí)的消費(fèi)處理(thenAccept)

計(jì)算結(jié)果完成時(shí)的消費(fèi)的處理是將上一步任務(wù)處理的結(jié)果作為本次任務(wù)處理的輸入?yún)?shù),并且thenAccept的處理只會(huì)對(duì)上一步的結(jié)果進(jìn)行處理,而不會(huì)返回任何處理結(jié)果。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

uniAcceptStage()的處理流程:

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //線程池為空則調(diào)用uniAccept同步處理任務(wù);
    //線程池非空則將任務(wù)封裝為UniAccept并推入堆棧,同時(shí)調(diào)用tryFire()進(jìn)行任務(wù)處理
    if (e != null || !d.uniAccept(this, f, null)) {
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

final <S> boolean uniAccept(CompletableFuture<S> a,
                            Consumer<? super S> f, UniAccept<S> c) {
    Object r; Throwable x;
    //依賴任務(wù)未完成?直接返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //當(dāng)前任務(wù)未完成?    
    tryComplete: if (result == null) {
        //依賴任務(wù)結(jié)果異常,則設(shè)置當(dāng)前的異常結(jié)果
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            //uniAccept中c全部為null,無(wú)需考慮
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            //執(zhí)行當(dāng)前任務(wù)
            f.accept(s);
            //設(shè)置空結(jié)果
            completeNull();
        } catch (Throwable ex) {
            //執(zhí)行異常則設(shè)置異常結(jié)果
            completeThrowable(ex);
        }
    }
    return true;
}

//uniAccept的任務(wù)封裝
static final class UniAccept<T> extends UniCompletion<T,Void> {
    Consumer<? super T> fn;
    UniAccept(Executor executor, CompletableFuture<Void> dep,
              CompletableFuture<T> src, Consumer<? super T> fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d; CompletableFuture<T> a;
        if ((d = dep) == null ||
            !d.uniAccept(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}

3.6、多個(gè)結(jié)果完成時(shí)消費(fèi)(thenAcceptBoth、runAfterBoth)

多個(gè)結(jié)果完成時(shí)處理會(huì)等待當(dāng)前結(jié)果CompletableFuture及依賴的other完成時(shí)執(zhí)行action,thenAcceptBoth會(huì)將依賴的當(dāng)前CompletableFuture及other的執(zhí)行結(jié)果作為action的輸入?yún)?shù)。

runAfterBoth則只等待兩個(gè)依賴的任務(wù)執(zhí)行完成再執(zhí)行。

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(null, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(asyncPool, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor) {
    return biAcceptStage(screenExecutor(executor), other, action);
}


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action) {
    return biRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action) {
    return biRunStage(asyncPool, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor) {
    return biRunStage(screenExecutor(executor), other, action);
}

biAcceptStage()及biRunStage()的處理流程基本相同,不同點(diǎn)為biAcceptStage()會(huì)將依賴的兩個(gè)任務(wù)作為執(zhí)行處理的入?yún)?,而biRunStage()不會(huì)。

以下是biAcceptStage()的處理流程:

//處理thenAcceptBoth類型的任務(wù)
//e:線程池;
//o:依賴的一個(gè)任務(wù)
//f:具體執(zhí)行邏輯,會(huì)將當(dāng)前CompletableFuture及o的執(zhí)行結(jié)果作為輸入
private <U> CompletableFuture<Void> biAcceptStage(
    Executor e, CompletionStage<U> o,
    BiConsumer<? super T,? super U> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //線程池為空時(shí),調(diào)用biAccept()同步執(zhí)行處理
    //線程池非空,則將任務(wù)封裝為BiAccept并推入堆棧,調(diào)用tryFire()進(jìn)行任務(wù)處理
    if (e != null || !d.biAccept(this, b, f, null)) {
        BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
        bipush(b, c);
        c.tryFire(SYNC);
    }
    return d;
}


//任務(wù)的同步處理
//a,b:依賴的任務(wù)
//f:具體執(zhí)行邏輯
//c:為空,無(wú)需考慮
final <R,S> boolean biAccept(CompletableFuture<R> a,
                             CompletableFuture<S> b,
                             BiConsumer<? super R,? super S> f,
                             BiAccept<R,S> c) {
    Object r, s; Throwable x;
    //判斷依賴的任務(wù)a,b是否執(zhí)行完畢
    if (a == null || (r = a.result) == null ||
        b == null || (s = b.result) == null || f == null)
        return false;
    //當(dāng)前任務(wù)未執(zhí)行完成?    
    tryComplete: if (result == null) {
        //若a或b任務(wù)執(zhí)行有異常,則設(shè)置當(dāng)前任務(wù)的異常結(jié)果
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        if (s instanceof AltResult) {
            if ((x = ((AltResult)s).ex) != null) {
                completeThrowable(x, s);
                break tryComplete;
            }
            s = null;
        }
        try {
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") R rr = (R) r;
            @SuppressWarnings("unchecked") S ss = (S) s;
            //執(zhí)行任務(wù),并將a,b任務(wù)的執(zhí)行結(jié)果作為參數(shù)輸入
            f.accept(rr, ss);
            //設(shè)置返回結(jié)果為null
            completeNull();
        } catch (Throwable ex) {
            //任務(wù)執(zhí)行異常則設(shè)置異常結(jié)果
            completeThrowable(ex);
        }
    }
    return true;
}

//thenAcceptBoth類型任務(wù)的封裝
static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
    BiConsumer<? super T,? super U> fn;
    BiAccept(Executor executor, CompletableFuture<Void> dep,
             CompletableFuture<T> src, CompletableFuture<U> snd,
             BiConsumer<? super T,? super U> fn) {
        super(executor, dep, src, snd); this.fn = fn;
    }
    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null ||
            !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; snd = null; fn = null;
        return d.postFire(a, b, mode);
    }
}

3.7、某個(gè)結(jié)果完成時(shí)消費(fèi)(applyToEither、acceptEither,runAfterEither)

applyToEither及acceptEither會(huì)將兩個(gè)結(jié)果中任意一個(gè)的執(zhí)行結(jié)果作為當(dāng)前執(zhí)行的輸入?yún)?shù),而applyToEither會(huì)返回執(zhí)行結(jié)果,acceptEither則返回空的執(zhí)行結(jié)果。runAfterEither則不會(huì)將依賴的執(zhí)行結(jié)果作為參數(shù),其只是當(dāng)依賴的任意一個(gè)任務(wù)完成時(shí)進(jìn)行處理,并返回空的執(zhí)行結(jié)果。

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(null, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(asyncPool, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor) {
    return orApplyStage(screenExecutor(executor), other, fn);
}

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(null, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(asyncPool, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor) {
    return orAcceptStage(screenExecutor(executor), other, action);
}

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                              Runnable action) {
    return orRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action) {
    return orRunStage(asyncPool, other, action);
}

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor) {
    return orRunStage(screenExecutor(executor), other, action);
}

orApplyStage()、orAcceptStage()、orRunStage()的處理基本相同。以下以orApplyStage()為例來(lái)分析其處理流程:

private <U extends T,V> CompletableFuture<V> orApplyStage(
    Executor e, CompletionStage<U> o,
    Function<? super T, ? extends V> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<V> d = new CompletableFuture<V>();
    //若線程池為空,則調(diào)用orApply()進(jìn)行任務(wù)的同步處理
    //若線程池非空,則將依賴及處理封裝為OrApply并推入堆棧,然后調(diào)用tryFire()進(jìn)行任務(wù)處理
    if (e != null || !d.orApply(this, b, f, null)) {
        OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
        orpush(b, c);
        c.tryFire(SYNC);
    }
    return d;
}

final <R,S extends R> boolean orApply(CompletableFuture<R> a,
                                      CompletableFuture<S> b,
                                      Function<? super R, ? extends T> f,
                                      OrApply<R,S,T> c) {
    Object r; Throwable x;
    //依賴的任務(wù)a,b都未執(zhí)行完成?
    if (a == null || b == null ||
        ((r = a.result) == null && (r = b.result) == null) || f == null)
        return false;
    //當(dāng)前任務(wù)未完成?    
    tryComplete: if (result == null) {
        try {
            if (c != null && !c.claim())
                return false;
            //依賴任務(wù)處理異常,設(shè)置當(dāng)前異常結(jié)果
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            @SuppressWarnings("unchecked") R rr = (R) r;
             //進(jìn)行任務(wù)處理,并設(shè)置處理結(jié)果              
             completeValue(f.apply(rr));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

//orApplyStage的任務(wù)封裝。
static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
    Function<? super T,? extends V> fn;
    OrApply(Executor executor, CompletableFuture<V> dep,
            CompletableFuture<T> src,
            CompletableFuture<U> snd,
            Function<? super T,? extends V> fn) {
        super(executor, dep, src, snd); this.fn = fn;
    }
    final CompletableFuture<V> tryFire(int mode) {
        CompletableFuture<V> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null ||
            !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; snd = null; fn = null;
        return d.postFire(a, b, mode);
    }
}

3.8、異步結(jié)果的組合處理(thenCompose)

thenCompose會(huì)連接兩個(gè)CompletableFuture,其處理是當(dāng)前CompletableFuture完成時(shí)將結(jié)果作為fn處理的入?yún)⑦M(jìn)行處理。

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(asyncPool, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn,
    Executor executor) {
    return uniComposeStage(screenExecutor(executor), fn);
}

uniComposeStage()處理流程:

private <V> CompletableFuture<V> uniComposeStage(
    Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
    if (f == null) throw new NullPointerException();
    Object r; Throwable x;
    //無(wú)線程池,且當(dāng)前任務(wù)處理完成
    if (e == null && (r = result) != null) {
        //若當(dāng)前處理結(jié)果異常,則直接返回異常結(jié)果
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                return new CompletableFuture<V>(encodeThrowable(x, r));
            }
            r = null;
        }
        try {
            //將當(dāng)前處理結(jié)果作為f的輸入,并執(zhí)行f處理
            @SuppressWarnings("unchecked") T t = (T) r;
            CompletableFuture<V> g = f.apply(t).toCompletableFuture();
            Object s = g.result;
            //f處理完成?則直接返回處理結(jié)果
            //未完成則封裝處理并將任務(wù)入棧
            if (s != null)
                return new CompletableFuture<V>(encodeRelay(s));
            CompletableFuture<V> d = new CompletableFuture<V>();
            UniRelay<V> copy = new UniRelay<V>(d, g);
            g.push(copy);
            copy.tryFire(SYNC);
            return d;
        } catch (Throwable ex) {
            return new CompletableFuture<V>(encodeThrowable(ex));
        }
    }
    //當(dāng)前任務(wù)未處理完成,則封裝當(dāng)前任務(wù)及依賴并入棧
    CompletableFuture<V> d = new CompletableFuture<V>();
    UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
    push(c);
    c.tryFire(SYNC);
    return d;
}

3.9、等待多個(gè)執(zhí)行結(jié)果完成

//所有任務(wù)都執(zhí)行完畢
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

//某個(gè)任務(wù)執(zhí)行完畢
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
                                        int lo, int hi) {
    CompletableFuture<Object> d = new CompletableFuture<Object>();
    //遞歸將任務(wù)進(jìn)行遍歷,若某個(gè)任務(wù)已經(jīng)完成,則直接設(shè)置結(jié)果為已完成任務(wù)的結(jié)果
    if (lo <= hi) {
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  orTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  orTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.orRelay(a, b)) {
            OrRelay<?,?> c = new OrRelay<>(d, a, b);
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}

3.10、對(duì)異步結(jié)果進(jìn)行處理(handle)

handle()主要獲取當(dāng)前任務(wù)的執(zhí)行結(jié)果,并將其作為fn函數(shù)的輸入?yún)?shù),并接執(zhí)行結(jié)果設(shè)置為返回的CompletableFuture的結(jié)果。

public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

uniHandleStage()的處理流程:

private <V> CompletableFuture<V> uniHandleStage(
    Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d = new CompletableFuture<V>();
    //若線程池為空,則直接調(diào)用uniHandle同步執(zhí)行任務(wù),
    //否則將任務(wù)及依賴信息封裝為UniHandle入棧,然后調(diào)用tryFire()進(jìn)行任務(wù)處理
    if (e != null || !d.uniHandle(this, f, null)) {
        UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}


final <S> boolean uniHandle(CompletableFuture<S> a,
                            BiFunction<? super S, Throwable, ? extends T> f,
                            UniHandle<S,T> c) {
    Object r; S s; Throwable x;
    //依賴任務(wù)未完成?返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //當(dāng)前結(jié)果為完成?    
    if (result == null) {
        try {
            if (c != null && !c.claim())
                return false;
            //依賴執(zhí)行結(jié)果異常?則設(shè)置當(dāng)前結(jié)果為異常結(jié)果    
            if (r instanceof AltResult) {
                x = ((AltResult)r).ex;
                s = null;
            } else {
                x = null;
                @SuppressWarnings("unchecked") S ss = (S) r;
                s = ss;
            }
            //將依賴的結(jié)果作為當(dāng)前函數(shù)的輸入?yún)?shù),并執(zhí)行函數(shù),設(shè)置當(dāng)前執(zhí)行結(jié)果
            completeValue(f.apply(s, x));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容