本系列Netty源碼解析文章基于 4.1.56.Final版本
1. 前文回顧
在前邊的系列文章中,筆者為大家詳細(xì)剖析了 Reactor 模型在 netty 中的創(chuàng)建,啟動,運行,接收連接,接收數(shù)據(jù),發(fā)送數(shù)據(jù)的完整流程,在詳細(xì)剖析整個 Reactor 模型如何在 netty 中實現(xiàn)的過程里,我們或多或少的見到了 pipeline 的身影。

比如在 Reactor 啟動的過程中首先需要創(chuàng)建 NioServerSocketChannel ,在創(chuàng)建的過程中會為 NioServerSocketChannel 創(chuàng)建分配一個 pipeline ,用于對 OP_ACCEPT 事件的編排。
當(dāng) NioServerSocketChannel 向 main reactor 注冊成功后,會在 pipeline 中觸發(fā) ChannelRegistered 事件的傳播。
當(dāng) NioServerSocketChannel 綁定端口成功后,會在 pipeline 中觸發(fā) ChannelActive 事件的傳播。

又比如在 Reactor 接收連接的過程中,當(dāng)客戶端發(fā)起一個連接并完成三次握手之后,連接對應(yīng)的 Socket 會存放在內(nèi)核中的全連接隊列中,隨后 JDK Selector 會通知 main reactor 此時 NioServerSocketChannel 上有 OP_ACCEPT 事件活躍,最后 main reactor 開始執(zhí)行 NioServerSocketChannel 的底層操作類 NioMessageUnsafe#read 方法在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRead 事件。

最終會在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中響應(yīng) ChannelRead 事件并創(chuàng)建初始化 NioSocketChannel ,隨后會為每一個新創(chuàng)建的 NioSocetChannel 創(chuàng)建分配一個獨立的 pipeline ,用于各自 NioSocketChannel 上的 IO 事件的編排。并向 sub reactor 注冊 NioSocketChannel ,隨后在 NioSocketChannel 的 pipeline 中傳播 ChannelRegistered 事件,最后傳播 ChannelActive 事件。

還有在《Netty如何高效接收網(wǎng)絡(luò)數(shù)據(jù)》一文中,我們也提過當(dāng) sub reactor 讀取 NioSocketChannel 中來自客戶端的請求數(shù)據(jù)時,會在 NioSocketChannel 的 pipeline 中傳播 ChannelRead 事件,在一個完整的 read loop 讀取完畢后會傳播 ChannelReadComplete 事件。
在《一文搞懂Netty發(fā)送數(shù)據(jù)全流程》一文中,我們講到了在用戶經(jīng)過業(yè)務(wù)處理后,通過 write 方法和 flush 方法分別在 NioSocketChannel 的 pipeline 中傳播 write 事件和 flush 事件的過程。
筆者帶大家又回顧了一下在前邊系列文章中關(guān)于 pipeline 的使用場景,但是在這些系列文章中并未對 pipeline 相關(guān)的細(xì)節(jié)進(jìn)行完整全面地描述,那么本文筆者將為大家詳細(xì)的剖析下 pipeline 在 IO 事件的編排和傳播場景下的完整實現(xiàn)原理。

2. pipeline的創(chuàng)建

Netty 會為每一個 Channel 分配一個獨立的 pipeline ,pipeline 伴隨著 channel 的創(chuàng)建而創(chuàng)建。
前邊介紹到 NioServerSocketChannel 是在 netty 服務(wù)端啟動的過程中創(chuàng)建的。而 NioSocketChannel 的創(chuàng)建是在當(dāng) NioServerSocketChannel 上的 OP_ACCEPT 事件活躍時,由 main reactor 線程在 NioServerSocketChannel 中創(chuàng)建,并在 NioServerSocketChannel 的 pipeline 中對 OP_ACCEPT 事件進(jìn)行編排時(圖中的 ServerBootstrapAcceptor 中)初始化的。
無論是創(chuàng)建 NioServerSocketChannel 里的 pipeline 還是創(chuàng)建 NioSocketChannel 里的 pipeline , 最終都會委托給它們的父類 AbstractChannel 。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用于底層socket的相關(guān)操作
unsafe = newUnsafe();
//為channel分配獨立的pipeline用于IO事件編排
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
....................
//pipeline中的頭結(jié)點
final AbstractChannelHandlerContext head;
//pipeline中的尾結(jié)點
final AbstractChannelHandlerContext tail;
//pipeline中持有對應(yīng)channel的引用
private final Channel channel;
....................
protected DefaultChannelPipeline(Channel channel) {
//pipeline中持有對應(yīng)channel的引用
this.channel = ObjectUtil.checkNotNull(channel, "channel");
............省略.......
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
....................
}
在前邊的系列文章中筆者多次提到過,pipeline 的結(jié)構(gòu)是由 ChannelHandlerContext 類型的節(jié)點構(gòu)成的雙向鏈表。其中頭結(jié)點為 HeadContext ,尾結(jié)點為 TailContext 。其初始結(jié)構(gòu)如下:

2.1 HeadContext
private static final String HEAD_NAME = generateName0(HeadContext.class);
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
//headContext中持有對channel unsafe操作類的引用 用于執(zhí)行channel底層操作
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
//持有channel unsafe操作類的引用,后續(xù)用于執(zhí)行channel底層操作
unsafe = pipeline.channel().unsafe();
//設(shè)置channelHandler的狀態(tài)為ADD_COMPLETE
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
.......................
}
我們知道雙向鏈表結(jié)構(gòu)的 pipeline 中的節(jié)點元素為 ChannelHandlerContext ,既然 HeadContext 作為 pipeline 的頭結(jié)點,那么它一定是 ChannelHandlerContext 類型的,所以它需要繼承實現(xiàn) AbstractChannelHandlerContext ,相當(dāng)于一個哨兵的作用,因為用戶可以以任意順序向 pipeline 中添加 ChannelHandler ,需要用 HeadContext 來固定指向第一個 ChannelHandlerContext 。
在《一文搞懂Netty發(fā)送數(shù)據(jù)全流程》 一文中的《1. ChannelHandlerContext》小節(jié)中,筆者曾為大家詳細(xì)介紹過 ChannelHandlerContext 在 pipeline 中的作用,忘記的同學(xué)可以在回看下。
于此同時 HeadContext 又實現(xiàn)了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,說明 HeadContext 即是一個 ChannelHandlerContext 又是一個 ChannelHandler ,它可以同時處理 Inbound 事件和 Outbound 事件。
我們也注意到 HeadContext 中持有了對應(yīng) channel 的底層操作類 unsafe ,這也說明 IO 事件在 pipeline 中的傳播最終會落在 HeadContext 中進(jìn)行最后的 IO 處理。它是 Inbound 事件的處理起點,也是 Outbound 事件的處理終點。這里也可以看出 HeadContext 除了起到哨兵的作用,它還承擔(dān)了對 channel 底層相關(guān)的操作。
比如我們在《Reactor在Netty中的實現(xiàn)(啟動篇)》中介紹的 NioServerSocketChannel 在向 main reactor 注冊完成后會觸發(fā) ChannelRegistered 事件從 HeadContext 開始依次在 pipeline 中向后傳播。
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
//此時firstRegistration已經(jīng)變?yōu)閒alse,在pipeline.invokeHandlerAddedIfNeeded中已被調(diào)用過
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
以及 NioServerSocketChannel 在與端口綁定成功后會觸發(fā) ChannelActive 事件從 HeadContext 開始依次在 pipeline 中向后傳播,并在 HeadContext 中通過 unsafe.beginRead() 注冊 OP_ACCEPT 事件到 main reactor 中。
@Override
public void read(ChannelHandlerContext ctx) {
//觸發(fā)注冊O(shè)P_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
同理在 NioSocketChannel 在向 sub reactor 注冊成功后。會先后觸發(fā) ChannelRegistered 事件和 ChannelActive 事件從 HeadContext 開始在 pipeline 中向后傳播。并在 HeadContext 中通過 unsafe.beginRead() 注冊 OP_READ 事件到 sub reactor 中。
@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline中繼續(xù)向后傳播channelActive事件
ctx.fireChannelActive();
//如果是autoRead 則自動觸發(fā)read事件傳播
//在read回調(diào)函數(shù)中 觸發(fā)OP_ACCEPT或者OP_READ事件注冊
readIfIsAutoRead();
}
在《一文搞懂Netty發(fā)送數(shù)據(jù)全流程》中介紹的 write 事件和 flush 事件最終會在 pipeline 中從后向前一直傳播到 HeadContext ,并在 HeadContext 中相應(yīng)事件回調(diào)函數(shù)中調(diào)用 unsafe 類操作底層 channel 發(fā)送數(shù)據(jù)。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
//到headContext這里 msg的類型必須是ByteBuffer,也就是說必須經(jīng)過編碼器將業(yè)務(wù)層寫入的實體編碼為ByteBuffer
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
從本小節(jié)的內(nèi)容介紹中,我們可以看出在 Netty 中對于 Channel 的相關(guān)底層操作調(diào)用均是在 HeadContext 中觸發(fā)的。
2.2 TailContext
private static final String TAIL_NAME = generateName0(TailContext.class);
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
//設(shè)置channelHandler的狀態(tài)為ADD_COMPLETE
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
......................
}
同樣 TailContext 作為雙向鏈表結(jié)構(gòu)的 pipeline 中的尾結(jié)點,也需要繼承實現(xiàn) AbstractChannelHandlerContext 。但它同時又實現(xiàn)了 ChannelInboundHandler 。
這說明 TailContext 除了是一個 ChannelHandlerContext 同時也是一個 ChannelInboundHandler 。
2.2.1 TailContext 作為一個 ChannelHandlerContext 的作用
TailContext 作為一個 ChannelHandlerContext 的作用是負(fù)責(zé)將 outbound 事件從 pipeline 的末尾一直向前傳播直到 HeadContext 。當(dāng)然前提是用戶需要調(diào)用 channel 的相關(guān) outbound 方法。
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
@Override
public final ChannelPipeline flush() {
tail.flush();
return this;
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
}
這里我們可以看到,當(dāng)我們在自定義 ChannelHandler 中調(diào)用 ctx.channel().write(msg) 時,會在 AbstractChannel 中觸發(fā) pipeline.write(msg) ,最終在 DefaultChannelPipeline 中調(diào)用 tail.write(msg) 。使得 write 事件可以從 pipeline 的末尾開始向前傳播,其他 outbound 事件的傳播也是一樣的道理。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.channel().write(msg);
}
}
而我們自定義的 ChannelHandler 會被封裝在一個 ChannelHandlerContext 中從而加入到 pipeline 中,而這個用于裝載自定義 ChannelHandler 的 ChannelHandlerContext 與 TailContext 一樣本質(zhì)也都是 ChannelHandlerContext ,只不過在 pipeline 中的位置不同罷了。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
我們看到 ChannelHandlerContext 接口本身也會繼承 ChannelInboundInvoker
和 ChannelOutboundInvoker 接口,所以說 ContextHandlerContext 也可以觸發(fā) inbound 事件和 outbound 事件,只不過表達(dá)的語義是在 pipeline 中從當(dāng)前 ChannelHandler 開始向前或者向后傳播 outbound 事件或者 inbound 事件。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.write(msg);
}
}
這里表示 write 事件從當(dāng)前 EchoServerHandler 開始在 pipeline 中向前傳播直到 HeadContext 。

