自頂向下深入分析Netty(五)--Future

Netty架構(gòu)模式

再次回顧這幅圖,在上一章中,我們分析了Reactor的完整實(shí)現(xiàn)。由于Java NIO事件驅(qū)動的模型,要求Netty的事件處理采用異步的方式,異步處理則需要表示異步操作的結(jié)果。Future正是用來表示異步操作結(jié)果的對象,F(xiàn)uture的類簽名為:

    public interface Future<V>;

其中的泛型參數(shù)V即表示異步結(jié)果的類型。

5.1 總述

也許你已經(jīng)使用過JDK的Future對象,該接口的方法如下:

    // 取消異步操作
    boolean cancel(boolean mayInterruptIfRunning);
    // 異步操作是否取消
    boolean isCancelled();
    // 異步操作是否完成,正常終止、異常、取消都是完成
    boolean isDone();
    // 阻塞直到取得異步操作結(jié)果
    V get() throws InterruptedException, ExecutionException;
    // 同上,但最長阻塞時(shí)間為timeout
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

我們的第一印象會覺得這樣的設(shè)計(jì)并不壞,但仔細(xì)思考,便會發(fā)現(xiàn)問題:
(1).接口中只有isDone()方法判斷一個(gè)異步操作是否完成,但是對于完成的定義過于模糊,JDK文檔指出正常終止、拋出異常、用戶取消都會使isDone()方法返回真。在我們的使用中,我們極有可能是對這三種情況分別處理,而JDK這樣的設(shè)計(jì)不能滿足我們的需求。
(2).對于一個(gè)異步操作,我們更關(guān)心的是這個(gè)異步操作觸發(fā)或者結(jié)束后能否再執(zhí)行一系列動作。比如說,我們?yōu)g覽網(wǎng)頁時(shí)點(diǎn)擊一個(gè)按鈕后實(shí)現(xiàn)用戶登錄。在javascript中,處理代碼如下:

    $("#login").click(function(){
        login();
    });

可見在這樣的情況下,JDK中的Future便不能處理,所以,Netty擴(kuò)展了JDK的Future接口,使其能解決上面的兩個(gè)問題。擴(kuò)展的方法如下(類似方法只列出一個(gè)):

    // 異步操作完成且正常終止
    boolean isSuccess();
    // 異步操作是否可以取消
    boolean isCancellable();
    // 異步操作失敗的原因
    Throwable cause();
    // 添加一個(gè)監(jiān)聽者,異步操作完成時(shí)回調(diào),類比javascript的回調(diào)函數(shù)
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到異步操作完成
    Future<V> await() throws InterruptedException;
    // 同上,但異步操作失敗時(shí)拋出異常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回異步結(jié)果,如果尚未完成返回null
    V getNow();

如果你對Future的狀態(tài)還有疑問,放上代碼注釋中的ascii圖打消你的疑慮:

 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+

可知,F(xiàn)uture對象有兩種狀態(tài)尚未完成和已完成,其中已完成又有三種狀態(tài):成功、失敗、用戶取消。各狀態(tài)的狀態(tài)斷言請?jiān)诖藞D中查找。
仔細(xì)看完上面的圖并聯(lián)系Future接口中的方法,你是不是也會和我有相同的疑問:Future接口中的方法都是getter方法而沒有setter方法,也就是說這樣實(shí)現(xiàn)的Future子類的狀態(tài)是不可變的,如果我們想要變化,那該怎么辦呢?Netty提供的解決方法是:使用可寫的Future即Promise。Promise接口擴(kuò)展的方法如下:

    // 標(biāo)記異步操作結(jié)果為成功,如果已被設(shè)置(不管成功還是失敗)則拋出異常IllegalStateException
    Promise<V> setSuccess(V result);
    // 同上,只是結(jié)果已被設(shè)置時(shí)返回False
    boolean trySuccess(V result);

    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

   // 設(shè)置結(jié)果為不可取消,結(jié)果已被取消返回False
    boolean setUncancellable();

