【相關(guān)源碼都是出自4.1.55.Final-SNAPSHOT版本】
在學(xué)習(xí)源碼之前,先看下官方的example是怎樣做的(以下代碼刪減了部分不必要代碼和添加部分中文注釋)
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// 創(chuàng)建線程池組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 創(chuàng)建 ServerBootstrap 對(duì)象
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 綁定線程池組
.channel(NioServerSocketChannel.class) // 服務(wù)端channel類型
.option(ChannelOption.SO_BACKLOG, 100) // TCP配置
.handler(new LoggingHandler(LogLevel.INFO)) // 服務(wù)端Handler
.childHandler(new ChannelInitializer<SocketChannel>() { // 客戶端Handler
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 綁定端口并啟動(dòng)
ChannelFuture f = b.bind(PORT).sync();
// 等待直到服務(wù)端channel關(guān)閉
f.channel().closeFuture().sync();
} finally {
// 優(yōu)雅關(guān)閉線程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
簡(jiǎn)單來(lái)說(shuō),啟動(dòng)Netty服務(wù)端需要以下幾步:
- 創(chuàng)建線程池組
- 創(chuàng)建并初始化ServerBootstrap對(duì)象
- 綁定端口并啟動(dòng)
- 優(yōu)雅關(guān)閉線程組
接下來(lái)的服務(wù)端啟動(dòng)源碼剖析也是按照這四步逐一展開(kāi)...
創(chuàng)建線程池組
首先第一步需要?jiǎng)?chuàng)建線程組,一個(gè)線程池是專門(mén)用來(lái)負(fù)責(zé)接收客戶端連接,另外一個(gè)線程池是專門(mén)負(fù)責(zé)處理客戶端的請(qǐng)求,分別對(duì)應(yīng)以下bossGroup和workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

實(shí)際上兩種線程池的創(chuàng)建過(guò)程都是一樣的,無(wú)參構(gòu)造器創(chuàng)建出來(lái)的線程數(shù)默認(rèn)是當(dāng)前啟動(dòng)服務(wù)端的服務(wù)器CPU核數(shù) * 2,有參構(gòu)造器就是按照指定數(shù)值,創(chuàng)建對(duì)應(yīng)的線程。
從new NioEventLoopGroup()位置debug進(jìn)去,一直debug到MultithreadEventLoopGroup類可以發(fā)現(xiàn),如果沒(méi)有指定線程數(shù),則會(huì)默認(rèn)使用DEFAULT_EVENT_LOOP_THREADS參數(shù)的值創(chuàng)建線程。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
然后繼續(xù)debug進(jìn)去,直到最后是MultithreadEventExecutorGroup類的相關(guān)構(gòu)造器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 刪減參數(shù)判斷代碼
// 如果 executot 為空,則使用netty默認(rèn)的線程工廠——ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 初始化線程池
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 創(chuàng)建線程池?cái)?shù)組
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
// 優(yōu)雅關(guān)閉線程池中所有的線程
children[j].shutdownGracefully();
}
// 省略異常處理邏輯
}
}
}
}
// 初始化線程選擇器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 為每個(gè)線程添加關(guān)閉監(jiān)聽(tīng)器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 所有的單例線程添加到HashSet中
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
總體來(lái)說(shuō),該方法主要執(zhí)行以下幾個(gè)步驟:
- 初始化線程工廠(如果參數(shù)executor為空則默認(rèn)使用newDefaultThreadFactory線程工廠)
- 初始化指定數(shù)量的線程池
- 調(diào)用
newChild方法創(chuàng)建線程池內(nèi)的線程 - 初始化線程選擇器
- 為每個(gè)線程添加關(guān)閉監(jiān)聽(tīng)器
其中newChild是創(chuàng)建線程的核心方法,debug進(jìn)去看一下是如何創(chuàng)建線程的
由于線程池是由NioEventLoopGroup創(chuàng)建的,調(diào)用newChild方法實(shí)際上是調(diào)用NioEventLoopGroup類重寫(xiě)的方法
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 創(chuàng)建nioEventLoop
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
至于創(chuàng)建nioEventLoop的細(xì)節(jié),留到之后再討論。先簡(jiǎn)單理順一下前面創(chuàng)建線程池組的思路。
- 確定線程池的大?。ㄈ绻麤](méi)有指定線程池的大小,默認(rèn)使用
DEFAULT_EVENT_LOOP_THREADS大小) - 通過(guò)MultithreadEventExecutorGroup類初始化線程池,其中調(diào)用newChild方法創(chuàng)建線程池內(nèi)的每一個(gè)線程對(duì)象

以上是創(chuàng)建線程池組的大致流程,其中我省略了很多步驟和方法,這個(gè)就留給讀者一步一步的debug,因?yàn)殚喿x源碼還是需要靠自己動(dòng)手,純看別人的博客或者視頻是很難學(xué)下去的...
線程池組創(chuàng)建完成后,接下來(lái)就需要?jiǎng)?chuàng)建ServerBootstrap對(duì)象。欲知后事如何,請(qǐng)看下篇:【Netty源碼系列】服務(wù)端啟動(dòng)流程(二)創(chuàng)建并初始化ServerBootstrap對(duì)象
如果覺(jué)得文章不錯(cuò)的話,麻煩點(diǎn)個(gè)贊哈,你的鼓勵(lì)就是我的動(dòng)力!對(duì)于文章有哪里不清楚或者有誤的地方,歡迎在評(píng)論區(qū)留言~