Netty中PipeLine處理流程

inbound 事件和 outbound 事件的流向是不一樣的, inbound 事件的流行是從下至上, 而 outbound 剛好相反, 是從上到下. 并且 inbound 的傳遞方式是通過(guò)調(diào)用相應(yīng)的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過(guò)調(diào)用 ChannelHandlerContext.OUT_EVT() 方法.

例如 ChannelHandlerContext.fireChannelRegistered() 調(diào)用會(huì)發(fā)送一個(gè) ChannelRegistered 的 inbound 給下一個(gè)ChannelHandlerContext, 而 ChannelHandlerContext.bind 調(diào)用會(huì)發(fā)送一個(gè) bind 的 outbound 事件給 下一個(gè) ChannelHandlerContext.

Inbound 事件傳播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Oubound 事件傳輸方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

注意, 如果我們捕獲了一個(gè)事件, 并且想讓這個(gè)事件繼續(xù)傳遞下去, 那么需要調(diào)用 Context 相應(yīng)的傳播方法.

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

PipeLine中事件傳遞

Outbound 事件都是請(qǐng)求事件(request event), 即請(qǐng)求某件事情的發(fā)生, 然后通過(guò) Outbound 事件進(jìn)行通知.
Outbound 事件的傳播方向是 tail -> customContext -> head.

鏈接請(qǐng)求
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
這樣形成一個(gè)鏈條處理

Inbound 事件是一個(gè)通知事件, 即某件事已經(jīng)發(fā)生了, 然后通過(guò) Inbound 事件進(jìn)行通知. Inbound 通常發(fā)生在 Channel 的狀態(tài)的改變或 IO 事件就緒.

Context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive

總結(jié)

對(duì)于 Outbound事件:

Outbound 事件是請(qǐng)求事件(由 Connect 發(fā)起一個(gè)請(qǐng)求, 并最終由 unsafe 處理這個(gè)請(qǐng)求)

Outbound 事件的發(fā)起者是 Channel

Outbound 事件的處理者是 unsafe

Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.

在 ChannelHandler 中處理事件時(shí), 如果這個(gè) Handler 不是最后一個(gè) Hnalder, 則需要調(diào)用 ctx.xxx (例如 ctx.connect) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會(huì)提前終止.

Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

對(duì)于 Inbound 事件:

Inbound 事件是通知事件, 當(dāng)某件事情已經(jīng)就緒后, 通知上層.

Inbound 事件發(fā)起者是 unsafe

Inbound 事件的處理者是 Channel, 如果用戶沒(méi)有實(shí)現(xiàn)自定義的處理方法, 那么Inbound 事件默認(rèn)的處理者是 TailContext, 并且其處理方法是空實(shí)現(xiàn).

Inbound 事件在 Pipeline 中傳輸方向是 head -> tail

在 ChannelHandler 中處理事件時(shí), 如果這個(gè) Handler 不是最后一個(gè) Hnalder, 則需要調(diào)用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會(huì)提前終止.

Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT

如何區(qū)分是In還是Out事件?

ch.pipeline().addLast→DefaultChannelHandlerContext newCtx =new DefaultChannelHandlerContext(this, invoker, name, handler);→ skipFlags = skipFlags(handler);
skipFlags字段標(biāo)識(shí)跳過(guò)/捕獲那個(gè)事件。

5.0版本中不分開(kāi)區(qū)分In/Out事件了,全部都繼承ChannelHandlerAdapter,需要處理什么方法,重寫什么方法就可以。

設(shè)置標(biāo)識(shí)的skipFlags0方法部分源碼。

    /**
     * Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API.
     */
    private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
        int flags = 0;
        try {
            if (handlerType.getMethod(
                    "handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_HANDLER_ADDED;
            }
            if (handlerType.getMethod(
                    "handlerRemoved", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_HANDLER_REMOVED;
            }
            if (handlerType.getMethod(
                    "exceptionCaught", ChannelHandlerContext.class, Throwable.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_EXCEPTION_CAUGHT;
            }
            if (handlerType.getMethod(
                    "channelRegistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_CHANNEL_REGISTERED;
            }
            if (handlerType.getMethod(
                    "channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_CHANNEL_ACTIVE;
            }
            if (handlerType.getMethod(
                    "channelInactive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_CHANNEL_INACTIVE;
            }
            if (handlerType.getMethod(
                    "channelRead", ChannelHandlerContext.class, Object.class).isAnnotationPresent(Skip.class)) {
                flags |= MASK_CHANNEL_READ;
            }
           .......
        return flags;
    }

在父類ChannelHandlerAdapter中所有的方法都添加了@Skip注解,子類繼承它,重寫方法后,該方法的@Skip注解即失效。
handlerType.getMethod( "channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)方法如果有@Skip注解的話,將flags對(duì)應(yīng)的位置1。即如果重寫了的話對(duì)應(yīng)位是0

最后有事件時(shí)fireChannelRead等函數(shù)會(huì)調(diào)用

 DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);

不斷循環(huán),找到最近的一個(gè)Handler處理。

    private DefaultChannelHandlerContext findContextInbound(int mask) {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.skipFlags & mask) != 0);
        return ctx;
    }

這個(gè)循環(huán)是當(dāng)對(duì)應(yīng)位是0時(shí),停止循環(huán)。需要處理。這樣處理一直到head/tail。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容