Bootstrap --- 服務(wù)端

  1. 服務(wù)器端
    在分析客戶端的代碼時(shí),我們已經(jīng)對(duì)Bootstrap啟動(dòng)Netty有了一個(gè)大致的認(rèn)識(shí),那么接下來(lái)分析服務(wù)器端時(shí),就會(huì)相對(duì)簡(jiǎn)單一些了。首先還是來(lái)看一下服務(wù)器端的啟動(dòng)代碼
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

和客戶端的代碼相比,沒(méi)有很大的差別,基本上也是進(jìn)行了如下幾個(gè)部分的初始化:

  • EventLoopGroup:不論是服務(wù)器端還是客戶端,都必須指定EventLoopGroup,在這個(gè)例子中,指定了NioEventLoopGroup,表示一個(gè)Nio的EventLoopGroup,不夠服務(wù)器端需要指定兩個(gè)EventLoopGroup,一個(gè)是bossGroup,用于處理客戶端的連接請(qǐng)求;另一個(gè)是workGroup,用于處理與各個(gè)客戶端連接的IO操作
  • ChannelType:指定Channel的類型,因?yàn)槭欠?wù)器端,因此使用NioServerSocketChannel
  • Handler:設(shè)置數(shù)據(jù)的處理器
  1. Channel的初始化過(guò)程
    我們?cè)诜治隹蛻舳说腃hannel初始化過(guò)程中,已經(jīng)提到,Channel是對(duì)Java底層Socket連接的抽象,并且知道了客戶端的Channel的具體類型是NioSocketChannel,那么自然地,服務(wù)端的Channel類型就是NioServerSocketChannel了
  2. Channel類型的確定
    同樣的分析套路,我們已經(jīng)知道了,在客戶端中,Channel的類型其實(shí)是在初始化時(shí),通過(guò)Bootstap.channel()方法設(shè)置的,服務(wù)端自然也不例外
    在服務(wù)器端,我們調(diào)用了ServerBootstrap.channel(NioServerSocketChannel.class),傳遞了一個(gè)NioServerSocketChannel Class對(duì)象,這樣的話,按照和分析客戶端代碼一樣的流程,我們就可以確定,NioServerSocketChannel的實(shí)例化是通過(guò)BoostrapChannelFactoru工廠類來(lái)完成,而BootstrapChannelFactory中的class字段被設(shè)置為NioServerSocketChannel.class,因此當(dāng)調(diào)用BootstrapChannelFactory.newChannel()時(shí):
@Override
public T newChannel() {
    // 刪除 try 塊
    return clazz.newInstance();
}

就獲取到了一個(gè)NioServerSocketChannel的實(shí)例
總結(jié):

  • ServerBootstrap中的ChannelFactory的實(shí)現(xiàn)是BootrapChannelFactory
  • 生成的Channel的具體類型是NioServerSocketChannel
    Channel的實(shí)例化過(guò)程,其實(shí)就是調(diào)用ChannelFactory.newChannel(),而實(shí)例化的Channel的具體類型又是和在初始化ServerBootstrap時(shí)傳入的channel()方法的參數(shù)相關(guān)。因此對(duì)于我們這個(gè)例子中的服務(wù)器端的ServerBootstrap而言,生成的Channel實(shí)例就是NioServerSocketChannel
  1. NioServerSocketChannel的實(shí)例化過(guò)程


    5.png

    首先,我們來(lái)看一下它的默認(rèn)的構(gòu)造器。和NioSocketChannel類似,構(gòu)造器都是調(diào)用了newSocket來(lái)打開一個(gè)java的Nio Socket,不過(guò)需要注意的是,客戶端的newSocket調(diào)用的是openSocketChannel,而服務(wù)器端的newSocket調(diào)用的是openServerSocketChannel。顧名思義,一個(gè)是客戶端的java SocketChannel,一個(gè)是服務(wù)器端的java ServerSocketChannel

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    return provider.openServerSocketChannel();
}

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

接下來(lái)會(huì)調(diào)用重載的構(gòu)造器

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

這個(gè)構(gòu)造器中,調(diào)用弗雷構(gòu)造器,傳入的參數(shù)是SelectionKey.OP_ACCEPT.作為對(duì)比,我們回想一下,在客戶端的Channel初始化時(shí),傳入的參數(shù)是Selection.OP_READ.原因是Java NIO是一種Reactor模式,我們通過(guò)selector來(lái)實(shí)現(xiàn)I/O的多路復(fù)用,在一開始時(shí),服務(wù)器端需要監(jiān)聽客戶端的連接請(qǐng)求,因此在這里我們?cè)O(shè)置了SelectionKey.OP_ACCEPT,即通知selector我們對(duì)客戶端的連接請(qǐng)求感興趣
接著和客戶端的分析一樣,會(huì)逐級(jí)的調(diào)動(dòng)父類的構(gòu)造器NioServerSocketChannel->AbstractNioMessageChannel->AbstractNioChannel->AbstractChannel
同樣的,在AbstractChannel中會(huì)實(shí)例化一個(gè)unsafe和pipeline:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

