前節(jié)我們分析了runAllTask()s和select(),前者是執(zhí)行NioEventLoop持有得任務(wù)隊列中所有得任務(wù),后者是輪詢檢測IO事件得發(fā)生。
本節(jié)研究processSelectedKeys():處理IO事件
我們直到處理IO事件需要輪詢selectedKeys,被NioEventLoop持有,查看原代碼,其底層維持了一個數(shù)組,而不是HashSet。這里netty對Selector做了事件復(fù)雜度的優(yōu)化
private SelectedSelectionKeySet selectedKeys;
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
/**
* 維持了一個數(shù)組,而不是HashSet
*/
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
/**
* 改方法直接操作數(shù)組,實現(xiàn)時間復(fù)雜度為O(1)
* @param o
* @return
*/
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public int size() {
return size;
}
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
NioEventLoop實例化的時候,調(diào)用openSelector(),獲取了一個Selector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//調(diào)用jdk底層代碼,獲取一個IO事件輪詢器,并且優(yōu)化Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
在這里進行了優(yōu)化,需要了解的是 SelectorTuple 只是包裝了unwrappedSelector,最終獲取的是WindowsSelectorImpl,其繼承了jdk底層的SelectorImpl。
那么優(yōu)化的過程也不復(fù)雜。
- 通過反射把netty自己實現(xiàn)的這個
WindowsSelectorImpl其中兩個重要的屬性selectedKeys和publicSelectedKeys的實現(xiàn)替換成了SelectedSelectionKeySet,重寫了其一系列方法。實際上是將這兩個原本是用HashSet來實現(xiàn)的屬性,換成了實際上是一個數(shù)組的實現(xiàn)的SelectionKey[] keys;,并且這兩個屬性用的都是同一個對象。 - 將這個標記事件的對象,直接存儲再
NioEventLoop#selectedKeys,后續(xù)可直接操作。
優(yōu)化的結(jié)果是降低了時間復(fù)雜度,從O(n) 降低到O(1).
/**
* 用數(shù)組替換selectedKeys的HashSet的一個是實現(xiàn),做到add方法時間復(fù)雜度為O(1)
* @return
*/
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
////初始化一個unwrappedSelector 是WindowsSelectorImpl繼承了jdk的SelectorImpl
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//判斷是否要優(yōu)化
if (DISABLE_KEY_SET_OPTIMIZATION) {
//不需要優(yōu)化
return new SelectorTuple(unwrappedSelector);
}
//反射獲取SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
//判斷是否是SelectorImpl的Class對象
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
//是是SelectorImpl的Class對象
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
//實際上維持了一個SelectionKey[]數(shù)組
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//獲取兩個重要的屬性:selectedKeys和publicSelectedKeys,這兩個屬性都是HashSet
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//將netty的unwrappedSelector的selectedKeys和publicSelectedKeys設(shè)置成經(jīng)過優(yōu)化后的selectedKeySet
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
回到具體調(diào)用processSelectedKeys();的地方,發(fā)現(xiàn)確實是輪詢selectedKeys來進行處理
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
//輪詢selectedKeys,
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
//獲取channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//處理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
具體的處理邏輯呢也比較簡單,就是判斷以下連接是否合法,然后讀取事件類型,并根據(jù)不同的事件類型具體處理不同的IO事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//判斷連接是否合法
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
//合法的連接進入事件處理邏輯
try {
//獲取key的IO事件并根據(jù)類型處理的
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//如果當前NioEventLoop是workGroup 則可能是OP_READ,bossGroup是OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}