7.2.2 ChannelHandlerContext
7.2.2.1 AbstractChannelHandlerContext
AbstractChannelHandlerContext的類簽名如下:
abstract class AbstractChannelHandlerContext extends
DefaultAttributeMap implements ChannelHandlerContext
該類作為其他ChannelHandlerContext的基類,Netty4.0中繼承自DefaultAttributeMap實(shí)現(xiàn)屬性鍵值對(duì)的存儲(chǔ)和獲取,由于此種用法與channel.attr()存在混淆,故不建議使用。確有需要時(shí),請(qǐng)直接使用channel.attr()。
下面介紹其中的關(guān)鍵字段:
// Context形成雙向鏈表,next和prev分別是后繼節(jié)點(diǎn)和前驅(qū)節(jié)點(diǎn)
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private final boolean inbound; // 對(duì)應(yīng)處理器為InboudHandler
private final boolean outbound; // 對(duì)應(yīng)處理器為OutboudHandler
private final DefaultChannelPipeline pipeline;
private final String name; // Context的名稱
private final boolean ordered; // 事件順序標(biāo)記
final EventExecutor executor; // 事件執(zhí)行線程
private volatile int handlerState = INIT; // 狀態(tài)
其中handlerState的字面意思容易使人誤解,為此列出四種狀態(tài)并加以解釋:
// 初始狀態(tài)
private static final int INIT = 0;
// 對(duì)應(yīng)Handler的handlerAdded方法將要被調(diào)用但還未調(diào)用
private static final int ADD_PENDING = 1;
// 對(duì)應(yīng)Handler的handlerAdded方法被調(diào)用
private static final int ADD_COMPLETE = 2;
// 對(duì)應(yīng)Handler的handlerRemoved方法被調(diào)用
private static final int REMOVE_COMPLETE = 3;
AbstractChannelHandlerContext只有一個(gè)構(gòu)造方法:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// 只有執(zhí)行線程為EventLoop或者標(biāo)記為OrderedEventExecutor才是順序的
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
由于Netty將事件抽象為入站事件和出站事件,AbstractChannelHandlerContext對(duì)事件的處理也分為兩類,分別以ChannelRead和read事件為例分析。首先分析ChannelRead事件:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg){
invokeChannelRead(findContextInbound(), msg);
return this;
}
回顧之前的分析,事件在ChannelPipeline中不會(huì)自動(dòng)流動(dòng),入站事件需要調(diào)用fireXXX方法將事件傳遞到下一個(gè)處理器,出站事件需要調(diào)用諸如write方法傳遞給下一個(gè)處理器處理。查找入站處理器的代碼如下:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next; // 入站事件向后查找
} while (!ctx.inbound);
return ctx;
}
代碼十分簡(jiǎn)潔,向雙向鏈表前進(jìn)方向查找到最近的入站處理器即可;可推知,出站事件則向雙向鏈表后退方向查找最近的出站處理器。這也是出站\入站事件傳播方向不同的原因。
找到事件傳播的下一個(gè)處理器后,在處理器中執(zhí)行處理過程的代碼如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next,
final Object msg) {
ObjectUtil.checkNotNull(msg, "msg");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 使用當(dāng)前IO線程執(zhí)行用戶自定義處理
next.invokeChannelRead(msg);
} else {
// 使用用戶定義的線程執(zhí)行處理過程
executor.execute(() -> {next.invokeChannelRead(msg);});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 處理器執(zhí)行用戶自定義的處理過程
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
Netty為用戶提供了方便的線程切換處理,特別是在版本4.1后,可以直接使用JDK提供的線程池。最佳實(shí)踐:在一個(gè)處理器中如果有耗時(shí)較大的業(yè)務(wù)邏輯,可以指定該處理器的處理線程Executor,以避免占用IO線程造成性能損失。
之后再看一下invokeHandler()和notifyHandlerException(t)方法,invokeHandler決定是否調(diào)用處理器,進(jìn)行業(yè)務(wù)邏輯處理:
private boolean invokeHandler() {
// handlerState為volatile變量,存儲(chǔ)為本地變量,以便減少volatile讀
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
可知,當(dāng)一個(gè)處理器還沒有調(diào)用HandlerAdded方法時(shí),只有處理器的執(zhí)行線程是非順序線程池的實(shí)例才能執(zhí)行業(yè)務(wù)處理邏輯;否則必須等待已調(diào)用handlerAdded方法,才能處理業(yè)務(wù)邏輯。這部分的處理,保證了ChannelPipeline的線程安全性,由此用戶可以隨意增加刪除Handler。
notifyHandlerException方法對(duì)處理過程中出現(xiàn)的異常進(jìn)行處理:
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
// 處理異常的過程中出現(xiàn)異常
logger.warn("...)
return;
}
invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
logger.warn("...")
}
} else {
fireExceptionCaught(cause);
}
}
invokeExceptionCaught方法可類比invokeChannelRead方法,其形式一致。由代碼可知,當(dāng)一個(gè)Handler的處理過程出現(xiàn)異常時(shí),會(huì)調(diào)用該Handler的exceptionCaught()方法進(jìn)行處理。注意:異常事件的傳播也是由用戶使用fireExceptionCaught方法控制。
至此,ChannelRead入站事件的處理已分析完畢,read出站事件的處理也可類比,不同的是:出站事件需要反向找出站處理器Handler,不再贅述。
7.2.2.2 DefaultChannelHandlerContext
DefaultChannelHandlerContext是使用Netty時(shí)經(jīng)常使用的事實(shí)Context類,其中的大部分功能已在AbstractChannelHandlerContext中完成,所以該類十分簡(jiǎn)單,其構(gòu)造方法如下:
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
對(duì)出站\入站Handler的判別使用粗暴的instanceof:
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
實(shí)現(xiàn)中引入了一個(gè)新的字段:
private final ChannelHandler handler;
回憶ChannelPipeline,pipeline其中形成Context的雙向鏈表,而處理邏輯則在Handler中。設(shè)計(jì)模式中指出,關(guān)聯(lián)Handler與Context的方法有兩種:組合和繼承。本例中,使用的是組合,下面分析繼承的方法。
7.2.2.3 HeadContext和TailContext
HeadContext和TailContext使用繼承的方式關(guān)聯(lián)Handler,作為ChannelPipeline雙向鏈表的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)。首先分析HeadContext,類簽名如下:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler
由于既繼承OutboundHandler又繼承InboundHandler,可知HeadContext需要同時(shí)處理出站事件和入站事件。以read和ChannelRead事件為例,當(dāng)用戶調(diào)用read出站事件時(shí),是在告訴IO線程:我需要向網(wǎng)絡(luò)讀數(shù)據(jù)做處理;當(dāng)IO線程讀到數(shù)據(jù)后,則使用ChannelRead事件通知用戶:已讀取到數(shù)據(jù)。read()方法代碼如下:
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
其中unsafe是Netty內(nèi)部實(shí)現(xiàn)底層IO細(xì)節(jié)的類,beginRead()方法設(shè)定底層Selector關(guān)心read事件,如果read事件就緒,則會(huì)調(diào)用unsafe.read()方法讀取數(shù)據(jù),然后調(diào)用channelPipe.fireChannelRead()方法通知用戶已讀取到數(shù)據(jù),可進(jìn)行業(yè)務(wù)處理。 HeadContext的ChannelRead()方法代碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
可知HeadContext只是簡(jiǎn)單的將事件傳播到下一個(gè)入站處理器。
也許你會(huì)有疑問:自己根本沒使用過read出站事件,為什么數(shù)據(jù)自動(dòng)讀取了呢?這是因?yàn)槟J(rèn)設(shè)置自動(dòng)讀取autoRead。通過HeadContext的channelActive代碼分析:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead(); // 自動(dòng)讀取
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
當(dāng)channel激活后,如果配置了自動(dòng)讀取,則會(huì)調(diào)用channel.read(),注意這將在ChannelPipeline中傳播出站事件read,最終傳播到HeadContext的read()方法,最后調(diào)用unsafe.beginRead()設(shè)置關(guān)心底層read事件,從而實(shí)現(xiàn)激活后自動(dòng)讀取數(shù)據(jù)。而當(dāng)讀取完一組數(shù)據(jù)后,在channelReadComplete()方法中繼續(xù)下一組數(shù)據(jù)的自動(dòng)讀取。
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead(); // 自動(dòng)讀取下一組數(shù)據(jù)
}
分析完HeadContext,再分析TailContext。首先看方法簽名:
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler
TailContext只繼承入站事件處理器,作為雙向鏈表的尾節(jié)點(diǎn),它對(duì)入站事件的處理極其簡(jiǎn)單,那就是:什么也不做。但有兩個(gè)方法除外:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
onUnhandledInboundException(cause);
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
而這也僅僅是做個(gè)簡(jiǎn)單提示,告知用戶這兩個(gè)方法必須在ChannelPipeline中有處理器處理,否則很有可能是代碼出了問題:
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
如果開啟了日志的Debug級(jí)別,不難相信,你已經(jīng)接觸過這樣的提示。