
目錄大綱:
- 前言
- 針對(duì) Netty 例子源碼做了哪些修改?
- 看 pipeline 是如何將數(shù)據(jù)送到自定義 handler 的
- 看 pipeline 是如何將數(shù)據(jù)從自定義 handler 送出的
- 總結(jié)
前言
在 Netty 核心組件 Pipeline 源碼分析(一)之剖析 pipeline 三巨頭 中,我們?cè)敿?xì)闡述了 pipeline,context,handler 的設(shè)計(jì)與實(shí)現(xiàn)。知道了 Netty 是如何處理網(wǎng)絡(luò)數(shù)據(jù)的,但到目前為止,我們都沒(méi)有實(shí)打?qū)嵉淖咭槐榱鞒?,?shí)際上,debug 一遍流程,會(huì)讓我們對(duì) Netty 處理整個(gè)數(shù)據(jù)流更加深刻理解。
樓主此次使用的依然還是 Netty 自帶的 ServerExample 和 Client Example,我想大家應(yīng)該早就下好源碼了吧。當(dāng)然,針對(duì)源碼,我們也做了一些修改,方便讓我們更加的容易測(cè)試。
1. 針對(duì) Netty 例子源碼做了哪些修改?
針對(duì) EchoInServerHandler 的channelRead 方法做了如下修改:

讀取客戶端發(fā)送來(lái)的數(shù)據(jù),并打印,然后發(fā)送一串字符串給客戶端。當(dāng)然,其余方法都加入了日志打印。
針對(duì) EchoClientHandler 的 channelActive 方法做了如下修改:

當(dāng)連接服務(wù)器成功時(shí),發(fā)送一串字符串。
針對(duì) EchoClientHandler 的 channelRead 方法做了如下修改:

解碼客戶端發(fā)送來(lái)的數(shù)據(jù)并打印。
同時(shí)新增了一個(gè) EchoOutServerHandler 類,繼承了 ChannelOutboundHandlerAdapter 類,用于打印出站事件:

運(yùn)行后的結(jié)果如下:
Server 控制臺(tái):

Client 控制臺(tái):

從上面紅色字可以看出,打印出了我們想要的結(jié)果,Server 接收到了 Client 的信息并打印,Client 接收到了 Server 的信息并打印。
下面就讓我們 debug,看看一個(gè)請(qǐng)求是如何在 pipeline 中游走的吧!
2. 看 pipeline 是如何將數(shù)據(jù)送到自定義 handler 的
首先我們 debug 模式啟動(dòng) EchoServer,讓整個(gè) Server 處于待命狀態(tài)。斷點(diǎn)打在 EventLoop 類的 processSelectedKey 方法中,監(jiān)聽(tīng) accpet 事件和 read 事件。

同時(shí)啟動(dòng)客戶端,這個(gè)時(shí)候 Server 斷點(diǎn)開(kāi)始卡住,我們開(kāi)始 debug。

這里的 readOps 是16,Accept 事件,這里的 unsafe 是 ServerSocket 的 unsafe,如果還記的 Netty 接受請(qǐng)求過(guò)程源碼分析 (基于4.1.23) 文中所說(shuō),在這之后,會(huì)創(chuàng)建一個(gè) 客戶端的 ChannelSocket,然后該 Socket 會(huì)向 selector 注冊(cè)讀事件,所以,我們這里需要放開(kāi)斷點(diǎn),得到讀事件才是真正請(qǐng)求的開(kāi)始。
好,我們使用 IDEA 的 Force run to cursor 功能,讓線程直接卡到這里,這時(shí),你會(huì)發(fā)現(xiàn),EventLoop-3-1 卡住了,而不是之前的 EventLoop-2-1,3-1 是上面線程大家應(yīng)該知道吧,就是 worker group 線程池中的 eventLoop,也就是剛剛注冊(cè)的 Socket。

從上面的斷點(diǎn)可以看出,這里確實(shí)是讀事件,斷點(diǎn)提示也指出這個(gè) unsafe 是 NioSocketChannel 的 內(nèi)部類 NioSocketChannelUnsafe,我們跟進(jìn)去看看。
進(jìn)入的是 NioSocketChannelUnsafe 的抽象父類 AbstractNioByteChannel 的 read 方法。精簡(jiǎn)過(guò)的代碼如下:
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// 讀取數(shù)據(jù)到容器
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 讓 handler 處理容器中的數(shù)據(jù)
pipeline.fireChannelRead(byteBuf);
// 告訴容器處理完畢了,觸發(fā)完成事件
pipeline.fireChannelReadComplete();
}
這里樓主簡(jiǎn)化了很多代碼,留下的是對(duì)本次分析比較重要的內(nèi)容。注釋已經(jīng)寫的很清除,首先從 unsafe 中讀取數(shù)據(jù),然后,將讀好的數(shù)據(jù)交給 pipeline,pipeline 調(diào)用 inbound 的 channelRead 方法,讀取成功后,調(diào)用 inbound 的 handler 的 ChannelReadComplete 方法。
在進(jìn)入方法之前,樓主向祭出上文中的圖,讓我們看后面的代碼更清晰:

該圖詮釋了一個(gè)請(qǐng)求在 pipeline 的流動(dòng)過(guò)程。請(qǐng)記住他。
整個(gè)過(guò)程還是比較清晰的。我們首先進(jìn)入 pipeline 的 fireChannelReadComplete 方法,這個(gè)方法是實(shí)現(xiàn)了 invoker 的方法。

內(nèi)部調(diào)用的是 AbstractChannelHandlerContext.invokeChannelRead(head, msg) 靜態(tài)方法,并傳入了 head,我們知道入站數(shù)據(jù)都是從 head 開(kāi)始的,以保證后面所有的 handler 都由機(jī)會(huì)處理數(shù)據(jù)流。
我們看看這個(gè)靜態(tài)方法內(nèi)部是怎么樣的:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
調(diào)用這個(gè) Context (也就是 head) 的 invokeChannelRead 方法,并傳入數(shù)據(jù)。我們?cè)倏纯?invokeChannelRead 方法的實(shí)現(xiàn):
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
這里和我們的圖畫的是一致的,調(diào)用了 Context 包裝的 handler 的 channelRead 方法。注意:直到目前,這個(gè) Context 還是 head,也就是調(diào)用 head 的 channelRead 方法。那么這個(gè)方法是怎么實(shí)現(xiàn)的呢?
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
什么都沒(méi)做,和我們圖中一樣,調(diào)用 Context 的 fire 系列方法,將請(qǐng)求轉(zhuǎn)發(fā)給下一個(gè)節(jié)點(diǎn)。我們這里是 fireChannelRead 方法,注意,這里方法名字都挺像的。需要細(xì)心區(qū)分。下面我們看看 Context 的成員方法 fireChannelRead:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
這個(gè)是 head 的抽象父類 AbstractChannelHandlerContext 的實(shí)現(xiàn),該方法再次調(diào)用了靜態(tài) fire 系列方法,但和上次不同的是,不再放入 head 參數(shù)了,而是使用 findContextInbound 方法的返回值。從這個(gè)方法的名字可以看出,是找到入站類型的 handler。我們看看方法實(shí)現(xiàn):
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
該方法很簡(jiǎn)單,找到當(dāng)前 Context 的 next 節(jié)點(diǎn)(inbound 類型的)并返回。這樣就能將請(qǐng)求傳遞給后面的 inbound handler 了。
重復(fù)上面的邏輯,終于數(shù)據(jù)到了我們自己寫的 handler-------EchoInServerHandler。

好,到這里,我們已經(jīng)知道了一個(gè)請(qǐng)求時(shí)怎么到達(dá)我們自定義的 handler 的,再來(lái)看看我們的圖:

請(qǐng)求進(jìn)來(lái)時(shí),pipeline 會(huì)從 head 節(jié)點(diǎn)開(kāi)始輸送,通過(guò)配合 invoker 接口的 fire 系列方法,實(shí)現(xiàn) Context 鏈在 pipeline 中的完美傳遞。最終到達(dá)我們自定義的 handler。
到了自定義 handler,我們會(huì)輸出客戶端發(fā)送的內(nèi)容,我們截圖看看:

成功輸出。
注意:此時(shí)如果我們想繼續(xù)向后傳遞該怎么辦呢?我們前面說(shuō)過(guò),可以調(diào)用 Context 的 fire 系列方法,就像 head 的 channelRead 方法一樣,調(diào)用 fire 系列方法,直接向后傳遞就 ok 了。
當(dāng)然,我們這里不需要,我們需要發(fā)送一條數(shù)據(jù)客戶端。那么,我們就來(lái)看看一條數(shù)據(jù)是如何到達(dá)客戶端的。
3. 看 pipeline 是如何將數(shù)據(jù)從自定義 handler 送出的
在打印了客戶端的內(nèi)容后,我們調(diào)用了 Context 的 writeAndFlush 方法,從 inbound 和 outbound 的定義來(lái)看,這個(gè)方法是 outbound 定義的,也就是出站方法。
在debug 進(jìn)去看看之前,我們能否猜測(cè)一下呢,這個(gè) Context 肯定會(huì)調(diào)用他的抽象父類 AbstractChannelHandlerContext 方法, 我們跟進(jìn)去看看:

