Netty源碼(五)服務端啟動流程分析

前言

在源碼分析的第一部分Netty源碼(一)Netty架構解析里面提到了netty的幾個關鍵組件

  • EventLoop
    EventLoop是Netty中最重要的組件,一個單線程事件循環(huán),監(jiān)聽IO事件、處理IO事件和任務隊列。
  • EventLoopGroup
    EventLoopGroup顧名思義,管理多個EventLoop。
  • Channel
    Channel是能夠進行IO操作組件的抽象,如讀、寫、連接和綁定。
  • ChannelHandler
    ChannelHandler是我們使用Netty開發(fā)最關心的一個組件,它是對IO事件的具體處理邏輯。
  • ChannelPipeline
    ChannelPipeline則是ChannelHandler的容器,本質是一個雙向鏈表,將所有的ChannelHandler按照順序連接起。

在本篇文章將會分析一個netty服務器的啟動流程,以及是如何將這個組件給串聯(lián)起來的。

服務端代碼

下面是一段服務端的啟動代碼,來自netty官網(wǎng)

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
  1. EventLoopGroup 之前以及詳細介紹過了,處理IO操作的多線程事件循環(huán),分為bossGroup和workGroup,分別處理連接事件和讀寫事件。可以通過構造參數(shù)指定EventLoopGroup 的線程數(shù)量,默認為當前cup數(shù)量的兩倍;
  2. ServerBootstrap 為了方便構建服務器的輔助類;
  3. channel:由于是服務器端,使用NioServerSocketChannel構建一個Channel用于接收傳入的連接;
  4. childHandler:添加用戶自定義的ChannelHandler,比如解碼、編碼處理器和業(yè)務處理的Handler。
  5. option:設置Channel的一些參數(shù),比如這里正在實現(xiàn)一個TCP/IP服務器,可以指定一些TCP參數(shù)比如tcpNoDelay和keepAlive。
  6. childOption:option與childOption不同在于,option指定的是傳入channel的參數(shù)如現(xiàn)在的NioServerSocketChannel,而childOption指定的是NioServerSocketChannel接受的Channel。
  7. 將服務綁定到指定的端口,并啟動服務。

可以看見netty構建一個服務器代碼是很簡單明了,大家如果看過用NIO構建一個服務器的代碼是非常復雜和抽象的,而netty用極短的代碼就能構建出一個服務器,可見netty框架對底層細節(jié)的抽象和對框架的設計的能力。

Channel初始化

接下來開始分析服務端的啟動流程,上面代碼構造的服務器最關鍵的一步就是調(diào)用bind(),其他的group、childHandler方法都是做一些簡單的賦值操作,這里就不分析了,而bind()是將上一步賦值的參數(shù)全部串起來,然后啟動服務。

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

跟到關鍵實現(xiàn)的地方doBind()

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        // ......
       doBind0(regFuture, channel, localAddress, promise);
        // ......
    }

總結起來這個方法做了兩個事件:

  1. initAndRegister(): 初始化傳入的Channel并且注冊到EventLoopGroup
  2. doBind0():綁定端口

initAndRegister()

由名字可知初始化并且注冊,但是初始化什么、注冊到哪里?雖然已經(jīng)告訴了答案,
初始化傳入的Channel并且注冊到EventLoopGroup,還是要具體分析一下是怎么初始化和注冊的。
initAndRegister()

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //初始化Channel
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            // ......
        }
        //注冊到EventLoopGroup
        ChannelFuture regFuture = config().group().register(channel);
        
        return regFuture;
    }

初始化

channel = channelFactory.newChannel();

看到這行代碼要channelFactory是什么,做什么作用?分析源碼的過程中很容易就迷失,找不到關鍵的地方,帶著問題去分析是最好的方法。比如這里當我們不知道這個channelFactory是什么的時候,首先看他的定義是什么

public interface ChannelFactory<T extends Channel> {
    /**
     * Creates a new channel.
     */
    T newChannel();
}

定義了一個接口,初始化Channel,可以知道channelFactory的職責是初始化一個Channel。那這里channelFactory的具體實現(xiàn)是什么呢?
最簡單的方法就是使用debug的方式找到channelFactory的實現(xiàn)類,或者使用編譯器看這個變量這哪里被調(diào)用,做的復制操作。通過這些方法我們定位到了代碼

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

上訴代碼可以知道我們在調(diào)用.channel(NioServerSocketChannel.class)的時候就對channelFactory做了初始化,默認為ReflectiveChannelFactory,顧名思義通過反射初始化Channel,下面代碼就展示了ReflectiveChannelFactory的實現(xiàn)。通過反射拿到Channel的構造器,然后調(diào)用初始化方法。
ReflectiveChannelFactory

    //.....
     this.constructor = clazz.getConstructor();
    //.....
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

所以分析到這里,我們就可以知道channel = channelFactory.newChannel();
這里的初始化的Channel就是第一步構造ServerBootstrap傳入的NioServerSocketChannel。

接下來分析第二步init(channel)

abstract void init(Channel channel) throws Exception;

