netty源碼分析 - ChannelHandler

系列

Netty源碼分析 - Bootstrap服務端
Netty源碼分析 - Bootstrap客戶端
netty源碼分析 - ChannelHandler
netty源碼分析 - EventLoop類關系
netty源碼分析 - register分析
Netty源碼分析 - NioEventLoop事件處理
netty源碼分析 - accept過程分析
Netty源碼分析 - ByteBuf
Netty源碼分析 - 粘包和拆包問題


開篇

  • 本文基于netty-4.1.8.Final版本進行分析,主要是分析Netty Server初始化過程。
  • 建議先參考Netty源碼分析 - Bootstrap客戶端文章。
  • 核心點在于能夠理解pipeline的串行調用的執(zhí)行過程。


基本概念

pipeline
  • channel、pipeline、context、handler的關系圖如上所示,handler由context進行封裝,由雙向鏈表的數據結構進行連接。

  • 當Channel對象在構造的時候會同時創(chuàng)建一個ChannelPipeline對象,兩個對象相互關聯,是一對一的關系,ChannelPipeline不會被多個Channel共享。

  • ChannelPipeline對象創(chuàng)建之后會調用它的各種添加handler的方法向鏈中加入ChannelHandler對象,而在加入ChannelHandler對象的同時,會自動給每個ChannelHandler包裝一個ChannelHandlerContext對象。

  • ChannelHandlerContext是ChannelHandler的上下文信息,它使得ChannelHandler可以和ChannelPipeline以及其它的ChannelHandler對象進行交互操作。通過ChannelHandlerContext對象,ChannelHandler可以通知同一個pipeline中的其他ChannelHandler,也可以在運行時動態(tài)改變ChannelPipeline中的內容。

輸入消息處理

  • 當輸入消息觸發(fā)的時候,例如registred,active,read或readComplete等輸入的消息觸發(fā)的時候,會通過Channel調用對應的ChannelPipeline的對應方法來處理,輸入消息會首先通過head找到下一個ChanneInbountHandler來處理輸入消息,然后逐一傳遞到下一個ChanneInbountHandler消息,直至到最后一個內置的tail處理器。

輸出消息處理

  • 當輸入消息觸發(fā)的時候,例如bind,connect,write等輸出消息觸發(fā)的時候,會通過Channel調用對應的ChannelPipeline的對應方法來處理,輸入消息會首先通過tail找到下一個ChanneOutbountHandler來處理輸入消息,然后逐一傳遞到下一個ChanneOutbountHandler消息,直至到最后一個內置的head處理器。

pipeline的handler添加過程

  • pipeline的hanndler添加過程分為兩個階段,第一個階段為添加ChannelInitializer對應的handler,在Netty Client/Server在初始化channel的時候會執(zhí)行;第二階段為執(zhí)行ChannelInitializer對象內部方法initChannel()的時候添加對應handler,在注冊channel的時候會執(zhí)行。


DiscardClient例子

public final class DiscardClient {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
              // 綁定handler對象
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     p.addLast(new DiscardClientHandler());
                 }
             });

            // Make the connection attempt.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}


handler添加過程

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1. newChannel創(chuàng)建對應的channel對象
            channel = channelFactory.newChannel();
            // 2. init真正執(zhí)行的是Bootstrap的init()方法
            init(channel);
        } catch (Throwable t) {
          // 省略代碼
        }
        // 3. 執(zhí)行register()動作
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }
}
  • AbstractBootstrap#initAndRegister執(zhí)行操作三部曲:創(chuàng)建channel,初始化channel,注冊channel。
  • channelFactory.newChannel()創(chuàng)建channel。
  • init(channel)初始化channel,init(channel)在子類Bootstrap被重寫。
  • config().group().register(channel)注冊channel。


init過程

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        // 綁定ChannelInitializer到ChannelPipeline對象當中
        p.addLast(config.handler());

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
}
  • Bootstrap#init的核心操作在于將Bootstrap的handler添加到channel對應的pipeline當中。
  • config.handler()返回的handler是DiscardClient中handler(new ChannelInitializer<SocketChannel>())綁定的handler對象。
  • 劃重點,在這里將ChannelHandler對象ChannelInitializer添加對應的pipeline當中


