Netty源碼死磕二(Netty的啟動流程)

引言

上一篇文章介紹了Netty的線程模型及EventLoop機(jī)制,相信大家對Netty已經(jīng)有一個基本的認(rèn)識。那么本篇文章我會根據(jù)Netty提供的Demo來分析一下Netty啟動流程。

啟動流程概覽

開始之前,我們先來分析下Netty服務(wù)端的啟動流程,下面是一個簡單的流程圖

image

啟動流程大致分為五步

  1. 創(chuàng)建ServerBootstrap實例,ServerBootstrap是Netty服務(wù)端的啟動輔助類,其存在意義在于其整合了Netty可以提供的所有能力,并且盡可能的進(jìn)行了封裝,以方便我們使用
  2. 設(shè)置并綁定EventLoopGroup,EventLoopGroup其實是一個包含了多個EventLoop的NIO線程池,在上一篇文章我們也有比較詳細(xì)的介紹過EventLoop事件循環(huán)機(jī)制,不過值得一提的是,Netty 中的EventLoop不僅僅只處理IO讀寫事件,還會處理用戶自定義或系統(tǒng)的Task任務(wù)
  3. 創(chuàng)建服務(wù)端Channel NioServerSocketChannel,并綁定至一個EventLoop上。在初始化NioServerSocketChannel的同時,會創(chuàng)建ChannelPipelineChannelPipeline其實是一個綁定了多個ChannelHandler的執(zhí)行鏈,后面我們會詳細(xì)介紹
  4. 為服務(wù)端Channel添加并綁定ChannelHandler,ChannelHandler是Netty開放給我們的一個非常重要的接口,在觸發(fā)網(wǎng)絡(luò)讀寫事件后,Netty都會調(diào)用對應(yīng)的ChannelHandler來處理,后面我們會詳細(xì)介紹
  5. 為服務(wù)端Channel綁定監(jiān)聽端口,完成綁定之后,Reactor線程(也就是第三步綁定的EventLoop線程)就開始執(zhí)行Selector輪詢網(wǎng)絡(luò)IO事件了,如果Selector輪詢到網(wǎng)絡(luò)IO事件了,則會調(diào)用Channel對應(yīng)的ChannelPipeline來依次執(zhí)行對應(yīng)的ChannelHandler

啟動流程源碼分析

下面我們就從啟動源碼來進(jìn)一步分析 Netty 服務(wù)端的啟動流程

入口

首先來看下常見的啟動代碼

// 配置bossEventLoopGroup 配置大小為1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 配置workEventLoopGroup 配置大小默認(rèn)為cpu數(shù)*2
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 自定義handler
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
    // 啟動輔助類 配置各種參數(shù)(服務(wù)端Channel類,EventLoopGroup,childHandler等)
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     // 配置 channel通道,會反射實例化
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 100)
     .handler(new LoggingHandler(LogLevel.INFO))
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             //p.addLast(new LoggingHandler(LogLevel.INFO));
             p.addLast(serverHandler);
         }
     });

    // 綁定監(jiān)聽端口 啟動服務(wù)器
    ChannelFuture f = b.bind(PORT).sync();

    // 等待服務(wù)器關(guān)閉
    f.channel().closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}
  1. 可以看到上面的代碼首先創(chuàng)建了兩個EventLoopGroup,在上一篇文章我們有介紹過Netty的線程模型有三種,而不同的EventLoopGroup配置對應(yīng)了三種不同的線程模型。這里創(chuàng)建的兩個EventLoopGroup則是用了多線程Reactor模型,其中bossEventLoopGroup對應(yīng)的就是處理Accept事件的線程組,而workEventLoopGroup則負(fù)責(zé)處理IO讀寫事件。
  2. 然后就是創(chuàng)建了一個啟動輔助類ServerBootstrap,并且配置了如下幾個重要參數(shù)
    • group 兩個Reactor線程組(bossEventLoopGroup, workEventLoopGroup)
    • channel 服務(wù)端Channel
    • option 服務(wù)端socket參數(shù)配置 例如SO_BACKLOG指定內(nèi)核未連接的Socket連接排隊個數(shù)
    • handler 服務(wù)端Channel對應(yīng)的Handler
    • childHandler 客戶端請求Channel對應(yīng)的Handler
  3. 綁定服務(wù)端監(jiān)聽端口,啟動服務(wù) -> ChannelFuture f = b.bind(PORT).sync();

這篇文章主要是分析Netty的啟動流程。so我們直接看b.bind(PORT).sync()的源碼

bind 發(fā)現(xiàn)該方法內(nèi)部實際調(diào)用的是doBind(final SocketAddress localAddress)方法

doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化服務(wù)端Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
    // 初始化一個 promise(異步回調(diào))
        ChannelPromise promise = channel.newPromise();
        // 綁定監(jiān)聽端口
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } 
    .... // 省略其他代碼
}

doBind主要做了兩個事情

  1. initAndRegister() 初始化Channel
  2. doBind0 綁定監(jiān)聽端口

initAndRegister()

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
    // new一個新的服務(wù)端Channel
        channel = channelFactory.newChannel();
        // 初始化Channel
        init(channel);
    } catch (Throwable t) {
        ...
    }
    // 將Channel注冊到EventLoopGroup中一個EventLoop上
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
  1. channelFactory.newChannel()其實就是通過反射創(chuàng)建配置的服務(wù)端Channel類,在這里是NioServerSocketChannel

  2. 創(chuàng)建完成的NioServerSocketChannel進(jìn)行一些初始化操作,例如將我們配置的Handler加到服務(wù)端Channelpipeline

  3. Channel注冊到EventLoopGroup中一個EventLoop上

