Netty的ChannelPipeline和ChannelHandler機(jī)制類似于Servlet和Filter過濾器,在設(shè)計(jì)模式中是一種責(zé)任鏈模式。ChannelPipeline持有一系列ChannelHandler的鏈表,每個(gè)ChannelHandler可以對(duì)I/O事件進(jìn)行攔截和處理。這樣,I/O事件消息在ChannelPipeline中流動(dòng)和傳遞時(shí),可以根據(jù)配置的ChannelHandler實(shí)現(xiàn)不同的業(yè)務(wù)邏輯定制。
1.ChannelPipeline
ChannelPipeline負(fù)責(zé)ChannelHandler的管理和事件攔截調(diào)度。
1.1ChannelPipeline處理流程
下圖展示了一個(gè)I/O事件消息通過ChannelPipeline進(jìn)行處理的全過程。
1)讀事件,底層的Socket.read()方法(such as {@link SocketChannel#read(ByteBuffer)})讀取ByteBuf,然后出發(fā)channelRead事件,通過NioEventLoop會(huì)調(diào)用pipeline的fireChannelRead(Object msg)方法;然后消息依次被Inbound Handler鏈條攔截和調(diào)用。
2)寫事件,當(dāng)調(diào)用ChannelHandlerContext的write方法發(fā)送消息時(shí),消息也會(huì)依次被Outbound Handler鏈條攔截和調(diào)用,并最終調(diào)用socket的write()方法將數(shù)據(jù)寫出去。

由上也可以得知,Netty中的事件也分為InBound事件和OutBound事件,并有分別對(duì)應(yīng)的Handler鏈條去處理。并且事件在Handler之間的傳遞是通過
ChannelHandlerContext的fireIN_EVT()和OUT_EVT()方法觸發(fā)和傳遞的。對(duì)于InBound事件,這些觸發(fā)方法有:
* <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
* <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
* <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
* <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
* <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
對(duì)于outBound事件,這些觸發(fā)方法有:
* <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#flush()}</li>
* <li>{@link ChannelHandlerContext#read()}</li>
* <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>
1.2 ChannelPipeline源碼分析
Netty中pipeline的默認(rèn)實(shí)現(xiàn)類是DefaultChannelPipeline??匆幌翫efaultChannelPipeline的實(shí)現(xiàn):
1)類型為AbstractChannelHandlerContext的兩個(gè)對(duì)象head、tail,DefaultChannelPipeline是通過AbstractChannelHandlerContext將Handler進(jìn)行串聯(lián)成一個(gè)鏈條的。具體可見下邊的添加Handler的過程分析。
2)該pipeline對(duì)應(yīng)的channel。
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
}
1.2.1 添加一個(gè)Handler過程分析
舉例addLast方法是如何添加一個(gè)新的Handler的,這個(gè)方法值得我們非常仔細(xì)地去探討一下。
addLast(EventExecutorGroup group, String name, ChannelHandler handler)
入?yún)?/strong>:1)EventExecutorGroup group,表示的是最終執(zhí)行Handler的線程池;2)String name,代表該Handler的名字;3)ChannelHandler handler是需要添加的具體執(zhí)行操作的Handler。
執(zhí)行過程分析:
①. newContext會(huì)創(chuàng)建一個(gè)AbstractChannelHandlerContext,將EventExecutorGroup、ChannelHandler、name等封裝到該對(duì)象中。
②.addLast0會(huì)將該AbstractChannelHandlerContext加入值ChannelPipeline得鏈條中去。代碼可見下邊,典型的鏈表追加操作
③.if (!registered)判斷該Channel是否已經(jīng)成功注冊(cè)到EventLoop中:
1)如果沒有的話,會(huì)創(chuàng)建一個(gè)CallbackTask(該task會(huì)執(zhí)行ChannelHandler.handlerAdded),等到channel注冊(cè)到EventLoop后回調(diào)執(zhí)行該task
2)已經(jīng)注冊(cè)的話,后續(xù)會(huì)執(zhí)行callHandlerAdded0,根據(jù)executor.inEventLoop()判斷決定是在當(dāng)前線程執(zhí)行還是在新線程中執(zhí)行。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
1.2.2 I/O事件執(zhí)行過程分析
我們以一個(gè)I/O讀事件作為一個(gè)代表對(duì)ChannelPipeline的執(zhí)行過程進(jìn)行分析,ChannelPipeline中對(duì)讀事件的執(zhí)行方法是fireChannelRead(Object msg)。
通過代碼分析,我們可以看到會(huì)直接執(zhí)行到AbstractChannelHandlerContext類的方法invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),入?yún)轭^指針head和對(duì)象msg。
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
在invokeChannelRead的方法中,還是會(huì)根據(jù)executor.inEventLoop()方法,根據(jù)用戶的線程設(shè)置,最終調(diào)用到對(duì)應(yīng)handler的channelRead方法。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
//to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called yet. If not return {@code false} and if called or could not detect return {@code true}.
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
2. ChannelHandler
先看一些類的繼承圖:

2.1 ChannelHandler中的方法
Netty定義了良好的類型層次結(jié)構(gòu)來表示不同的處理程序類型,所有的類型的父類是ChannelHandler。ChannelHandler提供了在其生命周期內(nèi)添加或從ChannelPipeline中刪除的方法。
1). handlerAdded,ChannelHandler添加到實(shí)際上下文中準(zhǔn)備處理事件
2). handlerRemoved,將ChannelHandler從實(shí)際上下文中刪除,不再處理事件
3). exceptionCaught,處理拋出的異常
2、ChannelInboundHandler
ChannelInboundHandler提供了一些方法再接收數(shù)據(jù)或Channel狀態(tài)改變時(shí)被調(diào)用。下面是ChannelInboundHandler的一些方法: 1). channelRegistered,ChannelHandlerContext的Channel被注冊(cè)到EventLoop; 2). channelUnregistered,ChannelHandlerContext的Channel從EventLoop中注銷 3). channelActive,ChannelHandlerContext的Channel已激活 4). channelInactive,ChannelHanderContxt的Channel結(jié)束生命周期 5). channelRead,從當(dāng)前Channel的對(duì)端讀取消息 6). channelReadComplete,消息讀取完成后執(zhí)行 7). userEventTriggered,一個(gè)用戶事件被觸發(fā) 8). channelWritabilityChanged,改變通道的可寫狀態(tài),可以使用Channel.isWritable()檢查 9). exceptionCaught,重寫父類ChannelHandler的方法,處理異常.
舉一個(gè)最常用的MessageToMessageDecoder作為例子,執(zhí)行decode將msg對(duì)象進(jìn)行轉(zhuǎn)換后,如果想繼續(xù)在Pipeline中繼續(xù)傳遞下去,必須顯示地去執(zhí)行ctx.fireChannelRead方法,會(huì)通過AbstractChannelHandlerContext繼續(xù)輪轉(zhuǎn)到下一個(gè)ChannelHandler去執(zhí)行。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}
}