Netty源碼分析-ChannelPipeline

Netty的ChannelPipeline和ChannelHandler機(jī)制類似于Servlet和Filter過濾器,在設(shè)計(jì)模式中是一種責(zé)任鏈模式。ChannelPipeline持有一系列ChannelHandler的鏈表,每個(gè)ChannelHandler可以對(duì)I/O事件進(jìn)行攔截和處理。這樣,I/O事件消息在ChannelPipeline中流動(dòng)和傳遞時(shí),可以根據(jù)配置的ChannelHandler實(shí)現(xiàn)不同的業(yè)務(wù)邏輯定制。

1.ChannelPipeline

ChannelPipeline負(fù)責(zé)ChannelHandler的管理和事件攔截調(diào)度。

1.1ChannelPipeline處理流程

下圖展示了一個(gè)I/O事件消息通過ChannelPipeline進(jìn)行處理的全過程。
1)讀事件,底層的Socket.read()方法(such as {@link SocketChannel#read(ByteBuffer)})讀取ByteBuf,然后出發(fā)channelRead事件,通過NioEventLoop會(huì)調(diào)用pipeline的fireChannelRead(Object msg)方法;然后消息依次被Inbound Handler鏈條攔截和調(diào)用。
2)寫事件,當(dāng)調(diào)用ChannelHandlerContext的write方法發(fā)送消息時(shí),消息也會(huì)依次被Outbound Handler鏈條攔截和調(diào)用,并最終調(diào)用socket的write()方法將數(shù)據(jù)寫出去。

ChannelPipeline事件處理流程圖

由上也可以得知,Netty中的事件也分為InBound事件和OutBound事件,并有分別對(duì)應(yīng)的Handler鏈條去處理。并且事件在Handler之間的傳遞是通過ChannelHandlerContext的fireIN_EVT()和OUT_EVT()方法觸發(fā)和傳遞的。
對(duì)于InBound事件,這些觸發(fā)方法有:

 *     <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
 *     <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
 *     <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>

對(duì)于outBound事件,這些觸發(fā)方法有:

 *     <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#flush()}</li>
 *     <li>{@link ChannelHandlerContext#read()}</li>
 *     <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>

1.2 ChannelPipeline源碼分析

Netty中pipeline的默認(rèn)實(shí)現(xiàn)類是DefaultChannelPipeline??匆幌翫efaultChannelPipeline的實(shí)現(xiàn):
1)類型為AbstractChannelHandlerContext的兩個(gè)對(duì)象head、tail,DefaultChannelPipeline是通過AbstractChannelHandlerContext將Handler進(jìn)行串聯(lián)成一個(gè)鏈條的。具體可見下邊的添加Handler的過程分析。
2)該pipeline對(duì)應(yīng)的channel。

public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

    private final Channel channel;
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private MessageSizeEstimator.Handle estimatorHandle;
    private boolean firstRegistration = true;
}

1.2.1 添加一個(gè)Handler過程分析

舉例addLast方法是如何添加一個(gè)新的Handler的,這個(gè)方法值得我們非常仔細(xì)地去探討一下。
addLast(EventExecutorGroup group, String name, ChannelHandler handler)
入?yún)?/strong>:1)EventExecutorGroup group,表示的是最終執(zhí)行Handler的線程池;2)String name,代表該Handler的名字;3)ChannelHandler handler是需要添加的具體執(zhí)行操作的Handler。
執(zhí)行過程分析:
①. newContext會(huì)創(chuàng)建一個(gè)AbstractChannelHandlerContext,將EventExecutorGroup、ChannelHandler、name等封裝到該對(duì)象中。
②.addLast0會(huì)將該AbstractChannelHandlerContext加入值ChannelPipeline得鏈條中去。代碼可見下邊,典型的鏈表追加操作
③.if (!registered)判斷該Channel是否已經(jīng)成功注冊(cè)到EventLoop中:
1)如果沒有的話,會(huì)創(chuàng)建一個(gè)CallbackTask(該task會(huì)執(zhí)行ChannelHandler.handlerAdded),等到channel注冊(cè)到EventLoop后回調(diào)執(zhí)行該task
2)已經(jīng)注冊(cè)的話,后續(xù)會(huì)執(zhí)行callHandlerAdded0,根據(jù)executor.inEventLoop()判斷決定是在當(dāng)前線程執(zhí)行還是在新線程中執(zhí)行。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            newCtx = newContext(group, filterName(name, handler), handler);
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

1.2.2 I/O事件執(zhí)行過程分析

我們以一個(gè)I/O讀事件作為一個(gè)代表對(duì)ChannelPipeline的執(zhí)行過程進(jìn)行分析,ChannelPipeline中對(duì)讀事件的執(zhí)行方法是fireChannelRead(Object msg)。
通過代碼分析,我們可以看到會(huì)直接執(zhí)行到AbstractChannelHandlerContext類的方法invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),入?yún)轭^指針head和對(duì)象msg。

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

invokeChannelRead的方法中,還是會(huì)根據(jù)executor.inEventLoop()方法,根據(jù)用戶的線程設(shè)置,最終調(diào)用到對(duì)應(yīng)handler的channelRead方法。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg) {
    //to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called yet. If not return {@code false} and if called or could not detect return {@code true}.
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

2. ChannelHandler

先看一些類的繼承圖:

image.png

2.1 ChannelHandler中的方法

Netty定義了良好的類型層次結(jié)構(gòu)來表示不同的處理程序類型,所有的類型的父類是ChannelHandler。ChannelHandler提供了在其生命周期內(nèi)添加或從ChannelPipeline中刪除的方法。
1). handlerAdded,ChannelHandler添加到實(shí)際上下文中準(zhǔn)備處理事件
2). handlerRemoved,將ChannelHandler從實(shí)際上下文中刪除,不再處理事件
3). exceptionCaught,處理拋出的異常
2、ChannelInboundHandler
ChannelInboundHandler提供了一些方法再接收數(shù)據(jù)或Channel狀態(tài)改變時(shí)被調(diào)用。下面是ChannelInboundHandler的一些方法: 1). channelRegistered,ChannelHandlerContext的Channel被注冊(cè)到EventLoop; 2). channelUnregistered,ChannelHandlerContext的Channel從EventLoop中注銷 3). channelActive,ChannelHandlerContext的Channel已激活 4). channelInactive,ChannelHanderContxt的Channel結(jié)束生命周期 5). channelRead,從當(dāng)前Channel的對(duì)端讀取消息 6). channelReadComplete,消息讀取完成后執(zhí)行 7). userEventTriggered,一個(gè)用戶事件被觸發(fā) 8). channelWritabilityChanged,改變通道的可寫狀態(tài),可以使用Channel.isWritable()檢查 9). exceptionCaught,重寫父類ChannelHandler的方法,處理異常.
舉一個(gè)最常用的MessageToMessageDecoder作為例子,執(zhí)行decode將msg對(duì)象進(jìn)行轉(zhuǎn)換后,如果想繼續(xù)在Pipeline中繼續(xù)傳遞下去,必須顯示地去執(zhí)行ctx.fireChannelRead方法,會(huì)通過AbstractChannelHandlerContext繼續(xù)輪轉(zhuǎn)到下一個(gè)ChannelHandler去執(zhí)行。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容