零、 整體流程

1、用戶在main線程啟動執(zhí)行ServerSocketChannel的初始化
1)初始化一個NioServerSocketChannel channel
包括初始化Channel關(guān)聯(lián)的ch、pipeline、unsafe、config。
nio.ServerSocketChannel ch = nio.SelectorProvider.openServerSocketChannel();
ch.configBlocking(false);
2)將channel與ServerBootstrap的EventLoopGroup中的某個EventLoop child綁定
3)向child的任務(wù)執(zhí)行隊列中添加channel的register0事件
4)監(jiān)聽register0事件的完成狀態(tài),完成時向child的任務(wù)執(zhí)行隊列中添加channel的bind事件
2、EventLoop線程中執(zhí)行register0事件
1)將NioServerSocketChannel的ch注冊到NioEventLoop child的selector上
SelectionKey selectionKey = ch.register(eventLoop().selector, 0, this);
2)標識ChannelPromise狀態(tài)為success,觸發(fā)Listener將bind任務(wù)添加到EventLoop的執(zhí)行任務(wù)隊列
3)ServerSocketChannel channel產(chǎn)生ChannelRegisted事件
pipeline.fireChannelRegistered();
會導(dǎo)致ChannelInitialzer.channelRegisted()執(zhí)行,將ServerBootstrapAcceptor添加到channel的pipeline中。
3、EventLoop線程中執(zhí)行Channel.bind事件
1)ServerSocketChannel綁定對應(yīng)服務(wù)端口,監(jiān)聽新的客戶端連接
javaChannel().socket().bind(localAddress, config.getBacklog());
2)ServerSocketChannel channel產(chǎn)生ChannelActive事件
pipeline.fireChannelActive();
ChannelActive事件由HeadContext處理,向ch中添加OP_ACCEPT事件監(jiān)聽。
selectionKey.interestOps(OP_ACCEPT);
一、代碼入口
ServerBootstrap b = new ServerBootstrap();
// Start the server.
ChannelFuture f = b.bind(port).sync();
服務(wù)端啟動時都會執(zhí)行上面的代碼,用來啟動ServerSocketChannel監(jiān)聽對應(yīng)端口。
最終由AbstractBootstrap.doBind方法處理。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
二、用戶線程初始化NioServerSocketChannel
final Channel channel = channelFactory.newChannel();
從此行代碼開始,ReflectiveChannelFactory通過反射的方式調(diào)用NioServerSocketChannel的構(gòu)造方法進行channel的初始化。
1、NioServerSocketChannel的實例化
** 1)創(chuàng)建一個nio.ServerSocketChannel**
newSocket(SelectorProvider provider);
使用nio.SelectorProvider.openServerSocketChannel()創(chuàng)建一個nio.ServerSocketChannel ch。
** 2)通過構(gòu)造方法實例化NioServerSocketChannel,并初始化相關(guān)的field**
parent = null
unsafe = new NioMessageUnsafe()
pipeline = new DefaultChannelPipeline(this) ->此處初始化一個DefaultChannelPipeline,并將pipeline和channel互相綁定
ch = ch ->同時將ch設(shè)置為非阻塞模式
readInterestOp = OP_ACCEPT
config = new ServerSocketChannelConfig(this, javaChannel().socket())
3)向channel的pipeline中添加Inbound處理器ChannelInitializer
init(channel);
由其父類ServerBootstrap直接實現(xiàn)。
- 設(shè)置channel的attr和options**
- 添加ChannelInitializer到pipeline中**
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
}
該處理器的*channelRegisted事件*回調(diào)方法會將*ServerBootstrapAcceptor*加入channel的pipeline中,并將自己從pipeline中移除。
4)ServerBootstrapAcceptor
- ServerBootstrapAcceptor也是一個Inbound處理器,用于在Server端accept新的客戶端連接時,向新生成的socketChannel中添加用戶定義的業(yè)務(wù)處理器。
- 其channelRead事件回調(diào)方法會將業(yè)務(wù)方往ServerBootstrap中添加的childHandler添加到socketChannel對應(yīng)的pipeline中。
- 對于Server端,channelRead事件被定義為server端accept到了新的socket連接。
2、將NioServerSocketChannel注冊到ServerBootstrap的EventLoopGroup上
ChannelFuture regFuture = group().register(channel);
1)NioEventLoopGroup的chooser從其children中選出一個NioEventLoop child,調(diào)用其register()方法進行channel注冊;
2)實際將NioEventLoop child傳給NioServerSocketChannel的unsafe,調(diào)用其register(EventLoop eventLoop, final ChannelPromise promise)方法完成注冊
- 將channel.eventLoop綁定為當前NioEventLoop child;
- 將AbstractUnsafe.register0(DefaultChannelPromise promise)任務(wù)加入EventLoop child的執(zhí)行任務(wù)隊列;
3、給標識“register0任務(wù)”完成狀態(tài)ChannelFuture添加一個Listener
doBind0(regFuture, channel, localAddress, promise);
當ChannelFuture完成時,將NioServerSocketChannel.bind(SocketAddress localAddress, ChannelPromise promise)任務(wù)加入EventLoop child的執(zhí)行任務(wù)隊列
三、EventLoop線程中執(zhí)行register0事件
AbstractUnsafe.register0(ChannelPromise promise);
1、將NioServerSocketChannel的ch注冊到NioEventLoop child的selector上,同時將注冊得到的SelectionKey綁定為NioServerSocketChannel的selectionKey
doRegister();
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
netty的輪詢注冊機制其實是將AbstractNioChannel內(nèi)部的jdk類對象SelectableChannel ch注冊到j(luò)dk類對象Selector selector上去,并且將AbstractNioChannel channel作為SelectableChannel對象ch的一個attachment附屬上,這樣在jdk輪詢出某個SelectableChannel有IO事件發(fā)生時,就可以直接取出AbstractNioChannel進行后續(xù)操作。
2、標識ChannelPromise狀態(tài)為success,觸發(fā)Listener將bind任務(wù)添加到EventLoop的執(zhí)行任務(wù)隊列
safeSetSuccess(promise);
3、ServerSocketChannel channel產(chǎn)生ChannelRegisted事件
pipeline.fireChannelRegistered();
會導(dǎo)致ChannelInitialzer.channelRegisted()執(zhí)行,將ServerBootstrapAcceptor添加到channel的pipeline中。
四、EventLoop線程中執(zhí)行Channel.bind事件
AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise);
1、ServerSocketChannel綁定對應(yīng)服務(wù)端口,監(jiān)聽新的客戶端連接
javaChannel().socket().bind(localAddress, config.getBacklog());
2、ServerSocketChannel channel產(chǎn)生ChannelActive事件
pipeline.fireChannelActive();
ChannelActive事件由HeadContext處理,最終調(diào)用了AbstractNioUnsafe.doBeginRead()方法,向ch中添加OP_READ事件監(jiān)聽。
selectionKey.interestOps(interestOps | readInterestOp);