第十九節(jié) netty源碼分析之 pipleline和handler以及pipeline的數(shù)據(jù)流向01

pipleline和handler以及pipeline的數(shù)據(jù)流向

  • 先理一下思路。首先我們考慮到之前的文章分析,沒創(chuàng)建一個channel就會創(chuàng)建一個pipeline與之對應(yīng)。每個pipeline會有AbstractChannelHandlerContext屬性的tail和head從而組成要給雙向鏈表。那么pipeline的handler添加和數(shù)據(jù)流向其實(shí)都是基于HandlerContext和雙向鏈表的性質(zhì)。下面具體分析。

當(dāng)然我們?nèi)匀幌旅孢@段代碼分析,主要分析pipeline的添加

b.group(group)
            //初始化工廠ReflectiveChannelFactory為后續(xù)鏈接connect方法創(chuàng)建NioSocketChannel對象
             .channel(NioSocketChannel.class)
                    //將選項(xiàng)添加到AbstractBootstrap屬性options. 實(shí)現(xiàn)類中Bootstrap的init(Channel channel)方法設(shè)置channel的類型
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

pipeline.addXXX 都有一個重載的方法, 例如 addLast, 它有一個重載的版本
直接查看DefaultChannelPipeline源碼:

 public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

我們一路查看下去,找到重載的方法,且記住我們?nèi)雲(yún)⒗飃roup、和name都是null

 @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //檢查是否重復(fù)添加
            checkMultiplicity(handler);
//創(chuàng)建DefaultChannelHandlerContext對象
            newCtx = newContext(group, filterName(name, handler), handler);
//            將生成的newCtx插入handlercontex鏈表中
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

從addLast0方法看到,這里是將我們的handler添加到了tail的前面

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    ```
* 還有一點(diǎn)就是上面addLast方法中newContext方法的入?yún)ilterName(name, handler),這里會生成handler名字并校驗(yàn)是否重復(fù)(有興趣可查看源碼類DefaultChannelPipeline中  generateName(ChannelHandler handler))

2、 那么inbond和outbond是決定pipleline的數(shù)據(jù)流向的關(guān)鍵。
記得我們上面的newContext方法中創(chuàng)建的DefaultChannelHandlerContext里的構(gòu)造器,有isInbound和isOutbound倆方法分別根據(jù)接口來判斷Inbound和Outbound
```java
 DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

一個Inbound事件通常由Inbound handler來處理。一個Inbound handler通常處理在IO線程產(chǎn)生的Inbound數(shù)據(jù)。Inbound數(shù)據(jù)通過真實(shí)的輸入操作如 SocketChannel#read(ByteBuffer)來獲取。如果一個inbound事件越過了最上面的inbound handler,該事件將會被拋棄到而不會通知你
一個outbound事件由outbound handler來處理。一個outbound handler通常由outbound流量如寫請求產(chǎn)生或者轉(zhuǎn)變的。如果一個outbound事件越過了底部的outbound handler,它將由channel關(guān)聯(lián)的IO線程處理。IO線程通常運(yùn)行的是真實(shí)的輸出操作如 SocketChannel#write(byteBuffer).

 inbound 事件傳播方法:
ChannelHandlerContext#fireChannelRegistered()
 ChannelHandlerContext#fireChannelActive()
  ChannelHandlerContext#fireChannelRead(Object) 
  ChannelHandlerContext#fireChannelReadComplete() 
  ChannelHandlerContext#fireExceptionCaught(Throwable) 
  ChannelHandlerContext#fireUserEventTriggered(Object) 
  ChannelHandlerContext#fireChannelWritabilityChanged() 
  ChannelHandlerContext#fireChannelInactive() 
  ChannelHandlerContext#fireChannelUnregistered()
outbound事件傳播方法:
ChannelHandlerContext#bind(SocketAddress, ChannelPromise) 
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) 
ChannelHandlerContext#write(Object, ChannelPromise) 
ChannelHandlerContext#flush() 
ChannelHandlerContext#read() 
ChannelHandlerContext#disconnect(ChannelPromise) 
ChannelHandlerContext#close(ChannelPromise) 
ChannelHandlerContext#deregister(ChannelPromise)

如果我們捕獲了一個事件, 并且想讓這個事件繼續(xù)傳遞下去, 那么需要調(diào)用 Context 相應(yīng)的傳播方法.
例如:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