下面我們來看下NioServerSocketChannel類的構(gòu)造方法,看看它到底初始化了哪些東西,先看下其繼承結(jié)構(gòu)

NioServerSocketChannel初始化
image

下面是它的構(gòu)造方法的調(diào)用順序,依次分為了四步

// 1
public NioServerSocketChannel() {
// 通過 SelectProvider來初始化一個Java NioServerChannel
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

// 2.
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 創(chuàng)建一個配置類,持有Java Channel
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 3.
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
    // 設(shè)置Channel為非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                        "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

// 4
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 生成一個channel Id
    id = newId();
    // 創(chuàng)建一個 unSafe 類,unsafe封裝了Netty底層的IO讀寫操作
    unsafe = newUnsafe();
    // 創(chuàng)建一個 pipeline類
    pipeline = newChannelPipeline();
}

可以看到NioServerSocketChannel的構(gòu)造函數(shù)主要是初始化并綁定了以下3類

  1. 綁定一個Java ServerSocketChannel
  2. 綁定一個unsafe類,unsafe封裝了Netty底層的IO讀寫操作
  3. 綁定一個pipeline,每個Channel都會唯一綁定一個pipeline
init(Channel channel)
void init(Channel channel) {
// 設(shè)置Socket參數(shù)
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

    ChannelPipeline p = channel.pipeline();
    // 子EventLoopGroup用于完成Nio讀寫操作
    final EventLoopGroup currentChildGroup = childGroup;
    // 為workEventLoop配置的自定義Handler
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    // 設(shè)置附加參數(shù)
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
      // 為服務(wù)端Channel pipeline 配置 對應(yīng)的Handler
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

這里主要是為服務(wù)端Channel配置一些參數(shù),以及對應(yīng)的處理器ChannelHandler,注意這里不僅僅會把我們自定義配置的ChannelHandler加上去,同時還會自動幫我們加入一個系統(tǒng)Handler(ServerBootstrapAcceptor),這就是Netty用來接收客戶端請求的Handler,在ServerBootstrapAcceptor內(nèi)部會完成SocketChannel的連接,EventLoop的綁定等操作,之后我們會著重分析這個類

Channel的注冊
// MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
// next()會選擇一個EventLoop來完成Channel的注冊
    return next().register(channel);
}

// SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
// AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
    // 注冊邏輯
        register0(promise);
    } 
    ....
}

完成注冊流程

  1. 完成實際的Java ServerSocketChannelSelect選擇器的綁定
  2. 并觸發(fā)channelRegistered以及channelActive事件

到這里為止,其實Netty服務(wù)端已經(jīng)基本啟動完成了,就差綁定一個監(jiān)聽端口了。可能讀者會很詫異,怎么沒有看到Nio線程輪詢 IO事件的循環(huán)呢,講道理肯定應(yīng)該有一個死循環(huán)才對?那我們下面就把這段代碼找出來

在之前的代碼中,我們經(jīng)常會看到這樣一段代碼

// 往EventLoop中丟了一個異步任務(wù)(其實是同步的,因為只有一個Nio線程,不過因為是事件循環(huán)機(jī)制(丟到一個任務(wù)隊列中),看起來像是異步的)
eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        ...
    }
});

eventLoop.execute到底做了什么事情?

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // 把當(dāng)前任務(wù)添加到任務(wù)隊列中
    addTask(task);
    // 不是Nio線程自己調(diào)用的話,則表明是初次啟動
    if (!inEventLoop) {
    // 啟動EventLoop的Nio線程
        startThread();
        ...
    }
    ...
}

/**
 * 啟動EventLoop的Nio線程
 */
private void doStartThread() {
    assert thread == null;
    // 啟動Nio線程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            ... 
}

通過上面的代碼可以知道這里主要做了兩件事情

  1. 創(chuàng)建的任務(wù)被丟入了一個隊列中等待執(zhí)行
  2. 如果是初次創(chuàng)建,則啟動Nio線程
  3. SingleThreadEventExecutor.this.run(); 調(diào)用子類的Run實現(xiàn)(執(zhí)行IO事件的輪詢)
    看下 NioEventLoopRun方法實現(xiàn)
protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
            // 獲取IO事件類型
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                ... 
                default:
                }
            } catch (IOException e) {
                // 出現(xiàn)異常 重建Selector
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                    // 處理對應(yīng)事件,激活對應(yīng)的ChannelHandler事件
                        processSelectedKeys();
                    }
                } finally {
                    // 處理完事件了才執(zhí)行全部Task
                    ranTasks = runAllTasks();
                }
            }
            ...
        }   
    }
}

到這里的代碼是不是就非常熟悉了,熟悉的死循環(huán)輪詢事件

  1. 通過Selector來輪詢IO事件
  2. 觸發(fā)Channel所綁定的Handler處理對應(yīng)的事件
  3. 處理完IO事件了 會執(zhí)行系統(tǒng)或用戶自定義加入的Task

doBind0

實際的Bind邏輯在 NioServerSocketChannel中執(zhí)行,我們直接省略前面一些冗長的調(diào)用,來看下最底層的調(diào)用代碼,發(fā)現(xiàn)其實就是調(diào)用其綁定的Java Channel來執(zhí)行對應(yīng)的監(jiān)聽端口綁定邏輯

protected void doBind(SocketAddress localAddress) throws Exception {
// 如果JDK版本大于7
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

尾言

本篇文章把Netty的啟動流程粗略的捋了一遍,目的不是為了摳細(xì)節(jié),而是大致能夠清楚Netty服務(wù)端啟動時主要做了哪些事情,所以有些地方難免會比較粗略一筆帶過。在后面的文章我會把一些細(xì)節(jié)的源碼單獨拎出來深入分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容