需要注意的是:Promise接口繼承自Future接口,它提供的setter方法與常見的setter方法大為不同。Promise從Uncompleted-->Completed的狀態(tài)轉(zhuǎn)變有且只能有一次,也就是說setSuccess和setFailure方法最多只會成功一個(gè),此外,在setSuccess和setFailure方法中會通知注冊到其上的監(jiān)聽者。為了加深對Future和Promise的理解,我們可以將Future類比于定額發(fā)票,Promise類比于機(jī)打發(fā)票。當(dāng)商戶拿到稅務(wù)局的發(fā)票時(shí),如果是定額發(fā)票,則已經(jīng)確定好金額是100還是50或其他,商戶再也不能更改;如果是機(jī)打發(fā)票,商戶相當(dāng)于拿到了一個(gè)發(fā)票模板,需要多少金額按實(shí)際情況填到模板指定處。顯然,不能兩次使用同一張機(jī)打發(fā)票打印,這會使發(fā)票失效,而Promise做的更好,它使第二次調(diào)用setter方法失敗。
至此,我們從總體上了解了Future和Promise的原理。我們再看一下類圖:

Future類圖
Future類圖

類圖給我們的第一印象是:繁雜。我們抓住關(guān)鍵點(diǎn):Future和Promise兩條分支,分而治之。我們使用自頂向下的方法分析其實(shí)現(xiàn)細(xì)節(jié),使用兩條線索:

    AbstractFuture<--CompleteFuture<--CompleteChannelFuture<--Succeeded/FailedChannelFuture
    
    DefaultPromise<--DefaultChannelPromise

5.2 Future

5.2.1 AbstractFuture

AbstractFuture主要實(shí)現(xiàn)Future的get()方法,取得Future關(guān)聯(lián)的異步操作結(jié)果:

    @Override
    public V get() throws InterruptedException, ExecutionException {
        await();    // 阻塞直到異步操作完成

        Throwable cause = cause();
        if (cause == null) {
            return getNow();    // 成功則返回關(guān)聯(lián)結(jié)果
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;    // 由用戶取消
        }
        throw new ExecutionException(cause);    // 失敗拋出異常
    }

其中的實(shí)現(xiàn)簡單明了,但關(guān)鍵調(diào)用方法的具體實(shí)現(xiàn)并沒有,我們將在子類實(shí)現(xiàn)中分析。對應(yīng)的加入超時(shí)時(shí)間的get(long timeout, TimeUnit unit)實(shí)現(xiàn)也類似,不再列出。

5.2.2 CompleteFuture

Complete表示操作已完成,所以CompleteFuture表示一個(gè)異步操作已完成的結(jié)果,由此可推知:該類的實(shí)例在異步操作完成時(shí)創(chuàng)建,返回給用戶,用戶則使用addListener()方法定義一個(gè)異步操作。如果你熟悉javascript,將Listener類比于回調(diào)函數(shù)callback()可方便理解。
我們首先看其中的字段和構(gòu)造方法:

    // 執(zhí)行器,執(zhí)行Listener中定義的操作
    private final EventExecutor executor;
    
    // 這有一個(gè)構(gòu)造方法,可知executor是必須的
    protected CompleteFuture(EventExecutor executor) {
        this.executor = executor;
    }

CompleteFuture類定義了一個(gè)EventExecutor,可視為一個(gè)線程,用于執(zhí)行Listener中的操作。我們再看addListener()和removeListener()方法:

    public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 由于這是一個(gè)已完成的Future,所以立即通知Listener執(zhí)行
        DefaultPromise.notifyListener(executor(), this, listener);
        return this;
    }
    
    public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 由于已完成,Listener中的操作已完成,沒有需要?jiǎng)h除的Listener
        return this;
    }

