Netty源碼分析----pipeline

(*文章基于Netty4.1.22版本)

介紹

Netty中隨著一個(gè)Channel的創(chuàng)建,會(huì)連帶創(chuàng)建一個(gè)ChannelPipeline,這個(gè)ChannelPipeline就像一個(gè)處理各種事件的管道,負(fù)責(zé)去處理Channel上發(fā)生的事件,例如連接事件,讀事件,寫事件等。
更深入的說,處理的并不是ChannelPipeline,而是ChannelPipeline中一個(gè)個(gè)的ChannelHandler,其結(jié)構(gòu)如下


image.png

ChannelPipeline中有很多Handler(其實(shí)是Context類型,Context封裝了Handler),組成了一個(gè)雙向的鏈表,同時(shí)初始化的時(shí)候就會(huì)帶有一個(gè)頭結(jié)點(diǎn)和尾結(jié)點(diǎn),自定義的ChannelHandler都會(huì)添加到Head和Tail之間。
同時(shí)Netty定義了兩種事件:

  • inbound:事件從Head往Tail方向傳遞,實(shí)現(xiàn)ChannelInboundHandler的ChannelHandler為處理inbound事件的ChannelHandler
  • outbound:事件從Tail往Head方向傳遞,實(shí)現(xiàn)ChannelOutboundHandler的ChannelHandler為處理inbound事件的ChannelHandler

ChannelHandler和ChannelHandlerContext

ChannelHandler

先看下ChannelHandler接口的定義

public interface ChannelHandler {
    // handler被添加進(jìn)pipeline的時(shí)候的回調(diào)方法
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    // handler從pipeline中移除的時(shí)候的回調(diào)方法
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}

另外兩個(gè)常用的接口如下,這兩個(gè)接口定義了Netty的兩種事件流:Inbound和Outbound

public interface ChannelOutboundHandler extends ChannelHandler {

    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}

public interface ChannelInboundHandler extends ChannelHandler {

    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    void channelActive(ChannelHandlerContext ctx) throws Exception;

    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}

接口不做解釋,從方法名稱可以知道每個(gè)方法大概關(guān)聯(lián)的操作,另外有一點(diǎn)比較重要的是,ChannelOutboundHandler和ChannelInboundHandler兩個(gè)劃分兩種事件流,而每個(gè)方法就代表了每種事件流下的事件,舉個(gè)例子來說,用戶調(diào)用的write、flush或者connect等都屬于Outbound事件,而且Outbound一般都是用戶觸發(fā)

ChannelHandlerContext

接來下看下ChannelHandlerContext接口的核心方法

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    // 返回對(duì)應(yīng)的Channel對(duì)象
    Channel channel();
    // 返回對(duì)應(yīng)的EventLoop對(duì)象
    EventExecutor executor();
    // 返回對(duì)應(yīng)的ChannelHandler對(duì)象
    ChannelHandler handler();
    // 該Context對(duì)應(yīng)的Handler是否從pipeline中移除
    boolean isRemoved();
    /*******************以下的fire方法都是觸發(fā)對(duì)應(yīng)事件在pipeline中傳播*********************/
    ChannelHandlerContext fireChannelRegistered();

    ChannelHandlerContext fireChannelUnregistered();

    ChannelHandlerContext fireChannelActive();

    ChannelHandlerContext fireChannelInactive();

    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    ChannelHandlerContext fireUserEventTriggered(Object evt);

    ChannelHandlerContext fireChannelRead(Object msg);

    ChannelHandlerContext fireChannelReadComplete();

    ChannelHandlerContext fireChannelWritabilityChanged();

    ChannelHandlerContext read();

    ChannelHandlerContext flush();
    // 返回對(duì)應(yīng)的pipeline
    ChannelPipeline pipeline();
}

從方法中可以看出,Context和Pipeline、Handler、Channel、EventLoop是一對(duì)一的關(guān)系。
方法定義中有很多fire方法,是觸發(fā)事件傳播的入口,那么也可以看出Context是一個(gè)觸發(fā)事件傳播的結(jié)構(gòu)

看下Pipeline里使用的DefaultChannelHandlerContext及其父類

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    // 返回封裝的Handler
    public ChannelHandler handler() {
        return handler;
    }
    // 通過handler繼承的類的類型判斷是Inbound還是Outbound
    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }
    // 通過handler繼承的類的類型判斷是Inbound還是Outbound
    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

