Netty 權(quán)威指南筆記(七):ChannelPipeline 和 ChannelHandler 源碼分析
文中源碼版本為 Netty4.1。
概述
Netty 的 ChannelPipeline 和 ChannelHandler 機(jī)制類似于 Servlet 和 Filter 過(guò)濾器,這類攔截器實(shí)際上是職責(zé)鏈模式的一種變形,主要是為了方便事件的攔截和用戶業(yè)務(wù)邏輯的定制。
Servlet Filter 過(guò)濾器提供了一種面向?qū)ο蟮哪K化機(jī)制,用來(lái)將公共人物封裝到可插入的組件中。這些組件通過(guò) Web 部署配置文件(web.xml)進(jìn)行聲明,無(wú)須改動(dòng)代碼即可添加和刪除過(guò)濾器??梢詫?duì)應(yīng)于程序 Servlet 提供的核心功能進(jìn)行補(bǔ)充,而不破壞原有的功能。
Netty 的 Channel 過(guò)濾器實(shí)現(xiàn)原理與 Servlet Filter 機(jī)制一致,它將 Channel 的數(shù)據(jù)管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動(dòng)和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來(lái)對(duì) I/O 事件進(jìn)行具體的攔截和處理,可以方便地通過(guò)新增和刪除 ChannelHandler 來(lái)實(shí)現(xiàn)不同業(yè)務(wù)邏輯的定制,能夠?qū)崿F(xiàn)對(duì)修改封閉和對(duì)擴(kuò)展到支持。
ChannelPipeline 源碼分析
ChannelHandler 雙向鏈表
在 TimeServer 程序 中,調(diào)用了 ChannelPipeline 的 addLast 方法來(lái)添加 ChannelHandler。那么 ChannelHandler 在其中是如何存儲(chǔ)的呢?
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 2014)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(16));
socketChannel.pipeline().addLast(new TimeServerHandler());
}
});
我們看一下 ChannelPipeline 的成員變量,前兩個(gè)就是 ChannelHandler 鏈表的首尾引用,其類型是 AbstractChannelHandlerContext,該類主要包含一個(gè)雙向鏈表節(jié)點(diǎn)的前置和后置節(jié)點(diǎn)引用 prev、next,以及數(shù)據(jù)引用 handler,相當(dāng)于鏈表數(shù)據(jù)結(jié)構(gòu)中的 Node 節(jié)點(diǎn)。
// ChannelHandler 首位指針
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
// pipeline 所屬 channel
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
AbstractChannelHandlerContext 類主要成員變量:
// in AbstractChannelHandlerContext 抽象類
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
// DefaultChannelHandlerContext 實(shí)現(xiàn)類
private final ChannelHandler handler;
TimeServer 程序中調(diào)用的 addLast 方法源碼如下:
- 首先進(jìn)行了能否被共享的檢查。
- 然后構(gòu)建了 AbstractChannelHandlerContext 節(jié)點(diǎn),并加入到了鏈表尾部。
- 如果 channel 尚未注冊(cè)到 EventLoop,就添加一個(gè)任務(wù)到 PendingHandlerCallback 上,后續(xù)注冊(cè)完畢,再調(diào)用 ChannelHandler.handlerAdded。
- 如果已經(jīng)注冊(cè),馬上調(diào)用 callHandlerAdded0 方法來(lái)執(zhí)行 ChannelHandler.handlerAdded。
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 檢查,若不是 Sharable,而且已經(jīng)被添加到其他 pipeline,則拋出異常
checkMultiplicity(handler);
// 構(gòu)建 AbstractChannelHandlerContext 節(jié)點(diǎn)
newCtx = newContext(group, filterName(name, handler), handler);
// 添加到鏈表尾部
addLast0(newCtx);
// registered 為 false 表示 channel 尚未注冊(cè)到 EventLoop 上。
// 添加一個(gè)任務(wù)到 PendingHandlerCallback 上,后續(xù)注冊(cè)完畢,再調(diào)用 ChannelHandler.handlerAdded
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// registered 為 true,則立即調(diào)用 ChannelHandler.handlerAdded
EventExecutor executor = newCtx.executor();
// inEvent 用于判斷當(dāng)前線程是否是 EventLoop 線程。執(zhí)行 ChannelHandler 時(shí),必須在對(duì)應(yīng)的 EventLoop 線程池中執(zhí)行。
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
// 使用 AbstractChannelHandlerContext 包裹 ChannelHandler
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 將新節(jié)點(diǎn)插入鏈表尾部
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
事件處理流程
Netty 中的事件分為 inbound 事件和 outbound 事件。inbound 事件通常由 I/O 線程觸發(fā),比如:
- 注冊(cè)事件 fireChannelRegistered。
- 連接建立事件 fireChannelActive。
- 讀事件和讀完成事件 fireChannelRead、fireChannelReadComplete。
- 異常通知事件 fireExceptionCaught。
- 用戶自定義事件 fireUserEventTriggered。
- Channel 可寫(xiě)狀態(tài)變化事件 fireChannelWritabilityChanged。
- 連接關(guān)閉事件 fireChannelInactive。
outbound 事件通常是由用戶主動(dòng)出發(fā)的 I/O 事件,比如:
- 端口綁定 bind。
- 連接服務(wù)端 connect。
- 寫(xiě)事件 write。
- 刷新時(shí)間 flush。
- 讀事件 read。
- 主動(dòng)斷開(kāi)連接 disconnect。
- 關(guān)閉 channel 事件 close。
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
看代碼我們發(fā)現(xiàn),inbound 事件是從 HeadContext 開(kāi)始處理的,而 outbound 事件都是由 TailContext 首先處理的。其中的原因是,HeadContext 負(fù)責(zé)與 NIO 底層的 SocketChannel、ServerSocketChannel 進(jìn)行交互(通過(guò) Unsafe 類)。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
}
下面我們以讀事件 fireChannelRead 為例看一下其處理流程,在 DefaultChannelPipeline 中調(diào)用了 AbstractChannelHandlerContext 類的 invokeChannelRead 方法,其源碼如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// 如果 msg 實(shí)現(xiàn)了 ReferenceCounted 接口,進(jìn)行處理。
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 調(diào)用 invokeChannelRead 方法
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
// 先調(diào)用 channelRead 方法,再 fireChannelRead 觸發(fā)下一個(gè)節(jié)點(diǎn)的 channelRead 方法
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
// inbound 事件中的下一個(gè)節(jié)點(diǎn)是本節(jié)點(diǎn) next 引用所指節(jié)點(diǎn),而 outbound 事件相反。
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
ChannelHandler 源碼分析
ChannelHandler 類似于 Servlet 的 Filter 過(guò)濾器,負(fù)責(zé)對(duì) I/O 事件進(jìn)行攔截和處理,可以選擇性地處理,也可以透?jìng)骱徒K止事件的傳遞?;?ChannelHandler 接口,用戶可以方便地進(jìn)行業(yè)務(wù)邏輯定制,比如打印日志,統(tǒng)一封裝異常信息等。
類圖
ChannelHandler 類圖如下所示:
前面提到 Netty 事件分為 inbound 和 outbound 兩類,分別對(duì)應(yīng) ChannelInboundHandler 和 ChannelOutboundHandler,它們的公共父類就是 ChannelHandler。
ChannelHandler、ChannelInboundHandler 和 ChannelOutboundHandler 接口中,提供了許多方法。在實(shí)際使用中,用戶往往只需要其中的一兩個(gè)。為了方便用戶使用,有幾個(gè)抽象類(ChannelHandlerAdapter、ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter)提供了一些默認(rèn)實(shí)現(xiàn),如此用戶只需要實(shí)現(xiàn)自己關(guān)心的方法便可。
類圖倒數(shù)第二層提供了一些編解碼器的抽象類,用戶可以在此基礎(chǔ)上進(jìn)行擴(kuò)展。最后一層是幾種常見(jiàn)的編解碼器。
| 編解碼器 | 類型 | 功能 |
|---|---|---|
| MessageToMessageEncoder | outbound | 從一個(gè)對(duì)象到另一個(gè)對(duì)象的轉(zhuǎn)換 |
| MessageToByteEncoder | outbound | 從對(duì)象到 ByteBuf 的轉(zhuǎn)換 |
| LengthFieldPrepender | outbound | 在消息體前面追加消息長(zhǎng)度的編碼器 |
| ProtobufVarint32LengthFieldPrepender | outbound | 給 protobuf 字節(jié)流添加描述消息長(zhǎng)度的消息頭 |
| MessageToMessageDecoder | inbound | 從對(duì)象到對(duì)象的解碼器 |
| ByteToMessageDecoder | inbound | 從 ByteBuf 到對(duì)象的解碼器 |
| StringDecoder | inbound | 將 ByteBuf 轉(zhuǎn)化成指定編碼的 String |
| FixedLengthFrameDecoder | inbound | 定長(zhǎng)消息解碼器 |
| LengthFieldBasedFrameDecoder | inbound | 消息長(zhǎng)度在位于消息頭的解碼器 |
下面我們選擇幾個(gè)典型類來(lái)解讀其源碼。
ChannelHandler
ChannelHandler 只有少數(shù)幾個(gè)方法,用于處理 ChannelHandler 被添加時(shí)做一些初始化操作,被移除時(shí)做一些銷毀操作,以及異常處理。
除此之外,還有一個(gè)注解 Sharable,用于標(biāo)識(shí)一個(gè) ChannelHandler 實(shí)例可以被多個(gè) ChannelPipeline 共享。
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
ChannelInboundHandler 和 ChannelOutboundHandler 接口中的方法和 “事件處理流程” 那一節(jié)中介紹的 inbound 和 outbound 事件類型基本上可以一一對(duì)應(yīng),這里就不貼代碼分析了。
ChannelHandlerAdapter
ChannelHandlerAdapter 抽象類,提供了 ChannelHandler 接口方法的默認(rèn)實(shí)現(xiàn),以及根據(jù)注解判斷該類是否可共享的 isSharable 方法。
handlerAdded 和 handlerRemoved 的默認(rèn)實(shí)現(xiàn)都是空。
public abstract class ChannelHandlerAdapter implements ChannelHandler {
/**
* Return {@code true} if the implementation is {@link Sharable} and so can be added
* to different {@link ChannelPipeline}s.
*/
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a >#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
}
ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter 分別提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的默認(rèn)實(shí)現(xiàn)。以 ChannelInboundHandlerAdapter 為例,其大多數(shù)方法的默認(rèn)實(shí)現(xiàn)都是調(diào)用 ChannelHandlerContext 的類似方法,作用為向后傳遞。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
}
ByteToMessageDecoder
該方法提供了將 ByteBuf 轉(zhuǎn)化為對(duì)象的解碼器處理流程,具體的解碼規(guī)則交由子類去實(shí)現(xiàn)。
我們以讀操作 channelRead 為例來(lái)研究一下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// out 是一個(gè)鏈表,存放解碼成功的對(duì)象
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// cumulation 中存放的是上次未處理完的半包消息
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 本次處理,需要把上次遺留的半包和本次數(shù)據(jù)拼接后,一起處理
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 調(diào)用解碼器解碼消息
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 如果有解碼成功的數(shù)據(jù),需要向后傳遞,讓其他 ChannelHandler 繼續(xù)處理
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
// 如果有解碼成功的數(shù)據(jù),需要向后傳遞,讓其他 ChannelHandler 繼續(xù)處理
fireChannelRead(ctx, out, outSize);
out.clear();
// 如果當(dāng)前 ChannelHandler 所屬 ctx 被剔除 pipeline 上下文,就不需要繼續(xù)處理了
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 解碼
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// 設(shè)置解碼器狀態(tài)為正在解碼,避免解碼過(guò)程中另一個(gè)線程調(diào)用了 handlerRemoved 把數(shù)據(jù)銷毀,造成混亂
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
// STATE_HANDLER_REMOVED_PENDING 表示在解碼過(guò)程中,ctx 被移除,需要由當(dāng)前線程來(lái)調(diào)用 handlerRemoved
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
// 具體的消息解碼算法,交給子類實(shí)現(xiàn)
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
FixedLengthFrameDecoder
上一節(jié)我們研究了 ByteToMessageDecoder,本節(jié)研究其最簡(jiǎn)單的一個(gè)實(shí)現(xiàn)類 FixedLengthFrameDecoder。
該類核心是 decode 方法,當(dāng)可讀字節(jié)數(shù)據(jù)大于 frameLength 時(shí),截取前 frameLength 個(gè)字節(jié)為一個(gè) ByteBuf,存入列表 out 中。
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
}