首先來張網(wǎng)上盛傳的netty框架參考圖,以供跟蹤代碼參考:
netty框架參考圖.jpg
一段標準的Netty服務(wù)端啟動代碼如下:
public NettyTcpServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(4);
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 5)
.childOption(ChannelOption.TCP_NODELAY, true);
}
public void bind(String ip, int port) {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new ProtoDecoder(upLimit))
.addLast("server-handler", new ServerHandler())
.addLast("encoder", new ProtoEncoder(downLimit));
}
});
InetSocketAddress address = new InetSocketAddress(ip, port);
try {
bootstrap.bind(address).sync();
} catch (InterruptedException e) {
log.error("bind "+ip+":"+port+" failed", e);
shutdown();
}
}
Netty的服務(wù)端,一般會啟動兩個NioEventLoopGroup線程組(個人感覺用組比用池更準確,這里組指數(shù)組),一個為bossGroup線程組,處理客戶端的連接請求;一個workerGroup線程組,用來處理IO事件。很多人都知道Netty服務(wù)端就是做這事的,今天我們就用源碼來揭示這是如何實現(xiàn)的。
先從NioEventLoopGroup的構(gòu)造方法著手:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
一路查看實現(xiàn),會來到這里:
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
可見,實例化NioEventLoopGroup時,如果在這里沒有設(shè)置參數(shù),也沒有在JVM參數(shù)里設(shè)置“-Dio.netty.eventLoopThreads=x”,那么這個線程組的默認線程數(shù)為CPUx2,否則為設(shè)置的參數(shù)值,最后,可以看到NioEventLoopGroup的具體實現(xiàn)為:
if (executor == null) {
//后續(xù)Netty對各種IO事件的處理就是通過此executor創(chuàng)建線程處理的
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//創(chuàng)建NIOEventLoop數(shù)組
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//實例化每個NIOEventLoop,每個NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//......
}
}
//選擇器,選擇由哪個NIOEventLoop處理,注意這里以children作為傳參
chooser = chooserFactory.newChooser(children);
在實例化NIOEventLoopGroup時,首先創(chuàng)建了一個Executor,而Executor的作用就是通常被用來代替顯示地創(chuàng)建線程的,Executor對象可以用來執(zhí)行Runnable任務(wù),該接口將“任務(wù)提交”從任務(wù)的運行機制中解耦出來,包括線程使用、調(diào)度等細節(jié)。我們看Netty中的它的實現(xiàn)如下,由它的execute方法中threadFactory.newThread(command).start();一句,證實了我們所說的Executor的作用,netty很有可能就是通過這句來創(chuàng)建它的那些IO線程的,我們不妨先猜猜,它是在什么時候執(zhí)行這句的呢?(彩蛋)
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();//創(chuàng)建NettyIO線程,重點代碼,將斷點打于此處,可查看整個Netty線程啟動步驟
}
再回溯到上面的NIOEventLoopGroup代碼段,由children = new EventExecutor[nThreads];一句,NIOEventLoopGroup實則創(chuàng)建了一個new NioEventLoopGroup(4)中參數(shù)數(shù)量的NIOEventLoop數(shù)組,NIOEventLoop的實例就是通過newChild(executor, args)方法添加的,由下面的幾段代碼可知,每個NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers。而newChild(executor, args)方法就是做NIOEventLoop實例的初始化工作,一路跟蹤我們還可發(fā)現(xiàn),NIOEventLoop的任務(wù)隊列用的是LinkedBlockingQueue,大小為Integer.MAX_VALUE,如果沒設(shè)置JVM參數(shù)“-Dio.netty.eventLoop.maxPendingTasks=x”的話:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NIOEventLoop構(gòu)造為:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
再看該構(gòu)造中的super方法為:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
由以上代碼我們大概知道了NIOEventLoopGroup和NIOEventLoop的實現(xiàn),NIOEventLoopGroup其實是一個NIOEventLoop的數(shù)組,每個NIOEventLoop都公用了一個Executor,后續(xù)創(chuàng)建線程的事都由Executor來創(chuàng)建,由threadFactory.newThread(command).start();一句可知。
但是我們還有一個疑問,那就是Netty什么時候開始啟動這些線程的呢?上述線程的啟動貌似和new NioEventLoopGroup(4)中的參數(shù)也沒有任何關(guān)系,那它怎么知道只創(chuàng)建new NioEventLoopGroup(4)中的參數(shù)個數(shù)的線程呢?我們只知道這個參數(shù)賦值給了children = new EventExecutor[nThreads];但在哪里限制了此參數(shù)的線程數(shù)量呢?即Netty怎么知道最大要啟動這么多個線程呢?線程的啟動數(shù)量和這個children數(shù)組有什么關(guān)系呢?
帶著這些疑問,我們不妨全局搜索這個children這個變量
children全局引用.png
發(fā)現(xiàn)它大多數(shù)時候用于關(guān)閉和終止判斷,這些看不出和啟動線程數(shù)有什么關(guān)系;只有在NioEventLoopGroup的實現(xiàn)中這里將children傳參給了一個字面意思叫選擇器的類,如下:
//選擇器,選擇由哪個NIOEventLoop處理,注意這里children的傳參
chooser = chooserFactory.newChooser(children);
啟動的線程數(shù)是否和這個選擇器有關(guān)呢?我們再跟進去看看
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
發(fā)現(xiàn),netty把這個NIOEventLoop數(shù)組賦值給了它的兩個內(nèi)部類Chooser之一,它們的唯一提供的使用方法是next()方法,看這方法實現(xiàn):executors[Math.abs(idx.getAndIncrement() % executors.length)]貌似這里告訴了netty當前應(yīng)使用哪個NIOEventLoop,看來可以懷疑啟動的線程數(shù)量和這個next()方法有關(guān),那只要全局搜下這個next()方法,就應(yīng)該知道了
next()全局引用.png
有這么多地方調(diào)用,要是第一次看Netty源碼,從next()反推過去看代碼直至看到跟ServerBootstrap或NioEventLoopGroup相關(guān)的next()調(diào)用,太難找了(我第一次看就是- -),而Netty服務(wù)端啟動代碼中間還有那么多的源碼沒看,究竟是在哪里限制了線程的啟動數(shù)量呢?
既然不知道在哪里限制線程數(shù)量的,也不知道何時啟動線程的,但是線程的創(chuàng)建地方我們是找到了的,即executor的初始化地方,如下
if (executor == null) {
//后續(xù)Netty對各種IO事件的處理就是通過此executor創(chuàng)建線程處理的
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
點進ThreadPerTaskExecutor,根據(jù)Executor的作用“就是通常被用來代替顯示地創(chuàng)建線程的”,我們懷疑是在這里創(chuàng)建線程的,那么我們何不在此打上斷點呢?
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();//創(chuàng)建NettyIO線程,重點代碼,將斷點打于此處,可查看整個Netty線程啟動步驟
}
打上斷點后,啟動ServerBootstrap,發(fā)現(xiàn)確實調(diào)到這里來了,由此證實了我們猜想的正確性,通過查看調(diào)用堆棧,如下圖所示:
服務(wù)端啟動啟動boss線程.png
一下就可以發(fā)現(xiàn),原來Netty第一個線程的啟動是在綁定地址和端口開始的,再看里面的NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) 行: 85 這句,
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel); //先選一個NIOEventLoop出來,然后再執(zhí)行它的register(channel)方法
}
它就是我們在查找next()全局引用圖中(見上一圖next()全局引用)的最小紅框標記的MultithreadEventLoopGroup類下的register(Channel)方法,我們開始不知道netty啟動線程是如何限制數(shù)量的,只懷疑這數(shù)量限制跟next()方法有關(guān),現(xiàn)在前后呼應(yīng)起來了,知道是這么個流程了,即Netty是在綁定服務(wù)器地址和ip時,先啟動一個線程去接受客戶端連接的,這個線程的啟動過程就如上圖所示(客戶端的啟動線程是在connect方法中開始的)。
再回到Netty的服務(wù)端啟動代碼,現(xiàn)在我們換種方式看源碼,之前我們只看了NioEventLoopGroup實現(xiàn),其余的還都沒看,現(xiàn)在我們跳過中間那些源碼,從bootstrap.bind(address).sync();中bind方法開始。
一直跟蹤方法堆棧,暫且先別管其他源碼,我們要先弄懂Netty是如何啟動boss線程組,去接受客戶端的連接,如何啟動worker線程組,去處理各類IO事件。點進看register實現(xiàn):
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
跟蹤源碼時可知NioEventLoop繼承自SingleThreadEventLoop,而SingleThreadEventLoop繼承自SingleThreadEventExecutor,SingleThreadEventExecutor最終又繼承自Executor,所以在這里最終會進入我們剛打的斷點,再看SingleThreadEventExecutor中的execute方法:
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
}
進入startThread()最終來到這里:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {//這里就是每個NIOEventLoop中的executor,它會執(zhí)行NIOEventLoop中的executor中的threadFactory.newThread(command).start();這句
@Override
public void run() { //注意這里是線程啟動后需執(zhí)行的代碼
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
}
}
});
}
這便創(chuàng)建并啟動了netty的處理客戶端連接的線程。線程啟動后需要做什么事呢?根據(jù)我們的了解,Netty的bossGroup線程組和workerGroup線程組啟動后,要分別時刻處理客戶端的連接和IO事件,那么這些線程應(yīng)該具備某種功能,需要時時刻刻知道是否有客戶端連接或IO事件,根據(jù)以往經(jīng)驗,這個實現(xiàn)往往是用無限for循環(huán)實現(xiàn)的,我們只要找到哪里有for死循環(huán)的地方或類似功能的地方即可。再點進SingleThreadEventExecutor.this.run(),來到NioEventLoop的run方法,我們要找到哪里具備如此功能:
@Override
protected void run() {
for (;;) {
try {
//可以在此處打上斷點,驗證啟動后是否會進這里無限循環(huán)
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} catch (Throwable t) {
}
}
}
一進來就發(fā)現(xiàn)是無限循環(huán)了,根據(jù)以往對無限循環(huán)的認知,在無限循環(huán)里往往是可以時時刻刻做某種事的,再點進processSelectedKeys()方法,查看哪里在做跟連接和處理IO相關(guān)的事了,最終找到這里:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
由SelectionKey.OP_CONNECT、SelectionKey.OP_ACCEPT、SelectionKey.OP_READ、SelectionKey.OP_WRITE字面猜想,很可能就是這里處理連接和IO事件的,我們先在上面run方法里的switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) 這句打上斷點,然后再分別在這些if分支里再打上斷點,驗證我們的猜想,重新啟動netty服務(wù)端:
啟動netty服務(wù)端進入run方法無限循環(huán).png
果然進入了此斷點,驗證了我們猜想的正確性,但這里只是啟動了處理客戶端連接的線程,這里還無法進入后面打的那些if分支的,因為還沒有客戶端請求連接和發(fā)送數(shù)據(jù)操作,因此,先放開run里的斷點,我們再啟動一個netty客戶端(netty服務(wù)端和netty客戶端分別用的是《使用Netty+Protobuf實現(xiàn)游戲TCP通信》的源碼):
netty客戶端在connect方法中啟動了自己的線程.png
可見,客戶端在自己的connect()方法中啟動了自己的線程,請求后,客戶端進入了SelectionKey.OP_CONNECT分支,即發(fā)起請求連接
客戶端進入SelectionKey.OP_CONNECT分支發(fā)起請求連接.png
而服務(wù)端,則進入了SelectionKey.OP_ACCEPT分支,此時readyOps=16,接受連接請求
服務(wù)端進入SelectionKey.OP_ACCEPT分支接受連接.png
若我們提前在threadFactory.newThread(command).start()打上斷點的話,如下:
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
會發(fā)現(xiàn),服務(wù)端會再次(第二次)進入此斷點,第一次服務(wù)端進入此斷點,是啟動boss線程,第二次進入是啟動worker線程,若客戶端還有數(shù)據(jù)發(fā)給服務(wù)端,則服務(wù)端還會再次進入SelectionKey.OP_READ分支,此時readyOps=1:
服務(wù)端再次進入SelectionKey.OP_READ讀取IO數(shù)據(jù).png
最終來個簡單流程回顧:
1)首先服務(wù)端啟動netty -> 服務(wù)端netty會啟動boss線程;
2)客戶端啟動netty -> 客戶端netty會啟動自己請求連接線程,客戶端進入SelectionKey.OP_CONNECT分支;
3)服務(wù)端進入SelectionKey.OP_ACCEPT分支;
4)服務(wù)端netty -> 啟動worker線程,接受客戶端的連接;
如果客戶端再發(fā)送數(shù)據(jù)給服務(wù)端:
5)服務(wù)端進入SelectionKey.OP_READ,讀取客戶端發(fā)送的數(shù)據(jù);
如果服務(wù)端也發(fā)送數(shù)據(jù)給客戶端:
6)客戶端進入SelectionKey.OP_READ,讀取服務(wù)端發(fā)送的數(shù)據(jù);
我們現(xiàn)在已經(jīng)大致知道了是如何啟動boss線程的了,那么worker線程又是如何啟動的呢?超過worker線程上限,netty又如何知道不需要再啟動線程了呢?
限于篇幅,將在下篇講解,敬請期待...








