逅弈 轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處,謝謝!
Netty高性能的原因除了他設(shè)計(jì)堪稱完美的IO模型外,另外一個(gè)原因就是他的線程模型。
有關(guān)netty線程模型的內(nèi)容不是本篇文章將要分享的,我只簡(jiǎn)單的描述下netty線程模型的幾個(gè)類別,具體的內(nèi)容可以查詢相關(guān)的資料,大體上netty的線程模型可以分成以下幾種:
單線程模型
所有I/O操作都由一個(gè)線程完成,即多路復(fù)用、事件分發(fā)和處理都是在一個(gè)Reactor線程上完成的多線程模型
一個(gè)Acceptor負(fù)責(zé)接收請(qǐng)求,一個(gè)Reactor Thread Pool負(fù)責(zé)處理I/O操作主從多線程模型
一個(gè)Acceptor負(fù)責(zé)接收請(qǐng)求,一個(gè)Main Reactor Thread Pool負(fù)責(zé)連接,一個(gè)Sub Reactor Thread Pool負(fù)責(zé)處理I/O操作
在網(wǎng)絡(luò)連接從開始到結(jié)束的這段生命周期里,我們需要處理連接中的事件,而這就需要為這些事件創(chuàng)建并執(zhí)行相應(yīng)的任務(wù)。而在netty中一個(gè)Channel通道從建立起來時(shí)就會(huì)為他分配一個(gè)EventLoop,用來處理該通道上需要執(zhí)行的事件,并且該EventLoop直到連接斷開的整個(gè)過程中都不會(huì)發(fā)生變化。
一個(gè)EventLoop的內(nèi)部是一個(gè)Thread在支撐,Netty線程模型的卓越性能之一取決于對(duì)當(dāng)前執(zhí)行的Thread的身份的確定,通過調(diào)用EventLoop的inEventLoop(Thread thread)方法來確定。就是在執(zhí)行一個(gè)任務(wù)時(shí),確定當(dāng)前線程是否是分配給當(dāng)前Channel以及Channel的EventLoop的那一個(gè)線程。如果當(dāng)前線程正好就是支撐EventLoop的那個(gè)線程,那么提交給EventLoop的任務(wù)將會(huì)被直接執(zhí)行,否則EventLoop會(huì)把該任務(wù)放入一個(gè)內(nèi)部的隊(duì)列中進(jìn)行調(diào)度,以便稍后在下一次處理事件時(shí)執(zhí)行。用一個(gè)簡(jiǎn)單的圖表示如下:

鑒于Netty線程模型的基石是建立在EventLoop上的,我們今天就來詳細(xì)的了解下EventLoop。
首先從設(shè)計(jì)上來看,EventLoop采用了一種協(xié)同設(shè)計(jì),它建立在兩個(gè)基本的API之上:Concurrent和Channel,也就是并發(fā)和網(wǎng)絡(luò)。并發(fā)是因?yàn)樗捎昧司€程池來管理大量的任務(wù),并且這些任務(wù)可以并發(fā)的執(zhí)行。其繼承了EventExecutor接口,而EventExecutor就是一個(gè)事件的執(zhí)行器。另外為了與Channel的事件進(jìn)行交互,EventLoop繼承了EventLoopGroup接口。一個(gè)詳細(xì)的EventLoop類繼承層次結(jié)構(gòu)如下:

一個(gè)Netty服務(wù)端啟動(dòng)時(shí),通常會(huì)有兩個(gè)NioEventLoopGroup:一個(gè)boss,一個(gè)worker。第一個(gè)NioEventLoopGroup正常只需要一個(gè)EventLoop,主要負(fù)責(zé)客戶端的連接請(qǐng)求,然后打開一個(gè)Channel,交給第二個(gè)NioEventLoopGroup中的一個(gè)EventLoop來處理這個(gè)Channel上的所有讀寫事件。一個(gè)Channel只會(huì)被一個(gè)EventLoop處理,而一個(gè)EventLoop可能會(huì)被分配給多個(gè)Channel。
我們知道每個(gè)EventLoop只要一個(gè)Thread來支撐并處理事件,可以在SingleThreadEventExecutor類中找到這個(gè)thread:
/**
* 用來執(zhí)行任務(wù)的線程
* 該線程其實(shí)就是用來支撐SingleThreadEventLoop的
*/
private volatile Thread thread;
通過debug,我們可以發(fā)現(xiàn)服務(wù)器啟動(dòng)過程中bootstrap.bind(PORT).sync()最終會(huì)執(zhí)行到AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise)的方法,而doBind0方法如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
最終會(huì)執(zhí)行到EventLoop的execute方法,而execute方法是在Executor接口中定義的,最終會(huì)由SingleThreadEventExecutor來執(zhí)行該execute方法,一個(gè)SingleThreadEventExecutor的調(diào)用鏈大致如下:
#1 SingleThreadEventExecutor.execute(Runnable task)
-> #2 SingleThreadEventExecutor.startThread()
-> #2 SingleThreadEventExecutor.doStartThread()
-> #2 SingleThreadEventExecutor.this.run()
-> #3 NioEventLoop.run() // for(;;)-loop
-> #4 NioEventLoop.select(boolean oldWakenUp)
-> #4 SingleThreadEventExecutor.runAllTasks(long timeoutNanos)
從代碼中可以知道下面一些結(jié)論:
1、SingleThreadEventExecutor中的Thread的初始化在doStartThread這個(gè)方法中
2、最后會(huì)調(diào)用到SingleThreadEventExecutor.this.run()方法,而該run方法是在NioEventLoop中實(shí)現(xiàn)的
3、NioEventLoop中的run方法通過一個(gè)for(;;)循環(huán)來處理Channel事件的類
讓我們來看一下NioEventLoop中的run方法:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
在這個(gè)for(;;)循環(huán)里面,首先會(huì)通過selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())判斷當(dāng)前有沒有可以執(zhí)行的任務(wù),沒有則繼續(xù)循環(huán),如果有任務(wù),那就開始處理任務(wù)。在處理事件的時(shí)候,不僅要處理IO事件,還要處理非I/O事件。其中ioRatio變量表示的是:處理I/O事件和非I/O事件的時(shí)間占比,默認(rèn)為50%。
當(dāng)且僅當(dāng)isShuttingDown()為true的時(shí)候,才開始停止循環(huán)。
我們可以通過在關(guān)鍵代碼里打印一些日志,來查看服務(wù)端啟動(dòng)過程中都執(zhí)行了哪些事情,例如,可以在每個(gè)方法的第一句增加一個(gè)System.out.println(),具體要在哪些方法中加,各位可以自己去嘗試,下面我把我增加后服務(wù)端啟動(dòng)時(shí)打印的日志信息列在下面:
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-startThread
gris-debug-SingleThreadEventExecutor-doStartThread
gris-debug-NioEventLoop-run
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-task poll from taskQueue is null,will break
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
可以看到每一步具體執(zhí)行所在的類和方法,可以發(fā)現(xiàn)最終程序停在NioEventLoop.run()方法中,循環(huán)著,等待事件的到來,然后進(jìn)行處理。