public class DefaultChannelPipeline implements ChannelPipeline {

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            // 1.channelHandler包裝成DefaultChannelHandlerContext對象
            newCtx = newContext(group, filterName(name, handler), handler);
            // 2.添加DefaultChannelHandlerContext對象到pipeline當中。
            addLast0(newCtx);
            // 3.針對未注冊的邏輯添加回調函數callHandlerCallbackLater
            if (!registered) {
                newCtx.setAddPending();
                // 添加回調函數callHandlerCallbackLater
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

        // 省略代碼
        return this;
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
}
  • ChannelHandler添加到pipeline流程包含3個步驟:創(chuàng)建HandlerContext對象;添加HandlerContext對象到pipeline當中;注冊callHandlerCallbackLater的回調task。
  • 創(chuàng)建HandlerContext對象,newContext(group, filterName(name, handler), handler)。
  • 添加HandlerContext對象到pipeline中,addLast0(newCtx)。
  • 注冊callHandlerCallbackLater的回調task,callHandlerCallbackLater(newCtx, true),入參參數為newCtx(封裝channelHandler對象)。
  • Channel對象初始化后pipeline的狀態(tài)如上圖所示,增加了ChannelInitializer的這個handler對象。


register過程

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected abstract class AbstractUnsafe implements Unsafe {

        private void register0(ChannelPromise promise) {
            try {
                // 根據neverRegistered的標識判斷是否執(zhí)行invokeHandlerAddedIfNeeded。
                boolean firstRegistration = neverRegistered;
                // 1.執(zhí)行channel到eventLoop的綁定動作
                doRegister();
                neverRegistered = false;
                registered = true;
                // 2.執(zhí)行ChannelInitializer的初始化動作
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);

                // 3.執(zhí)行pipeline的fireChannelRegistered()方法
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        // 4.第一次注冊會執(zhí)行pipeline.fireChannelActive()的操作。
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
            }
        }
    }
}
  • channel的注冊過程包含4部曲,分別是綁定channel到eventLoop;執(zhí)行invokeHandlerAddedIfNeeded動作;執(zhí)行fireChannelRegistered動作;執(zhí)行fireChannelActive動作。
  • invokeHandlerAddedIfNeeded負責回調init過程中生成的PendingHandlerCallback對象,會執(zhí)行ChannelInitializer對應的handler的initChannel方法。
  • fireChannelRegistered動作觸發(fā)channel的狀態(tài)變?yōu)镃hannelRegistered。
  • fireChannelActive動作觸發(fā)channel的狀態(tài)變?yōu)镃hannelActive。


invokeHandlerAddedIfNeeded

public class DefaultChannelPipeline implements ChannelPipeline {

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
            // that were added before the registration was done.
            callHandlerAddedForAllHandlers();
        }
    }

    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            this.pendingHandlerCallbackHead = null;
        }

        // task是PendingHandlerAddedTask
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

    private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        // PendingHandlerAddedTask的入參是ChannelInitializer對應的ctx對象
        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }
    }

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // ctx.handler()返回的是ChannelInitializer對象
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
        }
    }
}
  • invokeHandlerAddedIfNeeded的執(zhí)行按照下面的順序進行執(zhí)行 invokeHandlerAddedIfNeeded => callHandlerAddedForAllHandlers => PendingHandlerAddedTask#execute => PendingHandlerAddedTask#callHandlerAdded0 => ChannelInitializer#handlerAdded。
  • ctx.handler()返回的是ChannelInitializer的handler對象。

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                // initChannel會添加新的handler
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
            } finally {
                // 移除ChannelInitializer對應的handler。
                remove(ctx);
            }
            return true;
        }
        return false;
    }

    private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }
}
  • ChannelInitializer#handlerAdded執(zhí)行對應的initChannel()方法,完成ChannelInitializer的initChannel()方法并添加新的handler。
  • remove(ctx)會移除ChannelInitializer對應的handler。
  • 執(zhí)行ChannelInitializer#initChannel添加新handler同時移除ChannelInitializer自身handler的pipeline如上圖。


輸入消息處理流程