父類代碼如下,只貼了部分代碼

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    // Context是pipeline中的一個(gè)節(jié)點(diǎn),是雙向鏈表,這里保存了兩個(gè)指針類型便于向前和向后遍歷
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;

    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
    //當(dāng)handlerAdded方法被調(diào)用的方式不是馬上調(diào)用的時(shí)候,會(huì)設(shè)置為該狀態(tài)
    // 例如pipeline中調(diào)用callHandlerCallbackLater或者在EventLoop的execute方法中執(zhí)行
    // 這種情況是延遲執(zhí)行,或者說不是馬上執(zhí)行,需要有個(gè)中間狀態(tài)
    private static final int ADD_PENDING = 1;
    // handlerAdded調(diào)用前設(shè)置的狀態(tài)
    private static final int ADD_COMPLETE = 2;
    // 當(dāng)節(jié)點(diǎn)被移除之后設(shè)置的狀態(tài)
    private static final int REMOVE_COMPLETE = 3;
    // 初始狀態(tài)
    private static final int INIT = 0;
    // handler的類型
    private final boolean inbound;
    private final boolean outbound;
    private final DefaultChannelPipeline pipeline;
    private final String name;
    private final boolean ordered;
    // 當(dāng)channelReadComplete、read、channelWritableStateChanged、flush4個(gè)四個(gè)事件發(fā)生的時(shí)候
    // 如果不在EventLoop的線程中,那么會(huì)轉(zhuǎn)換成Runnable對(duì)象放到EventLoop線程中處理
    private Runnable invokeChannelReadCompleteTask;
    private Runnable invokeReadTask;
    private Runnable invokeChannelWritableStateChangedTask;
    private Runnable invokeFlushTask;

    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

    @Override
    public Channel channel() {
        return pipeline.channel();
    }

    // 返回pipeline
    public ChannelPipeline pipeline() {
        return pipeline;
    }
    
    final EventExecutor executor;
    // 一般調(diào)用pipeline添加context的時(shí)候都沒傳這個(gè)參數(shù),所以為空
    // 如果為空,則獲取Channel的EventLoop
    // 而Channel和EventLoop綁定是在注冊(cè)時(shí)候,也就是說,在注冊(cè)完成前是該返回是空
    public EventExecutor executor() {
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return executor;
        }
    }
}

ChannelPipeline如何與ChannelHandler關(guān)聯(lián)

先看下ChannelPipeline接口的部分定義

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    ChannelPipeline addFirst(String name, ChannelHandler handler);

    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addLast(String name, ChannelHandler handler);

    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addFirst(ChannelHandler... handlers);

    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline addLast(ChannelHandler... handlers);

    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
}

通過方法名稱,大概可以看出其工作原理以及每個(gè)方法是做什么的

再看下Channel啟動(dòng)初始化的時(shí)候,默認(rèn)是DefaultChannelPipeline(從這里可以看出,Channel總是對(duì)應(yīng)一個(gè)pipeline)

    protected AbstractChannel(Channel parent) {
        //....
        pipeline = newChannelPipeline();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

那么看下DefaultChannelPipeline的對(duì)方法的實(shí)現(xiàn)

    protected DefaultChannelPipeline(Channel channel) {
        // 
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

默認(rèn)初始化了Head和Tail,并且將ChannelPipeline對(duì)應(yīng)的Channel也保存了下來。
接下來,以文章Netty源碼分析----服務(wù)啟動(dòng)之Channel初始化中的Netty的demo中的這句代碼為例,分析一下其中實(shí)現(xiàn)

socketChannel.pipeline().addLast(new NettyServerHandler());

這里將一個(gè)自定義的ChannelHandler加入到了ChannelPipeline中

    public final ChannelPipeline addLast(ChannelHandler handler) {
        return addLast(null, handler);
    }
    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);//判斷是否重復(fù)添加并設(shè)置add屬性為true
            // 將Handler封裝成AbstractChannelHandlerContext
            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);
            // 下面會(huì)觸發(fā)handlerAdded方法,根據(jù)不同情況判斷該以什么方式調(diào)用
            // 未注冊(cè)的話走Callback流程
            if (!registered) {
                // 將狀態(tài)設(shè)置成ADD_PENDING
                // 然后handlerAdded的調(diào)用將換成callback
                // 等注冊(cè)完的時(shí)候會(huì)調(diào)用callback,然后調(diào)用handlerAdded方法
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            EventExecutor executor = newCtx.executor();
            // 如果和EventLoop不是同個(gè)線程
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                // 將handlerAdded的調(diào)用放到隊(duì)列,等待EventLoop線程執(zhí)行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        // 如果是在EventLoop線程中,那么直接執(zhí)行
        callHandlerAdded0(newCtx);
        return this;
    }

