Pipeline
設(shè)計(jì)模式中有一種設(shè)計(jì)模式叫做責(zé)任鏈模式,netty pipeline就是責(zé)任鏈模式的一種實(shí)現(xiàn),鏈上每個(gè)節(jié)點(diǎn)按照不同的添加方式和添加順序排列在鏈上不同的位置,這條鏈?zhǔn)且粭l雙向鏈,在netty中用戶創(chuàng)建的handler的都會(huì)通過(guò)DefaultChannelHandlerContext包裝成鏈上的節(jié)點(diǎn)。
DefaultChannelPipeline
netty默認(rèn)創(chuàng)建的pipeline類型是DefaultChannelPipeline,DefaultChannelPipeline默認(rèn)會(huì)創(chuàng)建一個(gè)head節(jié)點(diǎn)和一個(gè)tail節(jié)點(diǎn),用戶根據(jù)業(yè)務(wù)需求創(chuàng)建的業(yè)務(wù)處理handler都會(huì)被添加到這兩個(gè)節(jié)點(diǎn)中間,我們知道java io事件包含兩種類型:讀,寫(xiě)
netty定義了ChannelInboundHandler和ChannelOutboundHandler作為這兩種事件處理的抽象接口提供給開(kāi)發(fā)者,實(shí)現(xiàn)ChannelInboundHandler的handler用來(lái)處理讀事件,實(shí)現(xiàn)ChannelOutboundHandler的handler用來(lái)處理寫(xiě)事件。這里需要注意一點(diǎn):在pipeline鏈上對(duì)讀寫(xiě)事件的處理方向是不同的
- 對(duì)于讀事件netty從pipeline的head開(kāi)始向tail方向依次把讀事件交給實(shí)現(xiàn)了ChannelInboundHandler的handler處理
- 對(duì)于寫(xiě)事件netty從pipeline的tail開(kāi)始向head方向依次把寫(xiě)事件交給實(shí)現(xiàn)了ChannelOutboundHandler的handler處理

