netty學習系列六:服務(wù)端ServerSocketChannel綁定

零、 整體流程

整體執(zhí)行流程圖

1、用戶在main線程啟動執(zhí)行ServerSocketChannel的初始化

1)初始化一個NioServerSocketChannel channel
包括初始化Channel關(guān)聯(lián)的ch、pipeline、unsafe、config。

nio.ServerSocketChannel ch = nio.SelectorProvider.openServerSocketChannel();
ch.configBlocking(false);

2)將channel與ServerBootstrap的EventLoopGroup中的某個EventLoop child綁定
3)向child的任務(wù)執(zhí)行隊列中添加channel的register0事件
4)監(jiān)聽register0事件的完成狀態(tài),完成時向child的任務(wù)執(zhí)行隊列中添加channel的bind事件

2、EventLoop線程中執(zhí)行register0事件

1)將NioServerSocketChannel的ch注冊到NioEventLoop child的selector上

SelectionKey selectionKey = ch.register(eventLoop().selector, 0, this);

2)標識ChannelPromise狀態(tài)為success,觸發(fā)Listener將bind任務(wù)添加到EventLoop的執(zhí)行任務(wù)隊列
3)ServerSocketChannel channel產(chǎn)生ChannelRegisted事件

pipeline.fireChannelRegistered();

會導(dǎo)致ChannelInitialzer.channelRegisted()執(zhí)行,將ServerBootstrapAcceptor添加到channel的pipeline中。

3、EventLoop線程中執(zhí)行Channel.bind事件

1)ServerSocketChannel綁定對應(yīng)服務(wù)端口,監(jiān)聽新的客戶端連接

javaChannel().socket().bind(localAddress, config.getBacklog());

2)ServerSocketChannel channel產(chǎn)生ChannelActive事件

pipeline.fireChannelActive();

ChannelActive事件由HeadContext處理,向ch中添加OP_ACCEPT事件監(jiān)聽。

selectionKey.interestOps(OP_ACCEPT);


一、代碼入口

ServerBootstrap b = new ServerBootstrap();
// Start the server.
ChannelFuture f = b.bind(port).sync();

服務(wù)端啟動時都會執(zhí)行上面的代碼,用來啟動ServerSocketChannel監(jiān)聽對應(yīng)端口。
最終由AbstractBootstrap.doBind方法處理。

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

二、用戶線程初始化NioServerSocketChannel

final Channel channel = channelFactory.newChannel();

從此行代碼開始,ReflectiveChannelFactory通過反射的方式調(diào)用NioServerSocketChannel的構(gòu)造方法進行channel的初始化。

1、NioServerSocketChannel的實例化

** 1)創(chuàng)建一個nio.ServerSocketChannel**

newSocket(SelectorProvider provider);

使用nio.SelectorProvider.openServerSocketChannel()創(chuàng)建一個nio.ServerSocketChannel ch。
** 2)通過構(gòu)造方法實例化NioServerSocketChannel,并初始化相關(guān)的field**
parent = null
unsafe = new NioMessageUnsafe()
pipeline = new DefaultChannelPipeline(this) ->此處初始化一個DefaultChannelPipeline,并將pipeline和channel互相綁定
ch = ch ->同時將ch設(shè)置為非阻塞模式
readInterestOp = OP_ACCEPT
config = new ServerSocketChannelConfig(this, javaChannel().socket())
3)向channel的pipeline中添加Inbound處理器ChannelInitializer

init(channel);

由其父類ServerBootstrap直接實現(xiàn)。

  • 設(shè)置channel的attr和options**
  • 添加ChannelInitializer到pipeline中**
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                pipeline.addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        }
該處理器的*channelRegisted事件*回調(diào)方法會將*ServerBootstrapAcceptor*加入channel的pipeline中,并將自己從pipeline中移除。

4)ServerBootstrapAcceptor

  • ServerBootstrapAcceptor也是一個Inbound處理器,用于在Server端accept新的客戶端連接時,向新生成的socketChannel中添加用戶定義的業(yè)務(wù)處理器。
  • channelRead事件回調(diào)方法會將業(yè)務(wù)方往ServerBootstrap中添加的childHandler添加到socketChannel對應(yīng)的pipeline中。
  • 對于Server端,channelRead事件被定義為server端accept到了新的socket連接

2、將NioServerSocketChannel注冊到ServerBootstrap的EventLoopGroup上

ChannelFuture regFuture = group().register(channel);

1)NioEventLoopGroup的chooser從其children中選出一個NioEventLoop child,調(diào)用其register()方法進行channel注冊;
2)實際將NioEventLoop child傳給NioServerSocketChannel的unsafe,調(diào)用其register(EventLoop eventLoop, final ChannelPromise promise)方法完成注冊

  • 將channel.eventLoop綁定為當前NioEventLoop child;
  • 將AbstractUnsafe.register0(DefaultChannelPromise promise)任務(wù)加入EventLoop child的執(zhí)行任務(wù)隊列;

3、給標識“register0任務(wù)”完成狀態(tài)ChannelFuture添加一個Listener

doBind0(regFuture, channel, localAddress, promise);

當ChannelFuture完成時,將NioServerSocketChannel.bind(SocketAddress localAddress, ChannelPromise promise)任務(wù)加入EventLoop child的執(zhí)行任務(wù)隊列

三、EventLoop線程中執(zhí)行register0事件

AbstractUnsafe.register0(ChannelPromise promise);

1、將NioServerSocketChannel的ch注冊到NioEventLoop child的selector上,同時將注冊得到的SelectionKey綁定為NioServerSocketChannel的selectionKey

doRegister();

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

netty的輪詢注冊機制其實是將AbstractNioChannel內(nèi)部的jdk類對象SelectableChannel ch注冊到j(luò)dk類對象Selector selector上去,并且將AbstractNioChannel channel作為SelectableChannel對象ch的一個attachment附屬上,這樣在jdk輪詢出某個SelectableChannel有IO事件發(fā)生時,就可以直接取出AbstractNioChannel進行后續(xù)操作。

2、標識ChannelPromise狀態(tài)為success,觸發(fā)Listener將bind任務(wù)添加到EventLoop的執(zhí)行任務(wù)隊列

safeSetSuccess(promise);

3、ServerSocketChannel channel產(chǎn)生ChannelRegisted事件

pipeline.fireChannelRegistered();

會導(dǎo)致ChannelInitialzer.channelRegisted()執(zhí)行,將ServerBootstrapAcceptor添加到channel的pipeline中。

四、EventLoop線程中執(zhí)行Channel.bind事件

AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise);

1、ServerSocketChannel綁定對應(yīng)服務(wù)端口,監(jiān)聽新的客戶端連接

javaChannel().socket().bind(localAddress, config.getBacklog());

2、ServerSocketChannel channel產(chǎn)生ChannelActive事件

pipeline.fireChannelActive();

ChannelActive事件由HeadContext處理,最終調(diào)用了AbstractNioUnsafe.doBeginRead()方法,向ch中添加OP_READ事件監(jiān)聽。

selectionKey.interestOps(interestOps | readInterestOp);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容