netty注冊流程分析一

在基于nio的編程中,一般是聲明一個ServerSocketChannel對象,然后注冊到selector中,監(jiān)聽accept事件,當有新的連接請求時,select方法返回,通過ServerSocketChannel的accept方法獲取到新建立的SocketChannel連接,并注冊到selector,同時監(jiān)聽read事件,那么netty是如何處理這一過程的呢

我們先來看下一個netty的常規(guī)編程模式,然后再跟蹤源碼,來了解這個注冊監(jiān)聽流程

//1. 啟動器,負責組裝netty組件,啟動服務器
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

new ServerBootstrap()
        .group(bossGroup, workerGroup)
        //選擇服務器ServerSocketChannel的實現(xiàn)
        .channel(NioServerSocketChannel.class)
        //4 boss 負責處理連接 worker(child)負責處理讀寫,決定worker能執(zhí)行哪些操作
        .childHandler(
                //5 代表和客戶端進行數(shù)據(jù)讀寫的通道
                new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
        })
        .bind(8899);

這里面除了ServerBootstrap組件沒有講過,其他的在前面的幾篇文章都說過了,可以簡單的理解為ServerBootstrap就是負責連接各個組件,啟動服務器。

這里直接看下bind方法的邏輯

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

doBind方法,大概可以分為這三個大步驟init、register、doBind0
其中,init就是初始化一個Channel對象,生成關聯(lián)的pipeline對象,并且添加一個入站處理器ChannelInitializer,這個入站處理器的handlerAdd方法會在channel注冊到selector后才會被調用,handlerAdd方法

public void initChannel(final Channel ch) {
    final ChannelPipeline pipeline = ch.pipeline();
    ChannelHandler handler = config.handler();
    if (handler != null) {
        pipeline.addLast(handler);
    }

    ch.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            pipeline.addLast(new ServerBootstrapAcceptor(
                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

這里面有一個很重要是,當handlerAdd方法觸發(fā)時,又會往pipeline添加一個入站處理器ServerBootstrapAcceptor,并且將當前的處理器從pipeline移除。這個處理器就是用來注冊新連接的讀事件。

初始化channel對象后,接著就調用register方法,就會從NioEventLoopGroup(boss group)中挑選出一個NioEventLoop對象來注冊channel,而NioEventLoop對象對channel的注冊邏輯,跟進代碼可以看到如下

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            //省略....
        }
    }
}

這里最重要的有一個判斷eventLoop.inEventLoop(),這個是判斷currentThread是否為eventLoop的執(zhí)行線程(EventLoop第一次執(zhí)行任務時會關聯(lián)到一個thread對象),若是的話,直接執(zhí)行register0方法;不是的話,將其作為runnable對象提交給eventLoop的任務隊列,在事件循環(huán)中處理。

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

這個注冊方法里面第一步先執(zhí)行doRegister,這個就是將channel注冊到selector上,但此時并沒有監(jiān)控accept事件。然后再調用pipeline上ChannelHandler的handlerAdd方法,這里會執(zhí)行上面提到的ChannelInitializer的initChannel方法,添加一個ServerBootstrapAcceptor入站處理器。

接著,調用safeSetSuccess方法,表明這個channel的注冊完成,回調promise的監(jiān)聽器,然后調用pipeline上的ChannelHandler的channelRegistered方法。方法后面會判斷isActive方法,因為netty中調用register方法是異步的,當方法register返回后,便會緊接著調bind方法,也就是剛剛說的三大步驟的第三部doBind0方法。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容