第十七節(jié) netty源碼分析之pipeline的來(lái)源

pipeline的來(lái)源

  • 繼續(xù)跟蹤源碼EchoClient中得Bootstrap類(lèi)。
    根據(jù)前面得分析總結(jié)如下過(guò)程:
Bootstrap中channel(NioSocketChannel.class)
-》NioSocketChannel的初始化時(shí)創(chuàng)建-》NioSocketChannel this(DEFAULT_SELECTOR_PROVIDER)
-》NioSocketChannel中this(newSocket(provider)) newSocket方法中的參數(shù)provider會(huì)根據(jù)操作系統(tǒng)選擇不同的SelectorProvider(SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();)。 創(chuàng)建socketchannel方法provider.openSocketChannel(),SelectorProvider的用法可參考nio中的
-》繼續(xù)調(diào)用構(gòu)造器this(null, socket);其中的socket為newSocket方法創(chuàng)建的
-》接下來(lái)調(diào)用父類(lèi)的構(gòu)造器和初始化cofnig

查看NioSocketChannel的構(gòu)造代碼:

 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

-》 轉(zhuǎn)到父類(lèi)的構(gòu)造器中

   /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     */
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
--》 繼續(xù)調(diào)用父類(lèi)構(gòu)造器會(huì)發(fā)現(xiàn)又一次掉用父類(lèi)的構(gòu)造器 super(parent);
--》 下面找到我們費(fèi)盡心機(jī)的pipeline的創(chuàng)建如下:
    /**
    * Creates a new instance.
    *
    * @param parent
    *        the parent of this channel. {@code null} if there's no parent.
    */
   protected AbstractChannel(Channel parent) {
       this.parent = parent;
       id = newId();
       unsafe = newUnsafe();
       pipeline = newChannelPipeline();
   }
--》 上面的newChannelPipeline方法其實(shí)調(diào)用DefaultChannelPipeline的構(gòu)造器我們跟蹤一下piple的創(chuàng)建
 protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
* 從上面的構(gòu)造方法中可以推算Pipeline包含為一個(gè)雙向鏈表,且會(huì)有一個(gè)頭和尾,類(lèi)圖如下

總結(jié):每創(chuàng)建要給channel中會(huì)創(chuàng)建一個(gè)pipeline. pipeline為一個(gè)雙向鏈表,會(huì)有一個(gè)頭和尾
 頭尾,從兩者繼承關(guān)系來(lái)看基本一致。區(qū)別在于實(shí)現(xiàn)接口不同,head實(shí)現(xiàn)ChannelOutboundHandler, ChannelInboundHandler而tail實(shí)現(xiàn)ChannelInboundHandler
圖片.png
圖片.png

再看下他們的父類(lèi) AbstractChannelHandlerContext 的構(gòu)造器, 分別以參數(shù) inbound , outbound .來(lái)區(qū)分head和tail
結(jié)合他們實(shí)現(xiàn)的接口,header 是一個(gè) outboundHandler, 而 tail 是一個(gè)inboundHandler。

2 piple是如何將handler起到作用的呢?
1、 ChannelInitializer 的添加
初始化 Bootstrap, 會(huì)添加我們自定義的 ChannelHandler, 就以我們熟悉的 EchoClient 來(lái)舉例

Bootstrap b = new Bootstrap();
            b.group(group)
            //初始化工廠ReflectiveChannelFactory為后續(xù)鏈接connect方法創(chuàng)建NioSocketChannel對(duì)象
             .channel(NioSocketChannel.class)
                    //將選項(xiàng)添加到AbstractBootstrap屬性options. 實(shí)現(xiàn)類(lèi)中Bootstrap的init(Channel channel)方法設(shè)置channel的類(lèi)型
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });
            // Start the client.
            //包含channel(注意channel可抽象為socket鏈接來(lái)理解)實(shí)例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最終channelFactory.newChannel();這里的factory就是前面設(shè)置的ReflectiveChannelFactory)
           //  NioSocketChannel最終newSocket 來(lái)打開(kāi)一個(gè)新的 Java NIO SocketChannel, 最后調(diào)用父類(lèi)AbstractChannel(Channel parent)
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();

在調(diào)用 handler 時(shí), 傳入了 ChannelInitializer 對(duì)象, 它提供了一個(gè) initChannel 方法供我們初始化 ChannelHandler. 那么這個(gè)初始化過(guò)程是怎樣的呢
源碼追蹤:首先 Bootstrap中調(diào)用handler方法 .handler(new ChannelInitializer<SocketChannel>(),將ChannelInitializer父類(lèi)AbstractBootstrap中方法

  /**
     * the {@link ChannelHandler} to use for serving the requests.
     */
    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return self();
    }

初始化后的使用過(guò)程: 其實(shí)再ChannelFuture f = b.connect(HOST, PORT).sync();中的connect方法中,一路看下去會(huì)在父類(lèi)AbstractBootstrap找到initAndRegister()初始化這樓里
是一個(gè)很重要的方法。暫時(shí)我們只關(guān)注 init(channel);這里是我們要關(guān)注的地方因?yàn)檫@里是channel和pipeline交互的關(guān)鍵,而且他的具體實(shí)現(xiàn)在Bootstrap中

  @Override
  @SuppressWarnings("unchecked")
  void init(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      // config使用bootstrap.handler(),就是最初ChannelInitializer,可參考上面分析
      p.addLast(config.handler());

      final Map<ChannelOption<?>, Object> options = options0();
      synchronized (options) {
          //設(shè)置channel類(lèi)型
          setChannelOptions(channel, options, logger);
      }

最終我們要關(guān)注 p.addLast(config.handler());這里將我們出事的handler添加到了pipleline的末端(pipleline的結(jié)構(gòu)借本介紹在前面已介紹)
為了添加一個(gè) handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext. 因此在上面的代碼中我們可以看到新實(shí)例化了一個(gè) newCtx 對(duì)象, 并將 handler 作為參數(shù)傳遞到構(gòu)造方法中
最后追蹤到DefaultChannelPipeline中方法

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //檢查是否重名
            checkMultiplicity(handler);
//創(chuàng)建DefaultChannelHandlerContext對(duì)象
            newCtx = newContext(group, filterName(name, handler), handler);
//            將生成的newCtx插入handlercontex鏈表中,addLast0方法會(huì)將newCtx插入tail之前
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
  • 我們來(lái)關(guān)注下newContext方法是如何構(gòu)造
 DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            //isInbound  isOutbound 方法返回true false 
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

ChannelInitializer的繼承關(guān)系如下圖:

圖片.png

下一節(jié)再具體分析pipeline和handler的具體實(shí)現(xiàn)細(xì)節(jié):

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容