(*文章基于Netty4.1.22版本)
介紹
Netty中隨著一個(gè)Channel的創(chuàng)建,會(huì)連帶創(chuàng)建一個(gè)ChannelPipeline,這個(gè)ChannelPipeline就像一個(gè)處理各種事件的管道,負(fù)責(zé)去處理Channel上發(fā)生的事件,例如連接事件,讀事件,寫事件等。
更深入的說,處理的并不是ChannelPipeline,而是ChannelPipeline中一個(gè)個(gè)的ChannelHandler,其結(jié)構(gòu)如下

ChannelPipeline中有很多Handler(其實(shí)是Context類型,Context封裝了Handler),組成了一個(gè)雙向的鏈表,同時(shí)初始化的時(shí)候就會(huì)帶有一個(gè)頭結(jié)點(diǎn)和尾結(jié)點(diǎn),自定義的ChannelHandler都會(huì)添加到Head和Tail之間。
同時(shí)Netty定義了兩種事件:
- inbound:事件從Head往Tail方向傳遞,實(shí)現(xiàn)ChannelInboundHandler的ChannelHandler為處理inbound事件的ChannelHandler
- outbound:事件從Tail往Head方向傳遞,實(shí)現(xiàn)ChannelOutboundHandler的ChannelHandler為處理inbound事件的ChannelHandler
ChannelHandler和ChannelHandlerContext
ChannelHandler
先看下ChannelHandler接口的定義
public interface ChannelHandler {
// handler被添加進(jìn)pipeline的時(shí)候的回調(diào)方法
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// handler從pipeline中移除的時(shí)候的回調(diào)方法
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}
另外兩個(gè)常用的接口如下,這兩個(gè)接口定義了Netty的兩種事件流:Inbound和Outbound
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}
接口不做解釋,從方法名稱可以知道每個(gè)方法大概關(guān)聯(lián)的操作,另外有一點(diǎn)比較重要的是,ChannelOutboundHandler和ChannelInboundHandler兩個(gè)劃分兩種事件流,而每個(gè)方法就代表了每種事件流下的事件,舉個(gè)例子來說,用戶調(diào)用的write、flush或者connect等都屬于Outbound事件,而且Outbound一般都是用戶觸發(fā)
ChannelHandlerContext
接來下看下ChannelHandlerContext接口的核心方法
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
// 返回對(duì)應(yīng)的Channel對(duì)象
Channel channel();
// 返回對(duì)應(yīng)的EventLoop對(duì)象
EventExecutor executor();
// 返回對(duì)應(yīng)的ChannelHandler對(duì)象
ChannelHandler handler();
// 該Context對(duì)應(yīng)的Handler是否從pipeline中移除
boolean isRemoved();
/*******************以下的fire方法都是觸發(fā)對(duì)應(yīng)事件在pipeline中傳播*********************/
ChannelHandlerContext fireChannelRegistered();
ChannelHandlerContext fireChannelUnregistered();
ChannelHandlerContext fireChannelActive();
ChannelHandlerContext fireChannelInactive();
ChannelHandlerContext fireExceptionCaught(Throwable cause);
ChannelHandlerContext fireUserEventTriggered(Object evt);
ChannelHandlerContext fireChannelRead(Object msg);
ChannelHandlerContext fireChannelReadComplete();
ChannelHandlerContext fireChannelWritabilityChanged();
ChannelHandlerContext read();
ChannelHandlerContext flush();
// 返回對(duì)應(yīng)的pipeline
ChannelPipeline pipeline();
}
從方法中可以看出,Context和Pipeline、Handler、Channel、EventLoop是一對(duì)一的關(guān)系。
方法定義中有很多fire方法,是觸發(fā)事件傳播的入口,那么也可以看出Context是一個(gè)觸發(fā)事件傳播的結(jié)構(gòu)
看下Pipeline里使用的DefaultChannelHandlerContext及其父類
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
// 返回封裝的Handler
public ChannelHandler handler() {
return handler;
}
// 通過handler繼承的類的類型判斷是Inbound還是Outbound
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
// 通過handler繼承的類的類型判斷是Inbound還是Outbound
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
父類代碼如下,只貼了部分代碼
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
// Context是pipeline中的一個(gè)節(jié)點(diǎn),是雙向鏈表,這里保存了兩個(gè)指針類型便于向前和向后遍歷
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
//當(dāng)handlerAdded方法被調(diào)用的方式不是馬上調(diào)用的時(shí)候,會(huì)設(shè)置為該狀態(tài)
// 例如pipeline中調(diào)用callHandlerCallbackLater或者在EventLoop的execute方法中執(zhí)行
// 這種情況是延遲執(zhí)行,或者說不是馬上執(zhí)行,需要有個(gè)中間狀態(tài)
private static final int ADD_PENDING = 1;
// handlerAdded調(diào)用前設(shè)置的狀態(tài)
private static final int ADD_COMPLETE = 2;
// 當(dāng)節(jié)點(diǎn)被移除之后設(shè)置的狀態(tài)
private static final int REMOVE_COMPLETE = 3;
// 初始狀態(tài)
private static final int INIT = 0;
// handler的類型
private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;
// 當(dāng)channelReadComplete、read、channelWritableStateChanged、flush4個(gè)四個(gè)事件發(fā)生的時(shí)候
// 如果不在EventLoop的線程中,那么會(huì)轉(zhuǎn)換成Runnable對(duì)象放到EventLoop線程中處理
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
@Override
public Channel channel() {
return pipeline.channel();
}
// 返回pipeline
public ChannelPipeline pipeline() {
return pipeline;
}
final EventExecutor executor;
// 一般調(diào)用pipeline添加context的時(shí)候都沒傳這個(gè)參數(shù),所以為空
// 如果為空,則獲取Channel的EventLoop
// 而Channel和EventLoop綁定是在注冊(cè)時(shí)候,也就是說,在注冊(cè)完成前是該返回是空
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
}
ChannelPipeline如何與ChannelHandler關(guān)聯(lián)
先看下ChannelPipeline接口的部分定義
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
}
通過方法名稱,大概可以看出其工作原理以及每個(gè)方法是做什么的
再看下Channel啟動(dòng)初始化的時(shí)候,默認(rèn)是DefaultChannelPipeline(從這里可以看出,Channel總是對(duì)應(yīng)一個(gè)pipeline)
protected AbstractChannel(Channel parent) {
//....
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
那么看下DefaultChannelPipeline的對(duì)方法的實(shí)現(xiàn)
protected DefaultChannelPipeline(Channel channel) {
//
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
默認(rèn)初始化了Head和Tail,并且將ChannelPipeline對(duì)應(yīng)的Channel也保存了下來。
接下來,以文章Netty源碼分析----服務(wù)啟動(dòng)之Channel初始化中的Netty的demo中的這句代碼為例,分析一下其中實(shí)現(xiàn)
socketChannel.pipeline().addLast(new NettyServerHandler());
這里將一個(gè)自定義的ChannelHandler加入到了ChannelPipeline中
public final ChannelPipeline addLast(ChannelHandler handler) {
return addLast(null, handler);
}
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);//判斷是否重復(fù)添加并設(shè)置add屬性為true
// 將Handler封裝成AbstractChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// 下面會(huì)觸發(fā)handlerAdded方法,根據(jù)不同情況判斷該以什么方式調(diào)用
// 未注冊(cè)的話走Callback流程
if (!registered) {
// 將狀態(tài)設(shè)置成ADD_PENDING
// 然后handlerAdded的調(diào)用將換成callback
// 等注冊(cè)完的時(shí)候會(huì)調(diào)用callback,然后調(diào)用handlerAdded方法
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
// 如果和EventLoop不是同個(gè)線程
if (!executor.inEventLoop()) {
newCtx.setAddPending();
// 將handlerAdded的調(diào)用放到隊(duì)列,等待EventLoop線程執(zhí)行
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
// 如果是在EventLoop線程中,那么直接執(zhí)行
callHandlerAdded0(newCtx);
return this;
}
newContext主要?jiǎng)?chuàng)建了一個(gè)DefaultChannelHandlerContext對(duì)象,構(gòu)造方法之前已經(jīng)描述過了。
callHandlerAdded0方法主要是將Context的狀態(tài)設(shè)置為ADD_COMPLETE和調(diào)用handlerAdded方法
addLast0方法的實(shí)現(xiàn):
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
邏輯很簡(jiǎn)單,將Context加入到Tail前面,鏈表的相關(guān)知識(shí),不再分析引用的變化過程。
callHandlerCallbackLater方法如下:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
// PendingHandlerAddedTask是調(diào)用callHandlerAdded0方法
// PendingHandlerRemovedTask是調(diào)用callHandlerRemoved0方法
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
// 方法鏈表中等待后續(xù)遍歷調(diào)用
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
其他方法也是類似的過程,此處省略
InBound事件在Pipeline中傳播
以注冊(cè)事件為例,在之前講過,注冊(cè)調(diào)用的是AbstractUnsafe的register0方法,其中有句代碼如下:
pipeline.fireChannelRegistered();
在這里就觸發(fā)了事件的傳播,看下其實(shí)現(xiàn)
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
直接調(diào)用了靜態(tài)方法,傳入Head,表示從Head開始傳播
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
next是HeadContext,看下他的invokeChannelRegistered方法(實(shí)際在父類AbstractChannelHandlerContext中)
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
這里又調(diào)回了HeadContext的channelRegistered方法
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
invokeHandlerAddedIfNeeded方法這個(gè)在服務(wù)啟動(dòng)的文章中分析過,回調(diào)在Channel注冊(cè)前添加的Handler,該方法只會(huì)調(diào)用一次。
再看下fireChannelRegistered方法,這個(gè)實(shí)現(xiàn)在父類AbstractChannelHandlerContext中實(shí)現(xiàn),沒有傳入?yún)?shù),這個(gè)時(shí)候會(huì)去尋找下一個(gè)節(jié)點(diǎn),進(jìn)行調(diào)用
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
先通過findContextInbound方法,遍歷鏈表,找到當(dāng)前Context后面第一個(gè)inbound類型的Context(在addLast方法中,將Handler封裝成Context對(duì)象的時(shí)候,就已經(jīng)將inbound賦值,具體看上面),然后再又調(diào)用invokeChannelRegistered方法,這時(shí)的參數(shù)就不是Head了,而是Head后一個(gè)節(jié)點(diǎn),這樣通過遞歸的方式往后調(diào)用,就形成了事件在Pipeline中的傳播。
- 注意:傳播靠的是Handler中再調(diào)用一次fireChannelXX方法,這個(gè)方法會(huì)往后找合適的Handler進(jìn)行傳播
為了說明這個(gè)問題,我們看下Nettydemo中自定義的Handler
首先是先使用addLast方法添加一個(gè)自定義的Handler到pipeline中
ch.pipeline().addLast(new StringDecoder()).addLast(new ServerHandler())
private static class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
String body = (String)msg;
System.out.println("receive body:"+body );
}
}
而Handler的邏輯也很簡(jiǎn)單,就是打印一下接收到的信息,結(jié)果很明顯,當(dāng)Client往Server發(fā)送了一條消息,控制臺(tái)就打印
receive body:XXXX
假設(shè),我們有多個(gè)自定義的Handler
ch.pipeline().addLast(new StringDecoder())
.addLast(new ServerHandler()).addLast(new ServerHandler())
如上,我們添加了兩個(gè)自定義的Handler,那么我們是想事件依次通過兩個(gè)Handler進(jìn)行不同的處理(這里兩個(gè)Handler同樣的功能,只為說明問題),那么結(jié)果是Client發(fā)送一條消息,而Server打印兩次
- 結(jié)果呢?
結(jié)果當(dāng)然肯定打印了一次啦,不然我寫那么多結(jié)果和預(yù)想一樣,不就是在湊字?jǐn)?shù)么=_=....
- 那么怎么樣才可以讓多個(gè)Handler都執(zhí)行呢?
只需要在Handler最后加一句代碼就OK了
ctx.fireChannelRead(msg);
這個(gè)上面有分析過,會(huì)找到下一個(gè)對(duì)應(yīng)類型的Context然后調(diào)用。
所以我覺得Netty的Pipeline的Inbound傳播過程和下圖更像