其中的實(shí)現(xiàn)也很簡單,我們看一下GenericFutureListener接口,其中只定義了一個(gè)方法:

    // 異步操作完成是調(diào)用
    void operationComplete(F future) throws Exception;

關(guān)于Listener我們再關(guān)注一下ChannelFutureListener,它并沒有擴(kuò)展GenericFutureListener接口,所以類似于一個(gè)標(biāo)記接口。我們看其中實(shí)現(xiàn)的三個(gè)通用ChannelFutureListener:

    ChannelFutureListener CLOSE = (future) --> {
        future.channel().close();   //操作完成時(shí)關(guān)閉Channel
    };
    
    ChannelFutureListener CLOSE_ON_FAILURE = (future) --> {
        if (!future.isSuccess()) {
            future.channel().close();   // 操作失敗時(shí)關(guān)閉Channel
        }
    };
    
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = (future) --> {
        if (!future.isSuccess()) {
            // 操作失敗時(shí)觸發(fā)一個(gè)ExceptionCaught事件
            future.channel().pipeline().fireExceptionCaught(future.cause());
        }
    };

這三個(gè)Listener對象定義了對Channel處理時(shí)常用的操作,如果符合需求,可以直接使用。
由于CompleteFuture表示一個(gè)已完成的異步操作,所以可推知sync()和await()方法都將立即返回。此外,可推知線程的狀態(tài)如下,不再列出代碼:

    isDone() = true; isCancelled() = false; 

5.2.3 CompleteChannelFuture

CompleteChannelFuture的類簽名如下:

    abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture

ChannelFuture是不是覺得很親切?你肯定已經(jīng)使用過ChannelFuture。ChannelFuture接口相比于Future只擴(kuò)展了一個(gè)方法channel()用于取得關(guān)聯(lián)的Channel對象。CompleteChannelFuture還繼承了CompleteFuture<Void>,尖括號中的泛型表示Future關(guān)聯(lián)的結(jié)果,此結(jié)果為Void,意味著CompleteChannelFuture不關(guān)心這個(gè)特定結(jié)果即get()相關(guān)方法返回null。也就是說,我們可以將CompleteChannelFuture純粹的視為一種回調(diào)函數(shù)機(jī)制。
CompleteChannelFuture的字段只有一個(gè):

    private final Channel channel; // 關(guān)聯(lián)的Channel對象

CompleteChannelFuture的大部分方法實(shí)現(xiàn)中,只是將方法返回的Future覆蓋為ChannelFuture對象(ChannelFuture接口的要求),代碼不在列出。我們看一下executor()方法:

    @Override
    protected EventExecutor executor() {
        EventExecutor e = super.executor(); // 構(gòu)造方法指定
        if (e == null) {
            return channel().eventLoop();   // 構(gòu)造方法未指定使用channel注冊到的eventLoop
        } else {
            return e;
        }
    }

5.2.4 Succeeded/FailedChannelFuture

Succeeded/FailedChannelFuture為特定的兩個(gè)異步操作結(jié)果,回憶總述中關(guān)于Future狀態(tài)的講解,成功意味著

    Succeeded: isSuccess() == true, cause() == null;
    Failed:    isSuccess() == false, cause() == non-null        

代碼中的實(shí)現(xiàn)也很簡單,不再列出。需要注意的是,其中的構(gòu)造方法不建議用戶調(diào)用,一般使用Channel對象的方法newSucceededFuture()和newFailedFuture(Throwable)代替。

5.3 Promise

5.3.1 DefaultPromise

我們首先看其中的static字段:

    // 可以嵌套的Listener的最大層數(shù),可見最大值為8
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    // result字段由使用RESULT_UPDATER更新
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
    // 此處的Signal是Netty定義的類,繼承自Error,異步操作成功且結(jié)果為null時(shí)設(shè)置為改值
    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
    // 異步操作不可取消
    private static final Signal UNCANCELLABLE = Signal.valueOf(...);
    // 異步操作失敗時(shí)保存異常原因
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(...);

