Netty是基于Nio實現(xiàn)的,所以也離不開selector、serverSocketChannel、socketChannel和selectKey等,只不過Netty把這些實現(xiàn)都封裝在了底層。
來看一個標準的netty程序:
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync(); // (5)
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new EchoServer(port).run();
}
}
EchoServerHandler 實現(xiàn)
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
和客戶端的代碼相比, 沒有很大的差別, 基本上也是進行了如下幾個部分的初始化:
EventLoopGroup: 不論是服務器端還是客戶端, 都必須指定 EventLoopGroup. 在這個例子中, 指定了 NioEventLoopGroup, 表示一個 NIO 的EventLoopGroup, 不過服務器端需要指定兩個 EventLoopGroup, 一個是 bossGroup, 用于處理客戶端的連接請求; 另一個是 workerGroup, 用于處理與各個客戶端連接的 IO 操作.
- ChannelType: 指定 Channel 的類型. 因為是服務器端, 因此使用了NioServerSocketChannel.
- Handler: 設(shè)置數(shù)據(jù)的處理器.
在客戶端中, Channel 的類型其實是在初始化時, 通過 Bootstrap.channel() 方法設(shè)置的, 服務器端自然也不例外.
在服務器端, 我們調(diào)用了 ServerBootstarap.channel(NioServerSocketChannel.class), 傳遞了一個 NioServerSocketChannel Class 對象. 這樣的話, 按照和分析客戶端代碼一樣的流程, 我們就可以確定, NioServerSocketChannel 的實例化是通過 BootstrapChannelFactory 工廠類來完成的, 而 BootstrapChannelFactory 中的 clazz 字段被設(shè)置為了 NioServerSocketChannel.class, 因此當調(diào)用 BootstrapChannelFactory.newChannel() 時:
@Override
public T newChannel() {
// 刪除 try 塊
return clazz.newInstance();
}
就獲取到了一個 NioServerSocketChannel 的實例.
最后我們也來總結(jié)一下:
ServerBootstrap 中的 ChannelFactory 的實現(xiàn)是 BootstrapChannelFactory
生成的 Channel 的具體類型是 NioServerSocketChannel.
Channel 的實例化過程, 其實就是調(diào)用的 ChannelFactory.newChannel 方法, 而實例化的 Channel 的具體的類型又是和在初始化 ServerBootstrap 時傳入的 channel() 方法的參數(shù)相關(guān). 因此對于我們這個例子中的服務器端的 ServerBootstrap 而言, 生成的的 Channel 實例就是 NioServerSocketChannel.
NioServerSocketChannel 的實例化過程
下面是 NioServerSocketChannel 的類層次結(jié)構(gòu)圖:

首先, 我們來看一下它的默認的構(gòu)造器. 和 NioSocketChannel 類似, 構(gòu)造器都是調(diào)用了 newSocket 來打開一個 Java 的 NIO Socket, 不過需要注意的是, 客戶端的 newSocket 調(diào)用的是 openSocketChannel, 而服務器端的 newSocket 調(diào)用的是 openServerSocketChannel. 顧名思義, 一個是客戶端的 Java SocketChannel, 一個是服務器端的 Java ServerSocketChannel.
private static ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
接下來會調(diào)用重載的構(gòu)造器:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
這個構(gòu)造其中, 調(diào)用父類構(gòu)造器時, 傳入的參數(shù)是 SelectionKey.OP_ACCEPT. 作為對比, 我們回想一下, 在客戶端的 Channel 初始化時, 傳入的參數(shù)是 SelectionKey.OP_READ. 有 Java NIO Socket 開發(fā)經(jīng)驗的朋友就知道了, Java NIO 是一種 Reactor 模式, 我們通過 selector 來實現(xiàn) I/O 的多路復用復用. 在一開始時, 服務器端需要監(jiān)聽客戶端的連接請求, 因此在這里我們設(shè)置了 SelectionKey.OP_ACCEPT, 即通知 selector 我們對客戶端的連接請求感興趣.
接著和客戶端的分析一下, 會逐級地調(diào)用父類的構(gòu)造器 NioServerSocketChannel <- AbstractNioMessageChannel <- AbstractNioChannel <- AbstractChannel.
同樣的, 在 AbstractChannel 中會實例化一個 unsafe 和 pipeline:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
不過, 這里有一點需要注意的是, 客戶端的 unsafe 是一個 AbstractNioByteChannel#NioByteUnsafe 的實例, 而在服務器端時, 因為 AbstractNioMessageChannel 重寫了newUnsafe 方法:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
因此在服務器端, unsafe 字段其實是一個 AbstractNioMessageChannel#AbstractNioUnsafe 的實例.
我們來總結(jié)一下, 在 NioServerSocketChannsl 實例化過程中, 所需要做的工作:
調(diào)用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打開一個新的 Java NIO ServerSocketChannel
AbstractChannel(Channel parent) 中初始化 AbstractChannel 的屬性:
parent 屬性置為 null
unsafe 通過newUnsafe() 實例化一個 unsafe 對象, 它的類型是 AbstractNioMessageChannel#AbstractNioUnsafe 內(nèi)部類
pipeline 是 new DefaultChannelPipeline(this) 新創(chuàng)建的實例.
AbstractNioChannel 中的屬性:
SelectableChannel ch 被設(shè)置為 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.
readInterestOp 被設(shè)置為 SelectionKey.OP_ACCEPT
SelectableChannel ch 被配置為非阻塞的 ch.configureBlocking(false)
NioServerSocketChannel 中的屬性:
ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
Channel 的注冊
服務器端和客戶端的 Channel 的注冊過程一致, 因此就不再單獨分析了.
關(guān)于 bossGroup 與 workerGroup
可以看出一切從ServerBootstrap開始,ServerBootstrap實例中需要兩個NioEventLoopGroup實例,按照職責劃分成boss和work,有著不同的分工:
- boss負責請求的accept
- work負責請求的read、write
NioEventLoopGroup
NioEventLoopGroup主要管理eventLoop的生命周期。在客戶端的時候, 我們只提供了一個 EventLoopGroup 對象, 而在服務器端的初始化時, 我們設(shè)置了兩個 EventLoopGroup, 一個是 bossGroup, 另一個是 workerGroup. 那么這兩個 EventLoopGroup 都是干什么用的呢? bossGroup 是用于服務端 的 accept 的, 即用于處理客戶端的連接請求. 我們可以把 Netty 比作一個飯店, bossGroup 就像一個像一個前臺接待, 當客戶來到飯店吃時, 接待員就會引導顧客就坐, 為顧客端茶送水等. 而 workerGroup, 其實就是實際上干活的啦, 它們負責客戶端連接通道的 IO 操作: 當接待員 招待好顧客后, 就可以稍做休息, 而此時后廚里的廚師們(workerGroup)就開始忙碌地準備飯菜了.
關(guān)于 bossGroup 與 workerGroup 的關(guān)系, 我們可以用如下圖來展示:


首先, 服務器端 bossGroup 不斷地監(jiān)聽是否有客戶端的連接, 當發(fā)現(xiàn)有一個新的客戶端連接到來時, bossGroup 就會為此連接初始化各項資源, 然后從 workerGroup 中選出一個 EventLoop 綁定到此客戶端連接中. 那么接下來的服務器與客戶端的交互過程就全部在此分配的 EventLoop 中了.
首先在ServerBootstrap 初始化時, 調(diào)用了 b.group(bossGroup, workerGroup) 設(shè)置了兩個 EventLoopGroup, 我們跟蹤進去看一下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
...
this.childGroup = childGroup;
return this;
}
顯然, 這個方法初始化了兩個字段, 一個是 group = parentGroup, 它是在 super.group(parentGroup) 中初始化的, 另一個是 childGroup = childGroup. 接著我們啟動程序調(diào)用了 b.bind 方法來監(jiān)聽一個本地端口. bind 方法會觸發(fā)如下的調(diào)用鏈:
AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
... 省略異常判斷
init(channel);
ChannelFuture regFuture = group().register(channel);
return regFuture;
}
這里 group() 方法返回的是上面我們提到的 bossGroup, 而這里的 channel 我們也已經(jīng)分析過了, 它是一個是一個 NioServerSocketChannsl 實例, 因此我們可以知道, group().register(channel) 將 bossGroup 和 NioServerSocketChannsl 關(guān)聯(lián)起來了.
那么 workerGroup 是在哪里與 NioSocketChannel 關(guān)聯(lián)的呢?
我們繼續(xù)看 init(channel) 方法:
@Override
void init(Channel channel) throws Exception {
...
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
init 方法在 ServerBootstrap 中重寫了, 從上面的代碼片段中我們看到, 它為 pipeline 中添加了一個 ChannelInitializer, 而這個 ChannelInitializer 中添加了一個關(guān)鍵的 ServerBootstrapAcceptor handler. 我們看一下 ServerBootstrapAcceptor 類.
ServerBootstrapAcceptor 中重寫了 channelRead 方法, 其主要代碼如下:
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
...
childGroup.register(child).addListener(...);
}
ServerBootstrapAcceptor 中的 childGroup 是構(gòu)造此對象是傳入的 currentChildGroup, 即我們的 workerGroup, 而 Channel 是一個 NioSocketChannel 的實例, 因此這里的 childGroup.register 就是將 workerGroup 中的 EventLoop 和 NioSocketChannel 關(guān)聯(lián)了. 既然這樣, 那么現(xiàn)在的問題是, ServerBootstrapAcceptor.channelRead 方法是怎么被調(diào)用的呢? 其實當一個 client 連接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個 SelectionKey.OP_ACCEPT 就緒事件, 接著就會調(diào)用到 NioServerSocketChannel.doReadMessages:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
... 省略異常處理
buf.add(new NioSocketChannel(this, ch));
return 1;
}
在 doReadMessages 中, 通過 javaChannel().accept() 獲取到客戶端新連接的 SocketChannel, 接著就實例化一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 我們創(chuàng)建的這個 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .
接下來就經(jīng)由 Netty 的 ChannelPipeline 機制, 將讀取事件逐級發(fā)送到各個 handler 中, 于是就會觸發(fā)前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦.
我們再來看看NioEventLoopGroup構(gòu)造方法:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
MultithreadEventLoopGroup是NioEventLoopGroup的父類,構(gòu)造方法:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
其中 DEFAULT_EVENT_LOOP_THREADS 為處理器數(shù)量的兩倍。
MultithreadEventExecutorGroup是核心,管理eventLoop的生命周期,先看看其中幾個變量。
1、children:EventExecutor數(shù)組,保存eventLoop。
2、chooser:從children中選取一個eventLoop的策略。
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
// 根據(jù)數(shù)組的大小,采用不同策略初始化chooser,如果大小為2的冪次方,則采用PowerOfTwoEventExecutorChooser,否則使用GenericEventExecutorChooser。
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
1、根據(jù)數(shù)組的大小,采用不同策略初始化chooser,如果大小為2的冪次方,則采用PowerOfTwoEventExecutorChooser,否則使用GenericEventExecutorChooser。
其中判斷一個數(shù)是否是2的冪次方的方法很有技巧:
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
2、newChild方法重載,初始化EventExecutor時,實際執(zhí)行的是NioEventLoopGroup中的newChild方法,所以children元素的實際類型為NioEventLoop。
接下去看看NioEventLoop類。
每個eventLoop會維護一個selector和taskQueue,負責處理客戶端請求和內(nèi)部任務,如ServerSocketChannel注冊和ServerSocket綁定等。

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
當看到 selector = openSelector() 時,有沒有覺得親切了許多??纯碨ingleThreadEventLoop類。它是NioEventLoop的父類,
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
繼續(xù)看SingleThreadEventLoop的父類SingleThreadEventExecutor
從類名上可以看出,這是一個只有一個線程的線程池, 先看看其中的幾個變量:
1、state:線程池當前的狀態(tài)
2、taskQueue:存放任務的隊列
3、thread:線程池維護的唯一線程
4、scheduledTaskQueue:定義在其父類AbstractScheduledEventExecutor中,用以保存延遲執(zhí)行的任務。
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
threadProperties = new DefaultThreadProperties(thread);
taskQueue = newTaskQueue();
}
代碼很長,內(nèi)容很簡單:
1、初始化一個線程,并在線程內(nèi)部執(zhí)行NioEventLoop類的run方法,當然這個線程不會立刻執(zhí)行。
2、使用LinkedBlockingQueue類初始化taskQueue。
ServerBootstrap
通過serverBootstrap.bind(port)啟動服務,過程如下:
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}