2.2.2 TailContext 作為一個 ChannelInboundHandler 的作用
最后 TailContext 作為一個 ChannelInboundHandler 的作用就是為 inbound 事件在 pipeline 中的傳播做一個兜底的處理。
這里提到的兜底處理是什么意思呢?
比如我們前邊介紹到的,在 NioSocketChannel 向 sub reactor 注冊成功后之后觸發(fā)的 ChannelRegistered 事件和 ChannelActive 事件?;蛘咴?reactor 線程讀取 NioSocketChannel 中的請求數(shù)據(jù)時所觸發(fā)的 channelRead 事件和 ChannelReadComplete 事件。
這些 inbound 事件都會首先從 HeadContext 開始在 pipeline 中一個一個的向后傳遞。
極端的情況是如果 pipeline 中所有 ChannelInboundHandler 中相應(yīng)的 inbound 事件回調(diào)方法均不對事件作出處理,并繼續(xù)向后傳播。如下示例代碼所示:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
}
最終這些 inbound 事件在 pipeline 中得不到處理,最后會傳播到 TailContext 中。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
onUnhandledInboundChannelReadComplete();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
onUnhandledInboundChannelActive();
}
}
而在 TailContext 中需要對這些得不到任何處理的 inbound 事件做出最終處理。比如丟棄該 msg,并釋放所占用的 directByteBuffer,以免發(fā)生內(nèi)存泄露。
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
3. pipeline中的事件分類
在前邊的系列文章中,筆者多次介紹過,Netty 中的 IO 事件一共分為兩大類: inbound 類事件和 outbound 類事件。其實如果嚴(yán)格來分的話應(yīng)該分為三類。第三種事件類型為 exceptionCaught 異常事件類型。
而 exceptionCaught 事件在事件傳播角度上來說和 inbound 類事件一樣,都是從 pipeline 的 HeadContext 開始一直向后傳遞或者從當(dāng)前 ChannelHandler 開始一直向后傳遞直到 TailContext 。所以一般也會將 exceptionCaught 事件統(tǒng)一歸為 inbound 類事件。
而根據(jù)事件類型的分類,相應(yīng)負(fù)責(zé)處理事件回調(diào)的 ChannelHandler 也會被分為兩類:
ChannelInboundHandler:主要負(fù)責(zé)響應(yīng)處理 inbound 類事件回調(diào)和 exceptionCaught 事件回調(diào)。ChannelOutboundHandler:主要負(fù)責(zé)響應(yīng)處理 outbound 類事件回調(diào)。
那么我們常說的 inbound 類事件和 outbound 類事件具體都包含哪些事件呢?
3.1 inbound類事件
final class ChannelHandlerMask {
// inbound事件集合
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
// inbound 類事件相關(guān)掩碼
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
}
netty 會將其支持的所有異步事件用掩碼來表示,定義在 ChannelHandlerMask 類中, netty 框架通過這些事件掩碼可以很方便的知道用戶自定義的 ChannelHandler 是屬于什么類型的(ChannelInboundHandler or ChannelOutboundHandler )。
除此之外,inbound 類事件如此之多,用戶也并不是對所有的 inbound 類事件感興趣,用戶可以在自定義的 ChannelInboundHandler 中覆蓋自己感興趣的 inbound 事件回調(diào),從而達(dá)到針對特定 inbound 事件的監(jiān)聽。
這些用戶感興趣的 inbound 事件集合同樣也會用掩碼的形式保存在自定義 ChannelHandler 對應(yīng)的 ChannelHandlerContext 中,這樣當(dāng)特定 inbound 事件在 pipeline 中開始傳播的時候,netty 可以根據(jù)對應(yīng) ChannelHandlerContext 中保存的 inbound 事件集合掩碼來判斷,用戶自定義的 ChannelHandler 是否對該 inbound 事件感興趣,從而決定是否執(zhí)行用戶自定義 ChannelHandler 中的相應(yīng)回調(diào)方法或者跳過對該 inbound 事件不感興趣的 ChannelHandler 繼續(xù)向后傳播。
從以上描述中,我們也可以窺探出,Netty 引入 ChannelHandlerContext 來封裝 ChannelHandler 的原因,在代碼設(shè)計上還是遵循單一職責(zé)的原則, ChannelHandler 是用戶接觸最頻繁的一個 netty 組件,netty 希望用戶能夠把全部注意力放在最核心的 IO 處理上,用戶只需要關(guān)心自己對哪些異步事件感興趣并考慮相應(yīng)的處理邏輯即可,而并不需要關(guān)心異步事件在 pipeline 中如何傳遞,如何選擇具有執(zhí)行條件的 ChannelHandler 去執(zhí)行或者跳過。這些切面性質(zhì)的邏輯,netty 將它們作為上下文信息全部封裝在 ChannelHandlerContext 中由netty框架本身負(fù)責(zé)處理。
以上這些內(nèi)容,筆者還會在事件傳播相關(guān)小節(jié)做詳細(xì)的介紹,之所以這里引出,還是為了讓大家感受下利用掩碼進(jìn)行集合操作的便利性,netty 中類似這樣的設(shè)計還有很多,比如前邊系列文章中多次提到過的,channel 再向 reactor 注冊 IO 事件時,netty 也是將 channel 感興趣的 IO 事件用掩碼的形式存儲于 SelectionKey 中的 int interestOps 中。
接下來筆者就為大家介紹下這些 inbound 事件,并梳理出這些 inbound 事件的觸發(fā)時機(jī)。方便大家根據(jù)各自業(yè)務(wù)需求靈活地進(jìn)行監(jiān)聽。
3.1.1 ExceptionCaught 事件
在本小節(jié)介紹的這些 inbound 類事件在 pipeline 中傳播的過程中,如果在相應(yīng)事件回調(diào)函數(shù)執(zhí)行的過程中發(fā)生異常,那么就會觸發(fā)對應(yīng) ChannelHandler 中的 exceptionCaught 事件回調(diào)。
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
}
} else {
fireExceptionCaught(cause);
}
}
當(dāng)然用戶可以選擇在 exceptionCaught 事件回調(diào)中是否執(zhí)行 ctx.fireExceptionCaught(cause) 從而決定是否將 exceptionCaught 事件繼續(xù)向后傳播。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
..........
ctx.fireExceptionCaught(cause);
}
當(dāng) netty 內(nèi)核處理連接的接收,以及數(shù)據(jù)的讀取過程中如果發(fā)生異常,會在整個 pipeline 中觸發(fā) exceptionCaught 事件的傳播。
這里筆者為什么要單獨強(qiáng)調(diào)在 inbound 事件傳播的過程中發(fā)生異常,才會回調(diào) exceptionCaught 呢 ?
因為 inbound 事件一般都是由 netty 內(nèi)核觸發(fā)傳播的,而 outbound 事件一般都是由用戶選擇觸發(fā)的,比如用戶在處理完業(yè)務(wù)邏輯觸發(fā)的 write 事件或者 flush 事件。
而在用戶觸發(fā) outbound 事件后,一般都會得到一個 ChannelPromise 。用戶可以向 ChannelPromise 添加各種 listener 。當(dāng) outbound 事件在傳播的過程中發(fā)生異常時,netty 會通知用戶持有的這個 ChannelPromise ,但不會觸發(fā) exceptionCaught 的回調(diào)。
比如我們在《一文搞懂Netty發(fā)送數(shù)據(jù)全流程》一文中介紹到的在 write 事件傳播的過程中就不會觸發(fā) exceptionCaught 事件回調(diào)。只是去通知用戶的 ChannelPromise 。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//調(diào)用當(dāng)前ChannelHandler中的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}
而 outbound 事件中只有 flush 事件的傳播是個例外,當(dāng) flush 事件在 pipeline 傳播的過程中發(fā)生異常時,會觸發(fā)對應(yīng)異常 ChannelHandler 的 exceptionCaught 事件回調(diào)。因為 flush 方法的簽名中不會給用戶返回 ChannelPromise 。
@Override
ChannelHandlerContext flush();
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
3.1.2 ChannelRegistered 事件
當(dāng) main reactor 在啟動的時候,NioServerSocketChannel 會被創(chuàng)建并初始化,隨后就會向main reactor注冊,當(dāng)注冊成功后就會在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRegistered 事件。
當(dāng) main reactor 接收客戶端發(fā)起的連接后,NioSocketChannel 會被創(chuàng)建并初始化,隨后會向 sub reactor 注冊,當(dāng)注冊成功后會在 NioSocketChannel 中的 pipeline 傳播 ChannelRegistered 事件。

