Netty新連接接入與NioSocketChannel分析

原文:https://wangwei.one/posts/netty-new-connection-and-niosocketchannel-analyse.html

前面的一些章節(jié),我們分析了Netty的三大組件 —— Channel 、EventLoop、Pipeline ,對Netty的工作原理有了深入的了解。在此基礎(chǔ)上,我們來分析一下當(dāng)Netty服務(wù)端啟動后,Netty是如何處理新連接接入的。

本文內(nèi)容主要分為以下四部分:

  • 新連接檢測
  • NioSocketChannel創(chuàng)建
  • NioSocketChannel初始化與注冊
  • NioSocketChannel注冊READ興趣集

新連接檢測

前面,我們在講 EventLoop的啟動過程源碼分析 時,解讀過下面這段代碼:

public final class NioEventLoop extends SingleThreadEventLoop {
    
    ...
        
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    
    ...
    
        try {

            ...

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 讀取read事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
        
        ...
    }
    
    ...
    
}    

我們還是以服務(wù)端 NioServerSocketChannel 為例,它綁定的unsafe實例為 NioMessageUnsafe 。上面的 unsafe.read() 接口,會向下調(diào)用到 NioMessageUnsafe.read() 接口,如下:

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    
    ...
    
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
        
        // 用于保存新建立的 NioSocketChannel 的集合
        private final List<Object> readBuf = new ArrayList<Object>();
        
        @Override
        public void read() {
            // 確保在當(dāng)前線程與EventLoop中的一致
            assert eventLoop().inEventLoop();
            // 獲取 NioServerSocketChannel config配置
            final ChannelConfig config = config();
            // 獲取 NioServerSocketChannel 綁定的 pipeline
            final ChannelPipeline pipeline = pipeline();
            // 獲取RecvByteBuf 分配器 Handle
            // 當(dāng)channel在接收數(shù)據(jù)時,allocHandle 會用于分配ByteBuf來保存數(shù)據(jù)
            // 關(guān)于allocHandle后面再去做詳細介紹
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            // 重置已累積的所有計數(shù)器,并為下一個讀取循環(huán)讀取多少消息/字節(jié)數(shù)據(jù)提供建議
            allocHandle.reset(config);
            
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // 調(diào)用后面的 doReadMessages 接口,讀取到message則返回1
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        // 對當(dāng)前read循環(huán)所讀取到的message數(shù)量計數(shù)+1
                        allocHandle.incMessagesRead(localRead);
                        // 判斷是否繼續(xù)讀取message
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 調(diào)用pipeline傳播ChannelRead事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                // 清空readBuf
                readBuf.clear();
                allocHandle.readComplete();
                // 調(diào)用pipeline傳播 ChannelReadComplete 事件
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
    
    ...
    
}    

對于 doReadMessages(...) 的分析:

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
    
    ...
    
    // 讀取消息
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 獲取 SocketChannel 
        SocketChannel ch = SocketUtils.accept(javaChannel());
        
        try {
            if (ch != null) {
                // 使用SocketChannel創(chuàng)建NioSocketChannel,將其存入buf list中
                // 關(guān)于NioSocketChannel的創(chuàng)建請看后面的分析
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
    
    ...
    
}

對于 continueReading() 接口的分析,至于結(jié)果為什么返回false,后面會單獨分析:

public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
    
    private volatile int maxMessagesPerRead;
    private volatile boolean respectMaybeMoreData = true;
    
    ...
    
    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        // 每次讀取最大的消息數(shù)
        private int maxMessagePerRead;
        private int totalMessages;
        private int totalBytesRead;
        private int attemptedBytesRead;
        private int lastBytesRead;
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                return attemptedBytesRead == lastBytesRead;
            }
        };
        
        ...
        
        // 判斷是否繼續(xù)讀取message    
        @Override
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }
        
        // 判斷是否繼續(xù)讀取message
        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            // 默認(rèn)情況下 config.isAutoRead() 為true
            // respectMaybeMoreData 默認(rèn)為 true
            // maybeMoreDataSupplier.get() 為false
            // totalMessages第一次循環(huán)則為1
            // maxMessagePerRead為16
            // 結(jié)果返回false
            return config.isAutoRead() &&
                (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                totalMessages < maxMessagePerRead &&
                totalBytesRead > 0;
        }
        
        ...
    
    }
    
    ...
        
}

NioSocketChannel創(chuàng)建

上面分析新連接接入,提到了 NioSocketChannel 的創(chuàng)建,我們這里來詳細分析一下,NioSocketChannel的創(chuàng)建過程與此前我們分析 NioServerSocketChannel創(chuàng)建 大體類似。

構(gòu)造器

先來看看 NioSocketChannel 的構(gòu)造函數(shù):

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    ...
    
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        // 調(diào)用父類構(gòu)造器
        super(parent, socket);
        // 創(chuàng)建NioSocketChannelConfig
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    
    ...
    
}

父類 AbstractNioByteChannel 構(gòu)造器:

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    ...
    
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        // 調(diào)用父類構(gòu)造器,并設(shè)置興趣集為SelectionKey.OP_READ,對read事件感興趣
        super(parent, ch, SelectionKey.OP_READ);
    }
    
    ...
    
}

