原文:https://wangwei.one/posts/netty-new-connection-and-niosocketchannel-analyse.html
前面的一些章節(jié),我們分析了Netty的三大組件 —— Channel 、EventLoop、Pipeline ,對Netty的工作原理有了深入的了解。在此基礎(chǔ)上,我們來分析一下當(dāng)Netty服務(wù)端啟動后,Netty是如何處理新連接接入的。
本文內(nèi)容主要分為以下四部分:
- 新連接檢測
- NioSocketChannel創(chuàng)建
- NioSocketChannel初始化與注冊
- NioSocketChannel注冊READ興趣集
新連接檢測
前面,我們在講 EventLoop的啟動過程源碼分析 時,解讀過下面這段代碼:
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
try {
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 讀取read事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
...
}
...
}
我們還是以服務(wù)端 NioServerSocketChannel 為例,它綁定的unsafe實例為 NioMessageUnsafe 。上面的 unsafe.read() 接口,會向下調(diào)用到 NioMessageUnsafe.read() 接口,如下:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
// 用于保存新建立的 NioSocketChannel 的集合
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
// 確保在當(dāng)前線程與EventLoop中的一致
assert eventLoop().inEventLoop();
// 獲取 NioServerSocketChannel config配置
final ChannelConfig config = config();
// 獲取 NioServerSocketChannel 綁定的 pipeline
final ChannelPipeline pipeline = pipeline();
// 獲取RecvByteBuf 分配器 Handle
// 當(dāng)channel在接收數(shù)據(jù)時,allocHandle 會用于分配ByteBuf來保存數(shù)據(jù)
// 關(guān)于allocHandle后面再去做詳細介紹
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 重置已累積的所有計數(shù)器,并為下一個讀取循環(huán)讀取多少消息/字節(jié)數(shù)據(jù)提供建議
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 調(diào)用后面的 doReadMessages 接口,讀取到message則返回1
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 對當(dāng)前read循環(huán)所讀取到的message數(shù)量計數(shù)+1
allocHandle.incMessagesRead(localRead);
// 判斷是否繼續(xù)讀取message
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 調(diào)用pipeline傳播ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
// 清空readBuf
readBuf.clear();
allocHandle.readComplete();
// 調(diào)用pipeline傳播 ChannelReadComplete 事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
...
}
對于 doReadMessages(...) 的分析:
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
...
// 讀取消息
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 獲取 SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 使用SocketChannel創(chuàng)建NioSocketChannel,將其存入buf list中
// 關(guān)于NioSocketChannel的創(chuàng)建請看后面的分析
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
...
}
對于 continueReading() 接口的分析,至于結(jié)果為什么返回false,后面會單獨分析:
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
private volatile int maxMessagesPerRead;
private volatile boolean respectMaybeMoreData = true;
...
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
// 每次讀取最大的消息數(shù)
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
...
// 判斷是否繼續(xù)讀取message
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
// 判斷是否繼續(xù)讀取message
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
// 默認(rèn)情況下 config.isAutoRead() 為true
// respectMaybeMoreData 默認(rèn)為 true
// maybeMoreDataSupplier.get() 為false
// totalMessages第一次循環(huán)則為1
// maxMessagePerRead為16
// 結(jié)果返回false
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
...
}
...
}
NioSocketChannel創(chuàng)建
上面分析新連接接入,提到了 NioSocketChannel 的創(chuàng)建,我們這里來詳細分析一下,NioSocketChannel的創(chuàng)建過程與此前我們分析 NioServerSocketChannel創(chuàng)建 大體類似。
構(gòu)造器
先來看看 NioSocketChannel 的構(gòu)造函數(shù):
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
...
public NioSocketChannel(Channel parent, SocketChannel socket) {
// 調(diào)用父類構(gòu)造器
super(parent, socket);
// 創(chuàng)建NioSocketChannelConfig
config = new NioSocketChannelConfig(this, socket.socket());
}
...
}
父類 AbstractNioByteChannel 構(gòu)造器:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 調(diào)用父類構(gòu)造器,并設(shè)置興趣集為SelectionKey.OP_READ,對read事件感興趣
super(parent, ch, SelectionKey.OP_READ);
}
...
}
父類 AbstractNioChannel 構(gòu)造器:
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 調(diào)用父類構(gòu)造器
super(parent);
// 設(shè)置channel
this.ch = ch;
// 設(shè)置興趣集
this.readInterestOp = readInterestOp;
try {
// 設(shè)置為非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
}
父類 AbstractChannel 構(gòu)造器:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected AbstractChannel(Channel parent) {
// 設(shè)置parent
this.parent = parent;
// 創(chuàng)建channelId
id = newId();
// 創(chuàng)建unsafe
unsafe = newUnsafe();
// 創(chuàng)建pipeline
pipeline = newChannelPipeline();
}
...
}
ChannelConfig創(chuàng)建
接著我們看看 NioSocketChannelConfig 的創(chuàng)建邏輯:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
...
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
// 調(diào)用父類構(gòu)造器
super(channel, javaSocket);
calculateMaxBytesPerGatheringWrite();
}
...
}
父類 DefaultSocketChannelConfig 構(gòu)造器:
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
...
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
// 調(diào)用父類構(gòu)造器,綁定socketchannel
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
// 綁定java socket
this.javaSocket = javaSocket;
// Enable TCP_NODELAY by default if possible.
// netty一般運行在服務(wù)器上,不在Android上,canEnableTcpNoDelayByDefault返回true
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
try {
// 開啟 TCP_NODELAY ,開啟TCP的nagle算法
// 盡量不要等待,只要發(fā)送緩沖區(qū)中有數(shù)據(jù),并且發(fā)送窗口是打開的,就盡量把數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)上去。
setTcpNoDelay(true);
} catch (Exception e) {
// Ignore.
}
}
}
...
}
NioSocketChannel初始化與注冊
上面小節(jié)分析了NioSocketChannel的創(chuàng)建邏輯,創(chuàng)建完成之后,我們來分析一下NioSocketChannel是如何注冊到NioEventLoop上去的。
在前面小節(jié)分析新連接檢測的有如下小段代碼:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
...
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 調(diào)用pipeline傳播ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
...
}
調(diào)用pipeline傳播ChannelRead事件,這里的Pipeline是服務(wù)端Channel,也就是NioServerSocketChannel所綁定的Pipeline,此時的Pipeline的內(nèi)部結(jié)構(gòu)是怎么樣子的呢?

