前邊一遍文章分析了NioEventLoop的實現(xiàn)原理,可以知道NioEventLoop主要跑兩類任務(wù):I/O任務(wù)和非I/O任務(wù)。其中I/O任務(wù)主要是進行Select選擇出已注冊的I/O事件并對這些I/O事件進行處理,執(zhí)行的具體方法是processSelectedKeys()。下面我們就對這段代碼進行具體分析,可參見其中的一個分支processSelectedKeysPlain(Set<SelectionKey> selectedKeys)。
具體代碼如下:
遍歷喚醒的SelectionKey,并取出對應(yīng)key注冊的attachment。判斷attachment的類型:
1)是AbstractNioChannel,對應(yīng)確定的一個Netty對應(yīng)的NioChannel,對應(yīng)執(zhí)行processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法。
2)是NioTask<SelectableChannel>,一般不會是這種類型,多數(shù)為是用戶自定義的一個task。
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
我們可以著重來看下processSelectedKey(SelectionKey k, AbstractNioChannel ch)的實現(xiàn),根據(jù)readyOps確定感興趣的事件類型,執(zhí)行不同的操作:
1)SelectionKey.OP_CONNECT 連接事件,此處是對于client端而言的,該事件標志著三次握手階段,client端收到了server端的ack報文。
2)SelectionKey.OP_WRITE 寫事件,說明有數(shù)據(jù)要寫。
3)SelectionKey.OP_READ | SelectionKey.OP_ACCEPT 讀事件或接收連接事件。SelectionKey.OP_ACCEPT是對于server端而言的,標志有新的連接請求到達。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
1. OP_CONNECT事件分析
OP_CONNECT事件說明Server端已經(jīng)回復(fù)了客戶端建立連接的請求,下一步需要執(zhí)行三次握手的第三步。具體執(zhí)行的代碼如下。首先將Selector上將OP_CONNECT從注冊事件中去掉,然后會調(diào)用unsafe.finishConnect()。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
重點看一下unsafe.finishConnect()方法。doFinishConnect()會直接調(diào)用對應(yīng)javaChannel的finishConnect()方法,此處不再做復(fù)雜介紹。在fulfillConnectPromise方法中,會嘗試去判斷該channel是否是active狀態(tài),是否被人cancell掉。如果當(dāng)期狀態(tài)是active的,會調(diào)用pipeline的fireChannelActive()方法。
public final void finishConnect() {
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
2. OP_WRITE事件分析
OP_WRITE事件說明該端有數(shù)據(jù)需要寫出。通過代碼可以看到write事件會調(diào)用ch.unsafe().forceFlush()方法,接下來直接調(diào)用AbstractChannel類的flush0()方法。在這個方法中,最核心的就是調(diào)用doWrite(outboundBuffer),這是一個abstract方法,具體得由對應(yīng)的Channel子類去實現(xiàn)。舉例來說,對于NioSocketChannel類,outboundBuffer中的數(shù)據(jù)類型是byte,直接調(diào)用javaChannel的write方法即可;而對于AbstractNioMessageChannel類,outboundBuffer中的數(shù)據(jù)類型是Object,最終在調(diào)用javaChannel的write方法進行寫操作之前需要進行轉(zhuǎn)換將Object轉(zhuǎn)化至byte類型。
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
.......
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
3. SelectionKey.OP_READ | SelectionKey.OP_ACCEPT事件分析
Netty將Accept事件和READ事件進行了封裝,統(tǒng)一調(diào)用unsafe.read()方法進行處理。我們以AbstractNioMessageChannel為例,看一下read()方法進行了哪些操作。
核心過程大致分為3步:
1)doReadMessages(readBuf)將數(shù)據(jù)讀到readBuf。
2)遍歷readBuf,對其中的每一個元素調(diào)用pipeline的fireChannelRead方法。
3)調(diào)用pipeline的fireChannelReadComplete方法。
4)removeReadOp()。
前邊我們討論過說Netty將Accept和Read進行了統(tǒng)一封裝,而具體拆分的細節(jié)也是通過分別實現(xiàn)abstract方法進行不同的處理。比如,對于NioServerSocketChannel中,doReadMessages(readBuf)會調(diào)用SocketUtils.accept方法建立子連接并將該子連接放置到readBuf中,后續(xù)的pipeline在處理readBuf時也會有不同的處理,可以可到ServerBootstrapAcceptor中channelRead方法的實現(xiàn);對于NioUdtMessageConnectorChannel,doReadMessages(readBuf)方法會從javaChannel中讀取數(shù)據(jù)到readBuf
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}