導(dǎo)讀
原創(chuàng)文章,轉(zhuǎn)載請注明出處。
本文源碼地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼
Pipeline這個(gè)詞翻譯過來就是“流水線”的意思,讀到這里有了解過設(shè)計(jì)模式的同學(xué)應(yīng)該已經(jīng)想到了,這里用到的是“責(zé)任鏈模式”。本文我們將以DefaultChannelPipeline為例看一下Pipeline的構(gòu)造以及其中重要的數(shù)據(jù)結(jié)構(gòu)。
1 和Pipeline相關(guān)的其他組件
1.1 ChannnelHandler
這是ChannelHandler中的注釋,翻譯過來就是“處理IO事件或者攔截IO操作,并且將其向ChannelPipeline中的下一個(gè)handler傳遞”,說白了就是在責(zé)任鏈中注冊的一系列回調(diào)方法。
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
its ChannelPipeline
這里的I/O event就是很多書中提到的“入站事件”,而I/O operation就是很多書中提到的“出站事件”,前面我說過,這里我并不準(zhǔn)備這么叫,按我的理解我習(xí)慣把這兩者稱之為“事件”和“命令”。很顯然這里event和operation的含義是不一樣的,event更多地多表示事件發(fā)生了,我們被動(dòng)地收到,而operation則表示我們主動(dòng)地發(fā)起一個(gè)動(dòng)作或者命令。
1.2 ChannelHandlerContext
每一個(gè)ChannelHandler在被添加進(jìn)ChannelPipeline時(shí)會(huì)被包裝進(jìn)一個(gè)ChannelHandlerContext。有兩個(gè)特殊的ChannelHandlerContext除外,分別是HeadContext和TailContext,HeadContext繼承了ChannelInBoundHandler和ChannelOutBoundHandler,而TailContext繼承了ChannelInBoundHandler。
每個(gè)ChannelHandlerContext中有兩個(gè)指針next和prev,這是用來將多個(gè)ChannelHandlerContext構(gòu)成雙向鏈表的。
2 Pipeline的構(gòu)造方法
我們以DefaultChannelPipeline為例,從它的構(gòu)造方法開始。這里首先將Channel保存到Pipeline的屬性中,又初始化了兩個(gè)屬性succeedFuture和voidPromise。這是兩個(gè)特殊的可以共享的Promise,這兩個(gè)Promise不是重點(diǎn),不理解也沒關(guān)系。
接下來的tail和head是兩個(gè)特殊的ChannelHandlerContext,這兩個(gè)是Pipeline中的重要組件。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Pipeline在執(zhí)行完構(gòu)造方法以后的結(jié)構(gòu)如下圖所示,head和tail構(gòu)成了最簡單的雙向鏈表。

