inbound 事件和 outbound 事件的流向是不一樣的, inbound 事件的流行是從下至上, 而 outbound 剛好相反, 是從上到下. 并且 inbound 的傳遞方式是通過(guò)調(diào)用相應(yīng)的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過(guò)調(diào)用 ChannelHandlerContext.OUT_EVT() 方法.
例如 ChannelHandlerContext.fireChannelRegistered() 調(diào)用會(huì)發(fā)送一個(gè) ChannelRegistered 的 inbound 給下一個(gè)ChannelHandlerContext, 而 ChannelHandlerContext.bind 調(diào)用會(huì)發(fā)送一個(gè) bind 的 outbound 事件給 下一個(gè) ChannelHandlerContext.
Inbound 事件傳播方法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Oubound 事件傳輸方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
注意, 如果我們捕獲了一個(gè)事件, 并且想讓這個(gè)事件繼續(xù)傳遞下去, 那么需要調(diào)用 Context 相應(yīng)的傳播方法.
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
PipeLine中事件傳遞
Outbound 事件都是請(qǐng)求事件(request event), 即請(qǐng)求某件事情的發(fā)生, 然后通過(guò) Outbound 事件進(jìn)行通知.
Outbound 事件的傳播方向是 tail -> customContext -> head.
鏈接請(qǐng)求
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
這樣形成一個(gè)鏈條處理
Inbound 事件是一個(gè)通知事件, 即某件事已經(jīng)發(fā)生了, 然后通過(guò) Inbound 事件進(jìn)行通知. Inbound 通常發(fā)生在 Channel 的狀態(tài)的改變或 IO 事件就緒.
Context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive
總結(jié)
對(duì)于 Outbound事件:
Outbound 事件是請(qǐng)求事件(由 Connect 發(fā)起一個(gè)請(qǐng)求, 并最終由 unsafe 處理這個(gè)請(qǐng)求)
Outbound 事件的發(fā)起者是 Channel
Outbound 事件的處理者是 unsafe
Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.
在 ChannelHandler 中處理事件時(shí), 如果這個(gè) Handler 不是最后一個(gè) Hnalder, 則需要調(diào)用 ctx.xxx (例如 ctx.connect) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會(huì)提前終止.
Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
對(duì)于 Inbound 事件:
Inbound 事件是通知事件, 當(dāng)某件事情已經(jīng)就緒后, 通知上層.
Inbound 事件發(fā)起者是 unsafe
Inbound 事件的處理者是 Channel, 如果用戶沒(méi)有實(shí)現(xiàn)自定義的處理方法, 那么Inbound 事件默認(rèn)的處理者是 TailContext, 并且其處理方法是空實(shí)現(xiàn).
Inbound 事件在 Pipeline 中傳輸方向是 head -> tail
在 ChannelHandler 中處理事件時(shí), 如果這個(gè) Handler 不是最后一個(gè) Hnalder, 則需要調(diào)用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會(huì)提前終止.
Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT
如何區(qū)分是In還是Out事件?
ch.pipeline().addLast→DefaultChannelHandlerContext newCtx =new DefaultChannelHandlerContext(this, invoker, name, handler);→ skipFlags = skipFlags(handler);
skipFlags字段標(biāo)識(shí)跳過(guò)/捕獲那個(gè)事件。
5.0版本中不分開(kāi)區(qū)分In/Out事件了,全部都繼承ChannelHandlerAdapter,需要處理什么方法,重寫什么方法就可以。
設(shè)置標(biāo)識(shí)的skipFlags0方法部分源碼。
/**
* Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API.
*/
private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
int flags = 0;
try {
if (handlerType.getMethod(
"handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_HANDLER_ADDED;
}
if (handlerType.getMethod(
"handlerRemoved", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_HANDLER_REMOVED;
}
if (handlerType.getMethod(
"exceptionCaught", ChannelHandlerContext.class, Throwable.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_EXCEPTION_CAUGHT;
}
if (handlerType.getMethod(
"channelRegistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_REGISTERED;
}
if (handlerType.getMethod(
"channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_ACTIVE;
}
if (handlerType.getMethod(
"channelInactive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_INACTIVE;
}
if (handlerType.getMethod(
"channelRead", ChannelHandlerContext.class, Object.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_CHANNEL_READ;
}
.......
return flags;
}
在父類ChannelHandlerAdapter中所有的方法都添加了@Skip注解,子類繼承它,重寫方法后,該方法的@Skip注解即失效。
handlerType.getMethod( "channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)方法如果有@Skip注解的話,將flags對(duì)應(yīng)的位置1。即如果重寫了的話對(duì)應(yīng)位是0
最后有事件時(shí)fireChannelRead等函數(shù)會(huì)調(diào)用
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
不斷循環(huán),找到最近的一個(gè)Handler處理。
private DefaultChannelHandlerContext findContextInbound(int mask) {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.skipFlags & mask) != 0);
return ctx;
}
這個(gè)循環(huán)是當(dāng)對(duì)應(yīng)位是0時(shí),停止循環(huán)。需要處理。這樣處理一直到head/tail。