Netty 核心組件 Pipeline 源碼分析(二)一個請求的 pipeline 之旅

目錄大綱:

  1. 前言
  2. 針對 Netty 例子源碼做了哪些修改?
  3. 看 pipeline 是如何將數(shù)據(jù)送到自定義 handler 的
  4. 看 pipeline 是如何將數(shù)據(jù)從自定義 handler 送出的
  5. 總結(jié)

前言

Netty 核心組件 Pipeline 源碼分析(一)之剖析 pipeline 三巨頭 中,我們詳細闡述了 pipeline,context,handler 的設計與實現(xiàn)。知道了 Netty 是如何處理網(wǎng)絡數(shù)據(jù)的,但到目前為止,我們都沒有實打?qū)嵉淖咭槐榱鞒蹋瑢嶋H上,debug 一遍流程,會讓我們對 Netty 處理整個數(shù)據(jù)流更加深刻理解。

樓主此次使用的依然還是 Netty 自帶的 ServerExample 和 Client Example,我想大家應該早就下好源碼了吧。當然,針對源碼,我們也做了一些修改,方便讓我們更加的容易測試。

1. 針對 Netty 例子源碼做了哪些修改?

針對 EchoInServerHandler 的channelRead 方法做了如下修改:

讀取客戶端發(fā)送來的數(shù)據(jù),并打印,然后發(fā)送一串字符串給客戶端。當然,其余方法都加入了日志打印。

針對 EchoClientHandler 的 channelActive 方法做了如下修改:


當連接服務器成功時,發(fā)送一串字符串。

針對 EchoClientHandler 的 channelRead 方法做了如下修改:


解碼客戶端發(fā)送來的數(shù)據(jù)并打印。

同時新增了一個 EchoOutServerHandler 類,繼承了 ChannelOutboundHandlerAdapter 類,用于打印出站事件:


運行后的結(jié)果如下:

Server 控制臺:


Client 控制臺:


從上面紅色字可以看出,打印出了我們想要的結(jié)果,Server 接收到了 Client 的信息并打印,Client 接收到了 Server 的信息并打印。

下面就讓我們 debug,看看一個請求是如何在 pipeline 中游走的吧!

2. 看 pipeline 是如何將數(shù)據(jù)送到自定義 handler 的

首先我們 debug 模式啟動 EchoServer,讓整個 Server 處于待命狀態(tài)。斷點打在 EventLoop 類的 processSelectedKey 方法中,監(jiān)聽 accpet 事件和 read 事件。

同時啟動客戶端,這個時候 Server 斷點開始卡住,我們開始 debug。

這里的 readOps 是16,Accept 事件,這里的 unsafe 是 ServerSocket 的 unsafe,如果還記的 Netty 接受請求過程源碼分析 (基于4.1.23) 文中所說,在這之后,會創(chuàng)建一個 客戶端的 ChannelSocket,然后該 Socket 會向 selector 注冊讀事件,所以,我們這里需要放開斷點,得到讀事件才是真正請求的開始。

好,我們使用 IDEA 的 Force run to cursor 功能,讓線程直接卡到這里,這時,你會發(fā)現(xiàn),EventLoop-3-1 卡住了,而不是之前的 EventLoop-2-1,3-1 是上面線程大家應該知道吧,就是 worker group 線程池中的 eventLoop,也就是剛剛注冊的 Socket。

從上面的斷點可以看出,這里確實是讀事件,斷點提示也指出這個 unsafe 是 NioSocketChannel 的 內(nèi)部類 NioSocketChannelUnsafe,我們跟進去看看。

進入的是 NioSocketChannelUnsafe 的抽象父類 AbstractNioByteChannel 的 read 方法。精簡過的代碼如下:

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

    // 讀取數(shù)據(jù)到容器
    byteBuf = allocHandle.allocate(allocator);
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    // 讓 handler 處理容器中的數(shù)據(jù)
    pipeline.fireChannelRead(byteBuf);

    // 告訴容器處理完畢了,觸發(fā)完成事件
    pipeline.fireChannelReadComplete();

}

