一、EventLoopGroup功能概述
EventLoopGroup是netty中一個比較核心的組件,想要知道EventLoopGroup的功能,我們先看一下EventLoopGroup的類圖關(guān)系:

Exector是java的JUC包中定義的一個接口,我們可以看一下具體定義:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
從代碼的注釋中我們可以看到,Exector定義的execute方法的作用。即執(zhí)行提交的任務(wù)。執(zhí)行可以是在一個的線程中、或者在線程池中、也可以在方法調(diào)用線程中。總的來說,Exector開放了提交任務(wù)執(zhí)行的能力。
接下來是ExecutorService接口。它繼承自Exector,新定義了以下方法:
public interface ExecutorService extends Executor {
/**
* 停止接受新提交的任務(wù),已經(jīng)提交的等待中的任務(wù)會執(zhí)行完成
*/
void shutdown();
/**
* 停止提交新的任務(wù),已經(jīng)提交的等待中的任務(wù)
* 也會停止等待,返回等待中的任務(wù)表
*/
List<Runnable> shutdownNow();
/**
* 返回執(zhí)行器是否被停止
*/
boolean isShutdown();
/**
* 返回執(zhí)行器在shutdown之后,是否所有的任務(wù)都被執(zhí)行完。
*shutdown()或者shutdownNow()被調(diào)用后才會終止。
*/
boolean isTerminated();
/**
* 阻塞直到所有的任務(wù)都執(zhí)行完,或者超時,或者線程受到interrupted信號
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交有返回值的任務(wù)
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一個任務(wù),任務(wù)成功Future返回給定的result
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個任務(wù),任務(wù)成功Future返回null
*/
Future<?> submit(Runnable task);
/**
* 執(zhí)行給定的任務(wù)列表,當(dāng)執(zhí)行完成后返回持有執(zhí)行狀態(tài)和執(zhí)行結(jié)果的Future列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 執(zhí)行給定的任務(wù)列表,當(dāng)執(zhí)行完或者超時后返回
*持有執(zhí)行狀態(tài)和執(zhí)行結(jié)果的Future列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 執(zhí)行給定的任務(wù)列表,如果所有任務(wù)都執(zhí)行成功,返回其中的一個結(jié)果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 執(zhí)行給定的任務(wù)列表,如果所有任務(wù)都在給定的時
* 間內(nèi)執(zhí)行成功,返回其中的一個結(jié)果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看到ExecutorService這個接口開放了提交有返回結(jié)果的任務(wù)的能力,同時開放了停止執(zhí)行器的能力。
接下來是ScheduledExecutorService:
public interface ScheduledExecutorService extends ExecutorService {
/**
* 創(chuàng)建并且執(zhí)行一個一次性的任務(wù),這個任務(wù)在給定的延遲事件之后才可以被執(zhí)行
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* 創(chuàng)建并且執(zhí)行一個帶有返回值的一次性的任務(wù),
* 這個任務(wù)在給定的延遲事件之后才可以被執(zhí)行
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* 創(chuàng)建并且執(zhí)行一個周期性的任務(wù),任務(wù)的開始時間是固定間隔的
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 創(chuàng)建并且執(zhí)行一個周期性的任務(wù),一個任務(wù)執(zhí)行完成到
* 下個任務(wù)執(zhí)行開始之間的時間間隔是固定的
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
我們可以看到,ScheduledExecutorService具有延遲執(zhí)行和周期執(zhí)行任務(wù)的能力。
接下來是EventExecutorGroup接口,EventExecutorGroup的方法比較多,我們這里只列出部分關(guān)鍵方法:
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
/**
* Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
* {@link EventExecutorGroup} have been terminated.
*/
Future<?> terminationFuture();
/**
* Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
EventExecutor next();
@Override
Iterator<EventExecutor> iterator();
}
我們可以看到EventExecutorGroup主要提供了兩個能力:一是優(yōu)雅停機的能力,優(yōu)雅停機這塊我們本篇先不去分析,放到接下來的筆記中去分析;第二個就是執(zhí)行器調(diào)度的能力,通過next()方法來返回下一個要執(zhí)行任務(wù)的EventExecutor。
最后就是EventLoopGroup了。我們看一下這個接口的代碼:
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
*/
@Override
EventLoop next();
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(ChannelPromise promise);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
*
* @deprecated Use {@link #register(ChannelPromise)} instead.
*/
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}
從上面的代碼可以看到,EventLoopGroup又新增了注冊Channel的方法。到這里為止,我們可以給EventLoopGroup下個定義:有優(yōu)雅停機功能、可以注冊Channel的事件執(zhí)行器。到現(xiàn)在我們只看了EventLoopGroup這個接口,也許會對這個接口的理解比較模糊,接下來我們就結(jié)合EventLoopGroup這個接口的一個實現(xiàn)類來看看EventLoopGroup這個接口在netty中究竟是扮演者一個什么角色。我們下面拿我們開發(fā)中比較常用的NioEventLoopGroup來具體分析一下。
從NioEventLoopGroup來看EventLoopGroup在netty中扮演的角色
我們看一下NioEventLoopGroup的部分內(nèi)容:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
/**
* Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
NioEventLoopGroup的內(nèi)容其實并沒有多少,邏輯大部分都在其父類之中。除了構(gòu)造方法之外,只有三個方法:
1、setIoRatio(int ioRatio)設(shè)置IO運行率,方法內(nèi)容也很簡單,就是遍歷自身持有的NioEventLoop對象,并且設(shè)置NioEventLoop的ioRatio參數(shù)。
2、rebuildSelectors()遍歷自身持有的NioEventLoop對象,調(diào)用NioEventLoop對象的rebuildSelector()方法。
3、EventLoop newChild(Executor executor, Object... args)創(chuàng)建一個NioEventLoop對象。這是一個父類的模板方法。
父類MultithreadEventLoopGroup是一個抽象類,這個抽象類實現(xiàn)了幾個重載的register方法。方法內(nèi)容都是調(diào)用父類的next()方法獲取自身的一個EventLoop對象,然后把需要注冊的Channel、ChannelPromise參數(shù)注冊到EventLoop。
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
next()方法就是實現(xiàn)EventLoopGroup對EventLoop調(diào)度的方法(EventLoop是真正執(zhí)行任務(wù)的執(zhí)行器,后面我們會說到)。next()方法是怎么實現(xiàn)對EventLoop調(diào)度的呢?我們可以看到這個方法調(diào)用了EventExecutorChooser的實現(xiàn)類的next()方法。netty默認(rèn)提供了兩個實現(xiàn)類,一個是GenericEventExecutorChooser,另一個是PowerOfTwoEventExecutorChooser,這兩個選擇器都是輪詢EventLoopGroup持有的EventLoop。這兩個選擇器都是輪詢EventLoop,有什么區(qū)別呢?我們從這兩個實現(xiàn)類可以看出netty對性能追求的極致之處:
// GenericEventExecutorChooser的選擇方法
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
// PowerOfTwoEventExecutorChooser的選擇方法
return executors[idx.getAndIncrement() & executors.length - 1];
GenericEventExecutorChooser是通用的輪詢方法;而PowerOfTwoEventExecutorChooser是專門用來處理當(dāng)EventLoop數(shù)量是2的次方數(shù)時的情況,用位運算取idx的低位(低log2 (executors.length) 位)。netty不會放過哪怕這一點點對性能的優(yōu)化!??!
我們繼續(xù)按圖索驥看一下MultithreadEventLoopGroup的父類MultithreadEventExecutorGroup。我們可以看到,MultithreadEventLoopGroup實現(xiàn)了停機的功能,不過都是調(diào)用的持有的EventExecutor的對應(yīng)方法,我們這里就不詳細分析了。MultithreadEventExecutorGroup最主要的邏輯在它的構(gòu)造方法中,構(gòu)造方法實現(xiàn)了對EventExecutor的初始化等工作,我們詳細看一下這塊內(nèi)容:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
首先,判斷參數(shù)傳入的Executor執(zhí)行器對象是否是空,如果是空,則初始化一個默認(rèn)的實現(xiàn)類ThreadPerTaskExecutor,我們從這個類的名字可以看出它的能力,每個任務(wù)一個線程,我們看它的實現(xiàn)也確實如此:
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
每次提交任務(wù)的時候,都會調(diào)用線程工廠創(chuàng)建一個線程來執(zhí)行任務(wù)!這個Executor對象是最終用來執(zhí)行任務(wù)的,不過卻不是EventLoopGroup調(diào)度的內(nèi)容。我們繼續(xù)往下看MultithreadEventExecutorGroup的構(gòu)造方法。初始化EventExecutor[] children這個成員變量,新建一個大小為nThreads的數(shù)組,然后循環(huán)為數(shù)組元素賦值。這個EventExecutor[] children才是EventLoopGroup真正調(diào)度的內(nèi)容。在填充數(shù)組內(nèi)容的時候,調(diào)用了子類的模板方法newChild(),在之前我們在NioEventLoopGroup中看到的newChild()方法就是這個模板方法的一個實現(xiàn)。
children數(shù)組填充完成之后,是初始化成員變量chooser的操作,chooser就是用來調(diào)度所有執(zhí)行器,我們上面已經(jīng)分析過了,這里不再贅述。
接下來是對EventExecutor[] children中的每個執(zhí)行器添加終止監(jiān)聽器,確保EventExecutor[] children中的所有執(zhí)行器都終止后,會調(diào)用設(shè)置成員變量terminationFuture的狀態(tài)。
最后是對readonlyChildren這個成員變量賦值,看這個變量的名字我們也能猜出來這個成員變量是children變量的只讀版本,事實也的確如此。
復(fù)盤
我們分析了NioEventLoop的核心源碼,整個內(nèi)容也許有點繞,我們這里再把NioEventLoop的類圖貼出來幫助大家理解:

我們可以看到,EventExecutorGroup以及其父接口,開放了任務(wù)提交、任務(wù)執(zhí)行、優(yōu)雅停機等能力。而實現(xiàn)類AbstractEventExecutorGroup這個抽象類實現(xiàn)了任務(wù)提交的基本方法,MultithreadEventExecutorGroup則實現(xiàn)了多線程任務(wù)調(diào)度的基本方法。子類MultithreadEventLoopGroup不僅繼承了MultithreadEventExecutorGroup這個抽象了,具備了多線程執(zhí)行調(diào)度的基本能力,而且還實現(xiàn)了EventLoopGroup接口,具備注冊Channel的能力。其對外開放的變化點僅僅有創(chuàng)建執(zhí)行器這個功能。我們繼承MultithreadEventExecutorGroup僅僅可以通過實現(xiàn)
newChild這個抽象方法來定制自己的EventLoop事件處理器(不考慮方法重寫)。最后,NioEventLoopGroup繼承了MultithreadEventExecutorGroup,通過實現(xiàn)newChild方法,來指定事件處理器是NioEventLoop,下篇我們會分析NioEventLoop的功能。
到這里我們基于NioEventLoopGroup分析了EventLoopGroup的基本能力。EventLoopGroup可以注冊Channel,然后具有任務(wù)執(zhí)行、選擇執(zhí)行器去執(zhí)行任務(wù),也就是執(zhí)行器調(diào)度的能力。也許現(xiàn)在大家對EventLoopGroup的功能還是比較模糊,不過沒關(guān)系,加下來我們會繼續(xù)分析EventLoop、BootStrap等關(guān)鍵組件。等分析完這些內(nèi)容,大家對netty的理解就會更清晰。