不過(guò),這里有一點(diǎn)需要注意的是,客戶端的unsafe是一個(gè)AbstractNioByteChannel#NioByteUnsafe的實(shí)例,而在服務(wù)器端,因?yàn)锳bstractNioMessageChannel重寫了newUnsafe方法

 @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

因此在服務(wù)器端,unsafe字段其實(shí)是一個(gè)AbstractNioMessageChannel#AbstractNioUnsafe實(shí)例
總結(jié)在NioServerSocketChannel實(shí)例化過(guò)程中,所需要做的工作

  • 調(diào)用NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打開一個(gè)新的Java NIO ServerSocketChannel
  • AbstractChannel(Channel parent)中初始化AbstractChannel的屬性
    • parent屬性置為null
    • unsafe通過(guò)newUnsafe()實(shí)例化一個(gè)unsafe對(duì)象,它的類型是AbstractNioMessageChannel#AbstractNioUnsafe內(nèi)部類
    • pipeline是new DefaultChannelPipeline(this)新創(chuàng)建的實(shí)例
  • AbstractNioChannel 中的屬性
    • SelectableChannel ch 被設(shè)置為 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.
    • readInterestOp 被設(shè)置為 SelectionKey.OP_ACCEPT
    • SelectableChannel ch 被配置為非阻塞的 ch.configureBlocking(false)
  • NioServerSocketChannel中的屬性:
    • ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this,javaChannel().socket)
  1. 關(guān)于bossGroup與workerGroup
    在客戶端的時(shí)候,我們只提供了一個(gè)EventLoopGroup對(duì)象,而在服務(wù)器端的初始化時(shí),我們?cè)O(shè)置了兩個(gè)EventLoopGroup,一個(gè)是bossGroup,另一個(gè)是workerGroup,那么這兩個(gè)EventLoopGroup都是干什么用的呢?其實(shí)呢,bossGroup是用于服務(wù)端的accept的,即用于處理客戶端的連接請(qǐng)求。首先,客戶端bossGroup不斷地監(jiān)聽是否有客戶端的連接,當(dāng)發(fā)現(xiàn)有一個(gè)新的客戶端連接到來(lái)時(shí),bossGroup就會(huì)為此連接初始化各項(xiàng)資源,然后從workerGroup中選出一個(gè)EventLoop綁定到此客戶點(diǎn)連接中,那么接下來(lái)的服務(wù)器與客戶端的交互過(guò)程就全部在此分配的EventLoop中了
    源碼如下
    首先在ServerBoostrap初始化時(shí),調(diào)用了b.group(bossGroup,workerGroup)設(shè)置了兩個(gè)EventLoopGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    ...
    this.childGroup = childGroup;
    return this;
}

顯然,這個(gè)方法初始化了兩個(gè)字段,一個(gè)是group=parentGroup,它是在super.group(parentGroup)中初始化的,兩一個(gè)是childGroup = childGroup。接下來(lái)我們啟動(dòng)程序調(diào)用了b.bind方法來(lái)監(jiān)聽一個(gè)本地端口。

AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister

AbstractBootstrap.initAndRegister在分析客戶端程序時(shí),也有

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    ... 省略異常判斷
    init(channel);
    ChannelFuture regFuture = group().register(channel);
    return regFuture;
}

這里group()方法返回的是上面我們提到的bossGroup,而這里的channel我們也已經(jīng)分析過(guò)了,他是一個(gè)NioServerSocketChannel實(shí)例,因此我們可以知道,group().register(channel)將bossGroup和NioServerSocketChannel關(guān)聯(lián)起來(lái)了。那么workerGroup實(shí)在哪里與NioSocketChannel關(guān)聯(lián)起來(lái)呢?
我們繼續(xù)看init(channel)方法

@Override
void init(Channel channel) throws Exception {
    ...
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

init方法在ServerBootstrap中重寫了,從上面的代碼片段中我們出道,它為pipeline中添加了一個(gè)ChannelInitializer,而這個(gè)ChannelInitializer中添加了一個(gè)關(guān)鍵的ServerBootstrapAcceptor handler。關(guān)于handler的添加與初始化的過(guò)程,我們下面介紹,現(xiàn)在關(guān)注一下ServerBoostrapAcceptor類。
ServerBootstrapAcceptor中重寫了channelRead方法,其代碼如下:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    ...
    childGroup.register(child).addListener(...);
}

ServerBoostrapAccept中的childGroup是構(gòu)造此對(duì)象時(shí)傳入的currentChildGroup,即我們的workerGroup,而Channel是一個(gè)NioSocketChannel的實(shí)例,因此這里的childGroup.register就是將workerGroup中某個(gè)EventLoop和NioSocketChannel關(guān)聯(lián)了。既然這樣,那么現(xiàn)在的問(wèn)題是,ServerBoostrapAcceptor.channelRead方法是怎么被調(diào)用的呢?其實(shí)當(dāng)一個(gè)client連接到server時(shí),Java底層的Nio ServerSocketChannel會(huì)有一個(gè)SelectionKey.OP_ACCEPT就緒時(shí)間,接著就會(huì)調(diào)用到NioServerSocketChannel.doReadMessage:


@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    ... 省略異常處理
    buf.add(new NioSocketChannel(this, ch));
    return 1;
}

在doReadMessage()中,通過(guò)javaChannel().accept()獲取到客戶端新連接的SocketChannel,接著就實(shí)例化一個(gè)NioSocketChannel并且傳入NioServerSocketChannel對(duì)象(即this),由此可知,我們創(chuàng)建的而這個(gè)NioSocketChannel的父Channel就是NioServerSocketChannel實(shí)例。
接下來(lái)就經(jīng)由Netty的ChannelPipeline機(jī)制,將讀時(shí)間逐級(jí)發(fā)送到各個(gè)handler中,于是就會(huì)觸發(fā)前面我們提到的ServerBoostrapAcceptor.channelRead方法啦

  1. handler的添加過(guò)程
    服務(wù)器端的handler的添加過(guò)程和客戶端的有點(diǎn)區(qū)別,和EventLoopGroup一樣,服務(wù)器端的handler也有連個(gè),一個(gè)是通過(guò)handler()方法設(shè)置handler字段,另一個(gè)通過(guò)childHandler()設(shè)置childHandler字段,通過(guò)前面的bossGroup和workerGroup的分析,其實(shí)我們?cè)谶@里可以大膽地猜測(cè):handler子彈與accept過(guò)程有關(guān),即這個(gè)handler負(fù)責(zé)處理客戶端的連接請(qǐng)求;而childHandler就是負(fù)責(zé)和客戶端的連接的IO交互。
    在關(guān)于bossGroupworkerGroup小節(jié)中,我們提到,serverBootstrap重寫了init方法,在這個(gè)方法中添加了handler;
@Override
void init(Channel channel) throws Exception {
    ...
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

上面代碼的initChannel方法中,首先通過(guò)handler()方法獲取一個(gè)handler,如果獲取的handle不為空,則添加到pipeline中,然后接著,添加了一個(gè)ServerBootstrapAcceptor實(shí)例,那么這里handler()方法返回的是那個(gè)對(duì)象呢?其實(shí)它返回的是handler字段,而這個(gè)字段就是我們?cè)诜?wù)器端的啟動(dòng)代碼中設(shè)置的:

b.group(bossGroup, workerGroup)
 ...
 .handler(new LoggingHandler(LogLevel.INFO))

那么這個(gè)時(shí)候,pipeline中的handler情況如下:


1.png

根據(jù)我們?cè)瓉?lái)分析客戶端的經(jīng)驗(yàn),我們制定,當(dāng)channel綁定到eventLoop后(在這里是NioServerSocketChannel綁定到bossGroup)中時(shí),會(huì)在pipeline中發(fā)出fireChannelRegister時(shí)間,接著就會(huì)觸發(fā)ChannelInitializer.initChannel方法的調(diào)用。因此在綁定完成后,此時(shí)的pipeline的內(nèi)容如下


2.png

前面我們?cè)诜治鯾ossGroup和workerGroup時(shí),已經(jīng)知道了在ServerBoostrapAcceptor.channelRead中會(huì)為新建的Channel設(shè)置handler并注冊(cè)到一個(gè)eventLoop中,即

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    ...
    childGroup.register(child).addListener(...);
}

而這里的childHandler就是我們?cè)诜?wù)器端啟動(dòng)代碼中設(shè)置的handler

b.group(bossGroup, workerGroup)
 ...
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         if (sslCtx != null) {
             p.addLast(sslCtx.newHandler(ch.alloc()));
         }
         //p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });

當(dāng)這個(gè)客戶端連接Channel注冊(cè)后,就會(huì)觸發(fā)ChannelInitializer.initChannel方法的調(diào)用,此后的客戶端連接的ChannelPipeline狀態(tài)如下:


3.png

最后我們來(lái)總結(jié)一下服務(wù)器端的handler與childHandler的區(qū)別與聯(lián)系

  • 在服務(wù)器NioServerSocketChannel的pipeline中添加的handler與ServerBoostrapAeecptor。
  • 當(dāng)有新的客戶端連接請(qǐng)求時(shí),ServerBoostrapAcceptor.channelRead中負(fù)責(zé)新建此連接的NioSocketChannel并添加childHandler到NioSocketChannel對(duì)應(yīng)的pipeline中,并將此channel綁定到workerGroup中的某個(gè)eventLoop中。
  • handler是在accept階段起作用,他處理客戶端的連接請(qǐng)求
  • childHandler是在客戶端連接建立以后起作用的,它負(fù)責(zé)客戶端連接的IO交互。


    4.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容