在眾多的編程語言和網(wǎng)絡(luò)庫中,拿來介紹網(wǎng)絡(luò)編程的例子,echo服務(wù)器和客戶端恐怕是最多的一個例子。netty作為一個在java語言中應(yīng)用非常廣泛、非常優(yōu)秀的網(wǎng)絡(luò)編程框架,echo服務(wù)器和客戶端程序往往是大家第一個接觸的實例程序。很多工程師正是通過echo服務(wù)器和客戶端跨入netty的大門。
我們先看一個echo服務(wù)器的完整實例代碼:
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
// 把ServerBootstrap的channelfactory設(shè)置為ReflectiveChannelFactory
// 當(dāng)執(zhí)行channelfactory的newChannel方法時,會創(chuàng)建NioServerSocketChannel實例
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
// 處理server Socket事件
.handler(new LoggingHandler(LogLevel.INFO))
// 處理client socket事件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
代碼中前面一段跟SSL相關(guān)的代碼我們先跳過,從這兩行代碼開始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
這里創(chuàng)建了兩個EventLoopGroup對象bossGroup和workerGroup,bossGroup主要用來處理server socket監(jiān)聽client socket的連接請求,server socket接收了新建連接后,會把connection socket放到workerGroup中進(jìn)行處理,也就是說workerGroup主要用來處理connection socket的網(wǎng)絡(luò)IO和相關(guān)的業(yè)務(wù)邏輯。netty支持很多種線程模型,這種bossGroup和workerGroup的組合是比較典型的。
netty作為一個異步的事件驅(qū)動(event drivern)的網(wǎng)絡(luò)應(yīng)用程序框架,采用非阻塞的多路復(fù)用的網(wǎng)絡(luò)io模型。連接建立、連接關(guān)閉、socket可讀、socket可寫這些狀態(tài)的改變稱為一個個“事件(event)”,當(dāng)然事件也可以是應(yīng)用程序添加的一個任務(wù)。而處理這些event的線程就是一個無限循環(huán)(for or while loop),沒有事件發(fā)生時,線程一直等待事件的產(chǎn)生。當(dāng)有事件發(fā)生時,則處理所有的事件,處理完后則開始下一個循環(huán),繼續(xù)等待新的事件的發(fā)生。所以這種處理邏輯叫做eventloop,eventloop與線程是一一對應(yīng)的,一個eventloop對應(yīng)一個線程,一個線程也只有一個eventloop。而eventloopgroup主要包含了一組eventloop(線程池),當(dāng)然還有許多線程任務(wù)分發(fā)等管理功能。
eventloopgroup/eventloop是netty的核心部件之一,可以說是netty三大核心部件之一,另外兩個是channel和pipeline。eventloopgroup/eventloop定義了netty的線程模型,包括channel對socket的操作、pipeline的業(yè)務(wù)處理、用戶提交的任務(wù)在內(nèi)的所有任務(wù)的分發(fā)調(diào)度和執(zhí)行都是在eventloopgroup/eventloop中進(jìn)行。
這里為eventloopgroup接口實例化的是類NioEventLoopGroup,NioEventLoopGroup基于java nio進(jìn)行網(wǎng)絡(luò)操作。
我們跟蹤一下NioEventLoopGroup的構(gòu)造函數(shù),
NioEventLoopGroup
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
然后進(jìn)入他的父類:
MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
繼續(xù)進(jìn)入上一級父類:
MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
}
這里進(jìn)入了一個有實質(zhì)內(nèi)容的構(gòu)造函數(shù),在分析這個函數(shù)之前,我們先根據(jù)構(gòu)造函數(shù)的調(diào)用鏈?zhǔn)崂硪幌聵?gòu)造函數(shù)的入?yún)⒌闹怠?/p>
nThreads = (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads);
executor = null;
chooserFactory = DefaultEventExecutorChooserFactory.INSTANCE;
args = [SelectorProvider SelectorProvider.provider(),
SelectStrategyFactory DefaultSelectStrategyFactory.INSTANCE,
RejectedExecutionHandler RejectedExecutionHandlers.reject()]
可見如果NioEventLoopGroup的構(gòu)造函數(shù)如果nThreads為非0值,則為傳入的實際值,如果為0或沒有參數(shù),則為DEFAULT_EVENT_LOOP_THREADS,我們看下DEFAULT_EVENT_LOOP_THREADS的定義,
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
繼續(xù)跟下去,可以知道NettyRuntime.availableProcessors()默認(rèn)值為Runtime.getRuntime().availableProcessors(),也就是說nThreads值默認(rèn)為CPU核心數(shù)的2倍
接下來我們分析構(gòu)造函數(shù):
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
首先為executor重新賦值:
if (executor == null) {
// DefaultThreadFactory設(shè)置: 非daemon、線程優(yōu)先級5、線程名字、線程組為當(dāng)前線程所屬組
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
因為executor入?yún)⒅禐閚ull,所以executor重新賦值為ThreadPerTaskExecutor類的一個新建實例,ThreadPerTaskExecutor就是eventloop的執(zhí)行器,eventloop的所有任務(wù)都是通過調(diào)用ThreadPerTaskExecutor的execute()函數(shù),從而創(chuàng)建一個線程,進(jìn)而執(zhí)行提交的任務(wù)的。ThreadPerTaskExecutor只有一個成員,線程工廠DefaultThreadFactory,這個線程工廠創(chuàng)建的線程實例設(shè)置的線程屬性為:非daemon、線程優(yōu)先級5、線程名的前綴、所屬的線程組與當(dāng)前線程相同,一句話,創(chuàng)建的是一個普通線程,其中線程名的前綴為:
nioEventLoopGroup-poolId-, poolId初始值為0,每創(chuàng)建一個DefaultThreadFactory實例值加1。
然后初始化EventLoopGroup的線程池,具體代碼如下:
// 創(chuàng)建一個數(shù)組,數(shù)組大小為線程的個數(shù)
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 初始化線程executor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果初始化一個線程executor時拋出異常,則執(zhí)行這一段代碼:關(guān)閉前面創(chuàng)建的所有executor,并等待所有線程退出運行
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
初始化過程,在代碼注釋中已經(jīng)說明了,關(guān)鍵語句children[i] = newChild(executor, args);我們先放下,待會再詳細(xì)分析。
線程選擇器初始化:
chooser = chooserFactory.newChooser(children);
chooser實現(xiàn)了eventloopgroup的任務(wù)分派,當(dāng)需要向線程池提交一個新的任務(wù)時,如注冊一個新的服務(wù)端socket到eventloop,則通過chooser選擇一個具體的executor。從前面分析得知,chooserFactory是DefaultEventExecutorChooserFactory的一個實例,它的executor選擇邏輯代碼如下:
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
// executor數(shù)是2的n次方,用與來計算,改善性能
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
// executor數(shù)不是2的n次方,用除法求余的方式計算
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
可見,線程池中的個數(shù)是2的n次方與不是2的n次方,具體的計算方法不一樣,但效果都是round-robin的選擇方法。
最后,為線程池的每一個EventExcutor注冊一個結(jié)束后的回調(diào)函數(shù),并把所有的EventExcutor保存到一個不可修改的Set中。具體代碼如下:
// 線程結(jié)束時的回調(diào)函數(shù)
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 注冊回調(diào)函數(shù)
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
這個構(gòu)造函數(shù)分析完了,下面分析eventloop以及它的初始化,也就是前面遺留沒有分析的children[i] = newChild(executor, args); newChild是一個抽象函數(shù):
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
我們先看看類NioEventLoopGroup的繼承體系:

然后從NioEventLoopGroup開始往上找哪個最底層的類實現(xiàn)了newChild()函數(shù)。
然后在類NioEventLoopGroup中找到了它的實現(xiàn),
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可見EventExecutor是NioEventLoop的一個實例,按照分析NioEventLoopGroup一樣的分析過程,這里我不在粘貼具體代碼了,只列出NioEventLoop的關(guān)鍵域的初始值,
NioEventLoop的繼承體系:

關(guān)鍵域的初始值及說明:
SelectorProvider provider = SelectorProvider.provider()
Selector selector = provider.openSelector()
SelectStrategy selectStrategy = DefaultSelectStrategyFactory.INSTANCE
boolean addTaskWakesUp = false // 添加任務(wù)時不wakeup線程eventloop
int maxPendingTasks = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); // 最小值為16,默認(rèn)值為Integer.MAX_VALUE
Queue<Runnable> tailTasks = new LinkedBlockingQueue<Runnable>(maxPendingTasks)
Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks)
EventExecutorGroup parent = 執(zhí)行newChild函數(shù)的NioEventLoopGroup實例
Executor executor = ThreadPerTaskExecutor的實例
這里特別說明一下provider和selector。
provider在不同的操作系統(tǒng)平臺下有不同的實現(xiàn),windowns平臺是WindowsSelectorProvider。在linux平臺下,如果內(nèi)核版本>=2.6則,具體的實現(xiàn)為EPollSelectorProvider,否則為默認(rèn)的PollSelectorProvider。MAC平臺下則是KQueueSelectorProvider。
selector是netty的一個核心概念,封裝了不同平臺的多路復(fù)用網(wǎng)絡(luò)io模式,eventloop正是通過向selector注冊感興趣的事件(event),然后等待事件發(fā)生,從而實現(xiàn)非阻塞異步處理事件的目的。在linux平臺下,底層通過epoll實現(xiàn),執(zhí)行provider.openSelector()可以理解為執(zhí)行了epoll_init()系統(tǒng)調(diào)用,會返回一個epoll句柄給selector,然后通過selector可以向epoll注冊感興趣的事件,并在事件發(fā)生時獲取發(fā)生的事件,然后對事件進(jìn)行相應(yīng)的處理。