netty事件和異常的傳播

關(guān)于ChannelRead事件的傳播

在自定義handler的時(shí)候,通常要重寫channelRead函數(shù),如果想要將該事件向后傳播(注意,傳播順序與handler添加順序相同),需要調(diào)用fireChannelRead函數(shù),ChannelRead事件便在這里中斷

通常在重寫的channelRead函數(shù)里,有兩種傳播ChannelRead事件的方式

public void channelRead(ChannelHandlerContext ctx, Object msg) {   
 //第一種   
 ctx.fireChannelRead(msg);    
//第二種   
 ctx.channel().pipeline().fireChannelRead(msg);
}

這兩種方式的主要區(qū)別在于接下來(lái)傳播的起始位置,非常重要

  • 使用第一種方式,事件會(huì)從該節(jié)點(diǎn)開(kāi)始繼續(xù)向后傳播
  • 使用第二種方式,事件會(huì)從head節(jié)點(diǎn)開(kāi)始傳播

下面分析源碼來(lái)做說(shuō)明

第一種傳播方式

跟進(jìn)到AbstractChannelHandlerContext#fireChannelRead方法

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

findContextInbound方法就是在尋找下一個(gè)節(jié)點(diǎn),看看這個(gè)方法的代碼

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}

繼續(xù)看看skipContext方法

private static boolean skipContext(
    AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
    return (ctx.executionMask & (onlyMask | mask)) == 0 ||
        (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

可以看到,這里判斷的關(guān)鍵就在executionMask這個(gè)成員變量,而這個(gè)成員變量就在AbstractChannelHandlerContext里被賦值

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
                              EventExecutor executor,
                              String name, 
                              Class<? extends ChannelHandler> handlerClass) {
    //省略其他代碼
    this.executionMask = mask(handlerClass);
}

mask方法最終調(diào)用到mask0方法

private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;
 
            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            //省略部分代碼
        }
 
        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;
 
            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                            SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            //省略部分代碼
        }
 
        //省略部分代碼
    } catch (Exception e) {
        //省略部分代碼
    }
 
    return mask;
}

isSkippable函數(shù)只有在找不到函數(shù)或者函數(shù)被@Skip注解時(shí)才返回false

可以看到,實(shí)際上executionMask就是用來(lái)記錄handler的類型信息和方法注解信息

skipContext方法實(shí)際上就是在尋找下一個(gè)沒(méi)有用@Skip注解了ChannelRead方法的inbound節(jié)點(diǎn)

private static boolean isSkippable(
            final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws Exception {
                Method m;
                try {
                    m = handlerType.getMethod(methodName, paramTypes);
                } catch (NoSuchMethodException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                    }
                    return false;
                }
                return m != null && m.isAnnotationPresent(Skip.class);
            }
        });
    }

繼續(xù)看fireChannelRead函數(shù)里調(diào)用到的invokeChannelRead函數(shù)

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        //不在當(dāng)前eventloop,放到異步任務(wù)隊(duì)列里
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
 
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //省略
        }
    } else {
        fireChannelRead(msg);
    }
}

可以看到,這里就是在調(diào)用下一個(gè)節(jié)點(diǎn)的channelRead方法

第二種傳播方式

跟進(jìn)到DefaultChannelPipeline#fireChannelRead方法

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

繼續(xù)跟進(jìn)invokeChannelRead方法

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

可以看到,這里就是傳入head節(jié)點(diǎn),從head節(jié)點(diǎn)開(kāi)始向后傳播channelRead事件

資源釋放相關(guān)問(wèn)題

若自定義的handler繼承自ChannelInboundHandlerAdapter,并且在ChannelRead函數(shù)里沒(méi)有將事件向后傳播,那么需要自行調(diào)用函數(shù)處理資源釋放,如下

ReferenceCountUtil.release(msg);

Write事件傳播

write事件的傳播順序與handler的添加順序相反(即最后添加的outboundHandler最先處理write事件)

類似的,在用戶代碼里傳播write事件也有兩種方式

第一種方式,從當(dāng)前節(jié)點(diǎn),往前尋找outbound,繼續(xù)傳播
ctx.write(msg);

 第二種方式,從tail節(jié)點(diǎn)開(kāi)始,往前尋找outbound傳播
ctx.channel().write(msg);
如果沒(méi)有中斷,最終write事件會(huì)傳播到head節(jié)點(diǎn),然后head節(jié)點(diǎn)會(huì)調(diào)用unsafe的write方法

異常的傳播

異常的產(chǎn)生

首先,異常是在ChannelReadChannelRegister等這些函數(shù)中拋出的,然后在形如invokeChannelXXX(例如invokeChannelRead)中捕獲,例如

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //捕獲異常
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

看看invokeExceptionCaught方法

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            //省略
        }
    } else {
        fireExceptionCaught(cause);
    }
}

可以看到,這里調(diào)用exceptionCaught方法處理異常

傳播異常

異常的傳播方向與handler的添加方向一致,并且不區(qū)分是inboundHandler還是outboundHandler(即異??梢詮膇nboundHandler傳播到outboundHandler,反之亦可)

默認(rèn)情況下,如果不重寫exceptionCaught方法,那么會(huì)把該異常繼續(xù)向后傳播,最終會(huì)傳播到tail節(jié)點(diǎn),tail節(jié)點(diǎn)會(huì)打印一條日志表明該異常未被處理

如果重寫了exceptionCaught方法,并且想將該異常繼續(xù)向后傳播,那么需要調(diào)用fireExceptionCaught方法

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    throws Exception {
    //其他處理代碼
    ctx.fireExceptionCaught(cause);
}

關(guān)聯(lián):Netty4中Handler的執(zhí)行順序以及ctx.close() 與 ctx.channel().close()的區(qū)別

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容