本文是Netty文集中“Netty 源碼解析”系列的文章。主要對Netty的重要流程以及類進(jìn)行源碼解析,以使得我們更好的去使用Netty。Netty是一個非常優(yōu)秀的網(wǎng)絡(luò)框架,對其源碼解讀的過程也是不斷學(xué)習(xí)的過程。
NioEventLoop
通過前面的學(xué)習(xí),我們對NioEventLoop做過如下幾點(diǎn)簡單的概述:
① NioEventLoop是一個基于JDK NIO的異步事件循環(huán)類,它負(fù)責(zé)處理一個Channel的所有事件在這個Channel的生命周期期間。
② NioEventLoop的整個生命周期只會依賴于一個單一的線程來完成。一個NioEventLoop可以分配給多個Channel,NioEventLoop通過JDK Selector來實(shí)現(xiàn)I/O多路復(fù)用,以對多個Channel進(jìn)行管理。
③ 如果調(diào)用Channel操作的線程是EventLoop所關(guān)聯(lián)的線程,那么該操作會被立即執(zhí)行。否則會將該操作封裝成任務(wù)放入EventLoop的任務(wù)隊列中。
④ 所有提交到NioEventLoop的任務(wù)都會先放入隊列中,然后在線程中以有序(FIFO)/連續(xù)的方式執(zhí)行所有提交的任務(wù)。
⑤ NioEventLoop的事件循環(huán)主要完成了:a)已經(jīng)注冊到Selector的Channel的監(jiān)控,并在感興趣的事件可執(zhí)行時對其進(jìn)行處理;b)完成任務(wù)隊列(taskQueue)中的任務(wù),以及對可執(zhí)行的定時任務(wù)和周期性任務(wù)的處理(scheduledTaskQueue中的可執(zhí)行的任務(wù)都會先放入taskQueue中后,再從taskQueue中依次取出執(zhí)行)。
其中幾點(diǎn)已經(jīng)在啟動流程的源碼分析中做了詳細(xì)的介紹。本文主要針對NioEventLoop事件循環(huán)的流程對NioEventLoop進(jìn)行更深一步的學(xué)習(xí)。
JCTools
JCTools:適用于JVM的Java并發(fā)工具。該項目旨在提供一些當(dāng)前JDK缺失的并發(fā)數(shù)據(jù)結(jié)構(gòu)。
SPSC/MPSC/SPMC/MPMC 變種的并發(fā)隊列:
- SPSC:用于單生產(chǎn)者單消費(fèi)者模式(無等待,有限長度 和 無限長度)
- MPSC:用于多生產(chǎn)者單消費(fèi)者模式(無鎖的,有限長度 和 無限長度)
- SPMC:用于單生產(chǎn)者多消費(fèi)者模式(無鎖的,有限長度)
- MPMC:用于多生產(chǎn)者多消費(fèi)模式(無鎖的,有限長度)
JCTools提供的隊列是一個無鎖隊列,也就是隊列的底層通過無鎖的方式實(shí)現(xiàn)了線程安全的訪問。
MpscUnboundedArrayQueue是由JCTools提供的一個多生產(chǎn)者單個消費(fèi)者的數(shù)組隊列。多個生產(chǎn)者同時并發(fā)的訪問隊列是線程安全的,但是同一時刻只允許一個消費(fèi)者訪問隊列,這是需要程序控制的,因?yàn)镸pscQueue的用途即為多個生成者可同時訪問隊列,但只有一個消費(fèi)者會訪問隊列的情況。如果是其他情況你可以使用JCTools提供的其他隊列。
Q:為什么說MpscUnboundedArrayQueue的性能高于LinkedBlockingQueue了?
A:① MpscUnboundedArrayQueue底層通過無鎖的方式實(shí)現(xiàn)了多生產(chǎn)者同時訪問隊列的線程安全性,而LinkedBlockingQueue是一個多生產(chǎn)者多消費(fèi)者的模式,它則是用過Lock鎖的方式來實(shí)現(xiàn)隊列的線程安全性。
② Netty的線程模型決定了taskQueue可以用多個生產(chǎn)者線程同時提交任務(wù),但只會有EventLoop所在線程來消費(fèi)taskQueue隊列中的任務(wù)。這樣JCTools提供的MpscQueue完全符合Netty線程模式的使用場景。而LinkedBlockingQueue會在生產(chǎn)者線程操作隊列時以及消費(fèi)者線程操作隊列時都對隊列加鎖以保證線程安全性。雖然,在Netty的線程模型中程序會控制訪問taskQueue的始終都會是EventLoop所在線程,這時會使用偏向鎖來降低線程獲得鎖的代價。
偏向鎖:HotSpot的作者經(jīng)過研究發(fā)現(xiàn),大多數(shù)情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得,為了讓線程獲得鎖的代價更低而引入了偏向鎖。當(dāng)一個線程訪問同步塊并獲取鎖時,會在對象頭和棧幀中的鎖記錄里存儲鎖偏向的線程ID,以后該線程再進(jìn)入和退出同步塊時不需要進(jìn)行CAS操作來加鎖和解鎖,只需要簡單地測試一下對象頭的Mark Word(Mark Word是Java對象頭的內(nèi)容,用于存儲對象的hashCode或鎖信息等)里是否存儲著指向當(dāng)前線程的偏向鎖。如果測試成功,表示線程已經(jīng)獲得了鎖。如果測試失敗,則需要再測試一下Mark Word中偏向鎖的標(biāo)識是否設(shè)置成1(表示當(dāng)前是偏向鎖):如果沒有設(shè)置,則使用CAS競爭鎖;如果設(shè)置了,則嘗試使用CAS將對象頭的偏向鎖指向當(dāng)前線程。
重要屬性
- taskQueue
// 用于存儲任務(wù)的隊列,是一個MpscUnboundedArrayQueue實(shí)例。
private final Queue<Runnable> taskQueue;
- tailTasks
// 是一個MpscUnboundedArrayQueue實(shí)例。用于存儲當(dāng)前或下一次事件循環(huán)(eventloop)迭代結(jié)束后需要執(zhí)行的任務(wù)。
private final Queue<Runnable> tailTasks;
- scheduledTaskQueue
// 定時或周期任務(wù)隊列,是一個PriorityQueue實(shí)例。
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
- SELECTOR_AUTO_REBUILD_THRESHOLD
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
SELECTOR_AUTO_REBUILD_THRESHOLD用于標(biāo)識Selector空輪詢的閾值,當(dāng)超過這個閾值的話則需要重構(gòu)Selector。
如果有設(shè)置系統(tǒng)屬性”io.netty.selectorAutoRebuildThreshold”,并且該屬性值大于MIN_PREMATURE_SELECTOR_RETURNS(即,3),那么該屬性值就為閾值;如果該屬性值小于MIN_PREMATURE_SELECTOR_RETURNS(即,3),那么閾值為0。如果沒有設(shè)置系統(tǒng)屬性”io.netty.selectorAutoRebuildThreshold”,那么閾值為512,即,默認(rèn)情況下閾值為512。
- selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
selectNow提供器,在事件循環(huán)里用于選擇策略(selectStrategy)中。
- pendingTasksCallable
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return NioEventLoop.super.pendingTasks();
}
};
@Override
public int pendingTasks() {
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
// See https://github.com/netty/netty/issues/5297
if (inEventLoop()) {
return super.pendingTasks();
} else {
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
}
}
因?yàn)閜endingTasks()方法的底層就是調(diào)用taskQueue.size()方法,而前面我們已經(jīng)說了taskQueue是一個MpscQueue,所以只能由EventLoop所在的線程來調(diào)用這個pendingTasks()方法,如果當(dāng)前線程不是EventLoop所在線程,那么就將pendingTasks()封裝在一個Callable(即,pendingTasksCallable)提交到taskQueue中去執(zhí)行,并同步的等待執(zhí)行的結(jié)果。
- 靜態(tài)代碼塊
// Workaround for JDK NIO bug.
//
// See:
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
// - https://github.com/netty/netty/issues/203
static {
final String key = "sun.nio.ch.bugLevel";
final String buglevel = SystemPropertyUtil.get(key);
if (buglevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
這里的靜態(tài)代碼塊,主要做了兩件事:
① 解決在java6 中 NIO Selector.open()可能拋出NPE異常的問題。
問題:http://bugs.java.com/view_bug.do?bug_id=6427854
解決:https://github.com/netty/netty/issues/203
這里我們對問題和Netty的解決方案進(jìn)行一個簡單的講解。
問題描述:
sun.nio.ch.Util中包含??線程不安全的代碼,并可能拋出一個NullPointerException異常。


正是因?yàn)?p>
java.security.PrivilegedAction pa =
new GetPropertyAction("sun.nio.ch.bugLevel");
// the next line can reset bugLevel to null
bugLevel = (String)AccessController.doPrivileged(pa);
??的調(diào)用導(dǎo)致bugLevel又被重置為了null。導(dǎo)致了NPE bug的發(fā)生。
Netty的解決方案:在開始使用Selector.open()方法之前,先將"sun.nio.ch.bugLevel"系統(tǒng)屬性設(shè)置為non-null的。即,如果"sun.nio.ch.bugLevel”系統(tǒng)屬性值為null,則設(shè)置”sun.nio.ch.bugLevel”=“”
② 為了在事件循環(huán)時解決JDK NIO類庫的epoll bug,先設(shè)置好SELECTOR_AUTO_REBUILD_THRESHOLD,即selector空輪詢的閾值。具體的賦值流程上面已經(jīng)詳細(xì)說明過了。
- 喚醒select標(biāo)識符
// 一個原子類的Boolean標(biāo)識用于控制決定一個阻塞著的Selector.select是否應(yīng)該結(jié)束它的選擇操作。
private final AtomicBoolean wakenUp = new AtomicBoolean();
- ioRatio
// 在事件循環(huán)中期待用于處理I/O操作時間的百分比。默認(rèn)為50%。
// 也就是說,在事件循環(huán)中默認(rèn)情況下用于處理I/O操作的時間和用于處理任務(wù)的時間百分比都為50%,
// 即,用于處理I/O操作的時間和用于處理任務(wù)的時間時一樣的。用戶可以根據(jù)實(shí)際情況來修改這個比率。
private volatile int ioRatio = 50
方法
構(gòu)造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
a) 完成成員屬性taskQueue、tailQueue的構(gòu)建;
最終會調(diào)用newTaskQueue方法來完成構(gòu)建:
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
參數(shù)maxPendingTasks默認(rèn)為Integer.MAX_VALUE,則會通過“PlatformDependent.<Runnable>newMpscQueue()”返回來構(gòu)造一個MpscUnboundedArrayQueue實(shí)例,其初容量大小為1024,最大容量限制為2048。
b) 設(shè)置成員屬性addTaskWakesUp為false。
c) 設(shè)置成員屬性rejectedExecutionHandler值為RejectedExecutionHandlers.reject()方法將返回一個RejectedExecutionHandler實(shí)例。
/**
* Similar to {@link java.util.concurrent.RejectedExecutionHandler} but specific to {@link SingleThreadEventExecutor}.
*/
public interface RejectedExecutionHandler {
/**
* Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity
* restrictions.
*/
void rejected(Runnable task, SingleThreadEventExecutor executor);
}
RejectedExecutionHandler接口類似于JDK 的 java.util.concurrent.RejectedExecutionHandler,但是RejectedExecutionHandler只針對于SingleThreadEventExecutor。
該接口中有一個唯一的接口方法rejected,當(dāng)嘗試去添加一個任務(wù)到SingleThreadEventExecutor中,但是由于容量的限制添加失敗了,那么此時該方法就會被調(diào)用。
RejectedExecutionHandlers.reject()返回的是一個RejectedExecutionHandler常量REJECT
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
該RejectedExecutionHandler總是拋出一個RejectedExecutionException異常。
d) final SelectorTuple selectorTuple = openSelector();
開啟Selector,構(gòu)造SelectorTuple實(shí)例,SelectorTuple是一個封裝了原始selector對象和封裝后selector對象(即,SelectedSelectionKeySetSelector對象)的類:
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
這里,成員變量unwrappedSelector就是通過SelectorProvider.provider().openSelector()開啟的Selector;而成員變量selector則是一個SelectedSelectionKeySetSelector對象。
SelectedSelectionKeySetSelector中持有unwrappedSelector實(shí)例,并作為unwrappedSelector的代理類,提供Selector所需要的方法,而Selector相關(guān)的操作底層實(shí)際上都是由unwrappedSelector來完成的,只是在操作中增加了對selectionKeys進(jìn)行相應(yīng)的設(shè)置。SelectedSelectionKeySetSelector中除了持有unwrappedSelector實(shí)例外還持有一個SelectedSelectionKeySet對象。該對象是Netty提供的一個可以‘代替’Selector selectedKeys的對象。openSelector()方法中通過反射機(jī)制將程序構(gòu)建的SelectedSelectionKeySet對象給設(shè)置到了Selector內(nèi)部的selectedKeys、publicSelectedKeys屬性。這使Selector中所有對selectedKeys、publicSelectedKeys的操作實(shí)際上就是對SelectedSelectionKeySet的操作。
SelectedSelectionKeySet類主要通過成員變量SelectionKey[]數(shù)組來維護(hù)被選擇的SelectionKeys,并將擴(kuò)容操作簡單的簡化為了’newCapacity為oldCapacity的2倍’來實(shí)現(xiàn)。同時不在支持remove、contains、iterator方法。并添加了reset方法來對SelectionKey[]數(shù)組進(jìn)行重置。
SelectedSelectionKeySetSelector中主要是在每次select操作的時候,都會先將selectedKeys進(jìn)行清除(reset)操作。
e) 設(shè)置成員屬性selectStrategy的值為DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(),即一個DefaultSelectStrategy實(shí)例。
事件循環(huán)