嵌套的Listener,是指在listener的operationComplete方法中,可以再次使用future.addListener()繼續(xù)添加listener,Netty限制的最大層數(shù)是8,用戶可使用系統(tǒng)變量io.netty.defaultPromise.maxListenerStackDepth設(shè)置。
再看其中的私有字段:

    // 異步操作結(jié)果
    private volatile Object result;
    // 執(zhí)行l(wèi)istener操作的執(zhí)行器
    private final EventExecutor executor;
    // 監(jiān)聽者
    private Object listeners;
    // 阻塞等待該結(jié)果的線程數(shù)
    private short waiters;
    // 通知正在進(jìn)行標(biāo)識
    private boolean notifyingListeners;

也許你已經(jīng)注意到,listeners是一個(gè)Object類型。這似乎不合常理,一般情況下我們會使用一個(gè)集合或者一個(gè)數(shù)組。Netty之所以這樣設(shè)計(jì),是因?yàn)榇蠖鄶?shù)情況下listener只有一個(gè),用集合和數(shù)組都會造成浪費(fèi)。當(dāng)只有一個(gè)listener時(shí),該字段為一個(gè)GenericFutureListener對象;當(dāng)多余一個(gè)listener時(shí),該字段為DefaultFutureListeners,可以儲存多個(gè)listener。明白了這些,我們分析關(guān)鍵方法addListener():

    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        synchronized (this) {
            addListener0(listener); // 保證多線程情況下只有一個(gè)線程執(zhí)行添加操作
        }

        if (isDone()) {
            notifyListeners();  // 異步操作已經(jīng)完成通知監(jiān)聽者
        }
        return this;
    }
    
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;   // 只有一個(gè)
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener); // 大于兩個(gè)
        } else {
            // 從一個(gè)擴(kuò)展為兩個(gè)
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);   
        }
    }

從代碼中可以看出,在添加Listener時(shí),如果異步操作已經(jīng)完成,則會notifyListeners():

    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {   //執(zhí)行線程為指定線程
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth(); // 嵌套層數(shù)
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                // 執(zhí)行前增加嵌套層數(shù)
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);   
                try {
                    notifyListenersNow();
                } finally {
                    // 執(zhí)行完畢,無論如何都要回滾嵌套層數(shù)
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        // 外部線程則提交任務(wù)給執(zhí)行線程
        safeExecute(executor, () -> { notifyListenersNow(); });
    }
    
    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

所以,外部線程不能執(zhí)行監(jiān)聽者Listener中定義的操作,只能提交任務(wù)到指定Executor,其中的操作最終由指定Executor執(zhí)行。我們再看notifyListenersNow()方法:

    private void notifyListenersNow() {
        Object listeners;
        // 此時(shí)外部線程可能會執(zhí)行添加Listener操作,所以需要同步
        synchronized (this) { 
            if (notifyingListeners || this.listeners == null) {
                // 正在通知或已沒有監(jiān)聽者(外部線程刪除)直接返回
                return; 
            }
            notifyingListeners = true;  
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) { // 通知單個(gè)
                notifyListeners0((DefaultFutureListeners) listeners);
            } else { // 通知多個(gè)(遍歷集合調(diào)用單個(gè))
                notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
            }
            synchronized (this) {
                // 執(zhí)行完畢且外部線程沒有再添加監(jiān)聽者
                if (this.listeners == null) {
                    notifyingListeners = false; 
                    return; 
                }
                // 外部線程添加了監(jiān)聽者繼續(xù)執(zhí)行
                listeners = this.listeners; 
                this.listeners = null;
            }
        }
    }
    
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }

到此為止,我們分析完了Promise最重要的addListener()和notifyListener()方法。在源碼中還有static的notifyListener()方法,這些方法是CompleteFuture使用的,對于CompleteFuture,添加監(jiān)聽者的操作不需要緩存,直接執(zhí)行Listener中的方法即可,執(zhí)行線程為調(diào)用線程,相關(guān)代碼可回顧C(jī)ompleteFuture。addListener()相對的removeListener()方法實(shí)現(xiàn)簡單,我們不再分析。
回憶result字段,修飾符有volatile,所以使用RESULT_UPDATER更新,保證更新操作為原子操作。Promise不攜帶特定的結(jié)果(即攜帶Void)時(shí),成功時(shí)設(shè)置為靜態(tài)字段的Signal對象SUCCESS;如果攜帶泛型參數(shù)結(jié)果,則設(shè)置為泛型一致的結(jié)果。對于Promise,設(shè)置成功、設(shè)置失敗、取消操作,三個(gè)操作至多只能調(diào)用一個(gè)且同一個(gè)方法至多生效一次,再次調(diào)用會拋出異常(set)或返回失敗(try)。這些設(shè)置方法原理相同,我們以setSuccess()為例分析:

    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();  // 可以設(shè)置結(jié)果說明異步操作已完成,故通知監(jiān)聽者
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private boolean setSuccess0(V result) {
        // 為空設(shè)置為Signal對象Success
        return setValue0(result == null ? SUCCESS : result);
    }
    
    private boolean setValue0(Object objResult) {
        // 只有結(jié)果為null或者UNCANCELLABLE時(shí)才可設(shè)置且只可以設(shè)置一次
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();   // 通知等待的線程
            return true;
        }
        return false;
    }

checkNotifyWaiters()方法喚醒調(diào)用await()和sync()方法等待該異步操作結(jié)果的線程,代碼如下:

    private synchronized void checkNotifyWaiters() {
        // 確實(shí)有等待的線程才notifyAll
        if (waiters > 0) {  
            notifyAll();    // JDK方法
        }
    }

有了喚醒操作,那么sync()和await()的實(shí)現(xiàn)是怎么樣的呢?我們首先看sync()的代碼:

    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();  // 異步操作失敗拋出異常
        return this;
    }

可見,sync()和await()很類似,區(qū)別只是sync()調(diào)用,如果異步操作失敗,則會拋出異常。我們接著看await()的實(shí)現(xiàn):

    public Promise<V> await() throws InterruptedException {
        // 異步操作已經(jīng)完成,直接返回
        if (isDone()) {
            return this;    
        }
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // 死鎖檢測
        checkDeadLock();
        // 同步使修改waiters的線程只有一個(gè)
        synchronized (this) {
            while (!isDone()) { // 等待直到異步操作完成
                incWaiters();   // ++waiters;
                try {
                    wait(); // JDK方法
                } finally {
                    decWaiters(); // --waiters
                }
            }
        }
        return this;
    }

其中的實(shí)現(xiàn)簡單明了,其他await()方法也類似,不再分析。我們注意其中的checkDeadLock()方法用來進(jìn)行死鎖檢測:

    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

也就是說,不能在同一個(gè)線程中調(diào)用await()相關(guān)的方法。為了更好的理解這句話,我們使用代碼注釋中的例子來解釋。Handler中的channelRead()方法是由Channel注冊到的eventLoop執(zhí)行的,其中的Future的Executor也是這個(gè)eventLoop,所以不能在channelRead()方法中調(diào)用await這一類(包括sync)方法。

    // 錯(cuò)誤的例子
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.channel().close();
        future.awaitUninterruptibly();
        // ...
    }

    // 正確的做法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ChannelFuture future = ctx.channel().close();
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                // ... 使用異步操作
            }
        });
    }

到了這里,我們已經(jīng)分析完Future和Promise的主要實(shí)現(xiàn)。剩下的DefaultChannelPromise、VoidChannelPromise實(shí)現(xiàn)都很簡單,我們不再分析。ProgressivePromise表示異步的進(jìn)度結(jié)果,也不再進(jìn)行分析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容