newContext主要?jiǎng)?chuàng)建了一個(gè)DefaultChannelHandlerContext對(duì)象,構(gòu)造方法之前已經(jīng)描述過了。
callHandlerAdded0方法主要是將Context的狀態(tài)設(shè)置為ADD_COMPLETE和調(diào)用handlerAdded方法

addLast0方法的實(shí)現(xiàn):

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

邏輯很簡(jiǎn)單,將Context加入到Tail前面,鏈表的相關(guān)知識(shí),不再分析引用的變化過程。

callHandlerCallbackLater方法如下:

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        // PendingHandlerAddedTask是調(diào)用callHandlerAdded0方法
        // PendingHandlerRemovedTask是調(diào)用callHandlerRemoved0方法
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        // 方法鏈表中等待后續(xù)遍歷調(diào)用
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

其他方法也是類似的過程,此處省略

InBound事件在Pipeline中傳播

以注冊(cè)事件為例,在之前講過,注冊(cè)調(diào)用的是AbstractUnsafe的register0方法,其中有句代碼如下:

pipeline.fireChannelRegistered();

在這里就觸發(fā)了事件的傳播,看下其實(shí)現(xiàn)

    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }

直接調(diào)用了靜態(tài)方法,傳入Head,表示從Head開始傳播

   static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
       EventExecutor executor = next.executor();
       if (executor.inEventLoop()) {
           next.invokeChannelRegistered();
       } else {
           executor.execute(new Runnable() {
               @Override
               public void run() {
                   next.invokeChannelRegistered();
               }
           });
       }
   }

next是HeadContext,看下他的invokeChannelRegistered方法(實(shí)際在父類AbstractChannelHandlerContext中)

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

這里又調(diào)回了HeadContext的channelRegistered方法

        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }

invokeHandlerAddedIfNeeded方法這個(gè)在服務(wù)啟動(dòng)的文章中分析過,回調(diào)在Channel注冊(cè)前添加的Handler,該方法只會(huì)調(diào)用一次。
再看下fireChannelRegistered方法,這個(gè)實(shí)現(xiàn)在父類AbstractChannelHandlerContext中實(shí)現(xiàn),沒有傳入?yún)?shù),這個(gè)時(shí)候會(huì)去尋找下一個(gè)節(jié)點(diǎn),進(jìn)行調(diào)用

    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

先通過findContextInbound方法,遍歷鏈表,找到當(dāng)前Context后面第一個(gè)inbound類型的Context(在addLast方法中,將Handler封裝成Context對(duì)象的時(shí)候,就已經(jīng)將inbound賦值,具體看上面),然后再又調(diào)用invokeChannelRegistered方法,這時(shí)的參數(shù)就不是Head了,而是Head后一個(gè)節(jié)點(diǎn),這樣通過遞歸的方式往后調(diào)用,就形成了事件在Pipeline中的傳播。

  • 注意:傳播靠的是Handler中再調(diào)用一次fireChannelXX方法,這個(gè)方法會(huì)往后找合適的Handler進(jìn)行傳播

為了說明這個(gè)問題,我們看下Nettydemo中自定義的Handler
首先是先使用addLast方法添加一個(gè)自定義的Handler到pipeline中

    ch.pipeline().addLast(new StringDecoder()).addLast(new ServerHandler())

    private static class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            String body = (String)msg;
            System.out.println("receive body:"+body );
        }
    }

而Handler的邏輯也很簡(jiǎn)單,就是打印一下接收到的信息,結(jié)果很明顯,當(dāng)Client往Server發(fā)送了一條消息,控制臺(tái)就打印

receive body:XXXX

假設(shè),我們有多個(gè)自定義的Handler

ch.pipeline().addLast(new StringDecoder())
    .addLast(new ServerHandler()).addLast(new ServerHandler())

如上,我們添加了兩個(gè)自定義的Handler,那么我們是想事件依次通過兩個(gè)Handler進(jìn)行不同的處理(這里兩個(gè)Handler同樣的功能,只為說明問題),那么結(jié)果是Client發(fā)送一條消息,而Server打印兩次

  • 結(jié)果呢?

結(jié)果當(dāng)然肯定打印了一次啦,不然我寫那么多結(jié)果和預(yù)想一樣,不就是在湊字?jǐn)?shù)么=_=....

  • 那么怎么樣才可以讓多個(gè)Handler都執(zhí)行呢?

只需要在Handler最后加一句代碼就OK了

ctx.fireChannelRead(msg);

這個(gè)上面有分析過,會(huì)找到下一個(gè)對(duì)應(yīng)類型的Context然后調(diào)用。
所以我覺得Netty的Pipeline的Inbound傳播過程和下圖更像


image.png