final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1、負責創(chuàng)建服務端的NioServerSocketChannel實例
2、為NioServerSocketChannel的pipeline添加handler
3、注冊NioServerSocketChannel到selector
NioServerSocketChannel
對Nio的ServerSocketChannel和SelectionKey進行了封裝。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
1、方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel對象。
2、設(shè)置SelectionKey.OP_ACCEPT事件。
父類AbstractNioMessageChannel構(gòu)造方法
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
設(shè)置當前ServerSocketChannel為非阻塞通道。
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
1、初始化unsafe,這里的Unsafe并非是jdk中底層Unsafe類,用來負責底層的connect、register、read和write等操作。
2、初始化pipeline,每個Channel都有自己的pipeline,當有請求事件發(fā)生時,pipeline負責調(diào)用相應的hander進行處理。
回到ServerBootstrap的init(Channel channel)方法,添加handler到channel的pipeline中。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
1、設(shè)置channel的options和attrs。
2、在pipeline中添加一個ChannelInitializer對象。
init執(zhí)行完,需要把當前channel注冊到EventLoopGroup。其實最終目的是為了實現(xiàn)Nio中把ServerSocket注冊到selector上,這樣就可以實現(xiàn)client請求的監(jiān)聽了
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
public EventLoop next() {
return (EventLoop) super.next();
}
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
因為EventLoopGroup中維護了多個eventLoop,next方法會調(diào)用chooser策略找到下一個eventLoop,并執(zhí)行eventLoop的register方法進行注冊。
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
channel.unsafe()是什么?
NioServerSocketChannel初始化時,會創(chuàng)建一個NioMessageUnsafe實例,用于實現(xiàn)底層的register、read、write等操作。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
private void register0(ChannelPromise promise) {
try {
if (!ensureOpen(promise)) {
return;
}
Runnable postRegisterTask = doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (postRegisterTask != null) {
postRegisterTask.run();
}
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
if (!promise.tryFailure(t)) {
}
closeFuture.setClosed();
}
}
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
1、register0方法提交到eventLoop線程池中執(zhí)行,這個時候會啟動eventLoop中的線程。
2、方法doRegister()才是最終Nio中的注冊方法,方法javaChannel()獲取ServerSocketChannel。
protected Runnable doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return null;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
ServerSocketChannel注冊完之后,通知pipeline執(zhí)行fireChannelRegistered方法,pipeline中維護了handler鏈表,通過遍歷鏈表,執(zhí)行InBound類型handler的channelRegistered方法,最終執(zhí)行init中添加的ChannelInitializer handler。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
1、initChannel方法最終把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline,負責accept客戶端請求。
2、在pipeline中刪除對應的handler。
3、觸發(fā)fireChannelRegistered方法,可以自定義handler的channelRegistered方法。
到目前為止,ServerSocketChannel完成了初始化并注冊到seletor上,啟動線程執(zhí)行selector.select()方法準備接受客戶端請求。
,ServerSocketChannel的socket還未綁定到指定端口,那么這一塊Netty是如何實現(xiàn)的? Netty把注冊操作放到eventLoop中執(zhí)行。
private static void doBind0(
final ChannelFuture regFuture,
final Channel channel,
final SocketAddress localAddress,
final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(promise, false);
return findContextOutbound().invokeBind(localAddress, promise);
}
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
invokeBind0(localAddress, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeBind0(localAddress, promise);
}
});
}
return promise;
}
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
最終由unsafe實現(xiàn)端口的bind操作。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
...
doBind(localAddress);
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
}
}
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
bind完成后,且ServerSocketChannel也已經(jīng)注冊完成,則觸發(fā)pipeline的fireChannelActive方法,所以在這里可以自定義fireChannelActive方法,默認執(zhí)行tail的fireChannelActive。
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
channel.read()方法會觸發(fā)pipeline的行為:
@Override
public Channel read() {
pipeline.read();
return this;
}
@Override
public ChannelPipeline read() {
tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
findContextOutbound().invokeRead();
return this;
}
private void invokeRead() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
invokeRead0();
} else {
Runnable task = invokeRead0Task;
if (task == null) {
invokeRead0Task = task = new Runnable() {
@Override
public void run() {
invokeRead0();
}
};
}
executor.execute(task);
}
}
private void invokeRead0() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
handler 的添加過程
和 EventLoopGroup 一樣, 服務器端的 handler 也有兩個, 一個是通過 handler() 方法設(shè)置 handler 字段, 另一個是通過 childHandler() 設(shè)置 childHandler 字段. 通過前面的 bossGroup 和 workerGroup 的分析, 其實我們在這里可以大膽地猜測: handler 字段與 accept 過程有關(guān), 即這個 handler 負責處理客戶端的連接請求; 而 childHandler 就是負責和客戶端的連接的 IO 交互.
在 上面 bossGroup 與 workerGroup , 我們提到, ServerBootstrap 重寫了 init 方法, 在這個方法中添加了 handler:
@Override
void init(Channel channel) throws Exception {
...
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
上面代碼的 initChannel 方法中, 首先通過 handler() 方法獲取一個 handler, 如果獲取的 handler 不為空,則添加到 pipeline 中. 然后接著, 添加了一個 ServerBootstrapAcceptor 實例. 那么這里 handler() 方法返回的是哪個對象呢? 其實它返回的是 handler 字段, 而這個字段就是我們在服務器端的啟動代碼中設(shè)置的:
b.group(bossGroup, workerGroup)
...
.handler(new LoggingHandler(LogLevel.INFO))
那么這個時候, pipeline 中的 handler 情況如下:

根據(jù)我們原來分析客戶端的經(jīng)驗, 我們指定, 當 channel 綁定到 eventLoop 后(在這里是 NioServerSocketChannel 綁定到 bossGroup)中時, 會在 pipeline 中發(fā)出 fireChannelRegistered 事件, 接著就會觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用.
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); // 這里觸發(fā)
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
當 channel 綁定到 eventLoop 后(在這里是 NioServerSocketChannel 綁定到 bossGroup)中時, 會在 pipeline 中發(fā)出 fireChannelRegistered 事件, 接著就會觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用.
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
因此在綁定完成后, 此時的 pipeline 的內(nèi)如如下:

前面我們在分析 bossGroup 和 workerGroup 時, 已經(jīng)知道了在 ServerBootstrapAcceptor.channelRead 中會為新建的 Channel 設(shè)置 handler 并注冊到一個 eventLoop 中, 即:
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
...
childGroup.register(child).addListener(...);
}
而這里的 childHandler 就是我們在服務器端啟動代碼中設(shè)置的 handler:
b.group(bossGroup, workerGroup)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
, 當這個客戶端連接 Channel 注冊后, 就會觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用, 此后的客戶端連接的 ChannelPipeline 狀態(tài)如下:

最后我們來總結(jié)一下服務器端的 handler 與 childHandler 的區(qū)別與聯(lián)系:
- 在服務器 NioServerSocketChannel 的 pipeline 中添加的是 handler 與 ServerBootstrapAcceptor.
- 當有新的客戶端連接請求時, ServerBootstrapAcceptor.channelRead 中負責新建此連接的 NioSocketChannel 并添加 childHandler 到 NioSocketChannel 對應的 pipeline 中, 并將此 channel 綁定到 workerGroup 中的某個 eventLoop 中.
- handler 是在 accept 階段起作用, 它處理客戶端的連接請求.
-
childHandler 是在客戶端連接建立以后起作用, 它負責客戶端連接的 IO 交互.
image.png