NioEventLoop的事件循環(huán)主要完成下面幾件事:
① 根據(jù)當(dāng)前NioEventLoop中是否有待完成的任務(wù)得出select策略,進(jìn)行相應(yīng)的select操作
② 處理select操作得到的已經(jīng)準(zhǔn)備好處理的I/O事件,以及處理提交到當(dāng)前EventLoop的任務(wù)(包括定時和周期任務(wù))。
③ 如果NioEventLoop所在線程執(zhí)行了關(guān)閉操作,則執(zhí)行相關(guān)的關(guān)閉操作處理。這一塊在之前Netty 源碼解析 ——— Netty 優(yōu)雅關(guān)閉流程的文章已經(jīng)做了詳細(xì)的說明,這里就不再贅述了。
下面我們詳細(xì)展開每一步
① 根據(jù)當(dāng)前NioEventLoop中是否有待完成的任務(wù)得出select策略,進(jìn)行相應(yīng)的select操作:
『selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())』
前面我們已經(jīng)說過selectNowSupplier是一個selectNow提供器:
// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// NioEventLoop#selectNow()
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
// SelectedSelectionKeySetSelector#selectNow()
public int selectNow() throws IOException {
selectionKeys.reset();
return delegate.selectNow();
}
// SelectedSelectionKeySetSelector#wakeup()
public Selector wakeup() {
return delegate.wakeup();
}
selectNowSupplier提供的selectNow()操作是通過封裝過的selector(即,SelectedSelectionKeySetSelector對象)來完成的。而SelectedSelectionKeySetSelector的selectorNow()方法處理委托真實(shí)的selector完成selectoNow()操作外,還會將selectionKeys清空。
hasTasks()方法用于判斷taskQueue或tailTasks中是否有任務(wù)。
前面我們也提到過selectStrategy就是一個DefaultSelectStrategy對象:
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
DefaultSelectStrategy的選擇策略就是:
如果當(dāng)前的EventLoop中有待處理的任務(wù),那么會調(diào)用selectSupplier.get()方法,也就是最終會調(diào)用Selector.selectNow()方法,并清空selectionKeys。Selector.selectNow()方法不會發(fā)生阻塞,如果沒有一個channel(即,該channel注冊的事件發(fā)生了)被選擇也會立即返回,否則返回就緒I/O事件的個數(shù)。
如果當(dāng)前的EventLoop中沒有待處理的任務(wù),那么返回’SelectStrategy.SELECT(即,-1)’。
如果‘selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())’操作返回的是一個>0的值,則說明有就緒的的I/O事件待處理,則直接進(jìn)入流程②。否則,如果返回的是’SelectStrategy.SELECT’則進(jìn)行select(wakenUp.getAndSet(false))操作:
首先先通過自旋鎖(自旋 + CAS)方式獲得wakenUp當(dāng)前的標(biāo)識,并再將wakenUp標(biāo)識設(shè)置為false。將wakenUp作為參數(shù)傳入select(boolean oldWakenUp)方法中,注意這個select方法不是JDK NIO的Selector.select方法,是NioEventLoop類自己實(shí)現(xiàn)的一個方法,只是方法名一樣而已。NioEventLoop的這個select方法還做了一件很重要的時,就是解決“JDK NIO類庫的epoll bug”問題。
- 解決 JDK NIO 類庫的 epool bug
下面我們來對這個“JDK NIO類庫的epoll bug”問題已經(jīng)Netty是如何解決這個問題進(jìn)行一個說明:
JDK NIO類庫最著名的就是 epoll bug了,它會導(dǎo)致Selector空輪詢,IO線程CPU 100%,嚴(yán)重影響系統(tǒng)的安全性和可靠性。
SUN在解決該BUG的問題上不給力,只能從NIO框架層面進(jìn)行問題規(guī)避,下面我們看下Netty是如何解決該問題的。
Netty的解決策略:
- 根據(jù)該BUG的特征,首先偵測該BUG是否發(fā)生;
- 將問題Selector上注冊的Channel轉(zhuǎn)移到新建的Selector上;
- 老的問題Selector關(guān)閉,使用新建的Selector替換。
下面具體看下代碼,首先檢測是否發(fā)生了該BUG:


紅色框中的代碼,主要完成了是否發(fā)生“epoll-bug”的檢測。
『if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)』返回false,即『time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) < currentTimeNanos』 的意思是:int selectedKeys = selector.select(timeoutMillis)在timeoutMillis時間到期前就返回了,并且selectedKeys==0,則說明selector進(jìn)行了一次空輪詢,這違反了Javadoc中對Selector.select(timeout)方法的描述。epoll-bug會導(dǎo)致無效的狀態(tài)選擇和100%的CPU利用率。也就是Selector不管有無感興趣的事件發(fā)生,select總是不阻塞就返回。這會導(dǎo)致select方法總是無效的被調(diào)用然后立即返回,依次不斷的進(jìn)行空輪詢,導(dǎo)致CPU的利用率達(dá)到了100%。
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
SELECTOR_AUTO_REBUILD_THRESHOLD默認(rèn)為512,也就是當(dāng)Selector連續(xù)執(zhí)行了512次空輪詢后,Netty就會進(jìn)行Selector的重建操作,即rebuildSelector()操作。
綠色框中代碼主要說明了,當(dāng)有定時/周期性任務(wù)即將到達(dá)執(zhí)行時間(<0.5ms),或者NioEventLoop的線程收到了新提交的任務(wù)上來等待著被處理,或者有定時/周期性任務(wù)到達(dá)了可處理狀態(tài)等待被處理,那么則退出select方法轉(zhuǎn)而去執(zhí)行任務(wù)。這也說明Netty總是會盡最大努力去保證任務(wù)隊列中的任務(wù)以及定時/周期性任務(wù)能得到及時的處理。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
該段代碼會計算scheduledTaskQueue中是否有即將要執(zhí)行的任務(wù),即在0.5ms內(nèi)就可執(zhí)行的scheduledTask,如果有則退出select方法轉(zhuǎn)而去執(zhí)行任務(wù)。
‘selectDeadLineNanos’的初始值通過‘currentTimeNanos + delayNanos(currentTimeNanos);’而來。delayNanos方法會返回最近一個待執(zhí)行的定時/周期性任務(wù)還差多少納秒就可以執(zhí)行的時間差(若,scheduledTaskQueue為空,也就是沒有任務(wù)的定時/周期性任務(wù),則返回1秒)。因此selectDeadLineNanos就表示最近一個待執(zhí)行的定時/周期性任務(wù)的可執(zhí)行時間。
‘selectDeadLineNanos - currentTimeNanos’就表示:最近一個待執(zhí)行的定時/周期性任務(wù)還差多少納秒就可以執(zhí)行的時間差。我們用scheduledTaskDelayNanos來表示該差值。
‘(selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L’表示:(scheduledTaskDelayNanos + 0.5ms) / 1ms。如果該結(jié)果大于0,則說明scheduledTaskDelayNanos >= 0.5ms,否則scheduledTaskDelayNanos < 0.5ms。
因此,就有了上面所說的結(jié)論,scheduledTaskQueue中有在0.5ms內(nèi)就可執(zhí)行的任務(wù),則退出select方法轉(zhuǎn)而去執(zhí)行任務(wù)。
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
在了解??代碼的用意之前,我們先來說下,當(dāng)有任務(wù)提交至EventLoop時的一些細(xì)節(jié)補(bǔ)充

a) 成員變量addTaskWakesUp為false。
這里,在構(gòu)造NioEventLoop對象時,通過構(gòu)造方法傳進(jìn)的參數(shù)’addTaskWakesUp’正是false,它會賦值給成員變量addTaskWakesUp。因此該條件滿足。
b)當(dāng)提交上來的任務(wù)不是一個NonWakeupRunnable任務(wù)
// NioEventLoop#wakeup(boolean inEventLoop)
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
c) 執(zhí)行提交任務(wù)的線程不是EventLoop所在線程
d) 當(dāng)wakenUp成員變量當(dāng)前的值為false
// NioEventLoop#wakeup(boolean inEventLoop)
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
只有同時滿足上面4個條件的情況下,Selector的wakeup()方法才會的以調(diào)用。
現(xiàn)在,我們來說明這段代碼塊的用意
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
如果一個任務(wù)在wakenUp值為true的情況下被提交上來,那么這個任務(wù)將沒有機(jī)會去調(diào)用Selector.wakeup()(即,此時’d)’條件不滿足)。所以我們需要去再次檢測任務(wù)隊列中是否有待執(zhí)行的任務(wù),在執(zhí)行Selector.select操作之前。如果我們不這么做,那么任務(wù)隊列中的任務(wù)將等待直到Selector.select操作超時。如果ChannelPipeline中存在IdleStateHandler,那么IdleStateHandler處理器可能會被掛起直到空閑超時。
首先,這段代碼是在每次要執(zhí)行Selector.select(long timeout)之前我們會進(jìn)行一個判斷。我們能夠確定的事,如果hasTasks()為true,即發(fā)現(xiàn)當(dāng)前有任務(wù)待處理時。wakenUp.compareAndSet(false, true)會返回true,因?yàn)樵诿看握{(diào)用當(dāng)前這個select方法時,都會將wakenUp標(biāo)識設(shè)置為false(即,‘wakenUp.getAndSet(false)’這句代碼)。而此時,wakenUp已經(jīng)被置位true了,在此之后有任務(wù)提交至EventLoop,那么是無法觸發(fā)Selector.wakeup()的。所以如果當(dāng)前有待處理的任務(wù),就不會進(jìn)行下面的Selector.select(long timeout)操作,而是退出select方法,繼而去處理任務(wù)。
因?yàn)槿绻贿@么做的話,如果當(dāng)前NioEventLoop線程上已經(jīng)有任務(wù)提交上來,這會使得這些任務(wù)可能會需要等待Selector.select(long timeout)操作超時后才能得以執(zhí)行。再者,假設(shè)我們的ChannelPipeline中存在一個IdleStateHandler,那么就可能導(dǎo)致因?yàn)镾elector.select(long timeout)操作的timeout比IdleStateHandler設(shè)置的idle timeout長,而導(dǎo)致IdleStateHandler不能對空閑超時做出即使的處理。
同時,我們注意,在執(zhí)行‘break’退出select方法前,會執(zhí)行‘selector.selectNow()’,該方法不會阻塞,它會立即返回,同時它會抵消Selector.wakeup()操作帶來的影響(關(guān)于NIO 相關(guān)的知識點(diǎn),歡迎參閱關(guān)于 NIO 你不得不知道的一些“地雷”)。
所以,① 如有有非NioEventLoop線程提交了一個任務(wù)上來,那么這個線程會執(zhí)行『selector
.wakeup()』方法,那么NioEventLoop在『if (hasTasks() && wakenUp.compareAndSet(false, true))』的后半個條件會返回false,程序會執(zhí)行到『int selectedKeys = selector.select(timeoutMillis);』,但是此時select不會阻塞,而是直接返回,因?yàn)榍懊嬉呀?jīng)先執(zhí)行了『selector.wakeup()』;② 因?yàn)樘峤蝗蝿?wù)的線程是非NioEventLoop線程,所以也可能是由NioEventLoop線程成功執(zhí)行了『if (hasTasks() && wakenUp.compareAndSet(false, true))』,退出了select方法轉(zhuǎn)而去執(zhí)行任務(wù)隊列中的任務(wù)。注意,這是提交任務(wù)的非NioEventLoop線程就不會執(zhí)行『selector.wakeup()』。
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
同時,除了在每次Selector.select(long timeout)操作前進(jìn)行任務(wù)隊列的檢測外,在每次Selector.select(long timeout)操作后也會檢測任務(wù)隊列是否已經(jīng)有提交上來的任務(wù)待處理,以及是由有定時或周期性任務(wù)準(zhǔn)備好被執(zhí)行。如果有,也不會繼續(xù)“epoll-bug”的檢測,轉(zhuǎn)而去執(zhí)行待處理的任務(wù)。
好了,我們在來看下如果經(jīng)過檢測,我們已經(jīng)確認(rèn)發(fā)生了“epoll-bug”,這時我們就需要進(jìn)行Selector的重構(gòu)操作:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
重構(gòu)操作主要的流程:首先,通過openSelector()先構(gòu)造一個新的SelectorTuple。然后,遍歷oldSelector中的所有SelectionKey,依次判斷其有效性,如果有效則將其重新注冊到新的Selector上,并將舊的SelectionKey執(zhí)行cancel操作,進(jìn)行相關(guān)的數(shù)據(jù)清理,以便最后oldSelector好進(jìn)行關(guān)閉。在將所有的SelectionKey數(shù)據(jù)移至新的Selector后,將newSelectorTuple的selector和unwrappedSelector賦值給相應(yīng)的成員屬性。最后,調(diào)用oldSelector.close()關(guān)閉舊的Selector以進(jìn)行資源的釋放。
接下我們繼續(xù)討論NioEventLoop.select操作的流程②
② 處理select操作得到的已經(jīng)準(zhǔn)備好處理的I/O事件,以及處理提交到當(dāng)前EventLoop的任務(wù)(包括定時和周期任務(wù)):