這里樓主簡化了很多代碼,留下的是對本次分析比較重要的內(nèi)容。注釋已經(jīng)寫的很清除,首先從 unsafe 中讀取數(shù)據(jù),然后,將讀好的數(shù)據(jù)交給 pipeline,pipeline 調(diào)用 inbound 的 channelRead 方法,讀取成功后,調(diào)用 inbound 的 handler 的 ChannelReadComplete 方法。

在進入方法之前,樓主向祭出上文中的圖,讓我們看后面的代碼更清晰:

該圖詮釋了一個請求在 pipeline 的流動過程。請記住他。

整個過程還是比較清晰的。我們首先進入 pipeline 的 fireChannelReadComplete 方法,這個方法是實現(xiàn)了 invoker 的方法。

內(nèi)部調(diào)用的是 AbstractChannelHandlerContext.invokeChannelRead(head, msg) 靜態(tài)方法,并傳入了 head,我們知道入站數(shù)據(jù)都是從 head 開始的,以保證后面所有的 handler 都由機會處理數(shù)據(jù)流。

我們看看這個靜態(tài)方法內(nèi)部是怎么樣的:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

調(diào)用這個 Context (也就是 head) 的 invokeChannelRead 方法,并傳入數(shù)據(jù)。我們再看看 invokeChannelRead 方法的實現(xiàn):

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

這里和我們的圖畫的是一致的,調(diào)用了 Context 包裝的 handler 的 channelRead 方法。注意:直到目前,這個 Context 還是 head,也就是調(diào)用 head 的 channelRead 方法。那么這個方法是怎么實現(xiàn)的呢?

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }

什么都沒做,和我們圖中一樣,調(diào)用 Context 的 fire 系列方法,將請求轉(zhuǎn)發(fā)給下一個節(jié)點。我們這里是 fireChannelRead 方法,注意,這里方法名字都挺像的。需要細心區(qū)分。下面我們看看 Context 的成員方法 fireChannelRead:

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

這個是 head 的抽象父類 AbstractChannelHandlerContext 的實現(xiàn),該方法再次調(diào)用了靜態(tài) fire 系列方法,但和上次不同的是,不再放入 head 參數(shù)了,而是使用 findContextInbound 方法的返回值。從這個方法的名字可以看出,是找到入站類型的 handler。我們看看方法實現(xiàn):

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

該方法很簡單,找到當前 Context 的 next 節(jié)點(inbound 類型的)并返回。這樣就能將請求傳遞給后面的 inbound handler 了。

重復上面的邏輯,終于數(shù)據(jù)到了我們自己寫的 handler-------EchoInServerHandler。

好,到這里,我們已經(jīng)知道了一個請求時怎么到達我們自定義的 handler 的,再來看看我們的圖:

請求進來時,pipeline 會從 head 節(jié)點開始輸送,通過配合 invoker 接口的 fire 系列方法,實現(xiàn) Context 鏈在 pipeline 中的完美傳遞。最終到達我們自定義的 handler。

到了自定義 handler,我們會輸出客戶端發(fā)送的內(nèi)容,我們截圖看看:

成功輸出。

注意:此時如果我們想繼續(xù)向后傳遞該怎么辦呢?我們前面說過,可以調(diào)用 Context 的 fire 系列方法,就像 head 的 channelRead 方法一樣,調(diào)用 fire 系列方法,直接向后傳遞就 ok 了。

當然,我們這里不需要,我們需要發(fā)送一條數(shù)據(jù)客戶端。那么,我們就來看看一條數(shù)據(jù)是如何到達客戶端的。

3. 看 pipeline 是如何將數(shù)據(jù)從自定義 handler 送出的

在打印了客戶端的內(nèi)容后,我們調(diào)用了 Context 的 writeAndFlush 方法,從 inbound 和 outbound 的定義來看,這個方法是 outbound 定義的,也就是出站方法。

