一個 NIO 服務(wù)端啟動需要哪些要素
一個典型的 NIO 服務(wù)端應(yīng)該有哪些東西來支撐他的服務(wù)呢?
ServerSocketChannel
首先要有一個 ServerSocketChannel,就像流操作都要基于 Stream 對象一樣, NIO 中的所有 I/O 操作都基于 Channel 對象。
一個 Channel 代表著和某一實體的連接,這個實體可以是硬件設(shè)備、文件或者是網(wǎng)絡(luò)套接字,通過 Channel 可以讀寫數(shù)據(jù)。
NIO 中的 ServerSocketChannel 相當于普通 IO 中的 ServerSocket,而客戶端的 SocketChannel 則相當于普通 IO 中的 Socket。
Selector
另外還需要有一個 Selector,用來獲取 Channel 的事件,Channel 有4種事件,分別是:
Accept:有可以接受的連接
Connect:已經(jīng)連接成功
Read:有數(shù)據(jù)可以讀取
Write:可以進行數(shù)據(jù)寫入
如果我們直接從 Channel 中讀取數(shù)據(jù)時,很可能會有問題,因為這時 Channel 中可能根本就沒有數(shù)據(jù)可以讀,而強行進行讀取的話,會使線程掛起一直等待著數(shù)據(jù)的到來,直到有數(shù)據(jù)到達才被喚醒,那該線程在數(shù)據(jù)到達這段時間內(nèi)將不做任何事情。
通過 Selector 來檢查 Channel 的狀態(tài)變化,當具體的狀態(tài)滿足條件時向外發(fā)出通知即可,就跟監(jiān)聽器一樣,我們將 Channel 注冊到 Selector 上,然后告訴 Selector 我這個 Channel 所感興趣的事件列表,接下來 Selector 就會負責(zé)把滿足條件的事件通知到 Channel,這樣的話 Channel 就不需要每次傻傻的來讀取數(shù)據(jù)了。
那 Channel 怎樣才能知道他感興趣的事件已經(jīng)發(fā)生了呢,當 Channel 把自己感興趣的事件注冊到 Selector 上之后,只需要通過 Selector 提供的 select 方法去查詢就好了。
所以一個典型的 NIO 服務(wù)端是這樣的:
// 初始化服務(wù)端TCP連接通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設(shè)置為非阻塞模式
serverChannel.configureBlocking(false);
// 創(chuàng)建一個selector
Selector selector = Selector.open();
// 把Channel注冊到selector上
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 綁定端口
serverChannel.bind(new InetSocketAddress(8864));
了解了 NIO 是怎么玩的之后,我們來分析下 Netty 服務(wù)端是怎么啟動的,首先看一個最簡單的 EchoServer 的啟動代碼:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 2
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)) // 3
.childHandler(new ChannelInitializer<SocketChannel>() { // 4
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync(); // 5
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
整個過程主要有五個部分組成,如下圖所示:

簡單解釋一下每個步驟的作用:
1、初始化兩個 NioEventLoopGroup,這兩個對象可以看做是傳統(tǒng) IO 編程模型中的線程組,bossGroup 主要進行監(jiān)聽端口,accept 新連接,并將新連接轉(zhuǎn)交給 worker 線程去執(zhí)行,workerGroup 主要是處理客戶端請求的。
2、通過指定一個 Channel 的 Class 來創(chuàng)建一個 Channel 工廠,后續(xù)通過該工廠來創(chuàng)建 Channel,實際上這里就統(tǒng)一了服務(wù)端的 IO 模型了,通過統(tǒng)一的 api 就能輕松的指定服務(wù)端的 IO 模型是 NIO 還是 BIO,只需要指定不同的 Channel 類型即可。
3、添加一個 Server 端專屬的 ChannelHandler。
4、添加一個自定義的用來處理客戶端請求的 ChannelHandler,主要用來進行編解碼、數(shù)據(jù)讀寫、邏輯處理等。
5、綁定端口并啟動服務(wù)端。
下面就對這五個步驟進行詳細的分析,不過上面還有一個很重要的類沒有說到,就是 ServerBootstrap 引導(dǎo),他主要就是負責(zé)把所有的對象聚集在一起,然后把服務(wù)啟動起來,所以不進行具體的描述。
Netty 是怎樣啟動的
初始化 NioEventLoopGroup
NioEventLoopGroup 簡單點理解就是線程組,他會持有一組線程,這里的線程就是 EventLoop。
整個 NioEventLoopGroup 的初始化的過程如下圖所示,具體的流程可以查看 NioEventLoopGroup 的構(gòu)造方法的執(zhí)行過程,這里不貼具體的代碼了:

在 NioEventLoopGroup 初始化時,依次初始化了三個對象,分別是虛線框中黃色部分對應(yīng)的對象。
其中有一個對象 SelectorProvider 會在后續(xù)使用它來創(chuàng)建 Selector 對象。
在這些對象都初始化好之后,NioEventLoopGroup 會依次調(diào)用父類的構(gòu)造方法,最終調(diào)用到父類 MultithreadEventExecutorGroup 中,然后再父類的構(gòu)造方法中完成剩下的初始化工作,這些工作又可以分為四個部分:

我們來看這四個部分具體的實現(xiàn)。
初始化 Executor
首先是初始化一個 Executor 對象,代碼如下所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 省略部分代碼
}
executor 從 NioEventLoopGroup 中未進行初始化,一直到這里才進行初始化,這里默認是初始化的一個 ThreadPerTaskExecutor 對象,從類名中我們可以發(fā)現(xiàn),這個 Executor 在接收到新的任務(wù)時,會為每個任務(wù)都創(chuàng)建一個線程來執(zhí)行。
創(chuàng)建 EventExecutor 數(shù)組
第二步是創(chuàng)建一個 EventExecutor 數(shù)組,如下圖所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
// 2.創(chuàng)建 EventExecutor 數(shù)組
children = new EventExecutor[nThreads];
// 省略部分代碼
}
初始化 EventExecutor 數(shù)組
第三步就是對這個數(shù)組進行實例化,如下圖所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
// 2.創(chuàng)建 EventExecutor 數(shù)組
// 3.初始化每一個 EventExecutor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 如果當前對象初始化失敗,則將之前初始化的所有對象都關(guān)閉
}
}
}
// 省略部分代碼
}
這里需要注意的是,初始化 EventExecutor 對象的方法是 newChild,而這個方法的實現(xiàn)是在 NioEventLoopGroup 類中,具體的代碼如下所示:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可以看到最終創(chuàng)建的是一個 NioEventLoop 對象,并且該對象持有了 SelectorProvider 和 Executor 以及其他對象。
但是我們的 children 數(shù)組中需要的 EventExecutor 對象,為什么這里 new 出來的是一個 NioEventLoop 對象呢?我們可以看一下 NioEventLoop 類的結(jié)構(gòu),如下圖所示:

從類的結(jié)構(gòu)圖中可以看到, NioEventLoop 類繼承自 SingleThreadEventLoop,而 SingleThreadEventLoop 類繼承自 SingleThreadEventExecutor 類,并且實現(xiàn)了 EventLoop 接口。然后 SingleThreadEventExecutor 又繼承自 AbstractScheduledEventExecutor 類。
AbstractScheduledEventExecutor 類又繼承自 AbstractEventExecutor,該類直接實現(xiàn)了 EvenExecutor 接口,所以 NioEventLoop 也就是成了一個 EventExecutor。
從圖中還可以看出,EventExecutor 還是繼承自 ScheduleExecutorService 接口,這就說明了,EventExecutor 除了有線程池的能力,還具備調(diào)度的能力。
所以 NioEventLoop 是一個具有線程池功能的事件循環(huán)器,并且還具有調(diào)度任務(wù)的功能,為什么要擁有調(diào)度的功能呢,因為 Netty 中有很多任務(wù),包括需要定時執(zhí)行的調(diào)度任務(wù),和一次性執(zhí)行的任務(wù)。
但是 NioEventLoop 中負責(zé)執(zhí)行具體任務(wù)的線程,是在 SingleThreadEventLoop 中創(chuàng)建的,并且只創(chuàng)建了一個線程,這一點從類名中就可以看出來。
那為什么只創(chuàng)建一個線程呢,線程池如果只有一個線程的話,那意義不就很小了嗎?其實這正是 Netty 設(shè)計的精美之處,通過一個線程來支撐所有的事件循環(huán),從而避免了多線程之間的并發(fā)問題,也減少了線程切換所帶來的性能損耗。Netty 通過充分壓榨這一個線程的能力,實現(xiàn)了一種無鎖化的高效的編程模型。
初始化 EventExecutorChooser
當上面的三個步驟都完成之后,最后一個步驟就是初始化一個 EventExecutorChooser,如下所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
// 2.創(chuàng)建 EventExecutor 數(shù)組
// 3.初始化每一個 EventExecutor
// 4.初始化 EventExecutorChooser
chooser = chooserFactory.newChooser(children);
// 省略部分代碼
}
這步主要是通過一個 chooserFactory 創(chuàng)建了一個 EventExecutorChooser,傳遞進去的參數(shù)則是 EventExecutor 數(shù)組,而 EventExecutorChooser 的作用就是從 EventExecutor 數(shù)組中選擇一個可用的 EventExecutor。
需要注意的是 newChooser 創(chuàng)建的 chooser 對象有兩種類型,這取決于 children 的個數(shù),如果個數(shù)是偶數(shù),則選擇 PowerOfTwoEventExecutorChooser,該類型的 chooser 在選擇 EventExecutor 時采用位運算,效率非常高;如果個數(shù)是奇數(shù)則選擇 GenericEventExecutorChooser,該類型的 chooser 在選擇 EventExecutor 時采用取余運算,效率較低,這充分體現(xiàn)了 Netty 在性能優(yōu)化上的考慮。
現(xiàn)在我們來總結(jié)一下初始化完 NioEventLoopGroup 之后一共創(chuàng)建了哪些對象:
Executor:創(chuàng)建了一個 Executor 對象,具體實例為:ThreadPerTaskExecutor
EventExecutor 數(shù)組:創(chuàng)建了一個大小為 nThread 的 EventExecutor 數(shù)組,每個實例都是一個 NioEventLoop
NioEventLoop:一個同時具備 execute 和 schedule 能力的 EventExecutor,并且每一個 NioEventLoop 只有一個 Thread 來支撐其運行
EventExecutorChooser:一個可以獲取一個可用的 EventExecutor 的選擇器
指定 Channel 類型
通過 .channel() 方法指定了一個 Channel 的 Class,該方法會創(chuàng)建一個 ChannelFactory,后續(xù)創(chuàng)建新的 Channel 則由該 ChannelFactory 來創(chuàng)建。
具體是如何創(chuàng)建 Channel 的,只需要看下 ChannelFactory 是如何實現(xiàn)的即可,實際的 ChannelFactory 實例,是一個 ReflectiveChannelFactory:
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
可以很清楚的知道,是通過反射獲取到該類的構(gòu)造方法,然后創(chuàng)建了一個實際。
這里的 Channel 都是服務(wù)端的 Channel,另外該方法除了是創(chuàng)建了一個 Channel 工廠之外,更重要的是他指定了 IO 模型。
我們使用 Netty 來進行編寫網(wǎng)絡(luò)應(yīng)用時,一般都是使用它的 NIO 模型,因為這樣才能夠發(fā)揮出他最大的價值。但是如果你想使用 BIO 模型的話,也是支持的,只需要指定 Class 為:OioServerSocketChannel 即可。
添加服務(wù)端 ChannelHandler
通過 .handler() 方法可以添加一個服務(wù)端的 ChannelHandler,該 ChannelHandler 是用來處理服務(wù)端的一些邏輯的,比如紀錄一些日志等等。
添加客戶端 ChannelHandler
通過 .childHandler() 方法就可以添加一個用來處理 Client 端請求的 ChannelHandler,該 ChannelHandler 就是所有實際業(yè)務(wù)邏輯處理的核心部分。
包括對客戶端發(fā)送過來的數(shù)據(jù)進行解碼,對數(shù)據(jù)進行邏輯處理,然后生成響應(yīng)數(shù)據(jù)后寫回客戶端。
綁定端口并啟動
當以上的所有準備工作都執(zhí)行完畢之后,服務(wù)端啟動過程中最重要的部分就要開始執(zhí)行了:那就是綁定端口,也就是執(zhí)行 ServerBootstrap 的 bind 方法。該方法也可以拆分成兩個獨立的部分:

初始化 Channel
initAndRegister 方法,主要做的就是初始化并注冊 Channel 對象,這個 Channel 的類型就是開始的時候通過 .channel() 方法指定的類型。
首先通過 ChannelFactory 創(chuàng)建一個我們所需類型的 Channel 對象,然后對這個 Channel 進行初始化,具體的代碼如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 創(chuàng)建一個 channel
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// 省略部分代碼
}
}
創(chuàng)建 Channel
我們先看下創(chuàng)建 Channel 的過程,上面我們已經(jīng)分析過了 channelFactory.newChannel() 是通過反射創(chuàng)建了一個 Channel 的實例,而該 Channel 的 Class 是我們指定的 NioServerSocketChannel,現(xiàn)在我們需要知道該 Channel 是如何被創(chuàng)建的,整個過程也分為四個部分:

第一步,創(chuàng)建一個 ServerSocketChannel:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
這里就是通過 JDK 底層的 SelectorProvider 創(chuàng)建了一個 ServerSocketChannel,后續(xù)都是通過該 Channel 對客戶端請求進行 accept 操作。
PS:在創(chuàng)建 NioEventLoopGroup 時也創(chuàng)建了一個 SelectorProvider,不過該 provider 是為了在某個條件下重新創(chuàng)建一個新的 Selector ,通過 rebuildSelector 的方式來解決 JDK 的 epoll 空轉(zhuǎn)的bug。
第二步,將第一步創(chuàng)建的 ServerSocketChannel 以及一個感興趣的事件傳入并調(diào)用父類構(gòu)造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
這里的第二行代碼,是將當前 NioServerSocketChannel 和 一個 ServerSocket 封裝成一個 NioServerSocketChannelConfig。
第三步,調(diào)用父類構(gòu)造方法初始化一些變量:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
// 省略部分代碼
}
}
這一步主要是將一些變量保存起來,并設(shè)置了 channel 的阻塞模式為非阻塞模式。
第四步,創(chuàng)建核心對象,包括 channelId,unsafe,pipeline:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
因為我們本篇文章是對服務(wù)器啟動流程的分析,所以這里不對每一個對象的創(chuàng)建過程進行分析了,只需要知道是在創(chuàng)建 NioServerSocketChannel 的時候,創(chuàng)建了 channelId,unsafe 和 pipeline 對象。后面的操作中會用到這些對象。
初始化 Channel
初始化 Channel 的 init 方法 在 AbstractBootstrap 中是一個抽象方法,具體的實現(xiàn)在 ServerBootstrap 中,我們來看下具體的實現(xiàn):
void init(Channel channel) throws Exception {
// 獲取Channel的pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
// 該childHandler對象是在創(chuàng)建ServerBootstrap對象時,通過childHandler方法創(chuàng)建的
final ChannelHandler currentChildHandler = childHandler;
// 省略部分代碼
// 把在創(chuàng)建ServerBootstrap對象時,創(chuàng)建的channelHandler和childHandler對象都添加到
// channel的pipeline中去
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
這里初始化最主要的工作就是為 NioServerSocketChannel 的 pipeline 添加了一個 ChannelHandler。
該 ChannelHandler 就是來處理客戶端連接的接入的,通過 ServerBootstrapAcceptor 來實現(xiàn)。
其中的 currentChildGroup 就是我們創(chuàng)建的 workerGroup,currentChildHandler 則是我們通過 childHandler 方法指定的處理客戶端請求的 ChannelHandler。
所以 ServerBootstrapAcceptor 主要的工作就是接受客戶端的請求,并將請求轉(zhuǎn)發(fā)給 workGroup 去處理,具體的處理邏輯由用戶自定義的 ChannelHandler 確定。
注冊 Channel
對 Channel 初始化完了之后,現(xiàn)在就需要對 Channel 進行注冊了,如下所示:
final ChannelFuture initAndRegister() {
Channel channel = null;
// 創(chuàng)建并初始化 channel
ChannelFuture regFuture = config().group().register(channel);
}
config().group() 返回的是 bootstrap 的 group 屬性,也就是我們創(chuàng)建 ServerBootstrap 時傳入的 bossGroup,他是一個 NioEventLoopGroup 實例。
整個注冊的過程也分為四個部分:

第一步,調(diào)用 NioEventLoopGroup 的注冊方法,如下所示:
public abstract class MultithreadEventLoopGroup
extends MultithreadEventExecutorGroup
implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
其中 next() 的方法實現(xiàn)是在父類 MultithreadEventExecutorGroup 中,也就是第二步的操作。
第二步,獲取一個可用的 EventExecutor,如下所示:
public abstract class MultithreadEventExecutorGroup
extends AbstractEventExecutorGroup {
@Override
public EventExecutor next() {
return chooser.next();
}
}
看到 chooser 我們應(yīng)該馬上反應(yīng)出來,這個對象是在初始化 NioEventLoopGroup 的時候創(chuàng)建的,同時還通過 newChild 方法創(chuàng)建了 nThreads 個 EventExecutor,并將這些 EventExecutor 都保存在了一個叫 children 的數(shù)組中,這里的 EventExecutor 實例是一個 NioEventLoop 。
chooser 選擇器的 next 方法其實就是從 children 數(shù)組中獲取一個可用的 EventExecutor,也就是獲取一個可用的 NioEventLoop,所以 next().register(channel) 的方法,實際上是執(zhí)行的 NioEventLoop 的 register(channel) 方法。
第三步,執(zhí)行 NioEventLoop 的 register 方法,該方法是在 NioEventLoop 的父類 SingleThreadEventLoop 中實現(xiàn)的,如下所示:
public abstract class SingleThreadEventLoop
extends SingleThreadEventExecutor
implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
第四步,執(zhí)行 unsafe 的 register 方法,看到 unsafe 我們也應(yīng)該馬上就想到,該對象是在初始化 NioServerSocketChannel 的時候,在父類 AbstractChannel 中通過 newUnsafe() 方法初始化的,所以我們回到 AbstractChannel 中查看 newUnsafe 方法創(chuàng)建的 unsafe 對象到底是什么。
但是很可惜,該方法是一個抽象方法,如下所示:
public abstract class AbstractChannel
extends DefaultAttributeMap
implements Channel {
protected abstract AbstractUnsafe newUnsafe();
}
所以需要到 AbstractChannel 的子類中尋找該方法的實現(xiàn),并且該子類也要滿足是 NIOServerSocketChannel 的父類,所以很容易找到,該實現(xiàn)類是:AbstractNioMessageChannel,現(xiàn)在看下 newUnsafe 方法創(chuàng)建的 unsafe 對象是什么,如下所示:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
private final class NioMessageUnsafe
extends AbstractNioUnsafe {
}
所以現(xiàn)在我們得到了 unsafe 對象的實例是 NioMessageUnsafe,接下來就看 NioMessageUnsafe 的 register 方法了,該方法是在父類 AbstractUnsafe 中實現(xiàn)的,如下所示:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略部分代碼
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 省略部分代碼
}
}
}
但是該方法仍然不是最終的注冊方法,又調(diào)用了一個叫 register0 的方法,但是該方法調(diào)用在兩個分支內(nèi):
如果 eventLoop.inEventLoop 為 true 則直接調(diào)用
否則將該方法封裝成一個 Runnable 交給 eventLoop 去執(zhí)行
這里我們需要注意的是 Netty 的 EventLoop 底層是使用的一個單線程來支撐他的工作的,很多操作都會看到對于當前線程的判斷,這正是 Netty 線程模型高效的原因所在。
讓該線程執(zhí)行某個方法之前,要先判斷,當前是否在 EventLoop 線程之中,如果在的話就直接執(zhí)行,否則將需要執(zhí)行的方法封裝成一個 Runnable 交給 EventLoop 去調(diào)度,EventLoop 會在下個時間點來執(zhí)行該任務(wù),并且是在 EventLoop 線程中執(zhí)行。
該方法可以拆分成兩個部分,如下圖所示:

第一步調(diào)用的是 doRegister 方法,如下所示:
private void register0(ChannelPromise promise) {
try {
// 省略部分代碼
// 執(zhí)行具體的注冊
doRegister();
} catch (Throwable t) {
// 省略部分的代碼
}
}
doRegister 方法在 AbstractChannel 中是一個空實現(xiàn),所以我們需要到他的子類中去尋找具體的實現(xiàn),很容易我們在 AbstractNioChannel 中找到了該方法的實現(xiàn),如下所示:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
到這里就是最底層的調(diào)用了,這里的 JavaChannel() 方法獲取到的就是我們創(chuàng)建 NioServerSocketChannel 時通過 provider.openServerSocketChannel() 方法創(chuàng)建的 ServerSocketChannel。
而 eventLoop().unwrappedSelector() 方法獲取到的則是通過 provider.openSelector() 創(chuàng)建的一個原始的 Selector 對象。
換一個寫法這步操作實際上就是:
ServerSocketChannel.register(Selector, interest set);
就是通過這一步操作把 ServerSocketChannel 感興趣的事件注冊到 JDK 底層的一個 Selector 上。
但是有可能這個這個時候,注冊事件有可能失敗,所以需要立即執(zhí)行一次 selector 的 selectNow 方法,因為這個被“取消”的SelectionKey 可能還緩存著沒有被移除。然后嘗試進行第二次注冊,如果成功的話就直接返回,如果還是失敗的話就拋出異常。
第二步,通過 pipeline 觸發(fā) handler 中的某些回調(diào)方法,如下所示:
private void register0(ChannelPromise promise) {
try {
// 省略部分代碼
// 執(zhí)行具體的注冊
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 省略部分代碼
}
}
首先通過 pipeline 的 invokeHandlerAddedIfNeeded 方法來觸發(fā)調(diào)用那些通過 pipeline.addLast() 方法添加的 ChannelHandler 的 channelAdded() 方法。
然后通知 promise 已經(jīng)成功了,現(xiàn)在可以執(zhí)行監(jiān)聽器的 operationComplete 方法了。
最后 isActive() 方法默認是返回的 false,因為到現(xiàn)在我們還沒有綁定端口呢。
綁定端口
當 Channel 已經(jīng)創(chuàng)建、初始化、并成功注冊好之后,最后就需要執(zhí)行端口綁定了,現(xiàn)在回到 ServerBootstrap 的 doBind 方法中,如下所示:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化Channel并進行注冊
// 省略部分代碼
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
當注冊的結(jié)果 regFuture 已經(jīng)完成了,那么就可以直接執(zhí)行綁定操作了,否則需要在 regFuture 上增加一個監(jiān)聽器,當注冊完成時再執(zhí)行綁定操作,不管怎么樣,具體的綁定操作都是在 doBind0 方法中,如下所示:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
doBind0 的方法可以拆分成以下幾個部分:

中間的幾步調(diào)用過程這里就不再贅述了,大家可以從源碼中跟一下,我們直接跳到最后一步調(diào)用 unsafe 的 bind 方法,不出意外就是在這一步進行了 JDK 底層 Channel 的端口綁定了,最終代碼如下:
public class NioServerSocketChannel
extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
}
看到這里一切就都清楚了,終于將端口綁定到了 ServerSocketChannel 上去了。
Netty 啟動過程總結(jié)
下面我們總結(jié)下 Netty 是如何在整個啟動過程中,把 NIO 那一套啟動步驟完美的分散到各個地方去的。
ServerSocketChannel 在哪創(chuàng)建的
首先需要知道 Netty 是在哪里創(chuàng)建的 ServerSocketChannel,是在初始化 NioServerSocketChannel 對象的時候在 newSocket 方法中,通過 provider.openServerSocketChannel() 創(chuàng)建的。
非阻塞模式在哪設(shè)置的
同樣是在初始化 NioServerSocketChannel 對象的時候,在調(diào)用到父類 AbstractNioChannel 的構(gòu)造方法的時候,執(zhí)行的 ch.configureBlocking(false)。
Selector 在哪創(chuàng)建的
在 NioEventLoop 中通過 provider.openSelector() 創(chuàng)建的,并將該 Selector 對象存放在一個叫 unwrappedSelector 的變量中。
而 NioEventLoop 中的 provider,是在初始化 NioEventLoopGroup 時,通過 SelectorProvider.provider() 創(chuàng)建的,并最終傳遞給了 NioEventLoop。
Channel 是什么時候注冊到 Selector 上去的
在 initAndRegister 方法中最終執(zhí)行到了 AbstractNioChannel 中的 doRegister 方法,在該方法中將 Channel 注冊到 Selector 上去的。
端口是什么時候綁定的
在 ServerBootstrap 的 doBind 方法中會先執(zhí)行 initAndRegister 方法,執(zhí)行完之后就會執(zhí)行 doBind0 來進行端口綁定,最終會執(zhí)行到 NioServerSocketChannel 類中的 doBind 方法,在該方法中完成了 JDK 底層端口的綁定。