圖中藍(lán)色填充的就是ChannelHandlerContext,目前只有HeadContext和TailContext,ChannelHandlerContext中的較窄的矩形表示ChannelHandler,由于HeadContext和TailContext并沒有包含ChannelHandler,而是繼承ChannelHandler,所以這里我們用虛線表示。上下貫通的ChannelHandler表示既是ChannelInBoundHandler又是ChannelOutBoundHandler,只有上半部分的表示是ChannelInBoundHandler,只有下半部分的表示是ChannelOutBoundHandler。
3 添加ChannelHandler
在ChannelPipeline中有很多以add開頭的方法,這些方法就是向ChannelPipeline中添加ChannelHandler的方法。
-
addAfter:向某個(gè)ChannelHandler后邊添加 -
addBefore:向某個(gè)ChannelHandler前面添加 -
addFirst:添加到頭部,不能在head的前面,而是緊挨著head,在head的后面 -
addLast:添加到尾部,不能在tail的后面,而是緊挨著tail,在tail的前面
我們以最常用的的addLast方法為例來分析一下Pipeline中添加ChannelHandler的操作。
這里所貼出的addLast方法其實(shí)我們已經(jīng)在“服務(wù)端啟動(dòng)流程”這篇文章中打過照面了。方法參數(shù)中的EventExecutorGroup意味著我們可以為這個(gè)ChannelHandler單獨(dú)設(shè)置Excutor而不使用Channel所綁定的EventLoop,一般情況下我們不這么做,所以group參數(shù)為null。
這里先把ChannelHandler包裝成ChannelHandlerContext,再添加到尾部,隨后調(diào)用ChannelHandler的HandlerAdded方法。
在調(diào)用HandlerAdded方法時(shí)有一點(diǎn)問題,添加ChannelHandler的操作不需要在EventLoop線程中進(jìn)行,而HandlerAdded方法則必須在EventLoop線程中進(jìn)行。也就是說存在添加Handler時(shí)還未綁定EventLoop的情況,此時(shí)則調(diào)用newCtx.setAddPending()將當(dāng)前HandlerContext設(shè)置為ADD_PENDING狀態(tài),并且調(diào)用callHandlerCallbackLater(newCtx, true)將一個(gè)異步任務(wù)添加到一個(gè)單向隊(duì)鏈表中,即pendingHandlerCallbackHead這個(gè)鏈表。
如果當(dāng)前已經(jīng)綁定了EventLoop,則看當(dāng)前調(diào)用線程是否為EventLoop線程,如果不是則向EventLoop提交一個(gè)異步任務(wù)調(diào)用callHandlerAdded0方法,否則直接調(diào)用callHandlerAdded0方法。
下面咱們依次分析一下newContext,callHandlerCallbackLater和callHandlerAdd0方法。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//先把`handler`包裝成`HandlerContext`
newCtx = newContext(group, filterName(name, handler), handler);
//添加到尾部
addLast0(newCtx);
//如果還未綁定`EventLoop`則稍后再發(fā)起對`HandlerAdded`方法的調(diào)用。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//如果已經(jīng)綁定了EventLoop,并且當(dāng)前線程非EventLoop線程的話就提交一個(gè)異步任務(wù),就發(fā)起一個(gè)異步任務(wù)去調(diào)用HandlerAdded方法。
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//如果當(dāng)前線程是EventLoop線程,就直接調(diào)用HandlerAdded方法。
callHandlerAdded0(newCtx);
return this;
}
3.1 newContext
先看來一下newContext方法,這里直接調(diào)用了DefaultChannelHandlerContext的構(gòu)造方法,咱們跟進(jìn)去看看。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
在DefaultChannelHandlerContext的構(gòu)造方法中又調(diào)用了父類AbstractChannelHandlerContext的構(gòu)造方法,保存了handler屬性。在調(diào)用父類構(gòu)造方法之前調(diào)用了isInboud和isOutbound方法判斷當(dāng)前的Handler是否為ChannelInBoundHandler或者ChannelOutBoundHandler,這兩個(gè)方法很簡單,不再展開。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
接下來看AbstractChannelHandlerContext的構(gòu)造方法,這里非常簡單,保存了幾個(gè)屬性,咱們看一下ordered這個(gè)屬性。ordered表示EventExecutor在執(zhí)行異步任務(wù)時(shí)是否按添加順序執(zhí)行,這里一般情況下executor為null,表示使用Channel所綁定的EventLoop線程,而EventLoop線程都是OrderedEventExecutor的實(shí)現(xiàn)類。所以這里我們不考慮ordered為false的情況。
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
上面提到了ChannelHandlerContext可以在構(gòu)造方法里單獨(dú)指定EventExecutor,如果沒有單獨(dú)指定的話就使用Channel所綁定的EventLoop,代碼在哪里呢,就在AbstractChannelHandlerContext#executor方法,非常簡單,如果沒有為當(dāng)前ChannelHandler指定excutor則返回Channel所綁定的EventLoop。
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
3.2 callHandlerCallbackLater
在添加完ChannelHandler之后將調(diào)用ChannledHandler的handlerAdded方法,但是此時(shí)有可能還未綁定EventLoop,而handlerAdded方法的調(diào)用必須在EventLoop線程內(nèi)執(zhí)行,此時(shí)就需要調(diào)用callHandlerCallbackLater方法在pendingHandlerCallbackHead鏈表中添加一個(gè)PendingHandlerAddedTask。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
接下來咱們看一下PendingHandlerAddedTask的代碼,邏輯在execute方法里,這里直接調(diào)用了callHandlerAdded0。
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
}
}
}
}
3.3 callHandlerAdded0
不管是在未綁定EventLoop的情況下延遲調(diào)用handlerAdded還是在已經(jīng)綁定了EventLoop的情況下立即調(diào)用HandlerAdded,最終都會(huì)調(diào)用到callHandlerAdded0方法。這里干了兩件事,一是調(diào)用ChannelHandler的handlerAdded方法,二是將HandlerContext的狀態(tài)設(shè)置為ADD_COMPLETE狀態(tài)。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
}
3.4 添加多個(gè)ChannelHandler后的Pipeline
還記得咱們的“Netty整體架構(gòu)圖”嗎,在這里咱們把Pipeline部分單獨(dú)放大拿出來看一下,在添加完多個(gè)ChannelHandler之后,Pipeline的結(jié)構(gòu)是這樣的。