可以看見是一個抽象方法,找到服務端的具體實現(xiàn)

    void init(Channel channel) {
        //設置屬性
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();
        // ......

        p.addLast(new ChannelInitializer<Channel>() {
            // 將啟動類傳入的ChannelHandler 加入pipeline
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                // acceptor角色
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

這段代碼比較長,個人認為有兩個點需要重點分析

  1. ChannelPipeline p = channel.pipeline();
  2. ServerBootstrapAcceptor

ChannelPipeline netty中的一個關鍵的組件在這里出現(xiàn)了,ChannelPipeline是ChannelHandler 的容器,這里是調(diào)用channel的pipeline()返回的ChannelPipeline對象。所以需要找一下ChannelPipeline 是什么、在什么時候初始化的,按照之前的方法很輕松的在AbstractChannel找到代碼

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

可見是在Channel初始化的時候創(chuàng)建的ChannelPipeline默認為DefaultChannelPipeline,所以在這里也可以知道每個Channel都對應著一個ChannelPipeline。
接下來分析很關鍵的代碼

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });

在pipeline中添加了名為ServerBootstrapAcceptor的處理器,在第一篇文章中我們提到過Reactor模式,里面就有一個acceptor角色用于將mainReactor角色介紹的連接轉給subReactor,這里看一下ServerBootstrapAcceptor是不是也在做這個事情。

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter

繼承自ChannelInboundHandlerAdapter,可見是一個ChannelHandler而且處理的是入站事件,下面分析它是怎么處理的入站事件。

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 父Channel監(jiān)聽的是連接事件,所有直接轉為Channel 
            final Channel child = (Channel) msg;
            //添加 啟動類傳入的childHandler
            child.pipeline().addLast(childHandler);
            //設置啟動類傳入的屬性
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                // 將boosWorup獲得的連接注冊到childGroup上
                childGroup.register(child).addListener(new ChannelFutureListener() {
                   // ......
                });
            } catch (Throwable t) {
                 // ......
            }
        }
  1. 將父Channel(NioServerSocketChannel)讀取的信息強轉為Channel
  2. 添加 啟動類傳入的childHandler
  3. 設置啟動類傳入的屬性
  4. 將boosWorup獲得的連接注冊到childGroup上

分析到這里可以知道netty是通過ChannelHandler的形式將bossGroup與workGroup聯(lián)系起來,當bossGroup接受到新的連接就將其注冊到childGroup上。

最后再回到initAndRegister()這個方法的最后一步

 ChannelFuture regFuture = config().group().register(channel);

這個方法就是將我們實體化的NioServerSocketChannel注冊到bossGroup上。

分析到這里對initAndRegister()做一個簡單的總結

  1. 通過channelFactory使用反射的形式將啟動類傳入的Channel(NioServerSocketChannel)實例化;
  2. 實例化Channel的時候會創(chuàng)建一個ChannelPipeline對象,以Channel一一對應;
  3. 通過ServerBootstrapAcceptor將bossGroup接入的連接轉給workGroup;
  4. 將實例化的Channel(NioServerSocketChannel)注冊到boosGroup上。

所以initAndRegister()實例化的是啟動類傳入的Channel對象并注冊在boosGroup上。

綁定地址

這里分析服務啟動第二步doBind0綁定端口地址

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

跟進去channel.bind(SocketAddress localAddress, ChannelPromise promise)找到AbstractChannel的bind方法

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

調(diào)用pipeline的bind方法,再跟進去

    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

之前說過ChannelPipeline是管理ChannelHandler的容器,DefaultChannelPipeline的實現(xiàn)就是用一個雙向鏈表,初始化的是一個有一個頭節(jié)點HeadContext和尾節(jié)點TailContext,這里不具體分析。
這里分析需要打上斷點一步一步調(diào)用,最好找到實現(xiàn)在AbstractUnsafe中的bind方法

        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            //......
            boolean wasActive = isActive();
            try {
              //綁定端口地址
                doBind(localAddress);
            } catch (Throwable t) {
               //......
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                       //觸發(fā)channelActive事件
                        pipeline.fireChannelActive();
                    }
                });
            }
            //綁定成功
            safeSetSuccess(promise);
        }

簡化之后做三件事情調(diào)用jdk底層綁定端口,觸發(fā)channelActive事件,將promise設置為成功。
doBind 調(diào)用jdk底層綁定端口

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

就此服務端啟動的流程結束,當然這里分析不是所有細節(jié)都涉及到了,只是對netty服務端啟動流程的一個簡要分析。

總結

  1. 通過ServerBootstrap將主要的組件先添加進去,比如NioEventLoopGroup、Channel、ChannelHandler、option、childOption等;
  2. 通過反射創(chuàng)建出Channel,并且在創(chuàng)建Channel的時候也會初始化ChannelPipeline;
  3. 將ChannelHandler添加到Channel所對應的ChannelPipeline中;
  4. 通過ServerBootstrapAcceptor將Channel所接受的連接轉交給workGroup處理;
  5. 將第2步初始化Channel注冊到bossGroup上;
  6. 調(diào)用jdk底層綁定端口地址;
  7. 觸發(fā)fireChannelActive事件;

netty服務端的啟動流程主要就是上面的流程,越看netty的源碼就越感覺netty設計的精妙。不管是底層細節(jié)、還是整體框架的設計,比如創(chuàng)建一個netty的客戶端如下

    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

對比服務器只有少數(shù)不同的地方?;蛘卟皇褂肗ioServerSocketChannel作為服務器的底層連接將NioServerSocketChannel替換就可以了比如EpollServerSocketChannel就可以,不需要調(diào)整其他地方。本篇的源碼分析就到此為止。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容