在研究NioEventLoop執(zhí)行過程的時候,檢測IO事件(包括新連接),處理IO事件,執(zhí)行所有任務三個過程。其中檢測IO事件中通過持有的selector去輪詢事件,檢測出新連接。這里復用同一段代碼。
今天我們研究的新連接介入的過程大概如下:
- 檢測新連接
- 檢測新連接之后,創(chuàng)建
NioSocketChannel,也就是客戶端channel。 - 接著給
channel分配一個NioEventLoop,并且把該channel注冊到NioEventLoop對應的selector上。至此,這條channel之后的讀寫都由該NioEventLoop進行管理。 - 最后向
selector注冊讀寫事件,注冊的時候和服務端啟動注冊accept事件復用同一段邏輯。
netty的多連接復用指的是,多個連接父用一個NioEventLoop持有的線程。
netty服務端在啟動的時候會綁定一個bossGroup,即NioEventLoop,在bind()綁定端口的時候注冊accept(新連接接入)事件。掃描到該事件后,便處理。因此入口從:NioEventLoop#processSelectedKeys()開始
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//真正的處理過程
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
真正的入口NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel),改方法整體邏輯我們前面分析過在《NioEventLoop執(zhí)行之processSelectedKeys()》章節(jié)中有介紹,這里我們直接看,新連接處理的邏輯
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//省略代碼
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//如果當前NioEventLoop是workGroup 則可能是OP_READ,bossGroup是OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//新連接接入以及讀事件處理入口
unsafe.read();
}
}
這里的unsafe是在《Channel創(chuàng)建過程》的時候,調(diào)用了父類AbstractChannel#AbstractChannel()的構(gòu)造方法,和pipeline一起初始化的。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
該unsalf為NioServerSockeChannel的父類AbstractNioMessageChannel#newUnsafe()創(chuàng)建,可以看到對應的是AbstractNioMessageChannel.NioMessageUnsafe,內(nèi)部類
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
查看該類的read()方法,其大致流程如下:
- 循環(huán)調(diào)用jdk底層的代碼創(chuàng)建
channel,并用netty的NioSocketChannel包裝起來,代表新連接成功接入一個通道。 - 將所有獲取到的
channel存儲到一個容器當中,檢測接入的連接數(shù),默認是一次接16個連接 - 遍歷容器中的
channel,依次調(diào)用方法fireChannelRead,fireChannelReadComplete,fireExceptionCaught來觸發(fā)對應的傳播事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//臨時存儲讀到的連接
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//服務端接入速率處理器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
//while循環(huán)調(diào)用doReadMessages()創(chuàng)建新連接對象
do {
//獲取jdk底層的channel,并加入readBuf容器
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//把讀到的連接做一個累加totalMessages,默認最多累計讀取16個連接,結(jié)束循環(huán)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
//觸發(fā)readBuf容器內(nèi)所有的傳播事件:ChannelRead 讀事件
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
//清空容器
readBuf.clear();
allocHandle.readComplete();
//觸發(fā)傳播事件:ChannelReadComplete,所有的讀事件完成
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
//觸發(fā)傳播事件:exceptionCaught,觸發(fā)異常
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
- 獲取jdk底層的channel,調(diào)用的是
NioServerSocketChannel#doReadMessages(),創(chuàng)建jdk底層channel并且用NioSocketChannel包裝起來,將該channel添加到傳入的容器保存起來,同時返回一個計數(shù)。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//獲取jdk底層的channel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//將jdk底層的channel封裝到netty的channel,并存儲到傳入的容器當中
buf.add(new NioSocketChannel(this, ch));
//成功和創(chuàng)建 客戶端接入的一條通道,并返回
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
下面這段代碼allocHandle是一個服務器接入速率處理器,其實是DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle,通過incMessagesRead()方法維持一個成員變量totalMessages,與continueReading()方法配合,控制一個while循環(huán)接入的連接最大數(shù)。循環(huán)獲取了一個批次的連接之后再統(tǒng)一處理這部分連接。
//服務端接入速率處理器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
//判斷讀取到的連接總數(shù)是否大于最大連接數(shù),maxMessagePerRead默認16
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}