上下文ChannelHandlerContext 的最大作用就是向它所屬管道ChannelPipeline 的上游或下游傳遞事件。
那么它是如何實(shí)現(xiàn)的呢?
這就要看
ChannelHandlerContext接口的實(shí)現(xiàn)類(lèi)AbstractChannelHandlerContext。
一. 成員屬性
1.1 雙向鏈表
// 組成雙向鏈表
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
通過(guò)
next和prev組成一個(gè)雙向鏈表,這樣就可以向上或者向下查找管道中的其他處理器上下文。
1.2 上下文狀態(tài)
/**
* ChannelHandler.handlerAdded(ChannelHandlerContext) 即將被調(diào)用。
*/
private static final int ADD_PENDING = 1;
/**
* ChannelHandler.handlerAdded(ChannelHandlerContext) 已經(jīng)被調(diào)用。
*/
private static final int ADD_COMPLETE = 2;
/**
* ChannelHandler.handlerRemoved(ChannelHandlerContext) 已經(jīng)被調(diào)用。
*/
private static final int REMOVE_COMPLETE = 3;
/**
* 初始狀態(tài),
* ChannelHandler.handlerAdded(ChannelHandlerContext) 和
* ChannelHandler.handlerRemoved(ChannelHandlerContext) 都沒(méi)有被調(diào)用。
*
*/
private static final int INIT = 0;
private volatile int handlerState = INIT;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
- 狀態(tài)一共分為
4種:INIT,ADD_PENDING,ADD_COMPLETE和REMOVE_COMPLETE。- 通過(guò)
handlerState和HANDLER_STATE_UPDATER, 采用CAS的方式原子化更新屬性,這樣就不用加鎖處理并發(fā)問(wèn)題。
1.3 不可變的屬性
即被
final修飾的屬性,在創(chuàng)建ChannelHandlerContext對(duì)象時(shí)就需要賦值。
-
DefaultChannelPipeline pipeline當(dāng)前上下文所屬的管道
pipeline, 而且它的類(lèi)型就定死了是DefaultChannelPipeline類(lèi)。 -
name表示上下文的名稱(chēng) -
ordered表示上下文的執(zhí)行器是不是有序的 -
executor上下文的執(zhí)行器如果這個(gè)值是
null,那么上下文的執(zhí)行器用的就是所屬通道Channel的事件輪詢器。 -
executionMask表示事件執(zhí)行器ChannelHandler的執(zhí)行標(biāo)記用來(lái)判斷是否跳過(guò)執(zhí)行器
ChannelHandler的某些事件處理方法。
二. 重要方法
2.1 構(gòu)造方法
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
// 調(diào)用 ChannelHandlerMask.mask(handlerClass) 方法,獲取執(zhí)行標(biāo)記
this.executionMask = mask(handlerClass);
// 表示上下文的事件執(zhí)行器是不是有序的,即以有序/串行的方式處理所有提交的任務(wù)。
// executor == null,說(shuō)明當(dāng)前上下文用的是通道Channel的 channel().eventLoop(),這個(gè)肯定是有序的
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
這個(gè)是
AbstractChannelHandlerContext唯一的構(gòu)造方法,基本上賦值了它所有的final屬性。
2.2 狀態(tài)相關(guān)方法
-
等待添加
final void setAddPending() { boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING); // 這應(yīng)該總是為真,因?yàn)樗仨氃?setAddComplete()或 setRemoved()之前調(diào)用。 assert updated; }將上下文的狀態(tài)變成等待添加狀態(tài)
ADD_PENDING。 -
已添加
final boolean setAddComplete() { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE) { return false; } // 確保當(dāng) handlerState 已經(jīng)是REMOVE_COMPLETE時(shí),我們永遠(yuǎn)不會(huì)更新。 // oldState 通常是 ADD_PENDING,但當(dāng)使用不公開(kāi)排序保證的 EventExecutor 時(shí),也可能是 REMOVE_COMPLETE。 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return true; } } }通過(guò)
for (;;)死循環(huán),采用CAS的方法,將上下文的狀態(tài)變成已添加ADD_COMPLETE。
只有已添加狀態(tài)上下文的事件處理器ChannelHandler才能處理事件。final void callHandlerAdded() throws Exception { // 我們必須在調(diào)用 handlerAdded 之前調(diào)用 setAddComplete,將狀態(tài)改成 REMOVE_COMPLETE, // 否則這個(gè)上下文對(duì)應(yīng)的事件處理器將不會(huì)處理任何事件,因?yàn)闋顟B(tài)不允許。 if (setAddComplete()) { handler().handlerAdded(this); } }將狀態(tài)變成已添加,如果設(shè)置成功就調(diào)用
handler().handlerAdded(this)方法,通知事件處理器已經(jīng)被添加到管道上了。 -
已刪除
final void setRemoved() { handlerState = REMOVE_COMPLETE; } final void callHandlerRemoved() throws Exception { try { // 只有 handlerState 狀態(tài)變成 ADD_COMPLETE 時(shí),才會(huì)調(diào)用 handler().handlerRemoved(this); // 也就是說(shuō) 只有之前調(diào)用過(guò) handlerAdded(…)方法,之后才會(huì)調(diào)用handlerRemoved(…) 方法。 if (handlerState == ADD_COMPLETE) { handler().handlerRemoved(this); } } finally { // 在任何情況下都要將該上下文標(biāo)記為已刪除 setRemoved(); } }- 將上下文狀態(tài)變成已刪除。
- 如果上下文狀態(tài)之前的狀態(tài)是已添加,那么就會(huì)調(diào)用
handler().handlerRemoved(this)方法。 - 也就是說(shuō),只有之前調(diào)用過(guò)
handlerAdded(…)方法,之后才會(huì)調(diào)用handlerRemoved(…)方法。
2.3 發(fā)送IO事件
2.3.1 發(fā)送入站 IO事件
/**
* 發(fā)送注冊(cè)的IO事件
*/
@Override
public ChannelHandlerContext fireChannelRegistered() {
// 通過(guò) findContextInbound 方法找到下一個(gè)入站處理器上下文
// 通過(guò) invokeChannelRegistered 方法,調(diào)用下一個(gè)入站處理器的對(duì)應(yīng)事件處理方法
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
/**
* 保證在上下文 next 的事件執(zhí)行器線程中調(diào)用對(duì)應(yīng)方法
*/
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 如果當(dāng)前線程是 next 事件執(zhí)行器EventExecutor線程,直接調(diào)用
next.invokeChannelRegistered();
} else {
// 如果當(dāng)前線程不是 next 事件執(zhí)行器線程,
// 那么就通過(guò)事件執(zhí)行器EventExecutor 的execute方法,
// 保證在執(zhí)行器線程調(diào)用
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
/**
* 調(diào)用該上下文擁有的事件處理器 ChannelHandler 的對(duì)應(yīng)方法
*/
private void invokeChannelRegistered() {
// 判斷當(dāng)前上下文有沒(méi)有已經(jīng)添加到管道上了
if (invokeHandler()) {
// 如果已經(jīng)添加完成了,就調(diào)用對(duì)應(yīng)事件處理器方法
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// 如果沒(méi)有添加完成,即狀態(tài)不是 ADD_COMPLETE,
// 就繼續(xù)調(diào)用 fire 的方法,讓管道下一個(gè)處理器處理
fireChannelRegistered();
}
}
我們以注冊(cè)的 IO 事件為例,發(fā)現(xiàn)調(diào)用過(guò)程:
- 先通過(guò)
findContextInbound方法,找到下一個(gè)入站處理器上下文。 - 再調(diào)用
invokeChannel...系列靜態(tài)方法,保證處理器方法的調(diào)用是在它的上下文事件執(zhí)行器EventExecutor線程中。 - 最后通過(guò)
invokeChannel...系列成員方法,調(diào)用該上下文擁有的事件處理器ChannelHandler的對(duì)應(yīng)方法。要通過(guò)
invokeHandler()方法判斷該上下文是否已經(jīng)添加到管道上,只有已經(jīng)完全添加的上下文才能處理事件。
2.3.2 發(fā)送出站IO操作
/**
* 發(fā)送綁定 IO 操作
*/
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
// 檢查 promise 是否有效
if (isNotValidPromise(promise, false)) {
// 返回 true, 說(shuō)明已取消,直接返回,不做下面的處理
return promise;
}
// 通過(guò) findContextOutbound 方法找到上一個(gè)出站處理器上下文
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
// 保證在上下文 next 的事件執(zhí)行器線程中調(diào)用對(duì)應(yīng)方法
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
// 判斷當(dāng)前上下文有沒(méi)有已經(jīng)添加到管道上了
if (invokeHandler()) {
// 如果已經(jīng)添加完成了,就調(diào)用對(duì)應(yīng)事件處理器方法
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
// 如果沒(méi)有添加完成,即狀態(tài)不是 ADD_COMPLETE,
// 就繼續(xù)調(diào)用 fire 的方法,讓管道下一個(gè)處理器處理
bind(localAddress, promise);
}
}
我們以綁定 IO 事件為例,你會(huì)發(fā)現(xiàn)調(diào)用流程和入站事件差不多,只不過(guò)出站事件沒(méi)有中間那個(gè)靜態(tài)方法。
2.3.3 invokeTasks 作用
你會(huì)發(fā)現(xiàn)有的入站和出站事件的處理,與上面的流程不一樣,有四個(gè)事件:
-
channelReadComplete讀完成的入站事件 -
channelWritabilityChanged可讀狀態(tài)改變的入站事件 -
read設(shè)置讀的出站事件 -
flush刷新數(shù)據(jù)的出站事件
public ChannelHandlerContext fireChannelReadComplete() {
invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
return this;
}
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeChannelReadCompleteTask);
}
}
private void invokeChannelReadComplete() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelReadComplete();
}
}
你會(huì)發(fā)現(xiàn)發(fā)送讀完成事件的處理過(guò)程和上面有區(qū)別,不同的是:
- 上面是通過(guò)
executor.execute(new Runnable()),每次都創(chuàng)建新的Runnable對(duì)象。- 而這里是通過(guò)一個(gè)
invokeTasks對(duì)象,不用每次都創(chuàng)建新的Runnable對(duì)象,減少對(duì)象創(chuàng)建的實(shí)例。
private static final class Tasks {
private final AbstractChannelHandlerContext next;
// `channelReadComplete` 讀完成的入站事件
private final Runnable invokeChannelReadCompleteTask = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
// `read` 設(shè)置讀的出站事件
private final Runnable invokeReadTask = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
// `channelWritabilityChanged` 可讀狀態(tài)改變的入站事件
private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
@Override
public void run() {
next.invokeChannelWritabilityChanged();
}
};
// `flush` 刷新數(shù)據(jù)的出站事件
private final Runnable invokeFlushTask = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
Tasks(AbstractChannelHandlerContext next) {
this.next = next;
}
}
為什么這四個(gè)事件可以呢?
- 你仔細(xì)觀察,這四個(gè)事件都沒(méi)有參數(shù),也就是說(shuō)每次調(diào)用的時(shí)候,沒(méi)有變化。
- 也許你會(huì)說(shuō)
channelRegistered,channelUnregistered,channelActive和channelInactive這幾個(gè)事件也沒(méi)有參數(shù)啊,為什么它們不這么照著上面的處理呢?主要是因?yàn)檫@幾個(gè)比較特殊,它們只會(huì)調(diào)用一次,所以沒(méi)有必要那么處理。
2.3.4 寫(xiě)操作
寫(xiě)操作的處理過(guò)程也和上面的不是太一樣,多了點(diǎn)特別處理。
我們知道寫(xiě)操作
write和 刷新操作flush是一對(duì)的,不調(diào)用刷新的話,寫(xiě)入的數(shù)據(jù)永遠(yuǎn)不會(huì)發(fā)送到遠(yuǎn)端。
2.3.4.1 刷新操作
/**
* 在管道中尋找下一個(gè)事件處理器 進(jìn)行刷新的IO操作
*/
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
// 因?yàn)樗⑿虏僮鳑](méi)有參數(shù),不用每次都創(chuàng)建新的 Runnable 實(shí)例,
// 直接復(fù)用
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
}
return this;
}
/**
* 刷新
*/
private void invokeFlush() {
if (invokeHandler()) {
// 調(diào)用當(dāng)前上下文對(duì)應(yīng)處理器的 flush 處理方法
invokeFlush0();
} else {
// 當(dāng)前上下文對(duì)應(yīng)的處理器不處理,
// 繼續(xù)在管道中尋找下一個(gè)事件處理器處理
flush();
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
- 尋找下一個(gè)事件處理器
- 保證
invokeFlush方法調(diào)用在下一個(gè)事件處理器的執(zhí)行器線程中;又因?yàn)樗⑿虏僮鳑](méi)有額外參數(shù),不用每次都創(chuàng)建新的Runnable實(shí)例,直接復(fù)用invokeTasks- 通過(guò)
invokeHandler()判斷是否跳過(guò)當(dāng)前事件處理器的處理方法。
2.3.4.2 寫(xiě)入操作
/**
* 在管道中尋找下一個(gè)事件處理器 進(jìn)行寫(xiě)入的IO操作
*/
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
// 寫(xiě)入消息,不刷新
write(msg, false, promise);
return promise;
}
/**
* 在管道中尋找下一個(gè)事件處理器 進(jìn)行寫(xiě)入并刷新的IO操作
*/
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
// 寫(xiě)入消息并且刷新
write(msg, true, promise);
return promise;
}
提供寫(xiě)入和寫(xiě)入并刷新兩個(gè)方法。它們都調(diào)用了
write(Object, boolean, ChannelPromise)方法。
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
// 檢查 promise 是否有效
if (isNotValidPromise(promise, true)) {
// 回收引用
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
// 回收引用
ReferenceCountUtil.release(msg);
throw e;
}
// 通過(guò) findContextOutbound 方法找到上一個(gè)出站處理器上下文
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
// 只是添加附加信息,在內(nèi)存泄露的時(shí)候,可以獲取到這個(gè)附加信息
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 在當(dāng)前上下文線程中
if (flush) {
// 如果包括刷新,就調(diào)用 invokeWriteAndFlush 方法
next.invokeWriteAndFlush(m, promise);
} else {
// 如果不包括刷新,就調(diào)用 invokeWrite 方法
next.invokeWrite(m, promise);
}
} else {
// 將寫(xiě)操作封裝成一個(gè) WriteTask,也是一個(gè) Runnable 子類(lèi)。
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
- 這個(gè)方法流程和之前的流程沒(méi)有區(qū)別,唯一不同的就是它創(chuàng)建了一個(gè)
WriteTask對(duì)象,而不是一個(gè)簡(jiǎn)單的Runnable實(shí)例。- 因?yàn)閷?xiě)操作比較特殊,我們需要控制等待寫(xiě)入數(shù)據(jù)的大小,當(dāng)?shù)却龑?xiě)入數(shù)據(jù)太多,那么發(fā)送
channelWritabilityChanged入站IO事件,告訴用戶當(dāng)前通道不可寫(xiě)了,先將緩沖區(qū)數(shù)據(jù)發(fā)送到遠(yuǎn)端。- 如何知道等待寫(xiě)入數(shù)據(jù)的大?。烤褪峭ㄟ^(guò)這個(gè)
WriteTask類(lèi)事件,計(jì)算寫(xiě)入對(duì)象m的大小,累加加入到寫(xiě)入數(shù)據(jù)的大小中。
2.3.5 查找下一個(gè)處理器上下文
-
findContextInbound(int mask)private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.next; // 通過(guò) MASK_ONLY_INBOUND 表示查找的是入站事件 // mask 代表處理的方法,是否需要被跳過(guò) } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); return ctx; } -
findContextOutboundprivate AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.prev; // 通過(guò) MASK_ONLY_OUTBOUND 表示查找的是出站事件 // mask 代表處理的方法,是否需要被跳過(guò) } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND)); return ctx; } -
skipContextprivate static boolean skipContext( AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) { // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT // 這個(gè)方法返回 true,表示跳過(guò)這個(gè) ctx,繼續(xù)從管道中查找下一個(gè)。 // 因?yàn)槭褂玫氖?|| 或邏輯符,兩個(gè)條件只要有一個(gè)為 true,就返回 true。 // (ctx.executionMask & (onlyMask | mask)) == 0 表示這個(gè) ctx 屬于入站事件還是出站事件 // (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0) // 只有當(dāng) EventExecutor 相同的時(shí)候,才會(huì)考慮是否跳過(guò) ctx,因?yàn)槲覀円WC事件處理的順序。 return (ctx.executionMask & (onlyMask | mask)) == 0 || // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload // everything to preserve ordering. // See https://github.com/netty/netty/issues/10067 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0); }這個(gè)方法代碼很簡(jiǎn)單,但是邏輯很有意思。
- 方法的返回值表示是否需要跳過(guò)這個(gè)上下文
ctx,返回true則跳過(guò)。 -
(ctx.executionMask & (onlyMask | mask)) == 0,如果等于true,那就表明當(dāng)前這個(gè)上下文的執(zhí)行標(biāo)記executionMask就沒(méi)有(onlyMask | mask)中的任何方法啊,很容易就判斷它不屬于入站事件或者出站事件。 - 第二個(gè)條件是判斷上下文的執(zhí)行標(biāo)記是否包含這個(gè)方法
ctx.executionMask & mask,如果包含結(jié)果就不是1, 不包含就是0(表示跳過(guò),返回true),所以當(dāng)(ctx.executionMask & mask) == 0的時(shí)候,返回true, 跳過(guò)這個(gè)上下文ctx,尋找下一個(gè)。 - 不過(guò)這里多了一個(gè)
ctx.executor() == currentExecutor判斷, 為了事件處理的順序性,如果事件執(zhí)行器線程不一樣,那么不允許跳過(guò)處理器方法,即使這個(gè)方法被@Skip注解也沒(méi)用。
- 方法的返回值表示是否需要跳過(guò)這個(gè)上下文
2.3.6 invokeHandler
/**
* 盡最大努力檢測(cè) ChannelHandler.handlerAdded(ChannelHandlerContext) 是否被調(diào)用。
* 如果沒(méi)有被調(diào)用則返回false,如果調(diào)用或無(wú)法檢測(cè)返回true。
*
* 如果這個(gè)方法返回false,我們將不調(diào)用ChannelHandler,而只是轉(zhuǎn)發(fā)事件,調(diào)用管道中下一個(gè) ChannelHandler 處理。
*
* 因?yàn)榭赡芄艿繢efaultChannelPipeline已經(jīng)將這個(gè) ChannelHandler放在鏈接列表中,
* 但沒(méi)有調(diào)用 ChannelHandler.handlerAdded(ChannelHandlerContext) 方法,
* 有可能用戶在 ChannelHandler.handlerAdded 中做了一些初始化操作,當(dāng)它沒(méi)有被調(diào)用時(shí),
* 不能將 IO 事件交個(gè)這個(gè) ChannelHandler 處理。
*/
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
一般情況下,必須當(dāng)上下文狀態(tài)是
ADD_COMPLETE才返回true。
但是如果上下文的事件執(zhí)行器是順序的,那么當(dāng)上下文狀態(tài)是ADD_PENDING就可以返回true了。
三. WriteTask 類(lèi)
static final class WriteTask implements Runnable {
// 使用一個(gè)對(duì)象池,復(fù)用 WriteTask 實(shí)例
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
@Override
public WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
});
// 通過(guò)靜態(tài)方法得到 WriteTask 實(shí)例
static WriteTask newInstance(AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise, boolean flush) {
// 從對(duì)象池中獲取 WriteTask 實(shí)例
WriteTask task = RECYCLER.get();
// 初始化
init(task, ctx, msg, promise, flush);
return task;
}
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
private final Handle<WriteTask> handle;
// 當(dāng)前上下文對(duì)象
private AbstractChannelHandlerContext ctx;
// 寫(xiě)入的數(shù)據(jù)對(duì)象
private Object msg;
private ChannelPromise promise;
// 寫(xiě)入數(shù)據(jù)的大小
private int size; // sign bit controls flush
@SuppressWarnings("unchecked")
private WriteTask(Handle<? extends WriteTask> handle) {
this.handle = (Handle<WriteTask>) handle;
}
protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise, boolean flush) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
// 是否需要估算寫(xiě)入數(shù)據(jù)大小
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
// 估算數(shù)據(jù)大小
task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
// 增加等待寫(xiě)入數(shù)據(jù)的大小
ctx.pipeline.incrementPendingOutboundBytes(task.size);
} else {
task.size = 0;
}
if (flush) {
// 將size 大小變成負(fù)數(shù),那么就會(huì)調(diào)用 寫(xiě)入并刷新的方法
task.size |= Integer.MIN_VALUE;
}
}
@Override
public void run() {
try {
// 減小等待寫(xiě)入數(shù)據(jù)大小
decrementPendingOutboundBytes();
if (size >= 0) {
// 只調(diào)用寫(xiě)入操作
ctx.invokeWrite(msg, promise);
} else {
// 當(dāng) size < 0 ,寫(xiě)入并刷新操作
ctx.invokeWriteAndFlush(msg, promise);
}
} finally {
recycle();
}
}
void cancel() {
try {
// 取消的話,也需要減小等待寫(xiě)入數(shù)據(jù)大小
decrementPendingOutboundBytes();
} finally {
recycle();
}
}
/**
* 減小等待寫(xiě)入數(shù)據(jù)大小
*/
private void decrementPendingOutboundBytes() {
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
}
}
private void recycle() {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
handle.recycle(this);
}
}
這個(gè)類(lèi)的實(shí)現(xiàn)很簡(jiǎn)單
- 使用靜態(tài)方法從對(duì)象池中獲取
WriteTask實(shí)例(這樣會(huì)復(fù)用WriteTask)- 每次調(diào)用初始化
init方法,如果配置項(xiàng)ESTIMATE_TASK_SIZE_ON_SUBMIT為true,都會(huì)增加等待寫(xiě)入數(shù)據(jù)的大小。- 真正運(yùn)行(
run被調(diào)用),或者取消的時(shí)候,都會(huì)減少等待寫(xiě)入數(shù)據(jù)的大小。
在 DefaultChannelPipeline 中
@UnstableApi
protected void incrementPendingOutboundBytes(long size) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
@UnstableApi
protected void decrementPendingOutboundBytes(long size) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
都是調(diào)用
ChannelOutboundBuffer類(lèi)的對(duì)應(yīng)方法。
在 ChannelOutboundBuffer 中
void incrementPendingOutboundBytes(long size) {
incrementPendingOutboundBytes(size, true);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
void decrementPendingOutboundBytes(long size) {
decrementPendingOutboundBytes(size, true, true);
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
在
ChannelOutboundBuffer中有一個(gè)totalPendingSize變量表示寫(xiě)緩沖區(qū)等待數(shù)據(jù)大小。
- 當(dāng)它的值大于
channel.config().getWriteBufferHighWaterMark()時(shí),表示不能寫(xiě)了,通過(guò)setUnwritable(invokeLater)發(fā)送當(dāng)前通道可寫(xiě)狀態(tài)改變的 入站IO事件。- 當(dāng)它的值小于
channel.config().getWriteBufferLowWaterMark()時(shí),表示又可以寫(xiě)了,通過(guò)setWritable(invokeLater)發(fā)送當(dāng)前通道可寫(xiě)狀態(tài)改變的 入站IO事件。
四. 總結(jié)
ChannelHandlerContext 的主要實(shí)現(xiàn)原理已經(jīng)介紹完畢了,你也明白了,它是如何向上游或下游傳遞事件了。