ChannelInboundHandler
  • channelRegistered 注冊事件,channel注冊到EventLoop上后調用,例如服務崗啟動時,pipeline.fireChannelRegistered();
  • channelUnregistered 注銷事件,channel從EventLoop上注銷后調用,例如關閉連接成功后,pipeline.fireChannelUnregistered();
  • channelActive 激活事件,綁定端口成功后調用,pipeline.fireChannelActive();
  • channelInactive非激活事件,連接關閉后調用,pipeline.fireChannelInactive();
  • channelRead 讀事件,channel有數據時調用,pipeline.fireChannelRead();
  • channelReadComplete 讀完事件,channel讀完之后調用,pipeline.fireChannelReadComplete();
  • channelWritabilityChanged 可寫狀態(tài)變更事件,當一個Channel的可寫的狀態(tài)發(fā)生改變的時候執(zhí)行,可以保證寫的操作不要太快,防止OOM,pipeline.fireChannelWritabilityChanged();
  • userEventTriggered 用戶事件觸發(fā),例如心跳檢測,ctx.fireUserEventTriggered(evt);
  • exceptionCaught 異常事件說明:我們可以看出,Inbound事件都是由I/O線程觸發(fā),用戶實現部分關注的事件被動調用。
ChannelInboundInvoker
  • ChannelInboundInvoker負責fire上述的各類事件。


pipeline.fireChannelRead()讀事件流程

public final class NioEventLoop extends SingleThreadEventLoop {
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

        // 省略相關代碼
        try {
            // 省略相關代碼
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
}
  • NioEventLoop關注OP_READ|OP_ACCEPT兩類事件,都屬于輸入事件類型。
  • 進入unsafe.read()進入讀取操作,unsafe為AbstractNioByteChannel#NioByteUnsafe。


public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    protected class NioByteUnsafe extends AbstractNioUnsafe {
        public final void read() {
            // 省略相關代碼
            try {
                do {
                    pipeline.fireChannelRead(byteBuf);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
            } finally {
            }
        }
    }
}


public class DefaultChannelPipeline implements ChannelPipeline {

    public final ChannelPipeline fireChannelRead(Object msg) {
        // 從pipeline對象當中的head開始執(zhí)行遍歷
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
}


abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
        }
    }
}
  • AbstractNioByteChannel#NioByteUnsafe的read觸發(fā)pipeline.fireChannelRead()操作,進入pipeline的事件觸發(fā)流程,參數head表示從pipeline的head開始遍歷。
  • AbstractChannelHandlerContext#invokeChannelRead屬于靜態(tài)方法,個人認為是AbstractChannelHandlerContext提供的為ChannelContext執(zhí)行的入口。
  • AbstractChannelHandlerContext#invokeChannelRead方法內部執(zhí)行next.invokeChannelRead進行了Context的執(zhí)行域,第一次Context為HeadContext對象。


abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // handler()返回Context對象,第一次返回HeadContext對象。
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
}

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
}
  • ((ChannelInboundHandler) handler()).channelRead(this, msg)的handler()方法返回的是Context對象本身,第一次表示為HeadContext對象。
  • HeadContext#channelRead表示該handler的處理邏輯(只是這里沒有任何處理邏輯),然后通過ctx.fireChannelRead(msg)喚醒當前Context對象下的下一個Context對象。
  • 喚醒當前Context對象下的下一個Context對象的處理邏輯統(tǒng)一在AbstractChannelHandlerContext當中實現的。


abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // findContextInbound尋找當前Context的下一個Context對象
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    //  findContextInbound尋找當前Context的下一個Context對象
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    // 進入到轉為下一個Context的執(zhí)行邏輯的核心函數
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 下一個待執(zhí)行的Context對象
            next.invokeChannelRead(m);
        } else {
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // handler()返回Context對象
                // channelRead被重載執(zhí)行邏輯,需要在實現中再次執(zhí)行ctx.fireChannelRead(msg);
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
}
  • AbstractChannelHandlerContext作為Context的父類,其核心邏輯在于負責尋找下一個Context對象并開始下一個Context對象的執(zhí)行邏輯。
  • AbstractChannelHandlerContext#fireChannelRead負責查找下一個Context對象并通過AbstractChannelHandlerContext#invokeChannelRead實現當前Context到下一個Context執(zhí)行的交接。
  • AbstractChannelHandlerContext#invokeChannelRead內部執(zhí)行已經是下一個Context對象的channelRead方法。
    -AbstractChannelHandlerContext作為Context的父類,提供了通用的查詢Context、執(zhí)行Context的方法,本質這些邏輯都是在前一個Context對象的方法中執(zhí)行,然后通過查詢下一個Context對象進行切換。