父類 AbstractNioChannel 構(gòu)造器:

public abstract class AbstractNioChannel extends AbstractChannel {
    
    ...
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // 調(diào)用父類構(gòu)造器
        super(parent);
        // 設(shè)置channel
        this.ch = ch;
        // 設(shè)置興趣集
        this.readInterestOp = readInterestOp;
        try {
            // 設(shè)置為非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            ...
        }
    }


}

父類 AbstractChannel 構(gòu)造器:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    ...
    
    protected AbstractChannel(Channel parent) {
        // 設(shè)置parent
        this.parent = parent;
        // 創(chuàng)建channelId
        id = newId();
        // 創(chuàng)建unsafe
        unsafe = newUnsafe();
        // 創(chuàng)建pipeline
        pipeline = newChannelPipeline();
    }
    
    ...
}

ChannelConfig創(chuàng)建

接著我們看看 NioSocketChannelConfig 的創(chuàng)建邏輯:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    
    ...
       
    private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
        // 調(diào)用父類構(gòu)造器
        super(channel, javaSocket);
        calculateMaxBytesPerGatheringWrite();
    }
    
    ...
        
}

父類 DefaultSocketChannelConfig 構(gòu)造器:

public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
   
   ...
      
   public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
       // 調(diào)用父類構(gòu)造器,綁定socketchannel 
       super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        // 綁定java socket
        this.javaSocket = javaSocket;
        
        // Enable TCP_NODELAY by default if possible.
        // netty一般運行在服務(wù)器上,不在Android上,canEnableTcpNoDelayByDefault返回true
        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
                // 開啟 TCP_NODELAY ,開啟TCP的nagle算法
                // 盡量不要等待,只要發(fā)送緩沖區(qū)中有數(shù)據(jù),并且發(fā)送窗口是打開的,就盡量把數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)上去。
                setTcpNoDelay(true);
            } catch (Exception e) {
                // Ignore.
            }
        }
    }                                  
     
    ... 
                                        
}                                        

NioSocketChannel初始化與注冊

上面小節(jié)分析了NioSocketChannel的創(chuàng)建邏輯,創(chuàng)建完成之后,我們來分析一下NioSocketChannel是如何注冊到NioEventLoop上去的。

在前面小節(jié)分析新連接檢測的有如下小段代碼:

private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
    ...

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        // 調(diào)用pipeline傳播ChannelRead事件
        pipeline.fireChannelRead(readBuf.get(i));
    }

    ...

}    

調(diào)用pipeline傳播ChannelRead事件,這里的Pipeline是服務(wù)端Channel,也就是NioServerSocketChannel所綁定的Pipeline,此時的Pipeline的內(nèi)部結(jié)構(gòu)是怎么樣子的呢?

Pipeline-ServerBootstrapAcceptor

那這個 ServerBootstrapAcceptor 是從哪里來的呢?

在此前,我們分析 NioServerSocketChannel初始化 時,有過下面這段代碼:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    
    ...
    
    // NioServerSocketChannel初始化    
    void init(Channel channel) throws Exception {
        // 獲取啟動器 啟動時配置的option參數(shù),主要是TCP的一些屬性
        final Map<ChannelOption<?>, Object> options = options0();
        // 將獲得到 options 配置到 ChannelConfig 中去
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        // 獲取 ServerBootstrap 啟動時配置的 attr 參數(shù)
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        // 配置 Channel attr,主要是設(shè)置用戶自定義的一些參數(shù)
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        // 獲取channel中的 pipeline,這個pipeline使我們前面在channel創(chuàng)建過程中設(shè)置的 pipeline
        ChannelPipeline p = channel.pipeline();

        // 將啟動器中配置的 childGroup 保存到局部變量 currentChildGroup
        final EventLoopGroup currentChildGroup = childGroup;
        // 將啟動器中配置的 childHandler 保存到局部變量 currentChildHandler
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        // 保存用戶設(shè)置的 childOptions 到局部變量 currentChildOptions
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        // 保存用戶設(shè)置的 childAttrs 到局部變量 currentChildAttrs
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                // 獲取啟動器上配置的handler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    // 添加 handler 到 pipeline 中
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 用child相關(guān)的參數(shù)創(chuàng)建出一個新連接接入器ServerBootstrapAcceptor
                        // 通過 ServerBootstrapAcceptor 可以將一個新連接綁定到一個線程上去
                        // 每次有新的連接進來 ServerBootstrapAcceptor 都會用child相關(guān)的屬性對它們進行配置,并注冊到ChaildGroup上去
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
        
    ...    
       
}

ServerBootstrapAcceptor