果不其然。調(diào)用了 AbstractChannelHandlerContext 的 writeAndFlush 方法,然后,調(diào)用了他的重載方法,多傳入了一個(gè) promise 實(shí)例。看看是如何創(chuàng)建的:
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
我們?cè)俑M(jìn)去看看 writeAndFlush :

這里調(diào)用了 write 方法,并直接返回了 promise。繼續(xù)跟進(jìn)查看:

注意:這里調(diào)用了 findContextOutbound,尋找下一個(gè) outbound 節(jié)點(diǎn)。我們看看是如何實(shí)現(xiàn)的:

根據(jù)當(dāng)前節(jié)點(diǎn),找到之前的節(jié)點(diǎn)并且是 outbound 類型。
可以看到,數(shù)據(jù)開(kāi)始出站,從后向前開(kāi)始流動(dòng),和入站的方向是反的。
回到 write 方法,得到下一個(gè)節(jié)點(diǎn)后,調(diào)用下一個(gè)節(jié)點(diǎn)的 invokeWriteAndFlush 方法,這個(gè)是 invoker 接口的方法。

調(diào)用 invokeWrite0 方法,注意,Netty 很多方法都以 0 結(jié)尾,表示這是最底層的方法了,而再 JDK 中,結(jié)尾是 0 表示這是一個(gè)本地方法。我們進(jìn)入該方法查看:

調(diào)用了這個(gè) Context 的 worite 方法。還記得我們也寫了一個(gè) EchoOutServerHandler 類嗎,可能會(huì)進(jìn)入我們自己寫入的類的方法嗎?當(dāng)然不會(huì),因?yàn)槲覀兲砑拥捻樞蚴窍旅孢@樣的:

inbound 在前,outbound 在后,當(dāng)程序走到 inbound 就調(diào)用 outbound 的方法了,并找當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn),而我們寫的 outbound 是這個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),永遠(yuǎn)不會(huì)走到這里的。
那么會(huì)走到哪里呢,當(dāng)然是走到 head 節(jié)點(diǎn),因?yàn)?head 節(jié)點(diǎn)就是 outbound 類型的 handler。
進(jìn)入到 head 的 write 方法查看:

調(diào)用了 底層的 unsafe 操作數(shù)據(jù),到這里,我們就不跟了,基于我們今天的目的,我們只想知道一個(gè)請(qǐng)求在 pipeline 是如何流轉(zhuǎn)的。底層數(shù)據(jù)傳播的細(xì)節(jié)就不再贅述。留在以后研究。
當(dāng)執(zhí)行完這個(gè) write 方法后,方法開(kāi)始退棧。逐步退到 unsafe 的 read 方法,回到最初開(kāi)始的地方,然后繼續(xù)調(diào)用 pipeline.fireChannelReadComplete() 方法,重復(fù)之前 pipeline 的設(shè)計(jì)。

到這里,我們應(yīng)該已經(jīng)清楚了一個(gè)請(qǐng)求時(shí)如何在 pipeline 中周轉(zhuǎn)的了。
4. 總結(jié)
總結(jié)一下一個(gè)請(qǐng)求在 pipeline 中的流轉(zhuǎn)過(guò)程:
- 調(diào)用 pipeline 的 fire 系列方法,這些方法是接口 invoker 設(shè)計(jì)的,pipeline 實(shí)現(xiàn)了 invoker 的所有方法,inbound 事件從 head 開(kāi)始流入,outbound 事件從 tail 開(kāi)始流出。
- pipeline 會(huì)將請(qǐng)求交給 Context,然后 Context 通過(guò)抽象父類 AbstractChannelHandlerContext 的 invoke 系列方法(靜態(tài)和非靜態(tài)的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各個(gè) Context 的數(shù)據(jù)流轉(zhuǎn)。
- 當(dāng)入站過(guò)程中,調(diào)用 了出站的方法,那么請(qǐng)求就不會(huì)向后走了。后面的處理器將不會(huì)有任何作用。想繼續(xù)相會(huì)傳遞就調(diào)用 Context 的 fire 系列方法,讓 Netty 在內(nèi)部幫你傳遞數(shù)據(jù)到下一個(gè)節(jié)點(diǎn)。如果你想在整個(gè)通道傳遞,就在 handler 中調(diào)用 channel 或者 pipeline 的對(duì)應(yīng)方法,這兩個(gè)方法會(huì)將數(shù)據(jù)從頭到尾或者從尾到頭的流轉(zhuǎn)一遍。
最后,再次祭上我們的圖,配合 debug 堆棧信息:

上圖就是 pipeline 一個(gè)通用的數(shù)據(jù)流動(dòng)過(guò)程。
好。good luck ?。。?!