public class LoggingHandler extends ChannelDuplexHandler {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "RECEIVED", msg));
        }
        ctx.fireChannelRead(msg);
    }
}
  • 以LoggingHandler為例,channelRead()內部負責執(zhí)行本職的log功能,同時通過ctx.fireChannelRead(msg)繼續(xù)觸發(fā)下一個Handler的操作。
  • 所有的Handler操作本職都是方法的串行。


輸出消息處理流程

ChannelOutboundHandler
  • bind 事件,綁定端口。
  • close事件,關閉channel。
  • connect事件,用于客戶端,連接一個遠程機器。
  • disconnect事件,用于客戶端,關閉遠程連接。
  • deregister事件,用于客戶端,在執(zhí)行斷開連接disconnect操作后調用,將channel從EventLoop中注銷。
  • read事件,用于新接入連接時,注冊成功多路復用器上后,修改監(jiān)聽為OP_READ操作位。
  • write事件,向通道寫數據。
  • flush事件,將通道排隊的數據刷新到遠程機器上。
ChannelOutboundInvoker
  • ChannelOutboundInvoker負責處理上述各類事件。


DefaultChannelPipeline#write

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    public ChannelFuture write(Object msg) {
        // 進入pipeline的處理流程
        return pipeline.write(msg);
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
    public final ChannelFuture write(Object msg) {
        // 從tail開始執(zhí)行
        return tail.write(msg);
    }
}
  • DefaultChannelPipeline#write從pipeline的tail開始進行訪問。
  • tail.write()會調用父類AbstractChannelHandlerContext#write開始pipeline的handler的調用。


abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    // 負責從tail開始往head進行遍歷,查找輸出事件的handler
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        // 省略代碼
        write(msg, false, promise);

        return promise;
    }

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 從tail開始遍歷查找下一個Context進行操作
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                // 執(zhí)行Context對象的invokeWrite方法
                next.invokeWrite(m, promise);
            }
        } else {
        }
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            // 執(zhí)行handler自己重載的write方法
            invokeWrite0(msg, promise);
        } else {
            // 執(zhí)行handler自己重載的write方法或者直接使用父類的write方法
            write(msg, promise);
        }
    }

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            // 參考LoggingHandler為例進行說明
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
}
  • AbstractChannelHandlerContext#write通過findContextOutbound()查找tail的前一個輸出事件的handler。
  • AbstractChannelHandlerContext按照write => invokeWrite => invokeWrite0的順序進行調用,在invokeWrite0的內部執(zhí)行實際業(yè)務handler的write()方法。
  • ((ChannelOutboundHandler) handler())返回實際的handler對象,如LoggingHandler。
  • 實際handler對象如LoggingHandler內部會通過ctx.write(msg, promise)重新開始pipeline的handler對象的尋找,完成一次傳遞調用。


public class LoggingHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "WRITE", msg));
        }
        ctx.write(msg, promise);
    }
}


ChannelPipeline順序

ChannelPipeline
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+



 * For example, let us assume that we created the following pipeline:
 * <pre>
 * {@link ChannelPipeline} p = ...;
 * p.addLast("1", new InboundHandlerA());
 * p.addLast("2", new InboundHandlerB());
 * p.addLast("3", new OutboundHandlerA());
 * p.addLast("4", new OutboundHandlerB());
 * p.addLast("5", new InboundOutboundHandlerX());
 * </pre>
 * In the example above, the class whose name starts with {@code Inbound} means it is an inbound handler.
 * The class whose name starts with {@code Outbound} means it is a outbound handler.
 * <p>
 * In the given example configuration, the handler evaluation 
 * order is 1, 2, 3, 4, 5 when an event goes inbound.
 * When an event goes outbound, the order is 5, 4, 3, 2, 1. 


參考

Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端)
netty源碼分析系列——ChannelHandler系列

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容