private void register0(ChannelPromise promise) {
................
//執(zhí)行真正的注冊操作
doRegister();
...........
//觸發(fā)channelRegister事件
pipeline.fireChannelRegistered();
.......
}
注意:此時對應(yīng)的 channel 還沒有注冊 IO 事件到相應(yīng)的 reactor 中。
3.1.3 ChannelActive 事件
當(dāng) NioServerSocketChannel 再向 main reactor 注冊成功并觸發(fā) ChannelRegistered 事件傳播之后,隨后就會在 pipeline 中觸發(fā) bind 事件,而 bind 事件是一個 outbound 事件,會從 pipeline 中的尾結(jié)點 TailContext 一直向前傳播最終在 HeadContext 中執(zhí)行真正的綁定操作。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
//觸發(fā)AbstractChannel->bind方法 執(zhí)行JDK NIO SelectableChannel 執(zhí)行底層綁定操作
unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
..............
doBind(localAddress);
...............
//綁定成功后 channel激活 觸發(fā)channelActive事件傳播
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//HeadContext->channelActive回調(diào)方法 執(zhí)行注冊O(shè)P_ACCEPT事件
pipeline.fireChannelActive();
}
});
}
...............
}
當(dāng) netty 服務(wù)端 NioServerSocketChannel 綁定端口成功之后,才算是真正的 Active ,隨后觸發(fā) ChannelActive 事件在 pipeline 中的傳播。
之前我們也提到過判斷 NioServerSocketChannel 是否 Active 的標(biāo)準(zhǔn)就是 : 底層 JDK Nio ServerSocketChannel 是否 open 并且 ServerSocket 是否已經(jīng)完成綁定。
@Override
public boolean isActive() {
return isOpen() && javaChannel().socket().isBound();
}
而客戶端 NioSocketChannel 中觸發(fā) ChannelActive 事件就會比較簡單,當(dāng) NioSocketChannel 再向 sub reactor 注冊成功并觸發(fā) ChannelRegistered 之后,緊接著就會觸發(fā) ChannelActive 事件在 pipeline 中傳播。