上面例子中,那個(gè)問題就是"往后傳播"這個(gè)步驟漏了。而一些自帶的Handler,都會(huì)觸發(fā)這樣的步驟,所以添加多個(gè)也是可以一路處理到達(dá)你的Hadnler
注意:這里有一個(gè)問題,假設(shè)Context2處理完后繼續(xù)往后傳播,那么就會(huì)到了Tail,這會(huì)出現(xiàn)一個(gè)問題,以ChannelRead為例,看下Tail的實(shí)現(xiàn)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
Netty任務(wù)一個(gè)事件到達(dá)Tail,表示前面Pipeline中沒有正確的處理事件,并讓其無奈傳播到Tail,所以這里打印了一個(gè)日志,大概意思大家也懂
writeAndFlush/write與OutBound事件傳播的關(guān)系
上面講了InBound的事件傳播,InBound事件是IO線程觸發(fā)的事件,例如read,active等讀事件,而OutBound事件是用戶自己觸發(fā)的事件,例如Netty應(yīng)用中,最常用的就是
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ....
ctx.writeAndFlush(writeBuf);
}
或者
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ....
Channel channel = ctx.channel();
channel.writeAndFlush(writeBuf);
}
這種write的方式,會(huì)觸發(fā)OutBound事件的傳播,下面來說一下是如何傳播的,且上面兩者write方式觸發(fā)的事件傳播的區(qū)別
ctx.writeAndFlush
該方法會(huì)調(diào)用到AbstractChannelHandlerContext.write(Object, boolean, ChannelPromise)方法,前面的和事件傳播無關(guān),暫時(shí)不看
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
在講Inbound事件傳播機(jī)制的時(shí)候,說過每次傳播會(huì)遍歷Pipeline中的Handler然后找到Inbound類型進(jìn)行調(diào)用,對(duì)于Outbound事件也是類似的,通過findContextOutbound去找到Outbound類型的事件
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
注意這里,從當(dāng)前的節(jié)點(diǎn)開始往前找,這個(gè)this是我們自定義的Handler,而之前的例子中,prev只有一個(gè)Head。
為了更能說明傳播流程,我在Demo中多加了一個(gè)Outbound類型的Handler
socketChannel.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new NettyServerHandler());
這時(shí)候,findContextOutbound找到的就是StringEncoder這個(gè)Outbound類型的Handler,
然后調(diào)用無論走哪個(gè)分支,調(diào)用AbstractChannelHandlerContext.invokeWrite0(Object, ChannelPromise)方法
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
由于第一個(gè)找到的Outbound類型的Handler是StringEncoder,那么看下其write方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//....
ctx.write(msg, promise);
//....
}
這個(gè)時(shí)候,又調(diào)用了write方法,然后又回到AbstractChannelHandlerContext.write(Object, boolean, ChannelPromise)方法,然后繼續(xù)調(diào)用findContextOutbound方法,而這時(shí)候的this是StringEncoder,所以找到的是HeadContext,然后再調(diào)用HeadContext的write方法,這樣形成一個(gè)遞歸的調(diào)用,Outbound事件就是這樣傳播的
Channel.writeAndFlush
看下Channel的writeAndFlush方法,其調(diào)用的是AbstractChannel方法
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
pipiline的writeAndFlush方法如下:
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
總結(jié):直接使用Channel.writeAndFlush會(huì)從Tail開始傳播,而使用ctx.writeAndFlush則是從當(dāng)前Handler開始往前傳播
服務(wù)啟動(dòng)中涉及到的Pipeline的相關(guān)知識(shí)
看到服務(wù)啟動(dòng)分析的文章中,會(huì)有一些操作pipeline的代碼,可能一開始看的時(shí)候不太清楚流程,當(dāng)分析完pipeline后,這部分的內(nèi)容也可以充分的了解了
總結(jié)
一個(gè)Channel在創(chuàng)建的時(shí)候就會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的ChannelPipeline,
通過上面的分析,可以看到ChannelPipeline的設(shè)計(jì)是線程安全的,有很多地方的操作就是為了這個(gè)線程安全做了很多的操作,例如addLast調(diào)用handlerAdded會(huì)轉(zhuǎn)換成EventLoop隊(duì)列任務(wù),Netty中很多地方都是類似的,為了避免這種多線程操作的問題都是先轉(zhuǎn)成隊(duì)列的任務(wù),從而轉(zhuǎn)換成單線程的操作,這種設(shè)計(jì)需要好好琢磨