前言
Netty源碼(一)Netty架構(gòu)解析分析了netty的基本原理和工作流程,其中EventLoop是netty中最核心的組件,本文將會(huì)分析EventLoop的主要設(shè)計(jì)思想,至于具體的代碼細(xì)節(jié)下一篇會(huì)具體分析。本文將主要分析NIOEventLoop基于4.1.49.Final版本。
核心思想
- 單線(xiàn)程
之前介紹過(guò)EventLoop在它的生命周期內(nèi)只和一個(gè) Thread 綁定,一個(gè)EventLoop管理一個(gè)或者多個(gè)Channel。為什么是單線(xiàn)程?首先多線(xiàn)程編程是很難很麻煩的事情,而且為了保證線(xiàn)程安全加鎖或者CAS都會(huì)影響執(zhí)行效率,某些場(chǎng)景下線(xiàn)程數(shù)量越多導(dǎo)致的效率還會(huì)更低。采用單線(xiàn)程模型第一不用考慮線(xiàn)程安全,第二沒(méi)有線(xiàn)程上下文切換帶來(lái)的損耗。 - 執(zhí)行流程
與EventLoop所綁定的Channel的事件都由EventLoop所擁有的Thread處理,實(shí)現(xiàn)的核心流程如下:
//當(dāng)前調(diào)用線(xiàn)程是否EventLoop所屬的Thread
if (executor.inEventLoop()) {
//do something
} else {
executor.execute(new Runnable() {
@Override
public void run() {
//do something
}
});
如果當(dāng)前調(diào)用線(xiàn)程正是支撐 EventLoop 的線(xiàn)程,那么所提交的任務(wù)會(huì)直接執(zhí)行。否則,EventLoop 將調(diào)度該任務(wù)以便稍后執(zhí)行,并將它放入到內(nèi)部隊(duì)列中。EventLoop下次處理它的事件時(shí),它會(huì)執(zhí)行隊(duì)列中的那些任務(wù)。這樣的任務(wù)執(zhí)行流程保證,Channel上的所有任務(wù)都是由它所注冊(cè)的EventLoop所擁有的Thread執(zhí)行,不會(huì)出現(xiàn)并發(fā)問(wèn)題。
構(gòu)造參數(shù)
NioEventLoop構(gòu)造參數(shù):
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
- parent:當(dāng)前EventLoop所屬的NioEventLoopGroup;
- executor:為EventLoop創(chuàng)建線(xiàn)程,并且執(zhí)行管理該線(xiàn)程生命周期的任務(wù),默認(rèn)為T(mén)hreadPerTaskExecutor;
- selectorProvider:NIO中selector的幫助類(lèi);
- selectStrategy:對(duì)selector.select()提供選擇策略;
- queueFactory:創(chuàng)建存放任務(wù)隊(duì)列的工廠(chǎng)。
從構(gòu)造參數(shù)可以大概知道NioEventLoop的職責(zé),前面提過(guò)一個(gè)NioEventLoopGroup管理一個(gè)或者多個(gè)NioEventLoop,parent指向NioEventLoop所屬的NioEventLoopGroup。
NioEventLoop的Thread由executor去創(chuàng)建管理,這個(gè)thread只做三件事情:
- select: 返回EventLoop所持有的selector上已經(jīng)準(zhǔn)備就緒的Channel;
- processSelectedKeys: 處理準(zhǔn)備就緒的IO操作;
- ranTasks: 執(zhí)行隊(duì)列里的任務(wù);
由selectorProvider提供IO操作,queueFactory創(chuàng)建隊(duì)列存放待執(zhí)行的任務(wù)。
Executor

可見(jiàn)EventLoop繼承自Executor,Executor的核心思想是將任務(wù)調(diào)用,和任務(wù)自身的執(zhí)行邏輯給分離開(kāi),使用Executor去執(zhí)行任務(wù),調(diào)用方只需要關(guān)注自身任務(wù)的邏輯,把任務(wù)執(zhí)行交給Executor去處理,將職責(zé)分離開(kāi)。
JDK中自帶的線(xiàn)程池采用池化的思想,基本思路是從池中取一個(gè)空閑的線(xiàn)程去執(zhí)行任務(wù),執(zhí)行完之后將線(xiàn)程返回空閑列表,讓其可以重復(fù)使用。因?yàn)榫€(xiàn)程是很珍貴的資源,采用池化的技術(shù)可以減少線(xiàn)程創(chuàng)建、銷(xiāo)毀的損耗,但不能減少線(xiàn)程上下文切換所帶來(lái)的消耗,隨著線(xiàn)程數(shù)量的增加這種消耗會(huì)更多。所以EventLoop采用單線(xiàn)程的模型去實(shí)現(xiàn)Executor避免多線(xiàn)程的上下文切換,和多線(xiàn)程的并發(fā)編程問(wèn)題。
Feature
Netty中所有操作都是異步進(jìn)行的,實(shí)現(xiàn)異步編程很關(guān)鍵的對(duì)象就是Feature,因?yàn)楫惒讲僮鞫际橇⒓捶祷氐牟粫?huì)等待任務(wù)完成,所以需要有一種機(jī)制去表示異步執(zhí)行的結(jié)果 ,這種機(jī)制就是Feature。
JDK Feature
public interface Future<V> {
//取消任務(wù)
boolean cancel(boolean mayInterruptIfRunning);
//任務(wù)是否取消
boolean isCancelled();
//任務(wù)是否執(zhí)行完成
boolean isDone();
//阻塞獲取執(zhí)行結(jié)果
V get() throws InterruptedException, ExecutionException;
//指定時(shí)間內(nèi)獲取執(zhí)行結(jié)果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看出jdk的Feature接口提供了對(duì)異步執(zhí)行結(jié)果的幾個(gè)操作,取消、獲取結(jié)果、判斷是否完成等。但是這有個(gè)缺點(diǎn)就是調(diào)用方其實(shí)不知道任務(wù)什么時(shí)候結(jié)束,沒(méi)有一種機(jī)制去通知,只有自己不斷的去檢測(cè)或者阻塞等待執(zhí)行完成,這都不是很好的方法。所以netty自己實(shí)現(xiàn)了一套Feature,相比jdk的Feature對(duì)重要的就是提供了一種通知機(jī)制。
Netty Feature
public interface Future<V> extends java.util.concurrent.Future<V> {
//省略其他方法
......
//添加監(jiān)聽(tīng)器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
//添加多個(gè)監(jiān)聽(tīng)器
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//移除監(jiān)聽(tīng)器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
//移除多個(gè)監(jiān)聽(tīng)器
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
}
Netty的Feature與jdk自帶的Feature,最大的不同就是Listener。使用是一種觀(guān)察者模式,將執(zhí)行結(jié)果通知出去。
GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
GenericFutureListener只有一個(gè)operationComplete方法,通過(guò)實(shí)現(xiàn)GenericFutureListener接口的operationComplete調(diào)用方就能知道異步執(zhí)行的結(jié)果。
MpscUnboundedArrayQueue
前面提到,NioEventLoop中有一個(gè)用于存放任務(wù)的隊(duì)列,如果沒(méi)有指定特定的queueFactory,默認(rèn)為MpscUnboundedArrayQueue。
MpscUnboundedArrayQueue是JCTools所提供的的一種隊(duì)列,JCTools提供了幾種目前jdk沒(méi)有實(shí)現(xiàn)的隊(duì)列MpscUnboundedArrayQueue就是其中一種。從名字可知(MPSC - Multi Producer Single Consumer )多生產(chǎn)者單消費(fèi)者,是一種使用于多生產(chǎn)者單消費(fèi)者場(chǎng)景的隊(duì)列。而EventLoop正符合這種多生產(chǎn)者單消費(fèi)者的場(chǎng)景,所以采用MpscUnboundedArrayQueue隊(duì)列來(lái)存放任務(wù)。
下面說(shuō)一下MpscUnboundedArrayQueue的幾個(gè)好處,具體細(xì)節(jié)就不深入研究了
1.緩存行填充
為什么要有緩存行填充?首先要知道“偽共享”的概念,偽共享和 CPU 內(nèi)部的 Cache 有關(guān),Cache 內(nèi)部是按照緩存行(Cache Line)管理的,緩存行的大小通常是 64 個(gè)字節(jié)。CPU加載數(shù)據(jù)是按照一個(gè)緩存行為單位加載的。
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
如上圖如果出隊(duì)索引和入隊(duì)索引用32位的int類(lèi)型,CPU會(huì)把takeIndex、putIndex加載到一個(gè)緩存行。當(dāng)一個(gè)入隊(duì)操作的時(shí)候會(huì)修改putIndex,導(dǎo)致整個(gè)緩存行失效,需要重新加載到緩存中,但是入隊(duì)操作并不會(huì)修改takeIndex,由于putIndex與takeIndex共享同一個(gè)緩存行,所以也會(huì)導(dǎo)致takeIndex失效,這種情況就是偽共享。MpscUnboundedArrayQueue采用讓變量獨(dú)占緩存行的形式解決偽共享的問(wèn)題。
abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
}
-
無(wú)鎖
首先offer方法,這里只分析關(guān)鍵部分
while (true)
{
//省略代碼
......
//cas設(shè)置生產(chǎn)者索引
if (casProducerIndex(pIndex, pIndex + 2))
{
break;
}
}
// INDEX visible before ELEMENT
final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
//放置元素
soRefElement(buffer, offset, e); // release element e
return true;
}
offer方法采用cas的方式去放入元素,而poll由于是單線(xiàn)程消費(fèi),無(wú)需加鎖和cas操作。
EventLoopGroup
EventLoopGroup是對(duì)EventLoop進(jìn)行管理,那么EventLoopGroup是如何對(duì)EventLoop進(jìn)行管理的呢?先看一個(gè)簡(jiǎn)單的execute方法
@Override
public void execute(Runnable command) {
next().execute(command);
}
是調(diào)用next返回一個(gè)對(duì)象后執(zhí)行的execute方法,跟進(jìn)去看一下next方法
/**
* Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
EventExecutor next();
返回一個(gè)EventExecutorGroup所管理的EventExecutor對(duì)象,對(duì)于EventLoopGroup來(lái)說(shuō)next返回的就是所管理的EventLoop對(duì)象,next方法就是EventLoopGroup管理的EventLoop關(guān)鍵方法。
最后找一下next的實(shí)現(xiàn),在MultithreadEventExecutorGroup類(lèi)中
@Override
public EventExecutor next() {
return chooser.next();
}
chooser變量
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
最后再看一下EventExecutorChooserFactory的實(shí)現(xiàn),只有一個(gè)實(shí)現(xiàn)類(lèi)EventExecutorChooserFactory
/**
* Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
*/
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
很簡(jiǎn)單的實(shí)現(xiàn),按照管理executors數(shù)量的奇偶數(shù)返回不同的策略,核心邏輯就是循環(huán)依次返回executors中的executor。
總結(jié)
本文主要分析了Netty的線(xiàn)程模型EventLoop,最重要的要關(guān)注的點(diǎn)就是EventLoop是單線(xiàn)程模型,所有提交給EventLoop都需要判斷調(diào)用線(xiàn)程是否為EventLoop所持有的thread,如果是直接執(zhí)行,否則添加到EventLoop的任務(wù)隊(duì)列里面。這一部分個(gè)人感覺(jué)是netty中設(shè)計(jì)很精妙的一點(diǎn),如果要使用多線(xiàn)程就用EventLoopGroup去管理EventLoop,EventLoopGroup中的EventLoop又互相隔離,互不影響也不會(huì)出現(xiàn)線(xiàn)程安全的問(wèn)題。
本文主要分析了EventLoop的繼承關(guān)系和一些設(shè)計(jì)思想,并沒(méi)有具體去分析一些代碼細(xì)節(jié),下篇文章會(huì)具體分析EventLoop的thread所負(fù)責(zé)的三件事情:
- select: 返回EventLoop所持有的selector上已經(jīng)準(zhǔn)備就緒的Channel;
- processSelectedKeys: 處理準(zhǔn)備就緒的IO操作;
- ranTasks: 執(zhí)行隊(duì)列里的任務(wù)。