上面的例子中, MyInboundHandler 收到了一個 channelActive 事件, 它在處理后, 如果希望將事件繼續(xù)傳播下去, 那么需要接著調(diào)用 ctx.fireChannelActive().

Outbound 操作(outbound operations of a channel)
以connect為例

Bootstrap.connect -> Bootstrap.doResolveAndConnect -> Bootstrap.doResolveAndConnect0 ->Bootstrap.doConnect ->AbstractChannel.connect->pipeline.connect->tail.connect-> AbstractChannelHandlerContext.connect
最后我們以AbstractChannelHandlerContext.connect 的源碼分析:

@Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
// DefaultChannelPipeline 內(nèi)的雙向鏈表的 tail 開始, 不斷向前尋找第一個 outbound 為 true 的 AbstractChannelHandlerContext, 然后調(diào)用它的 invokeConnect
        final AbstractChannelHandlerContext next = findContextOutbound();
        //next其實(shí)是找到headContext  unsafe.connect(remoteAddress, localAddress, promise);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
當(dāng)我們找到了一個 outbound 的 Context 后, 就調(diào)用它的 invokeConnect 方法, 
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

invokeConnect這個方法中會調(diào)用 Context 所關(guān)聯(lián)著的 ChannelHandler 的 connect 方法。下面的handler()方法會返回一個handlerContex(根據(jù)上面next方法我們知道這里返回的為tailContext,但是tailContext并沒有實(shí)現(xiàn)connect方法,所以這里的connect為其父類AbstractChannelHandlerContext的connect方法。也就是說再次從上面哪個方法開始,知道執(zhí)行到headContext時,它實(shí)現(xiàn)了connect方法如下 方法三)。然后調(diào)用connect,包裝handler所以即為hanler的connect。

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                //pipeline.channel().unsafe();
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

方法三 headContext類找的connect方法

@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
所以最終是unsafe.connect,而這unsafe的由來我們前面也分析過,看HeadContext的構(gòu)造器unsafe = pipeline.channel().unsafe(); 所以它是來自channel。那么channel來自哪呢

  HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, true, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }

還記得 .channel(NioSocketChannel.class)這里就是channel的來源
接下來我們找到unsafe是哪里創(chuàng)建的,查看NioSocketChannel構(gòu)造器

 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

構(gòu)造器跟蹤流程:NioSocketChannel->AbstractNioByteChannel->AbstractChannel

 protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

同時newUnsafe() 并沒有再AbstractChannel實(shí)現(xiàn),而是在NioSocketChannel實(shí)現(xiàn),這是為什么呢?

 protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

其實(shí)在子類實(shí)現(xiàn),是由于不同的協(xié)議用的Unsafe會不同,所以要根據(jù)子類區(qū)別對待
繼續(xù)跟蹤NioSocketChannelUnsafe但是該類并未實(shí)現(xiàn)connect方法,所以查找父類直到找到
AbstractNioUnsafe中的connect方法

 @Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
           //省略
                //doConnect這里的實(shí)現(xiàn)在子類中
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }
                   //省略
        }

這段代碼的重點(diǎn)就在doConnect,而這個方法在該類中是沒有實(shí)現(xiàn)的,實(shí)現(xiàn)類在子類NioSocketChannel中

 @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

注意:上面這個方法就是和java的NIO聯(lián)系的地方了
重點(diǎn)分析:
1、首先doBind0方法 使用SocketUtils.bind(javaChannel(), localAddress);
其中的javaChannel()根據(jù)java版本選擇nio還是bio

private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            SocketUtils.bind(javaChannel().socket(), localAddress);
        }
    }

在NIO中javaChannel其實(shí)獲取SelectableChannel,這里SelectableChannel在之前介紹過。所以bind方法
最后調(diào)用socketChannel.bind(address); nio的在前面已介紹這里不再贅述。至此connect方法就追蹤到這里。
總結(jié)connect事件在outbound中的順序,結(jié)合上面Bootstrap.connect最后到達(dá)handler的connect就形成了下面這個循環(huán)

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
直到head中的connect,這里我們上面分析過了。所以從connect事件來管中窺豹的話。就借用官網(wǎng)的數(shù)據(jù)流程圖吧
參考官網(wǎng)的事件流轉(zhuǎn)圖

                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+
  • 由于篇幅過長下篇繼續(xù)講解Inbound事件的源碼
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容