a) 首先先將成員變量cancelledKeys和needsToSelectAgain重置,即,cancelledKeys置為0,needsToSelectAgain置為false;
b) 成員變量ioRatio的默認(rèn)值為50
private volatile int ioRatio = 50;ioRatio在事件循環(huán)中期待用于處理I/O操作時間的百分比。默認(rèn)為50%。也就是說,在事件循環(huán)中默認(rèn)情況下用于處理I/O操作的時間和用于處理任務(wù)的時間百分比都為50%,即,用于處理I/O操作的時間和用于處理任務(wù)的時間時一樣的。
這里做個簡單的證明吧:
當(dāng)ioRatio不為100%時,我們假設(shè)在事件循環(huán)中用于處理任務(wù)時間的百分比為taskRatio,I/O操作的時間為ioTime,處理任務(wù)的時間為taskTime,求taskTime:
ioTime/taskTime = ioRatio/taskRatio; 并且 ioRatio + taskRatio = 100;
帶入,ioTime/taskTime = ioRatio/(100-ioRatio); ==> taskTime = ioTime*(100 - ioRatio) / ioRatio;
所以runAllTasks(ioTime * (100 - ioRatio) / ioRatio);傳入的參數(shù)就為可用于運(yùn)行任務(wù)的時間。
c) processSelectedKeys():處理Selector.select操作返回的待處理的I/O事件。