NioServerSocketChannel初始化時,向NioServerSocketChannel所綁定的Pipeline添加了一個InboundHandler節(jié)點 —— ServerBootstrapAcceptor ,其代碼如下:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    
    ...
    
    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
        
        // 子EventLoopGroup,即為workGroup
        private final EventLoopGroup childGroup;
        // ServerBootstrap啟動時配置的 childHandler
        private final ChannelHandler childHandler;
        // ServerBootstrap啟動時配置的 childOptions
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        // ServerBootstrap啟動時配置的 childAttrs
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;
        
        // 構(gòu)造函數(shù)
        ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;

            // Task which is scheduled to re-enable auto-read.
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
            // not be able to load the class because of the file limit it already reached.
            //
            // See https://github.com/netty/netty/issues/1328
            enableAutoReadTask = new Runnable() {
                @Override
                public void run() {
                    channel.config().setAutoRead(true);
                }
            };
        }

        // 處理Pipeline所傳播的channelRead事件
        // 也就是前面新連接檢測時看到的那段代碼
        // pipeline.fireChannelRead(readBuf.get(i));
        // ServerBootstrapAcceptor的channelRead接口將會被調(diào)用,用于處理channelRead事件
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 獲取傳播事件的對象數(shù)據(jù),即為前面的readBuf.get(i)
            // readBuf.get(i)取出的對象為 NioSocketChannel
            final Channel child = (Channel) msg;
            // 向 NioSocketChannel 添加childHandler,也就是我們??吹降?            // ServerBootstrap在啟動時配置的代碼:
            // ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} )
            // 最終的結(jié)果就是向NioSocketChannel的Pipeline添加用戶自定義的ChannelHandler
            // 用于處理客戶端的channel連接
            child.pipeline().addLast(childHandler);
            // 配置 NioSocketChannel的TCP屬性
            setChannelOptions(child, childOptions, logger);
            // 配置 NioSocketChannel 一些用戶自定義數(shù)據(jù)
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            // 將NioSocketChannel注冊到childGroup,也就是Netty的WorkerGroup當(dāng)中去
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        
        ...
    }
    
    ...
    
}

關(guān)于 ChannelInitializer 的講解,可以看此前 Pipeline源碼分析 文章。

后面的register邏輯,就與我們前面講解 NioServerSocketChannel注冊 大體類似了,這里簡單介紹一下。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    ...
    
    // 注冊NioSocketChannel
    // eventLoop為childGroup    
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        
        ...
        // 綁定eventLoop到NioSocketChannel上
        AbstractChannel.this.eventLoop = eventLoop;
        // 現(xiàn)在分析的邏輯是在服務(wù)端的線程上,eventLoop與主線程不同,返回false
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 這里來調(diào)用register0方法
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
    
    // 注冊
    private void register0(ChannelPromise promise) {
        try {
            
            ...
                
            boolean firstRegistration = neverRegistered;
            // 調(diào)用 doRegister()
            doRegister();
            neverRegistered = false;
            registered = true;
            
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            
            // 服務(wù)端的NioServerSocketChannel已經(jīng)與客戶端的NioSocketChannel建立了連接
            // 所以,NioSocketChannel是處于激活狀態(tài),isActive()返回ture
            if (isActive()) {
                // 對于新連接,是第一次注冊
                if (firstRegistration) {
                    // 傳播ChannelActive事件
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
            ...
                
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    
    ...
            
}        

調(diào)用到NioSocketChannel中的doRegister()方法:

public abstract class AbstractNioChannel extends AbstractChannel {

    ...
        
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 將selector注冊到底層JDK channel上,并附加了NioSocketChannel對象
                // 興趣集設(shè)置為0,表示不關(guān)心任何事件
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }
    
    ...
    
}    

NioSocketChannel 注冊O(shè)P_READ興趣集

緊接著上面的分析,傳播ChannelActive事件之后的邏輯,主要就是向客戶端的NioSocketChannel注冊一個Read興趣集

if (isActive()) {
    // 對于新連接,是第一次注冊
    if (firstRegistration) {
        // 傳播ChannelActive事件
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        beginRead();
    }
}

通過 Pipeline的傳播機制 ,最終會調(diào)用到doBeginRead()接口,如下:

public abstract class AbstractNioChannel extends AbstractChannel {
    
    ...

    protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {

        ...
        
        @Override
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            // 保存selectionKey到局部變量
            final SelectionKey selectionKey = this.selectionKey;
            // 判斷有效性
            if (!selectionKey.isValid()) {
                return;
            }
        
            readPending = true;
            
            // 獲取selectionKey的興趣集
            // 前面小結(jié)分析doRegister()接口提到,selectionKey的興趣集設(shè)置為0
            final int interestOps = selectionKey.interestOps();
            // 這里的 readInterestOp 是前面講NioSocketChannel創(chuàng)建時設(shè)置的值
            // 為 SelectionKey.OP_READ,也就是1
            if ((interestOps & readInterestOp) == 0) {
                // 這樣,selectionKey最終設(shè)置的興趣集為SelectionKey.OP_READ
                // 表示對讀事件感興趣
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }    
            
        ...    
        
    }    
    
    ...
        
}        

小結(jié)

  • Netty是在哪里檢測有新連接接入的?
  • 新連接是怎樣注冊到NioEventLoop線程上的?
  • NioSocketChannel是怎樣初始化及注冊的?

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容