private void register0(ChannelPromise promise) {
................
//執(zhí)行真正的注冊操作
doRegister();
...........
//觸發(fā)channelRegister事件
pipeline.fireChannelRegistered();
.......
if (isActive()) {
if (firstRegistration) {
//觸發(fā)channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
而客戶端 NioSocketChannel 是否 Active 的標(biāo)識是:底層 JDK NIO
SocketChannel 是否 open 并且底層 socket 是否連接。毫無疑問,這里的 socket 一定是 connected 。所以直接觸發(fā) ChannelActive 事件。
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
注意:此時 channel 才會到相應(yīng)的 reactor 中去注冊感興趣的 IO 事件。當(dāng)用戶自定義的 ChannelHandler 接收到 ChannelActive 事件時,表明 IO 事件已經(jīng)注冊到 reactor 中了。
3.1.4 ChannelRead 和 ChannelReadComplete 事件

當(dāng)客戶端有新連接請求的時候,服務(wù)端的 NioServerSocketChannel 上的 OP_ACCEPT 事件會活躍,隨后 main reactor 會在一個 read loop 中不斷的調(diào)用 serverSocketChannel.accept() 接收新的連接直到全部接收完畢或者達(dá)到 read loop 最大次數(shù) 16 次。
在 NioServerSocketChannel 中,每 accept 一個新的連接,就會在 pipeline 中觸發(fā) ChannelRead 事件。一個完整的 read loop 結(jié)束之后,會觸發(fā) ChannelReadComplete 事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
......................
try {
do {
//底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶端SocketChannel
int localRead = doReadMessages(readBuf);
.................
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
pipeline.fireChannelReadComplete();
.................
}
}
當(dāng)客戶端 NioSocketChannel 上有請求數(shù)據(jù)到來時,NioSocketChannel 上的 OP_READ 事件活躍,隨后 sub reactor 也會在一個 read loop 中對 NioSocketChannel 中的請求數(shù)據(jù)進(jìn)行讀取直到讀取完畢或者達(dá)到 read loop 的最大次數(shù) 16 次。
在 read loop 的讀取過程中,每讀取一次就會在 pipeline 中觸發(fā) ChannelRead 事件。當(dāng)一個完整的 read loop 結(jié)束之后,會在 pipeline 中觸發(fā) ChannelReadComplete 事件。

這里需要注意的是當(dāng) ChannelReadComplete 事件觸發(fā)時,此時并不代表 NioSocketChannel 中的請求數(shù)據(jù)已經(jīng)讀取完畢,可能的情況是發(fā)送的請求數(shù)據(jù)太多,在一個 read loop 中讀取不完達(dá)到了最大限制次數(shù) 16 次,還沒全部讀取完畢就退出了 read loop 。一旦退出 read loop 就會觸發(fā) ChannelReadComplete 事件。詳細(xì)內(nèi)容可以查看筆者的這篇文章《Netty如何高效接收網(wǎng)絡(luò)數(shù)據(jù)》。
3.1.5 ChannelWritabilityChanged 事件
當(dāng)我們處理完業(yè)務(wù)邏輯得到業(yè)務(wù)處理結(jié)果后,會調(diào)用 ctx.write(msg) 觸發(fā) write 事件在 pipeline 中的傳播。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.write(msg);
}
最終 netty 會將發(fā)送數(shù)據(jù) msg 寫入 NioSocketChannel 中的待發(fā)送緩沖隊列 ChannelOutboundBuffer 中。并等待用戶調(diào)用 flush 操作從 ChannelOutboundBuffer 中將待發(fā)送數(shù)據(jù) msg ,寫入到底層 Socket 的發(fā)送緩沖區(qū)中。

當(dāng)對端的接收處理速度非常慢或者網(wǎng)絡(luò)狀況極度擁塞時,使得 TCP 滑動窗口不斷的縮小,這就導(dǎo)致發(fā)送端的發(fā)送速度也變得越來越小,而此時用戶還在不斷的調(diào)用 ctx.write(msg) ,這就會導(dǎo)致 ChannelOutboundBuffer 會急劇增大,從而可能導(dǎo)致 OOM 。netty 引入了高低水位線來控制 ChannelOutboundBuffer 的內(nèi)存占用。
public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
}
當(dāng) ChanneOutboundBuffer 中的內(nèi)存占用量超過高水位線時,netty 就會將對應(yīng)的 channel 置為不可寫狀態(tài),并在 pipeline 中觸發(fā) ChannelWritabilityChanged 事件。
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
//觸發(fā)fireChannelWritabilityChanged事件 表示當(dāng)前channel變?yōu)椴豢蓪? fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
當(dāng) ChannelOutboundBuffer 中的內(nèi)存占用量低于低水位線時,netty 又會將對應(yīng)的 NioSocketChannel 設(shè)置為可寫狀態(tài),并再次觸發(fā) ChannelWritabilityChanged 事件。

private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
用戶可在自定義 ChannelHandler 中通過 ctx.channel().isWritable() 判斷當(dāng)前 channel 是否可寫。
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
...........當(dāng)前channel可寫.........
} else {
...........當(dāng)前channel不可寫.........
}
}
3.1.6 UserEventTriggered 事件
netty 提供了一種事件擴(kuò)展機(jī)制可以允許用戶自定義異步事件,這樣可以使得用戶能夠靈活的定義各種復(fù)雜場景的處理機(jī)制。
下面我們來看下如何在 Netty 中自定義異步事件。
- 定義異步事件。
public final class OurOwnDefinedEvent {
public static final OurOwnDefinedEvent INSTANCE = new OurOwnDefinedEvent();
private OurOwnDefinedEvent() { }
}
- 觸發(fā)自定義事件的傳播
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
......省略.......
//事件在pipeline中從當(dāng)前ChannelHandlerContext開始向后傳播
ctx.fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
//事件從pipeline的頭結(jié)點headContext開始向后傳播
ctx.channel().pipeline().fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
}
}
- 自定義事件的響應(yīng)和處理。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (OurOwnDefinedEvent.INSTANCE == evt) {
.....自定義事件處理......
}
}
}
后續(xù)隨著我們源碼解讀的深入,我們還會看到 Netty 自己本身也定義了許多 UserEvent 事件,我們后面還會在介紹,大家這里只是稍微了解一下相關(guān)的用法即可。
3.1.7 ChannelInactive和ChannelUnregistered事件
當(dāng) Channel 被關(guān)閉之后會在 pipeline 中先觸發(fā) ChannelInactive 事件的傳播然后在觸發(fā) ChannelUnregistered 事件的傳播。
我們可以在 Inbound 類型的 ChannelHandler 中響應(yīng) ChannelInactive 和 ChannelUnregistered 事件。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
......響應(yīng)inActive事件...
//繼續(xù)向后傳播inActive事件
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
......響應(yīng)Unregistered事件...
//繼續(xù)向后傳播Unregistered事件
super.channelUnregistered(ctx);
}
這里和連接建立之后的事件觸發(fā)順序正好相反,連接建立之后是先觸發(fā) ChannelRegistered 事件然后在觸發(fā) ChannelActive 事件。
3.2 Outbound 類事件
final class ChannelHandlerMask {
// outbound 事件的集合
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
// outbound 事件掩碼
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
}
和 Inbound 類事件一樣,Outbound 類事件也有對應(yīng)的掩碼表示。下面我們來看下 Outbound類事件的觸發(fā)時機(jī):
3.2.1 read 事件
大家這里需要注意區(qū)分 read 事件和 ChannelRead 事件的不同。
ChannelRead 事件前邊我們已經(jīng)介紹了,當(dāng) NioServerSocketChannel 接收到新連接時,會觸發(fā) ChannelRead 事件在其 pipeline 上傳播。
當(dāng) NioSocketChannel 上有請求數(shù)據(jù)時,在 read loop 中讀取請求數(shù)據(jù)時會觸發(fā) ChannelRead 事件在其 pipeline 上傳播。
而 read 事件則和 ChannelRead 事件完全不同,read 事件特指使 Channel 具備感知 IO 事件的能力。NioServerSocketChannel 對應(yīng)的 OP_ACCEPT 事件的感知能力,NioSocketChannel 對應(yīng)的是 OP_READ 事件的感知能力。
read 事件的觸發(fā)是在當(dāng) channel 需要向其對應(yīng)的 reactor 注冊讀類型事件時(比如 OP_ACCEPT 事件 和 OP_READ 事件)才會觸發(fā)。read 事件的響應(yīng)就是將 channel 感興趣的 IO 事件注冊到對應(yīng)的 reactor 上。
比如 NioServerSocketChannel 感興趣的是 OP_ACCEPT 事件, NioSocketChannel 感興趣的是 OP_READ 事件。
在前邊介紹 ChannelActive 事件時我們提到,當(dāng) channel 處于 active 狀態(tài)后會在 pipeline 中傳播 ChannelActive 事件。而在 HeadContext 中的 ChannelActive 事件回調(diào)中會觸發(fā) Read 事件的傳播。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//如果是autoRead 則觸發(fā)read事件傳播
channel.read();
}
}
@Override
public void read(ChannelHandlerContext ctx) {
//觸發(fā)注冊O(shè)P_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
}
而在 HeadContext 中的 read 事件回調(diào)中會調(diào)用 Channel 的底層操作類 unsafe 的 beginRead 方法,在該方法中會向 reactor 注冊 channel 感興趣的 IO 事件。對于 NioServerSocketChannel 來說這里注冊的就是 OP_ACCEPT 事件,對于 NioSocketChannel 來說這里注冊的則是 OP_READ 事件。
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//注冊監(jiān)聽OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
細(xì)心的同學(xué)可能注意到了 channel 對應(yīng)的配置類中包含了一個 autoRead 屬性,那么這個 autoRead 到底是干什么的呢?
其實這是 netty 為大家提供的一種背壓機(jī)制,用來防止 OOM ,想象一下當(dāng)對端發(fā)送數(shù)據(jù)非常多并且發(fā)送速度非??欤?wù)端處理速度非常慢,一時間消費不過來。而對端又在不停的大量發(fā)送數(shù)據(jù),服務(wù)端的 reactor 線程不得不在 read loop 中不停的讀取,并且為讀取到的數(shù)據(jù)分配 ByteBuffer 。而服務(wù)端業(yè)務(wù)線程又處理不過來,這就導(dǎo)致了大量來不及處理的數(shù)據(jù)占用了大量的內(nèi)存空間,從而導(dǎo)致 OOM 。
面對這種情況,我們可以通過 channelHandlerContext.channel().config().setAutoRead(false) 將 autoRead 屬性設(shè)置為 false 。隨后 netty 就會將 channel 中感興趣的讀類型事件從 reactor 中注銷,從此 reactor 不會再對相應(yīng)事件進(jìn)行監(jiān)聽。這樣 channel 就不會在讀取數(shù)據(jù)了。
這里 NioServerSocketChannel 對應(yīng)的是 OP_ACCEPT 事件, NioSocketChannel 對應(yīng)的是 OP_READ 事件。
protected final void removeReadOp() {
SelectionKey key = selectionKey();
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
key.interestOps(interestOps & ~readInterestOp);
}
}
而當(dāng)服務(wù)端的處理速度恢復(fù)正常,我們又可以通過 channelHandlerContext.channel().config().setAutoRead(true) 將 autoRead 屬性設(shè)置為 true 。這樣 netty 會在 pipeline 中觸發(fā) read 事件,最終在 HeadContext 中的 read 事件回調(diào)方法中通過調(diào)用 unsafe#beginRead 方法將 channel 感興趣的讀類型事件重新注冊到對應(yīng)的 reactor 中。
@Override
public ChannelConfig setAutoRead(boolean autoRead) {
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
if (autoRead && !oldAutoRead) {
//autoRead從false變?yōu)閠rue
channel.read();
} else if (!autoRead && oldAutoRead) {
//autoRead從true變?yōu)閒alse
autoReadCleared();
}
return this;
}
read 事件可以理解為使 channel 擁有讀的能力,當(dāng)有了讀的能力后, channelRead 就可以讀取具體的數(shù)據(jù)了。
3.2.2 write 和 flush 事件
write 事件和 flush 事件我們在《一文搞懂Netty發(fā)送數(shù)據(jù)全流程》一文中已經(jīng)非常詳盡的介紹過了,這里筆者在帶大家簡單回顧一下。
write 事件和 flush 事件均由用戶在處理完業(yè)務(wù)請求得到業(yè)務(wù)結(jié)果后在業(yè)務(wù)線程中主動觸發(fā)。
用戶既可以通過 ChannelHandlerContext 觸發(fā)也可以通過 Channel 來觸發(fā)。
不同之處在于如果通過 ChannelHandlerContext 觸發(fā),那么 write 事件或者 flush 事件就會在 pipeline 中從當(dāng)前 ChannelHandler 開始一直向前傳播直到 HeadContext 。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
如果通過 Channel 觸發(fā),那么 write 事件和 flush 事件就會從 pipeline 的尾部節(jié)點 TailContext 開始一直向前傳播直到 HeadContext 。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.channel().write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.channel().flush();
}
當(dāng)然還有一個 writeAndFlush 方法,也會分為 ChannelHandlerContext 觸發(fā)和 Channel 的觸發(fā)。觸發(fā) writeAndFlush 后,write 事件首先會在 pipeline 中傳播,最后 flush 事件在 pipeline 中傳播。
netty 對 write 事件的處理最終會將發(fā)送數(shù)據(jù)寫入 Channel 對應(yīng)的寫緩沖隊列 ChannelOutboundBuffer 中。此時數(shù)據(jù)并沒有發(fā)送出去而是在寫緩沖隊列中緩存,這也是 netty 實現(xiàn)異步寫的核心設(shè)計。
最終通過 flush 操作從 Channel 中的寫緩沖隊列 ChannelOutboundBuffer 中獲取到待發(fā)送數(shù)據(jù),并寫入到 Socket 的發(fā)送緩沖區(qū)中。
3.2.3 close 事件
當(dāng)用戶在 ChannelHandler 中調(diào)用如下方法對 Channel 進(jìn)行關(guān)閉時,會觸發(fā) Close 事件在 pipeline 中從后向前傳播。
//close事件從當(dāng)前ChannelHandlerContext開始在pipeline中向前傳播
ctx.close();
//close事件從pipeline的尾結(jié)點tailContext開始向前傳播
ctx.channel().close();
我們可以在Outbound類型的ChannelHandler中響應(yīng)close事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
.....客戶端channel關(guān)閉之前的處理回調(diào).....
//繼續(xù)向前傳播close事件
super.close(ctx, promise);
}
}
最終 close 事件會在 pipeline 中一直向前傳播直到頭結(jié)點 HeadConnect 中,并在 HeadContext 中完成連接關(guān)閉的操作,當(dāng)連接完成關(guān)閉之后,會在 pipeline中先后觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件。
3.2.4 deRegister 事件
用戶可調(diào)用如下代碼將當(dāng)前 Channel 從 Reactor 中注銷掉。
//deregister事件從當(dāng)前ChannelHandlerContext開始在pipeline中向前傳播
ctx.deregister();
//deregister事件從pipeline的尾結(jié)點tailContext開始向前傳播
ctx.channel().deregister();
我們可以在 Outbound 類型的 ChannelHandler 中響應(yīng) deregister 事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
.....客戶端channel取消注冊之前的處理回調(diào).....
//繼續(xù)向前傳播connect事件
super.deregister(ctx, promise);
}
}
最終 deRegister 事件會傳播至 pipeline 中的頭結(jié)點 HeadContext 中,并在 HeadContext 中完成底層 channel 取消注冊的操作。當(dāng) Channel 從 Reactor 上注銷之后,從此 Reactor 將不會在監(jiān)聽 Channel 上的 IO 事件,并觸發(fā) ChannelUnregistered 事件在 pipeline 中傳播。
3.2.5 connect 事件
在 Netty 的客戶端中我們可以利用 NioSocketChannel 的 connect 方法觸發(fā) connect 事件在 pipeline 中傳播。
//connect事件從當(dāng)前ChannelHandlerContext開始在pipeline中向前傳播
ctx.connect(remoteAddress);
//connect事件從pipeline的尾結(jié)點tailContext開始向前傳播
ctx.channel().connect(remoteAddress);
我們可以在 Outbound 類型的 ChannelHandler 中響應(yīng) connect 事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
.....客戶端channel連接成功之前的處理回調(diào).....
//繼續(xù)向前傳播connect事件
super.connect(ctx, remoteAddress, localAddress, promise);
}
}
最終 connect 事件會在 pipeline 中的頭結(jié)點 headContext 中觸發(fā)底層的連接建立請求。當(dāng)客戶端成功連接到服務(wù)端之后,會在客戶端 NioSocketChannel 的 pipeline 中傳播 channelActive 事件。
3.2.6 disConnect 事件
在 Netty 的客戶端中我們也可以調(diào)用 NioSocketChannel 的 disconnect 方法在 pipeline 中觸發(fā) disconnect 事件,這會導(dǎo)致 NioSocketChannel 的關(guān)閉。
//disconnect事件從當(dāng)前ChannelHandlerContext開始在pipeline中向前傳播
ctx.disconnect();
//disconnect事件從pipeline的尾結(jié)點tailContext開始向前傳播
ctx.channel().disconnect();
我們可以在 Outbound 類型的 ChannelHandler 中響應(yīng) disconnect 事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
.....客戶端channel即將關(guān)閉前的處理回調(diào).....
//繼續(xù)向前傳播disconnect事件
super.disconnect(ctx, promise);
}
}
最終 disconnect 事件會傳播到 HeadContext 中,并在 HeadContext 中完成底層的斷開連接操作,當(dāng)客戶端斷開連接成功關(guān)閉之后,會在 pipeline 中先后觸發(fā) ChannelInactive 事件和 ChannelUnregistered 事件。
本文是 pipeline 的上半部分內(nèi)容,主要講述了 pipeline 的結(jié)構(gòu)以及 各種異步 IO 事件。在下篇文章中,筆者會對 pipeline 的源碼實現(xiàn)做進(jìn)一步的剖析~~