注意,『selectedKeys.keys[i] = null;』操作相當(dāng)于我們在NIO編程中在處理已經(jīng)觸發(fā)的感興趣的事件時,要將處理過的事件充selectedKeys集合中移除的步驟。
該方法會從selectedKeys中依次取出準(zhǔn)備好被處理的SelectionKey,并對相應(yīng)的待處理的I/O事件進(jìn)行處理。
在Netty 源碼解析 ——— 服務(wù)端啟動流程 (下)說到過,再將ServerSocketChannel注冊到Selector的時候,是會將其對應(yīng)的NioServerSocketChannel作為附加屬性設(shè)置到SelectionKey中。所有這里從k.attachment()獲取到的Object對象實(shí)際就是NioServerSocketChannel,而NioServerSocketChannel就是一個AbstractNioChannel的實(shí)現(xiàn)類。

當(dāng)SelectionKey.OP_CONNECT(連接事件)準(zhǔn)備就緒時,我們執(zhí)行如下操作:
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
將SelectionKey.OP_CONNECT事件從SelectionKey所感興趣的事件中移除,這樣Selector就不會再去監(jiān)聽該連接的SelectionKey.OP_CONNECT事件了。而SelectionKey.OP_CONNECT連接事件是只需要處理一次的事件,一旦連接建立完成,就可以進(jìn)行讀、寫操作了。
unsafe.finishConnect():該方法會調(diào)用SocketChannel.finishConnect()來標(biāo)識連接的完成,如果我們不調(diào)用該方法,就去調(diào)用read/write方法,則會拋出一個NotYetConnectedException異常。在此之后,觸發(fā)ChannelActive事件,該事件會在該Channel的ChannelPipeline中傳播處理。
具體的關(guān)于SelectionKey.OP_CONNECT、SelectionKey.OP_WRITE、SelectionKey.OP_READ、SelectionKey.OP_ACCEPT的處理流程可以參閱Netty 源碼解析 ——— 基于 NIO 網(wǎng)絡(luò)傳輸模式的 OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE 事件處理流程
d) 處理任務(wù)隊列中的任務(wù)以及定時/周期性任務(wù)。
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
首先,這里將執(zhí)行任務(wù)的語句寫在了finally塊中,這是為了確保即便處理SelectedKeys出現(xiàn)了異常,也要確保任務(wù)中的隊列總能得到執(zhí)行的機(jī)會。

