原文鏈接:https://wangwei.one/posts/netty-pipeline-source-analyse-1.html
前面,我們分析了Netty EventLoop的 創(chuàng)建 與 啟動 原理,接下里我們來分析Netty中另外兩個重要組件—— ChannelHandler 與 Pipeline。Netty中I/O事件的傳播機制均由它負(fù)責(zé),下面我們來看看它是如何實現(xiàn)的。
Netty版本:4.1.30
我們前面在講 Channel創(chuàng)建 時,在AbstractChannel的構(gòu)造函數(shù)中, 一筆帶過地提到了Pipeline,現(xiàn)在我們來深入分析一下它的原理。
概述
Netty channel lifecycle
前面,我們在分析 Netty channel 源碼時,分析了Channel的創(chuàng)建、初始化、注冊、綁定過程。在Netty中,channel的生命周期如下所示:

- ChannelRegistered:Channel注冊到了EventLoop上
- ChannelActive:Channel激活,連接到了遠(yuǎn)程某一個節(jié)點上,可以收發(fā)數(shù)據(jù)了
- ChannelInactive:斷開連接
- ChannelUnregistered:Channel從EventLoop上取消注冊
Netty channelHandler
Channel 每一次狀態(tài)的變化,都會產(chǎn)生一個事件,調(diào)用 ChannelHandler 中對應(yīng)的方法進行處理,我們看下 ChannelHandler的UML,其中最為重要的兩個ChannelHandler:
- ChannelInboundHandler:處理入站數(shù)據(jù)以及channel的各種狀態(tài)變化
- ChannelOutboundHandler:處理出站數(shù)據(jù)并允許攔截所有操作

Netty ChannelPipeline
前面我們在分析Channel創(chuàng)建過程時,每一個新創(chuàng)建的Channel都將會被分配一個新的ChannelPipeline。ChannelPipeline是一個攔截流經(jīng)Channel的入站和出站事件的ChannelHandler實例鏈,如圖所示:

一個 Channel 包含了一個 ChannelPipeline,ChannelPipeline內(nèi)部是一個雙向的鏈表結(jié)構(gòu),內(nèi)部由一個個的ChannelHandlerContext節(jié)點組成,ChannelPipeline有頭尾兩個固定的節(jié)點HeadContext與TailContext。用戶自定的ChannelHandler就是由ChannelHandlerContext包裝成Pipeline的節(jié)點,參與Channel整個生命周期中所觸發(fā)的入站事件與出站事件以及相應(yīng)數(shù)據(jù)流的攔截處理。
根據(jù)事件的起源,事件將會被ChannelInboundHandler(入站處理器)或者ChannelOutboundHandler(出站處理器)處理。隨后,通過調(diào)用ChannelHandlerContext實現(xiàn),它將被轉(zhuǎn)發(fā)給同一超類型的下一個ChannelHandler,如圖所示:

Pipeline UML
我們先來看下 ChannelPipeline 以及 ChannelHandlerContext 的類圖結(jié)構(gòu),它們都實現(xiàn)了ChannelInboundInvoker與ChannelOutboundInvoker接口。

Pipeline初始化
AbstractChannel構(gòu)造函數(shù)如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 創(chuàng)建默認(rèn)Pipeline
pipeline = newChannelPipeline();
}
// 創(chuàng)建默認(rèn)Pipeline
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
DefaultChannelPipeline 構(gòu)造函數(shù)如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 設(shè)置尾部節(jié)點
tail = new TailContext(this);
// 設(shè)置頭部節(jié)點
head = new HeadContext(this);
// 將tail與head串聯(lián)起來
head.next = tail;
tail.prev = head;
}
我們可以看到Pipeline其實是一個雙向鏈表的結(jié)構(gòu),剛剛初始化的時候,Pipeline(管道)中只有兩個節(jié)點,如圖:

接下來我們看看組成Pipeline節(jié)點的對象—— ChannelHandlerContext。
ChannelHandlerContext
ChannelHandlerContext 實現(xiàn)了AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker接口。Pipeline中的事件傳播,都是由ChannelHandlerContext負(fù)責(zé),將發(fā)生的事件從一個節(jié)點傳到下一個節(jié)點。
ChannelHandlerContext接口
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
// 返回ChannelHandlerContext中綁定的Channel
Channel channel();
// 返回專用于執(zhí)行任務(wù)的 EventExecutor
EventExecutor executor();
// 返回ChannelHandlerContext的唯一名稱。該名字將在ChannelHandler被添加到ChannelPipeline時會被用到,從ChannelPipeline中訪問注冊的ChannelHandler時,也會被用到。
String name();
// 返回ChannelHandlerContext中綁定的ChannelHandler
ChannelHandler handler();
// 屬于這個ChannelHandlerContext的ChannelHandler從ChannelPipeline移除了,返回true
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
// 返回分配的ChannelPipeline
ChannelPipeline pipeline();
// 返回用于分配ByteBuf的ByteBufAllocator
ByteBufAllocator alloc();
}
AttributeMap接口
實現(xiàn) AttributeMap 接口,表示ChannelHandlerContext節(jié)點可以存儲自定義的屬性。
// 屬性Map接口
public interface AttributeMap {
// 通過Key獲取屬性
<T> Attribute<T> attr(AttributeKey<T> key);
// 判斷屬性是否存在
<T> boolean hasAttr(AttributeKey<T> key);
}
ChannelInboundInvoker接口
實現(xiàn)ChannelInboundInvoker接口,表示節(jié)點可以用于傳播入站相關(guān)的事件。
public interface ChannelInboundInvoker {
// 當(dāng)Channel注冊到EventLoop上時
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelRegistered();
// 當(dāng)Channel從EventLoop上取消注冊
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelUnregistered();
// 當(dāng)Channel處理激活狀態(tài),意味著連接已經(jīng)建立
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelActive(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelActive();
// 當(dāng)Channel處理失效狀態(tài),意味著連接已經(jīng)斷開
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelInactive(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelInactive();
// 在pipeline中某個一個入站(inbound)操作出現(xiàn)了異常
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的exceptionCaught(ChannelHandlerContext)方法
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
// 收到用戶自定義的事件
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的userEventTriggered(ChannelHandlerContext)方法
ChannelInboundInvoker fireUserEventTriggered(Object event);
// Channel接收到了消息
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelRead(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelRead(Object msg);
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelReadComplete();
// 調(diào)用ChannelPipeline中下一個ChannelInboundHandler的channelWritabilityChanged(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelWritabilityChanged();
}
ChannelOutboundInvoker接口
實現(xiàn)ChannelOutboundInvoker接口,意味著節(jié)點可以用來處理出站相關(guān)的事件。
public interface ChannelOutboundInvoker {
// 將Channel綁定到一個本地地址,這將調(diào)用ChannelPipeline中的下一個ChannelOutboundHandler的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
// 將Channel連接到一個遠(yuǎn)程地址,這將調(diào)用ChannelPipeline中的下一個ChannelOutboundHandler的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
// 將Channel斷開連接。這將調(diào)用ChannelPipeline中的下一個ChannelOutbound- Handler的disconnect(ChannelHandlerContext, Channel Promise)方法
ChannelFuture disconnect();
ChannelFuture disconnect(ChannelPromise promise);
// 將Channel關(guān)閉。這將調(diào)用ChannelPipeline中的下一個ChannelOutbound- Handler的close(ChannelHandlerContext, ChannelPromise)方法
ChannelFuture close();
ChannelFuture close(ChannelPromise promise);
// 將Channel從它先前所分配的EventExecutor(即EventLoop)中注銷。這將調(diào)用ChannelPipeline中的下一個ChannelOutboundHandler的deregister (ChannelHandlerContext, ChannelPromise)方法
ChannelFuture deregister();
ChannelFuture deregister(ChannelPromise promise);
// 請求從Channel中讀取更多的數(shù)據(jù)。這將調(diào)用ChannelPipeline中的下一個ChannelOutboundHandler的read(ChannelHandlerContext)方法
ChannelOutboundInvoker read();
// 將消息寫入Channel。這將調(diào)用ChannelPipeline中的下一個Channel- OutboundHandler的write(ChannelHandlerContext, Object msg, Channel- Promise)方法。注意:這并不會將消息寫入底層的Socket,而只會將它放入隊列中。要將它寫入Socket,需要調(diào)用flush()或者writeAndFlush()方法
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
// 沖刷Channel所有掛起的寫入。這將調(diào)用ChannelPipeline中的下一個Channel- OutboundHandler的flush(ChannelHandlerContext)方法
ChannelOutboundInvoker flush();
// 這是一個先調(diào)用write()方法再接著調(diào)用flush()方法的便利方法
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
}
TailContext & HeadContext
接下來,我們看看Pipeline中的頭部與尾部節(jié)點。
TailContext節(jié)點
TailContext是尾部節(jié)點,inbound類型,主要處理Pipeline中數(shù)據(jù)流的收尾工作。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// 調(diào)用AbstractChannelHandlerContext構(gòu)造器
// TailContext是一個inbound(入站)節(jié)點
super(pipeline, null, TAIL_NAME, true, false);
// 設(shè)置添加完成
setAddComplete();
}
// 返回Handler,就是它自身
@Override
public ChannelHandler handler() {
return this;
}
...
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
...
}
// 如果pipeline中有異常沒做處理,最終會由TailContext打贏一個警告日志
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
// 釋放對象
ReferenceCountUtil.release(cause);
}
}
// 如果pipeline中有read消息沒有處理,最終會由TailContext打贏一個警告日志
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
// 設(shè)置 ChannelHandlerContext 狀態(tài)為添加完成,狀態(tài)=2
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
AbstractChannelHandlerContext
AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象實現(xiàn):
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
// 下一個節(jié)點
volatile AbstractChannelHandlerContext next;
// 上一個節(jié)點
volatile AbstractChannelHandlerContext prev;
// 是否為inBound類型
private final boolean inbound;
// 是否為outbound類型
private final boolean outbound;
// 綁定的默認(rèn)pipeline
private final DefaultChannelPipeline pipeline;
// 節(jié)點名
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
...
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
// 設(shè)置HandlerContext名稱
this.name = ObjectUtil.checkNotNull(name, "name");
// 綁定pipeline
this.pipeline = pipeline;
// 綁定executor(這里為null)
this.executor = executor;
// 如果節(jié)點為inbound類型就設(shè)置為true
this.inbound = inbound;
// 如果節(jié)點為outbound類型就設(shè)置為true
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
...
}
DefaultChannelHandlerContext
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// 調(diào)用 AbstractChannelHandlerContext 構(gòu)造函數(shù)
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
// 是否為inBound類型
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
// 是否為outBound類型
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
HeadContext
HeadContext是頭部節(jié)點,outbound類型,用于傳播事件和進行一些底層socket操作。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
// 調(diào)用AbstractChannelHandlerContext構(gòu)造器
// HeadContext是一個outbound(出站)節(jié)點
super(pipeline, null, HEAD_NAME, false, true);
// 設(shè)置Unsafe對象
unsafe = pipeline.channel().unsafe();
// 設(shè)置添加完成
setAddComplete();
}
// 返回ChannelHandler,就只它自身
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// 調(diào)用 unsafe 進行bind操作
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進行 connect 操作
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進行 disconnect 操作
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進行 close 操作
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進行 deregister 操作
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
// 調(diào)用 unsafe 進行 read 操作
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進行 write 操作
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// 調(diào)用 unsafe 進行 flush 操作
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 傳播ExceptionCaught事件
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
// 傳播channelRegistered事件
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// 傳播channelUnregistered事件
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelActive 事件
ctx.fireChannelActive();
// 在 https://wangwei.one/posts/netty-channel-source-analyse.html 中分析過了
// 主要是在channel激活之后,向底層的selector注冊一個SelectionKey.OP_ACCEPT監(jiān)聽事件
// 這樣channel在連接之后,就可以監(jiān)聽到一個read事件
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelInactive 事件
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 傳播 channelRead 事件
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelReadComplete 事件
ctx.fireChannelReadComplete();
//
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 傳播 userEventTriggered 事件
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelWritabilityChanged 事件
ctx.fireChannelWritabilityChanged();
}
}
Pipeline 節(jié)點添加
上面我們分析了Pipeline的基本結(jié)構(gòu),接下來我們看看Pipeline添加節(jié)點(也就是Handler處理器)的過程。該過程主要分為三步:
- 判斷是否重復(fù)添加
- 創(chuàng)建節(jié)點并添加至鏈表
- 回調(diào)添加完成事件
以這段常見的代碼為例:
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 添加 serverHandler
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
我們從 ChannelPipeline.addLast() 方法進去:
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 循環(huán)處理
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 檢查是否重復(fù)
checkMultiplicity(handler);
// 創(chuàng)建新節(jié)點
newCtx = newContext(group, filterName(name, handler), handler);
// 添加新節(jié)點
addLast0(newCtx);
// 如果 registered 為 false,則表示這個channel還未注冊到EventLoop上.
// 在這種情況下,我們添加一個Task到PendingHandlerCallback中,
// 等到這個channel注冊成功之后,將會調(diào)用立即調(diào)用 ChannelHandler.handlerAdded(...) 方法,已達(dá)到channel添加的目的
if (!registered) {
// 設(shè)置為待添加狀態(tài)
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 獲取executor
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// 設(shè)置為待添加狀態(tài)
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
// 回調(diào)添加完成事件
callHandlerAdded0(newCtx);
}
});
return this;
}
}
// 回調(diào)添加完成事件
callHandlerAdded0(newCtx);
return this;
}
// 檢查是否重復(fù)
private static void checkMultiplicity(ChannelHandler handler) {
// handler是否為ChannelHandlerAdapter類型,不是則不做處理
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 判斷handler是否添加了Sharable注解 && 是否添加過了
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
// 創(chuàng)建新的節(jié)點
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// 調(diào)用DefaultChannelHandlerContext的構(gòu)造函數(shù)
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 在tail節(jié)點之前添加新節(jié)點
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
// 回調(diào)ChannelHandler中的handlerAdded方法
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 我們必須在handlerAdded方法之前調(diào)用setAddComplete方法。否則的話,一旦handlerAdded方法產(chǎn)生了任何pipeline事件,由于狀態(tài)的緣故,ctx.handler()將會丟失這些事件的處理。
// 設(shè)置新節(jié)點的狀態(tài)為添加完成狀態(tài)
ctx.setAddComplete();
// 調(diào)用handlerAdded接口
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
...
// 如果添加失敗,則刪除新節(jié)點
remove0(ctx);
...
}
}
...
}
我們來看下setAddComplete()方法:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
// 通過自旋操作,設(shè)置狀態(tài)為ADD_COMPLETE
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
...
// 設(shè)置為 ADD_PENDING 狀態(tài)
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated;
// This should always be true as it MUST be called before setAddComplete() or setRemoved().
}
...
}
回調(diào)用戶自定義Handler中的handlerAdded方法:
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.printf("ServerHandler added ....");
}
...
}
ChannelInitializer
關(guān)于回調(diào)ChannelHandler中的handlerAdded()方法,最常見的一個場景就是,使用 ChannelInitializer 來添加我們自定義的ChannelHandler。ChannelInitializer被添加完成之后,會回調(diào)到它的 initChannel 方法。
接下來,我們看看 ChannelInitializer 這個類,它是一個特殊的ChannelInboundHandler,它提供了一種在Channel注冊到EventLoop后初始化Channel的簡便方法。
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();
/**
* 當(dāng) ch 注冊成功之后,該方法就會被調(diào)用,該方法結(jié)束返回之后,此ChannelInitializer實例將會從Channel所綁定的ChannelPipeline中移除
*
* @param ch 所注冊的Channel
*
*/
protected abstract void initChannel(C ch) throws Exception;
...
// ChannelInitializer 添加成功后,會回調(diào)到handlerAdded()接口
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 標(biāo)記ctx為true,且之前沒有標(biāo)記過。防止重復(fù)執(zhí)行
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
// 調(diào)用initChannel方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// 最終會刪除 ChannelInitializer 實例
remove(ctx);
}
return true;
}
return false;
}
// 刪除 ChannelInitializer 實例
private void remove(ChannelHandlerContext ctx) {
try {
// 獲取 Pipeline
ChannelPipeline pipeline = ctx.pipeline();
// 從 Pipeline 中返回 ChannelInitializer 實例
if (pipeline.context(this) != null) {
// 刪除 ChannelInitializer 實例
// 刪除邏輯請看下一小節(jié)
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
}
遍歷 ChannelHandlerContext 節(jié)點查詢出ChannelHandler實例
public class DefaultChannelPipeline implements ChannelPipeline {
...
// 通過handler獲取ChannelHandlerContext
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
...
}
Pipeline中除了addLast方法外, 還有addFirst、addBefore、addAfter等方法,邏輯類似,可以自行研究學(xué)習(xí)。
Pipeline 節(jié)點刪除
上面,我們講了Pipeline節(jié)點的添加,這小結(jié)我們看看Pipeline節(jié)點的刪除功能。
netty 有個最大的特性之一就是Handler可插拔,做到動態(tài)編織pipeline,比如在首次建立連接的時候,需要通過進行權(quán)限認(rèn)證,在認(rèn)證通過之后,就可以將此context移除,下次pipeline在傳播事件的時候就就不會調(diào)用到權(quán)限認(rèn)證處理器。
下面是權(quán)限認(rèn)證Handler最簡單的實現(xiàn),第一個數(shù)據(jù)包傳來的是認(rèn)證信息,如果校驗通過,就刪除此Handler,否則,直接關(guān)閉連接
// 鑒權(quán)Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
...
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("AuthHandler has been removed ! ");
}
}
我們來看看 DefaultChannelPipeline 中的 remove 方法:
public class DefaultChannelPipeline implements ChannelPipeline {
...
// 從Pipeline中刪除ChannelHandler
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
...
// 獲取 ChannelHandler ,獲取不到就拋出異常
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
...
// 刪除
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
// ctx不能為heand與tail
assert ctx != head && ctx != tail;
synchronized (this) {
// 從pipeline中刪除ChannelHandlerContext節(jié)點
remove0(ctx);
// 如果為false,則表明channel還沒有注冊到eventloop上
// 在刪除這種場景下,我們先添加一個Task,一旦channel注冊成功就會調(diào)用這個Task,這個Task就會立即調(diào)用ChannelHandler.handlerRemoved(...)方法,來從pipeline中刪除context。
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
// 回調(diào) handlerRemoved 方法
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
// 回調(diào) handlerRemoved 方法
callHandlerRemoved0(ctx);
return ctx;
}
...
// 刪除節(jié)點 ChannelHandlerContext
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
...
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
// 回調(diào) handlerRemoved 方法
// 也就是我們前面例子 AuthHandler 中的 handlerRemoved() 方法
ctx.handler().handlerRemoved(ctx);
} finally {
// 設(shè)置為ctx 狀態(tài)為 REMOVE_COMPLETE
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
...
}
好了, 刪除的邏輯就分析到這里了。
小結(jié)
這一講我們分析了Pipeline的創(chuàng)建過程,了解Pipeline中的鏈表結(jié)構(gòu)以及每個節(jié)點的數(shù)據(jù)結(jié)構(gòu)。還分析了Pipeline是如何添加節(jié)點的,又是如何刪除節(jié)點的。接下來 ,我們會分析Pipeline如何進行事件傳播的。