上面例子中,那個(gè)問題就是"往后傳播"這個(gè)步驟漏了。而一些自帶的Handler,都會(huì)觸發(fā)這樣的步驟,所以添加多個(gè)也是可以一路處理到達(dá)你的Hadnler

注意:這里有一個(gè)問題,假設(shè)Context2處理完后繼續(xù)往后傳播,那么就會(huì)到了Tail,這會(huì)出現(xiàn)一個(gè)問題,以ChannelRead為例,看下Tail的實(shí)現(xiàn)

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            onUnhandledInboundMessage(msg);
        }
    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

Netty任務(wù)一個(gè)事件到達(dá)Tail,表示前面Pipeline中沒有正確的處理事件,并讓其無奈傳播到Tail,所以這里打印了一個(gè)日志,大概意思大家也懂

writeAndFlush/write與OutBound事件傳播的關(guān)系

上面講了InBound的事件傳播,InBound事件是IO線程觸發(fā)的事件,例如read,active等讀事件,而OutBound事件是用戶自己觸發(fā)的事件,例如Netty應(yīng)用中,最常用的就是

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // ....
            ctx.writeAndFlush(writeBuf);
        }

或者

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // ....
            Channel channel = ctx.channel();
            channel.writeAndFlush(writeBuf);
        }

這種write的方式,會(huì)觸發(fā)OutBound事件的傳播,下面來說一下是如何傳播的,且上面兩者write方式觸發(fā)的事件傳播的區(qū)別

ctx.writeAndFlush

該方法會(huì)調(diào)用到AbstractChannelHandlerContext.write(Object, boolean, ChannelPromise)方法,前面的和事件傳播無關(guān),暫時(shí)不看

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

在講Inbound事件傳播機(jī)制的時(shí)候,說過每次傳播會(huì)遍歷Pipeline中的Handler然后找到Inbound類型進(jìn)行調(diào)用,對(duì)于Outbound事件也是類似的,通過findContextOutbound去找到Outbound類型的事件

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

注意這里,從當(dāng)前的節(jié)點(diǎn)開始往前找,這個(gè)this是我們自定義的Handler,而之前的例子中,prev只有一個(gè)Head。
為了更能說明傳播流程,我在Demo中多加了一個(gè)Outbound類型的Handler

socketChannel.pipeline()
                            .addLast(new StringDecoder())
                            .addLast(new StringEncoder())
                            .addLast(new NettyServerHandler());

這時(shí)候,findContextOutbound找到的就是StringEncoder這個(gè)Outbound類型的Handler,
然后調(diào)用無論走哪個(gè)分支,調(diào)用AbstractChannelHandlerContext.invokeWrite0(Object, ChannelPromise)方法

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

由于第一個(gè)找到的Outbound類型的Handler是StringEncoder,那么看下其write方法

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        //....
                ctx.write(msg, promise);
        //....
    }

這個(gè)時(shí)候,又調(diào)用了write方法,然后又回到AbstractChannelHandlerContext.write(Object, boolean, ChannelPromise)方法,然后繼續(xù)調(diào)用findContextOutbound方法,而這時(shí)候的this是StringEncoder,所以找到的是HeadContext,然后再調(diào)用HeadContext的write方法,這樣形成一個(gè)遞歸的調(diào)用,Outbound事件就是這樣傳播的

Channel.writeAndFlush

看下Channel的writeAndFlush方法,其調(diào)用的是AbstractChannel方法

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }

pipiline的writeAndFlush方法如下:

    @Override 
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

總結(jié):直接使用Channel.writeAndFlush會(huì)從Tail開始傳播,而使用ctx.writeAndFlush則是從當(dāng)前Handler開始往前傳播

服務(wù)啟動(dòng)中涉及到的Pipeline的相關(guān)知識(shí)

看到服務(wù)啟動(dòng)分析的文章中,會(huì)有一些操作pipeline的代碼,可能一開始看的時(shí)候不太清楚流程,當(dāng)分析完pipeline后,這部分的內(nèi)容也可以充分的了解了

總結(jié)

一個(gè)Channel在創(chuàng)建的時(shí)候就會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的ChannelPipeline,
通過上面的分析,可以看到ChannelPipeline的設(shè)計(jì)是線程安全的,有很多地方的操作就是為了這個(gè)線程安全做了很多的操作,例如addLast調(diào)用handlerAdded會(huì)轉(zhuǎn)換成EventLoop隊(duì)列任務(wù),Netty中很多地方都是類似的,為了避免這種多線程操作的問題都是先轉(zhuǎn)成隊(duì)列的任務(wù),從而轉(zhuǎn)換成單線程的操作,這種設(shè)計(jì)需要好好琢磨

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容