在debug 進去看看之前,我們能否猜測一下呢,這個 Context 肯定會調(diào)用他的抽象父類 AbstractChannelHandlerContext 方法, 我們跟進去看看:

果不其然。調(diào)用了 AbstractChannelHandlerContext 的 writeAndFlush 方法,然后,調(diào)用了他的重載方法,多傳入了一個 promise 實例??纯词侨绾蝿?chuàng)建的:

  @Override
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }

我們再跟進去看看 writeAndFlush :

這里調(diào)用了 write 方法,并直接返回了 promise。繼續(xù)跟進查看:

注意:這里調(diào)用了 findContextOutbound,尋找下一個 outbound 節(jié)點。我們看看是如何實現(xiàn)的:

根據(jù)當前節(jié)點,找到之前的節(jié)點并且是 outbound 類型。

可以看到,數(shù)據(jù)開始出站,從后向前開始流動,和入站的方向是反的。

回到 write 方法,得到下一個節(jié)點后,調(diào)用下一個節(jié)點的 invokeWriteAndFlush 方法,這個是 invoker 接口的方法。

調(diào)用 invokeWrite0 方法,注意,Netty 很多方法都以 0 結(jié)尾,表示這是最底層的方法了,而再 JDK 中,結(jié)尾是 0 表示這是一個本地方法。我們進入該方法查看:

調(diào)用了這個 Context 的 worite 方法。還記得我們也寫了一個 EchoOutServerHandler 類嗎,可能會進入我們自己寫入的類的方法嗎?當然不會,因為我們添加的順序是下面這樣的:

inbound 在前,outbound 在后,當程序走到 inbound 就調(diào)用 outbound 的方法了,并找當前節(jié)點的上一個節(jié)點,而我們寫的 outbound 是這個節(jié)點的下一個節(jié)點,永遠不會走到這里的。

那么會走到哪里呢,當然是走到 head 節(jié)點,因為 head 節(jié)點就是 outbound 類型的 handler。

進入到 head 的 write 方法查看:

調(diào)用了 底層的 unsafe 操作數(shù)據(jù),到這里,我們就不跟了,基于我們今天的目的,我們只想知道一個請求在 pipeline 是如何流轉(zhuǎn)的。底層數(shù)據(jù)傳播的細節(jié)就不再贅述。留在以后研究。

當執(zhí)行完這個 write 方法后,方法開始退棧。逐步退到 unsafe 的 read 方法,回到最初開始的地方,然后繼續(xù)調(diào)用 pipeline.fireChannelReadComplete() 方法,重復之前 pipeline 的設計。

到這里,我們應該已經(jīng)清楚了一個請求時如何在 pipeline 中周轉(zhuǎn)的了。

4. 總結(jié)

總結(jié)一下一個請求在 pipeline 中的流轉(zhuǎn)過程:

  1. 調(diào)用 pipeline 的 fire 系列方法,這些方法是接口 invoker 設計的,pipeline 實現(xiàn)了 invoker 的所有方法,inbound 事件從 head 開始流入,outbound 事件從 tail 開始流出。
  2. pipeline 會將請求交給 Context,然后 Context 通過抽象父類 AbstractChannelHandlerContext 的 invoke 系列方法(靜態(tài)和非靜態(tài)的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各個 Context 的數(shù)據(jù)流轉(zhuǎn)。
  3. 當入站過程中,調(diào)用 了出站的方法,那么請求就不會向后走了。后面的處理器將不會有任何作用。想繼續(xù)相會傳遞就調(diào)用 Context 的 fire 系列方法,讓 Netty 在內(nèi)部幫你傳遞數(shù)據(jù)到下一個節(jié)點。如果你想在整個通道傳遞,就在 handler 中調(diào)用 channel 或者 pipeline 的對應方法,這兩個方法會將數(shù)據(jù)從頭到尾或者從尾到頭的流轉(zhuǎn)一遍。

最后,再次祭上我們的圖,配合 debug 堆棧信息:

上圖就是 pipeline 一個通用的數(shù)據(jù)流動過程。

好。good luck ?。。。?/p>

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容