關(guān)于ChannelRead事件的傳播
在自定義handler的時(shí)候,通常要重寫channelRead函數(shù),如果想要將該事件向后傳播(注意,傳播順序與handler添加順序相同),需要調(diào)用fireChannelRead函數(shù),ChannelRead事件便在這里中斷
通常在重寫的channelRead函數(shù)里,有兩種傳播ChannelRead事件的方式
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//第一種
ctx.fireChannelRead(msg);
//第二種
ctx.channel().pipeline().fireChannelRead(msg);
}
這兩種方式的主要區(qū)別在于接下來(lái)傳播的起始位置,非常重要
- 使用第一種方式,事件會(huì)從該節(jié)點(diǎn)開(kāi)始繼續(xù)向后傳播
- 使用第二種方式,事件會(huì)從head節(jié)點(diǎn)開(kāi)始傳播
下面分析源碼來(lái)做說(shuō)明
第一種傳播方式
跟進(jìn)到AbstractChannelHandlerContext#fireChannelRead方法
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
findContextInbound方法就是在尋找下一個(gè)節(jié)點(diǎn),看看這個(gè)方法的代碼
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
繼續(xù)看看skipContext方法
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
可以看到,這里判斷的關(guān)鍵就在executionMask這個(gè)成員變量,而這個(gè)成員變量就在AbstractChannelHandlerContext里被賦值
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor,
String name,
Class<? extends ChannelHandler> handlerClass) {
//省略其他代碼
this.executionMask = mask(handlerClass);
}
mask方法最終調(diào)用到mask0方法
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
//省略部分代碼
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
//省略部分代碼
}
//省略部分代碼
} catch (Exception e) {
//省略部分代碼
}
return mask;
}
isSkippable函數(shù)只有在找不到函數(shù)或者函數(shù)被@Skip注解時(shí)才返回false
可以看到,實(shí)際上executionMask就是用來(lái)記錄handler的類型信息和方法注解信息
skipContext方法實(shí)際上就是在尋找下一個(gè)沒(méi)有用@Skip注解了ChannelRead方法的inbound節(jié)點(diǎn)
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}
繼續(xù)看fireChannelRead函數(shù)里調(diào)用到的invokeChannelRead函數(shù)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
//不在當(dāng)前eventloop,放到異步任務(wù)隊(duì)列里
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//省略
}
} else {
fireChannelRead(msg);
}
}
可以看到,這里就是在調(diào)用下一個(gè)節(jié)點(diǎn)的channelRead方法
第二種傳播方式
跟進(jìn)到DefaultChannelPipeline#fireChannelRead方法
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
繼續(xù)跟進(jìn)invokeChannelRead方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
可以看到,這里就是傳入head節(jié)點(diǎn),從head節(jié)點(diǎn)開(kāi)始向后傳播channelRead事件
資源釋放相關(guān)問(wèn)題
若自定義的handler繼承自ChannelInboundHandlerAdapter,并且在ChannelRead函數(shù)里沒(méi)有將事件向后傳播,那么需要自行調(diào)用函數(shù)處理資源釋放,如下
ReferenceCountUtil.release(msg);
Write事件傳播
write事件的傳播順序與handler的添加順序相反(即最后添加的outboundHandler最先處理write事件)
類似的,在用戶代碼里傳播write事件也有兩種方式
第一種方式,從當(dāng)前節(jié)點(diǎn),往前尋找outbound,繼續(xù)傳播
ctx.write(msg);
第二種方式,從tail節(jié)點(diǎn)開(kāi)始,往前尋找outbound傳播
ctx.channel().write(msg);
如果沒(méi)有中斷,最終write事件會(huì)傳播到head節(jié)點(diǎn),然后head節(jié)點(diǎn)會(huì)調(diào)用unsafe的write方法
異常的傳播
異常的產(chǎn)生
首先,異常是在ChannelRead、ChannelRegister等這些函數(shù)中拋出的,然后在形如invokeChannelXXX(例如invokeChannelRead)中捕獲,例如
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//捕獲異常
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
看看invokeExceptionCaught方法
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
//省略
}
} else {
fireExceptionCaught(cause);
}
}
可以看到,這里調(diào)用exceptionCaught方法處理異常
傳播異常
異常的傳播方向與handler的添加方向一致,并且不區(qū)分是inboundHandler還是outboundHandler(即異??梢詮膇nboundHandler傳播到outboundHandler,反之亦可)
默認(rèn)情況下,如果不重寫exceptionCaught方法,那么會(huì)把該異常繼續(xù)向后傳播,最終會(huì)傳播到tail節(jié)點(diǎn),tail節(jié)點(diǎn)會(huì)打印一條日志表明該異常未被處理
如果重寫了exceptionCaught方法,并且想將該異常繼續(xù)向后傳播,那么需要調(diào)用fireExceptionCaught方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//其他處理代碼
ctx.fireExceptionCaught(cause);
}
關(guān)聯(lián):Netty4中Handler的執(zhí)行順序以及ctx.close() 與 ctx.channel().close()的區(qū)別