① 獲取系統(tǒng)啟動到當(dāng)前的時間內(nèi)已經(jīng)過去的定時任務(wù)(即,延遲的時間已經(jīng)滿足或者定時執(zhí)行任務(wù)的時間已經(jīng)滿足的任務(wù))放入到taskQueue中。
從taskQueue中獲取任務(wù),如果taskQueue已經(jīng)沒有任務(wù)了,則依次執(zhí)行tailTasks隊列里的所有任務(wù)。
a) 『fetchFromScheduledTaskQueue()』
// SingleThreadEventExecutor#fetchFromScheduledTaskQueue()
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
獲取從系統(tǒng)啟動到當(dāng)前系統(tǒng)的時間間隔。從scheduledTaskQueue中獲取在該時間間隔內(nèi)已經(jīng)過期的任務(wù)(即延遲周期或定時周期已經(jīng)到時間的任務(wù)),將這些任務(wù)放入到taskQueue中,如果taskQueue滿了無法進(jìn)入添加新的任務(wù)(taskQueue隊列的容量限制最大為2048),則將其重新放回到scheduledTaskQueue。
默認(rèn)情況下,taskQueue是一個MpscUnboundedArrayQueue實(shí)例。
// AbstractScheduledEventExecutor#pollScheduledTask(long nanoTime)
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
根據(jù)給定的nanoTime返回已經(jīng)準(zhǔn)備好被執(zhí)行的Runnable。你必須是用AbstractScheduledEventExecutor的nanoTime()方法來檢索正確的nanoTime。
scheduledTaskQueue是一個PriorityQueue實(shí)例,它根據(jù)任務(wù)的deadlineNanos屬性的升序來維護(hù)一個任務(wù)隊列,每次peek能返回最先該被執(zhí)行的定時任務(wù)。deadlineNanos表示系統(tǒng)啟動到該任務(wù)應(yīng)該被執(zhí)行的時間點(diǎn)的時間差,如果“scheduledTask.deadlineNanos() <= nanoTime”則說明該任務(wù)的執(zhí)行時間已經(jīng)到了,因此將其從scheduledTaskQueue移除,然后通過該方法返回后放入到taskQueue中等待被執(zhí)行。
因此,可知每次執(zhí)行taskQueue前,taskQueue中除了有用戶自定義提交的任務(wù),系統(tǒng)邏輯流程提交至該NioEventLoop的任務(wù),還有用戶自定義或者系統(tǒng)設(shè)置的已經(jīng)達(dá)到運(yùn)行時間點(diǎn)的定時/周期性任務(wù)會一并放入到taskQueue中,而taskQueue的初始化容量為1024,最大長度限制為2048,也就是一次事件循環(huán)最多只能處理2048個任務(wù)。
b) 然后從taskQueue中獲取一個待執(zhí)行的任務(wù),如果獲取的task為null,說明本次事件循環(huán)中沒有任何待執(zhí)行的任何,那么就執(zhí)行“afterRunningAllTasks()”后返回。afterRunningAllTasks()方法會依次執(zhí)行tailQueue中的任務(wù),tailTasks中是用戶自定義的一些列在本次事件循環(huán)遍歷結(jié)束后會執(zhí)行的任務(wù),你可以通過類似如下的方式來添加tailTask:
((NioEventLoop)ctx.channel().eventLoop()).executeAfterEventLoopIteration(() -> {
// add some task to execute after eventLoop iteration
});
② 通過“系統(tǒng)啟動到當(dāng)前的時間差”+“可用于執(zhí)行任務(wù)的時間”=“系統(tǒng)啟動到可用于執(zhí)行任務(wù)時間的時間段(deadline)”。從taskQueue中依次出去任務(wù),如果task為null則說明已經(jīng)沒有待執(zhí)行的任務(wù),那么退出for循環(huán)。否則,同步的執(zhí)行task,每執(zhí)行64個任務(wù)后,就計算“系統(tǒng)啟動到當(dāng)前的時間”是否大于等于了deadline,如果是則說明已經(jīng)超過了分配給任務(wù)執(zhí)行的時間,此時就不會繼續(xù)執(zhí)行taskQueue中的任務(wù)了。
a)『safeExecute(task)』
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
通過直接調(diào)用task的run方法來同步的執(zhí)行任務(wù)。
b) 『runTasks & 0x3f』:
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
63的16進(jìn)制表示為0x3f(二進(jìn)制表示為’0011 1111’),當(dāng)已經(jīng)執(zhí)行的任務(wù)數(shù)量小于64時,其與0x3f的位與操作會大于0,當(dāng)其等于64(64的16進(jìn)制表示為0x40,二進(jìn)制表示為’0100 0000’)時,runTasks & 0x3f的結(jié)果為0。所以是每執(zhí)行64個任務(wù)后就進(jìn)行一次時間的判斷,以保證執(zhí)行任務(wù)隊列的任務(wù)不會嚴(yán)重的超過我們所設(shè)定的時間。
③ 則依次執(zhí)行tailTasks隊列里的所有任務(wù)。賦值全局屬性lastExecutionTime為最后一個任務(wù)執(zhí)行完后的時間。
到此為止,整個事件循環(huán)的流程就已經(jīng)分析完了。
后記
若文章有任何錯誤,望大家不吝指教:)
參考
http://www.infoq.com/cn/articles/netty-reliability
《Java 并發(fā)編程的藝術(shù)》