那這個 ServerBootstrapAcceptor 是從哪里來的呢?
在此前,我們分析 NioServerSocketChannel初始化 時,有過下面這段代碼:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
// NioServerSocketChannel初始化
void init(Channel channel) throws Exception {
// 獲取啟動器 啟動時配置的option參數(shù),主要是TCP的一些屬性
final Map<ChannelOption<?>, Object> options = options0();
// 將獲得到 options 配置到 ChannelConfig 中去
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 獲取 ServerBootstrap 啟動時配置的 attr 參數(shù)
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 配置 Channel attr,主要是設(shè)置用戶自定義的一些參數(shù)
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 獲取channel中的 pipeline,這個pipeline使我們前面在channel創(chuàng)建過程中設(shè)置的 pipeline
ChannelPipeline p = channel.pipeline();
// 將啟動器中配置的 childGroup 保存到局部變量 currentChildGroup
final EventLoopGroup currentChildGroup = childGroup;
// 將啟動器中配置的 childHandler 保存到局部變量 currentChildHandler
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// 保存用戶設(shè)置的 childOptions 到局部變量 currentChildOptions
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
// 保存用戶設(shè)置的 childAttrs 到局部變量 currentChildAttrs
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 獲取啟動器上配置的handler
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 handler 到 pipeline 中
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 用child相關(guān)的參數(shù)創(chuàng)建出一個新連接接入器ServerBootstrapAcceptor
// 通過 ServerBootstrapAcceptor 可以將一個新連接綁定到一個線程上去
// 每次有新的連接進來 ServerBootstrapAcceptor 都會用child相關(guān)的屬性對它們進行配置,并注冊到ChaildGroup上去
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
...
}
ServerBootstrapAcceptor
NioServerSocketChannel初始化時,向NioServerSocketChannel所綁定的Pipeline添加了一個InboundHandler節(jié)點 —— ServerBootstrapAcceptor ,其代碼如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 子EventLoopGroup,即為workGroup
private final EventLoopGroup childGroup;
// ServerBootstrap啟動時配置的 childHandler
private final ChannelHandler childHandler;
// ServerBootstrap啟動時配置的 childOptions
private final Entry<ChannelOption<?>, Object>[] childOptions;
// ServerBootstrap啟動時配置的 childAttrs
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
// 構(gòu)造函數(shù)
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
// 處理Pipeline所傳播的channelRead事件
// 也就是前面新連接檢測時看到的那段代碼
// pipeline.fireChannelRead(readBuf.get(i));
// ServerBootstrapAcceptor的channelRead接口將會被調(diào)用,用于處理channelRead事件
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 獲取傳播事件的對象數(shù)據(jù),即為前面的readBuf.get(i)
// readBuf.get(i)取出的對象為 NioSocketChannel
final Channel child = (Channel) msg;
// 向 NioSocketChannel 添加childHandler,也就是我們??吹降? // ServerBootstrap在啟動時配置的代碼:
// ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} )
// 最終的結(jié)果就是向NioSocketChannel的Pipeline添加用戶自定義的ChannelHandler
// 用于處理客戶端的channel連接
child.pipeline().addLast(childHandler);
// 配置 NioSocketChannel的TCP屬性
setChannelOptions(child, childOptions, logger);
// 配置 NioSocketChannel 一些用戶自定義數(shù)據(jù)
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
// 將NioSocketChannel注冊到childGroup,也就是Netty的WorkerGroup當(dāng)中去
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
...
}
...
}
關(guān)于 ChannelInitializer 的講解,可以看此前 Pipeline源碼分析 文章。
后面的register邏輯,就與我們前面講解 NioServerSocketChannel注冊 大體類似了,這里簡單介紹一下。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
// 注冊NioSocketChannel
// eventLoop為childGroup
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 綁定eventLoop到NioSocketChannel上
AbstractChannel.this.eventLoop = eventLoop;
// 現(xiàn)在分析的邏輯是在服務(wù)端的線程上,eventLoop與主線程不同,返回false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 這里來調(diào)用register0方法
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 注冊
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
// 調(diào)用 doRegister()
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 服務(wù)端的NioServerSocketChannel已經(jīng)與客戶端的NioSocketChannel建立了連接
// 所以,NioSocketChannel是處于激活狀態(tài),isActive()返回ture
if (isActive()) {
// 對于新連接,是第一次注冊
if (firstRegistration) {
// 傳播ChannelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
...
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
...
}
調(diào)用到NioSocketChannel中的doRegister()方法:
public abstract class AbstractNioChannel extends AbstractChannel {
...
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 將selector注冊到底層JDK channel上,并附加了NioSocketChannel對象
// 興趣集設(shè)置為0,表示不關(guān)心任何事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
...
}
NioSocketChannel 注冊O(shè)P_READ興趣集
緊接著上面的分析,傳播ChannelActive事件之后的邏輯,主要就是向客戶端的NioSocketChannel注冊一個Read興趣集
if (isActive()) {
// 對于新連接,是第一次注冊
if (firstRegistration) {
// 傳播ChannelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
通過 Pipeline的傳播機制 ,最終會調(diào)用到doBeginRead()接口,如下:
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
...
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
// 保存selectionKey到局部變量
final SelectionKey selectionKey = this.selectionKey;
// 判斷有效性
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 獲取selectionKey的興趣集
// 前面小結(jié)分析doRegister()接口提到,selectionKey的興趣集設(shè)置為0
final int interestOps = selectionKey.interestOps();
// 這里的 readInterestOp 是前面講NioSocketChannel創(chuàng)建時設(shè)置的值
// 為 SelectionKey.OP_READ,也就是1
if ((interestOps & readInterestOp) == 0) {
// 這樣,selectionKey最終設(shè)置的興趣集為SelectionKey.OP_READ
// 表示對讀事件感興趣
selectionKey.interestOps(interestOps | readInterestOp);
}
}
...
}
...
}
小結(jié)
- Netty是在哪里檢測有新連接接入的?
- 新連接是怎樣注冊到NioEventLoop線程上的?
- NioSocketChannel是怎樣初始化及注冊的?