自頂向下深入分析Netty(七)--ChannelHandlerContext源碼實(shí)現(xiàn)

7.2.2 ChannelHandlerContext

7.2.2.1 AbstractChannelHandlerContext

AbstractChannelHandlerContext的類簽名如下:

    abstract class AbstractChannelHandlerContext extends
              DefaultAttributeMap implements ChannelHandlerContext

該類作為其他ChannelHandlerContext的基類,Netty4.0中繼承自DefaultAttributeMap實(shí)現(xiàn)屬性鍵值對(duì)的存儲(chǔ)和獲取,由于此種用法與channel.attr()存在混淆,故不建議使用。確有需要時(shí),請(qǐng)直接使用channel.attr()。
下面介紹其中的關(guān)鍵字段:

    // Context形成雙向鏈表,next和prev分別是后繼節(jié)點(diǎn)和前驅(qū)節(jié)點(diǎn)
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    
    private final boolean inbound;  // 對(duì)應(yīng)處理器為InboudHandler
    private final boolean outbound; // 對(duì)應(yīng)處理器為OutboudHandler
    private final DefaultChannelPipeline pipeline;  
    private final String name;  // Context的名稱
    private final boolean ordered;  // 事件順序標(biāo)記
    
    final EventExecutor executor; // 事件執(zhí)行線程
    
    private volatile int handlerState = INIT;   // 狀態(tài)

其中handlerState的字面意思容易使人誤解,為此列出四種狀態(tài)并加以解釋:

    // 初始狀態(tài)
    private static final int INIT = 0; 
    // 對(duì)應(yīng)Handler的handlerAdded方法將要被調(diào)用但還未調(diào)用
    private static final int ADD_PENDING = 1;
    // 對(duì)應(yīng)Handler的handlerAdded方法被調(diào)用
    private static final int ADD_COMPLETE = 2;
    // 對(duì)應(yīng)Handler的handlerRemoved方法被調(diào)用
    private static final int REMOVE_COMPLETE = 3;

AbstractChannelHandlerContext只有一個(gè)構(gòu)造方法:

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, 
              EventExecutor executor, String name, 
              boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        
        // 只有執(zhí)行線程為EventLoop或者標(biāo)記為OrderedEventExecutor才是順序的
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

由于Netty將事件抽象為入站事件和出站事件,AbstractChannelHandlerContext對(duì)事件的處理也分為兩類,分別以ChannelRead和read事件為例分析。首先分析ChannelRead事件:

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg){
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

回顧之前的分析,事件在ChannelPipeline中不會(huì)自動(dòng)流動(dòng),入站事件需要調(diào)用fireXXX方法將事件傳遞到下一個(gè)處理器,出站事件需要調(diào)用諸如write方法傳遞給下一個(gè)處理器處理。查找入站處理器的代碼如下:

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next; // 入站事件向后查找
        } while (!ctx.inbound);
        return ctx;
    }

代碼十分簡(jiǎn)潔,向雙向鏈表前進(jìn)方向查找到最近的入站處理器即可;可推知,出站事件則向雙向鏈表后退方向查找最近的出站處理器。這也是出站\入站事件傳播方向不同的原因。
找到事件傳播的下一個(gè)處理器后,在處理器中執(zhí)行處理過程的代碼如下:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, 
              final Object msg) {
        ObjectUtil.checkNotNull(msg, "msg");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 使用當(dāng)前IO線程執(zhí)行用戶自定義處理
            next.invokeChannelRead(msg);
        } else {
            // 使用用戶定義的線程執(zhí)行處理過程
            executor.execute(() -> {next.invokeChannelRead(msg);});
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {   
                // 處理器執(zhí)行用戶自定義的處理過程
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

Netty為用戶提供了方便的線程切換處理,特別是在版本4.1后,可以直接使用JDK提供的線程池。最佳實(shí)踐:在一個(gè)處理器中如果有耗時(shí)較大的業(yè)務(wù)邏輯,可以指定該處理器的處理線程Executor,以避免占用IO線程造成性能損失。
之后再看一下invokeHandler()notifyHandlerException(t)方法,invokeHandler決定是否調(diào)用處理器,進(jìn)行業(yè)務(wù)邏輯處理:

    private boolean invokeHandler() {
        // handlerState為volatile變量,存儲(chǔ)為本地變量,以便減少volatile讀
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

可知,當(dāng)一個(gè)處理器還沒有調(diào)用HandlerAdded方法時(shí),只有處理器的執(zhí)行線程是非順序線程池的實(shí)例才能執(zhí)行業(yè)務(wù)處理邏輯;否則必須等待已調(diào)用handlerAdded方法,才能處理業(yè)務(wù)邏輯。這部分的處理,保證了ChannelPipeline的線程安全性,由此用戶可以隨意增加刪除Handler。
notifyHandlerException方法對(duì)處理過程中出現(xiàn)的異常進(jìn)行處理:

    private void notifyHandlerException(Throwable cause) {
        if (inExceptionCaught(cause)) {
            // 處理異常的過程中出現(xiàn)異常
            logger.warn("...)
            return;
        }
        invokeExceptionCaught(cause);
    }
    
    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                logger.warn("...")
            }
        } else {
            fireExceptionCaught(cause);
        }
    }

invokeExceptionCaught方法可類比invokeChannelRead方法,其形式一致。由代碼可知,當(dāng)一個(gè)Handler的處理過程出現(xiàn)異常時(shí),會(huì)調(diào)用該Handler的exceptionCaught()方法進(jìn)行處理。注意:異常事件的傳播也是由用戶使用fireExceptionCaught方法控制。
至此,ChannelRead入站事件的處理已分析完畢,read出站事件的處理也可類比,不同的是:出站事件需要反向找出站處理器Handler,不再贅述。

7.2.2.2 DefaultChannelHandlerContext

DefaultChannelHandlerContext是使用Netty時(shí)經(jīng)常使用的事實(shí)Context類,其中的大部分功能已在AbstractChannelHandlerContext中完成,所以該類十分簡(jiǎn)單,其構(gòu)造方法如下:

    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, 
              EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

對(duì)出站\入站Handler的判別使用粗暴的instanceof:

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }

實(shí)現(xiàn)中引入了一個(gè)新的字段:

    private final ChannelHandler handler;

回憶ChannelPipeline,pipeline其中形成Context的雙向鏈表,而處理邏輯則在Handler中。設(shè)計(jì)模式中指出,關(guān)聯(lián)Handler與Context的方法有兩種:組合繼承。本例中,使用的是組合,下面分析繼承的方法。

7.2.2.3 HeadContext和TailContext

HeadContext和TailContext使用繼承的方式關(guān)聯(lián)Handler,作為ChannelPipeline雙向鏈表的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)。首先分析HeadContext,類簽名如下:

    final class HeadContext extends AbstractChannelHandlerContext
                 implements ChannelOutboundHandler, ChannelInboundHandler

由于既繼承OutboundHandler又繼承InboundHandler,可知HeadContext需要同時(shí)處理出站事件和入站事件。以read和ChannelRead事件為例,當(dāng)用戶調(diào)用read出站事件時(shí),是在告訴IO線程:我需要向網(wǎng)絡(luò)讀數(shù)據(jù)做處理;當(dāng)IO線程讀到數(shù)據(jù)后,則使用ChannelRead事件通知用戶:已讀取到數(shù)據(jù)。read()方法代碼如下:

    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

其中unsafe是Netty內(nèi)部實(shí)現(xiàn)底層IO細(xì)節(jié)的類,beginRead()方法設(shè)定底層Selector關(guān)心read事件,如果read事件就緒,則會(huì)調(diào)用unsafe.read()方法讀取數(shù)據(jù),然后調(diào)用channelPipe.fireChannelRead()方法通知用戶已讀取到數(shù)據(jù),可進(jìn)行業(yè)務(wù)處理。 HeadContext的ChannelRead()方法代碼如下:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

可知HeadContext只是簡(jiǎn)單的將事件傳播到下一個(gè)入站處理器。
也許你會(huì)有疑問:自己根本沒使用過read出站事件,為什么數(shù)據(jù)自動(dòng)讀取了呢?這是因?yàn)槟J(rèn)設(shè)置自動(dòng)讀取autoRead。通過HeadContext的channelActive代碼分析:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead(); // 自動(dòng)讀取
    }
        
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

當(dāng)channel激活后,如果配置了自動(dòng)讀取,則會(huì)調(diào)用channel.read(),注意這將在ChannelPipeline中傳播出站事件read,最終傳播到HeadContext的read()方法,最后調(diào)用unsafe.beginRead()設(shè)置關(guān)心底層read事件,從而實(shí)現(xiàn)激活后自動(dòng)讀取數(shù)據(jù)。而當(dāng)讀取完一組數(shù)據(jù)后,在channelReadComplete()方法中繼續(xù)下一組數(shù)據(jù)的自動(dòng)讀取。

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead(); // 自動(dòng)讀取下一組數(shù)據(jù)
    }

分析完HeadContext,再分析TailContext。首先看方法簽名:

    final class TailContext extends AbstractChannelHandlerContext 
            implements ChannelInboundHandler

TailContext只繼承入站事件處理器,作為雙向鏈表的尾節(jié)點(diǎn),它對(duì)入站事件的處理極其簡(jiǎn)單,那就是:什么也不做。但有兩個(gè)方法除外:

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
            throws Exception {
        onUnhandledInboundException(cause);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

而這也僅僅是做個(gè)簡(jiǎn)單提示,告知用戶這兩個(gè)方法必須在ChannelPipeline中有處理器處理,否則很有可能是代碼出了問題:

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
              "Discarded inbound message {} that reached at the tail of the pipeline. " +
              "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

如果開啟了日志的Debug級(jí)別,不難相信,你已經(jīng)接觸過這樣的提示。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容