Handler
handler是pipeline上處理事件的基本單元,用戶處理IO事件的業(yè)務(wù)邏輯都是寫(xiě)在自定義的handler中
ChannelHandlerContext
ChannelHandlerContext是handler上下文信息類,實(shí)際上pipeline鏈上節(jié)點(diǎn)的類型是ChannelHandlerContext,默認(rèn)實(shí)現(xiàn)是DefaultChannelHandlerContext
我們看下DefaultChannelHandlerContext源代碼
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
//DefaultChannelHandlerContext綁定了用戶定義的handler
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
DefaultChannelHandlerContext作為pipeline鏈上的節(jié)點(diǎn),它的父類AbstractChannelHandlerContext有pre和next兩個(gè)屬性分別指向前一個(gè)節(jié)點(diǎn)和后一個(gè)節(jié)點(diǎn),可以看出pipeline是一個(gè)雙向鏈表。
DefaultChannelHandlerContext和handler綁定
用戶定義的handler是如何綁定到一個(gè)DefaultChannelHandlerContext的呢?
我們使用pipleline.addLast()來(lái)解析,當(dāng)我使用pipeline.addLast去添加自定義的handler的時(shí)候,最終會(huì)調(diào)用下面所示的代碼
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//newContex通過(guò)創(chuàng)建DefaultChannelHandlerContext的方式實(shí)現(xiàn)了context和handler的綁定
newCtx = newContext(group, filterName(name, handler), handler);
//把新建的DefaultChannelHandlerContext添加到pipeline的尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
executionMask
在創(chuàng)建DefaultChannelHandlerContext的時(shí)候會(huì)初始化它的父類AbstractChannelHandlerContext,這個(gè)類的executionMask屬性得我們?nèi)パ芯肯隆?br> pipeline上會(huì)觸發(fā)很多類型的事件,每個(gè)handler處理的事件類型是不同的,executionMask的作用就是標(biāo)識(shí)不同handler能響應(yīng)的事件類型,下面是handler executionMask計(jì)算方式如下:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
可以看到executionMask的值是根據(jù)handle類實(shí)現(xiàn)的不同事件響應(yīng)方法計(jì)算得到
事件是如何在pipeline上傳播的
從上面我們知道pipeline是一條雙向鏈表,那么事件是如何在pipeline的節(jié)點(diǎn)之間傳播的呢?我們拿channelRegistered事件來(lái)分析
當(dāng)一個(gè)channel在NioEventLoop上注冊(cè)完成后會(huì)觸發(fā)channelRegistered事件,觸發(fā)事件的入口是DefaultPipeline.fireChannelRegistered()方法
public final ChannelPipeline fireChannelRegistered() {
//head是pipeline鏈表上的的第一節(jié)點(diǎn),表示從鏈表的頭部開(kāi)始依次向尾部去處理registered事件
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
我們看下AbstractChannelHandlerContext.invokeChannelRegistered源代碼
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//觸發(fā)pipeline上當(dāng)前節(jié)點(diǎn)的invokeChannelRegistered方法
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
我繼續(xù)分析AbstractChannelHandlerContext.invokeChannelRegistered源代碼
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
//調(diào)用當(dāng)前DefaultChannelHandlerContext綁定的handler的channelRegistered方法,
//用戶根據(jù)自己業(yè)務(wù)邏輯重寫(xiě)的channelRegistered方法會(huì)被調(diào)用
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
用戶自定義的handler在處理完事件之后如何繼續(xù)觸發(fā)下一個(gè)節(jié)點(diǎn)執(zhí)行呢?一般用戶自定義handler重寫(xiě)的channelRegistered方法會(huì)在處理完業(yè)務(wù)邏輯后調(diào)用AbstractChannelHandlerContext.fireChannelRegistered
我們看下 fireChannelRegistered源代碼
public ChannelHandlerContext fireChannelRegistered() {
//findContextInbound在pipeline上從本節(jié)點(diǎn)開(kāi)始向pipeline的尾部找到下一個(gè)能處理channel registered事件的handler,
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
我們給出上面執(zhí)行的邏輯圖,XXX代表的是對(duì)應(yīng)的事件,比如registered,add,YY代表的是事件的類型,YY有兩種值:in 和 out

ChannelInitializer
ChannelInitializer是netty內(nèi)置的一個(gè)特殊的Inboundhandler,這個(gè)handler是netty提供給開(kāi)發(fā)者實(shí)現(xiàn)向pipeline中添加自定義handler的工具類,它一般會(huì)首先被添加到pipeline中,開(kāi)發(fā)者通過(guò)實(shí)現(xiàn)ChannelInitializer的initChannel方法去添加業(yè)務(wù)handler到pipeline中。那么initChannel方法是如何被調(diào)用的呢?開(kāi)發(fā)者在向pipeline添加handler的時(shí)候會(huì)觸發(fā)handlerAdded事件,handlerAdded事件會(huì)觸發(fā)相應(yīng)handler的handlerAdded方法,我們看下ChannelInitializer handlerAdded方法的源碼
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//判斷channel是不是已經(jīng)注冊(cè)了,如果在channel還沒(méi)有注冊(cè)的情況下向pipeline添加了handler,
//這個(gè)時(shí)候觸發(fā)的handlerAdded事件會(huì)被保存在一個(gè)鏈表中,
//將來(lái)當(dāng)channel注冊(cè)完成的時(shí)候會(huì)去查看這個(gè)鏈表有沒(méi)有pending的handlerAdded事件需要觸發(fā),
//如果有pending的handlerAdded事件,那么就調(diào)用相應(yīng)handler的handlerAdded方法
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
//這里會(huì)執(zhí)行用戶重寫(xiě)的ChannelInitializer的initChannel方法,
//這樣就實(shí)現(xiàn)了把用戶定義的各種業(yè)務(wù)handler添加到pipeline中
//當(dāng)成功執(zhí)行用戶重寫(xiě)的initChannel方法后,ChannelInitializer會(huì)把自己從pipeline中刪除,
//這樣pipeline中包含handlers除了head和tail剩下的都是用戶自己定義的業(yè)務(wù)handler
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
上面就是對(duì)netty pipeline的解析