Netty 分享之 EventLoop

逅弈 轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處,謝謝!

Netty高性能的原因除了他設(shè)計(jì)堪稱完美的IO模型外,另外一個(gè)原因就是他的線程模型。

有關(guān)netty線程模型的內(nèi)容不是本篇文章將要分享的,我只簡(jiǎn)單的描述下netty線程模型的幾個(gè)類別,具體的內(nèi)容可以查詢相關(guān)的資料,大體上netty的線程模型可以分成以下幾種:

  • 單線程模型
    所有I/O操作都由一個(gè)線程完成,即多路復(fù)用、事件分發(fā)和處理都是在一個(gè)Reactor線程上完成的

  • 多線程模型
    一個(gè)Acceptor負(fù)責(zé)接收請(qǐng)求,一個(gè)Reactor Thread Pool負(fù)責(zé)處理I/O操作

  • 主從多線程模型
    一個(gè)Acceptor負(fù)責(zé)接收請(qǐng)求,一個(gè)Main Reactor Thread Pool負(fù)責(zé)連接,一個(gè)Sub Reactor Thread Pool負(fù)責(zé)處理I/O操作

在網(wǎng)絡(luò)連接從開始到結(jié)束的這段生命周期里,我們需要處理連接中的事件,而這就需要為這些事件創(chuàng)建并執(zhí)行相應(yīng)的任務(wù)。而在netty中一個(gè)Channel通道從建立起來時(shí)就會(huì)為他分配一個(gè)EventLoop,用來處理該通道上需要執(zhí)行的事件,并且該EventLoop直到連接斷開的整個(gè)過程中都不會(huì)發(fā)生變化。
一個(gè)EventLoop的內(nèi)部是一個(gè)Thread在支撐,Netty線程模型的卓越性能之一取決于對(duì)當(dāng)前執(zhí)行的Thread的身份的確定,通過調(diào)用EventLoop的inEventLoop(Thread thread)方法來確定。就是在執(zhí)行一個(gè)任務(wù)時(shí),確定當(dāng)前線程是否是分配給當(dāng)前Channel以及Channel的EventLoop的那一個(gè)線程。如果當(dāng)前線程正好就是支撐EventLoop的那個(gè)線程,那么提交給EventLoop的任務(wù)將會(huì)被直接執(zhí)行,否則EventLoop會(huì)把該任務(wù)放入一個(gè)內(nèi)部的隊(duì)列中進(jìn)行調(diào)度,以便稍后在下一次處理事件時(shí)執(zhí)行。用一個(gè)簡(jiǎn)單的圖表示如下:

Executor_logic.png

鑒于Netty線程模型的基石是建立在EventLoop上的,我們今天就來詳細(xì)的了解下EventLoop。
首先從設(shè)計(jì)上來看,EventLoop采用了一種協(xié)同設(shè)計(jì),它建立在兩個(gè)基本的API之上:Concurrent和Channel,也就是并發(fā)和網(wǎng)絡(luò)。并發(fā)是因?yàn)樗捎昧司€程池來管理大量的任務(wù),并且這些任務(wù)可以并發(fā)的執(zhí)行。其繼承了EventExecutor接口,而EventExecutor就是一個(gè)事件的執(zhí)行器。另外為了與Channel的事件進(jìn)行交互,EventLoop繼承了EventLoopGroup接口。一個(gè)詳細(xì)的EventLoop類繼承層次結(jié)構(gòu)如下:

EventLoop_Class_Structure.png

一個(gè)Netty服務(wù)端啟動(dòng)時(shí),通常會(huì)有兩個(gè)NioEventLoopGroup:一個(gè)boss,一個(gè)worker。第一個(gè)NioEventLoopGroup正常只需要一個(gè)EventLoop,主要負(fù)責(zé)客戶端的連接請(qǐng)求,然后打開一個(gè)Channel,交給第二個(gè)NioEventLoopGroup中的一個(gè)EventLoop來處理這個(gè)Channel上的所有讀寫事件。一個(gè)Channel只會(huì)被一個(gè)EventLoop處理,而一個(gè)EventLoop可能會(huì)被分配給多個(gè)Channel。

我們知道每個(gè)EventLoop只要一個(gè)Thread來支撐并處理事件,可以在SingleThreadEventExecutor類中找到這個(gè)thread:

/**
 * 用來執(zhí)行任務(wù)的線程
 * 該線程其實(shí)就是用來支撐SingleThreadEventLoop的
 */
private volatile Thread thread;

通過debug,我們可以發(fā)現(xiàn)服務(wù)器啟動(dòng)過程中bootstrap.bind(PORT).sync()最終會(huì)執(zhí)行到AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise)的方法,而doBind0方法如下:

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}  

最終會(huì)執(zhí)行到EventLoop的execute方法,而execute方法是在Executor接口中定義的,最終會(huì)由SingleThreadEventExecutor來執(zhí)行該execute方法,一個(gè)SingleThreadEventExecutor的調(diào)用鏈大致如下:

#1 SingleThreadEventExecutor.execute(Runnable task)
    -> #2 SingleThreadEventExecutor.startThread()
    -> #2 SingleThreadEventExecutor.doStartThread()
    -> #2 SingleThreadEventExecutor.this.run()
        -> #3 NioEventLoop.run() // for(;;)-loop
            -> #4 NioEventLoop.select(boolean oldWakenUp)
            -> #4 SingleThreadEventExecutor.runAllTasks(long timeoutNanos)

從代碼中可以知道下面一些結(jié)論:
1、SingleThreadEventExecutor中的Thread的初始化在doStartThread這個(gè)方法中
2、最后會(huì)調(diào)用到SingleThreadEventExecutor.this.run()方法,而該run方法是在NioEventLoop中實(shí)現(xiàn)的
3、NioEventLoop中的run方法通過一個(gè)for(;;)循環(huán)來處理Channel事件的類

讓我們來看一下NioEventLoop中的run方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

在這個(gè)for(;;)循環(huán)里面,首先會(huì)通過selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())判斷當(dāng)前有沒有可以執(zhí)行的任務(wù),沒有則繼續(xù)循環(huán),如果有任務(wù),那就開始處理任務(wù)。在處理事件的時(shí)候,不僅要處理IO事件,還要處理非I/O事件。其中ioRatio變量表示的是:處理I/O事件和非I/O事件的時(shí)間占比,默認(rèn)為50%。
當(dāng)且僅當(dāng)isShuttingDown()為true的時(shí)候,才開始停止循環(huán)。

我們可以通過在關(guān)鍵代碼里打印一些日志,來查看服務(wù)端啟動(dòng)過程中都執(zhí)行了哪些事情,例如,可以在每個(gè)方法的第一句增加一個(gè)System.out.println(),具體要在哪些方法中加,各位可以自己去嘗試,下面我把我增加后服務(wù)端啟動(dòng)時(shí)打印的日志信息列在下面:

gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-startThread
gris-debug-SingleThreadEventExecutor-doStartThread
gris-debug-NioEventLoop-run
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-task poll from taskQueue is null,will break
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom

可以看到每一步具體執(zhí)行所在的類和方法,可以發(fā)現(xiàn)最終程序停在NioEventLoop.run()方法中,循環(huán)著,等待事件的到來,然后進(jìn)行處理。

更多原創(chuàng)好文,請(qǐng)關(guān)注「逅弈逐碼」
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容