4 刪除ChannelHandler
在Pipeline中有幾個(gè)以remove開頭的方法,這些方法的作用就是刪除ChannelHandler。
-
remove(ChannelHandler handler):從head向tail查找,用==判斷是否為同一實(shí)例,只刪除第1個(gè)。 -
remove(Class<T> handlerType):從head向tail查找,用isAssignableFrom方法判斷是否為符合條件的類型,只刪除第1個(gè)。 -
remove(String name):從head向tail查找,用name精確匹配查找,只刪除第1個(gè),因?yàn)?code>name不能重復(fù),所以這里刪除第1個(gè)也是唯一的1個(gè)。 -
removeFirst:刪除head的后一個(gè),不能刪除tail。 -
removeLast:刪除tail的前一個(gè),不能刪除head。
上述無論哪種刪除方式在查找到對應(yīng)的HandlerContext后都會(huì)調(diào)用到remove(final AbstractChannelHandlerContext ctx)方法,查找過程比較簡單,咱們不再展開,直接看remove(final AbstractChannelHandlerContext ctx)方法。
看看這個(gè)方法的實(shí)現(xiàn),是不是和addLast(EventExecutorGroup group, String name, ChannelHandler handler)很相似,非常相似。首先從雙向鏈表中刪除ChannelHandlerContext,再調(diào)用callHandlerRemoved0方法,callHandlerRemoved0方法內(nèi)會(huì)調(diào)用handlerRemoved方法,這個(gè)調(diào)用必須在EventLoop線程內(nèi)進(jìn)行。如果刪除時(shí)還未綁定EventLoop則添加一個(gè)異步任務(wù)到鏈表pendingHandlerCallbackHead中。
如果已經(jīng)綁定了EventLoop并且當(dāng)前線程非EventLoop線程則向EventLoop提交一個(gè)異步任務(wù),否則直接調(diào)用callHandlerRemoved0方法。
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
synchronized (this) {
//從雙向鏈表中刪除`ChannelHandlerContext`
remove0(ctx);
//如果還未綁定`EventLoop`,則稍后調(diào)用`handlerRemoved`方法
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
//如果已經(jīng)綁定了`EventLoop`,但是當(dāng)前線程非`EventLoop`線程的話,就發(fā)起一個(gè)異步任務(wù)調(diào)用callHandlerRemoved0方法
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
//如果當(dāng)前線程就是`EventLoop`線程,則直接調(diào)用callHandlerRemoved0方法。
callHandlerRemoved0(ctx);
return ctx;
}
callHandlerCallbackLater方法咱們前面已經(jīng)分析過,和添加ChannelHandler時(shí)不同的是,這里向鏈表添加的是PendingHandlerRemovedTask,這個(gè)類也很簡單,不再展開。
這里咱們只看一下callHandlerRemoved0方法。這個(gè)方法很簡單,調(diào)用handlerRemoved方法,再把ChannelHandlerContext的狀態(tài)設(shè)置為REMOVE_COMPLETE。
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
4 pendingHandlerCallbackHead鏈表中的任務(wù)什么時(shí)候調(diào)用
在AbstractUnsafe的register0方法中,在綁定EventLoop以后,會(huì)調(diào)用pipeline.invokeHandlerAddedIfNeeded()方法,我們看一下pipeline.invokeHandlerAddedIfNeeded()方法。
private void register0(ChannelPromise promise) {
try {
// 去完成那些在綁定EventLoop之前觸發(fā)的添加handler操作,這些操作被放在pipeline中的pendingHandlerCallbackHead中,是個(gè)鏈表
pipeline.invokeHandlerAddedIfNeeded();
}
invokeHandlerAddedIfNeeded方法調(diào)用了callHandlerAddedForAllHandlers方法,咱們接著看下去。
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
callHandlerAddedForAllHandlers方法的邏輯咱就不再展開來說了,非常簡單,就是遍歷pendingHandlerCallbackHead這個(gè)單向鏈表,依次調(diào)用每個(gè)元素的execute方法,并且清空這個(gè)單向鏈表。
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
5 總結(jié)
Pipeline中的最重要的數(shù)據(jù)結(jié)構(gòu)就是由多個(gè)ChannelHandlerContext組成的雙向鏈表,而每個(gè)ChannelHandlerContext中包含一個(gè)ChannelHandler,ChannelHandler既可以添加也可以刪除。在Pipeline中有兩個(gè)特殊的ChannelHandlerContext分別是HeadContext及TailContext,這兩個(gè)ChannelHandlerContext中不包含ChannelHandler,而是采用繼承的方式。HeadContext實(shí)現(xiàn)了ChannelOutBoundHandler和ChannelInBoundHandler,而TailContext實(shí)現(xiàn)了ChannelInBoundHandler。
關(guān)于作者
王建新,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負(fù)責(zé)服務(wù)治理、RPC框架、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等。愛技術(shù)、愛學(xué)習(xí),歡迎聯(lián)系交流。
原創(chuàng)文章,碼字不易,點(diǎn)贊分享,手有余香。