最近一直在看Netty方面的資料,發(fā)現(xiàn)在某寶上面賣得最好的關(guān)于Netty的書是《Netty實(shí)戰(zhàn)》,恕我直言,這本翻譯自《Netty in Action》的中文版對(duì)于想入門的同學(xué)來(lái)說(shuō)真的不太好??赐曛蠖疾恢涝谥v些什么,讓人摸不著頭腦。還是建議大家去看英文原版。我個(gè)人推薦通過(guò)幾個(gè)簡(jiǎn)單的Demo,模仿別人的代碼多造“輪子”,分析Netty的源碼,最后結(jié)合《Netty精髓》,才能更好地入門,當(dāng)然如果想精通還是要運(yùn)用到實(shí)際項(xiàng)目中。
什么是Netty?為什么要用Netty?怎么安裝Netty?這些問(wèn)題大家可以在網(wǎng)上搜索到答案,這邊不再多說(shuō)了。這篇文章主要介紹如何寫一個(gè)支持Http協(xié)議的Server端并對(duì)源碼進(jìn)行分析。
服務(wù)端代碼
public class TestServer {
public static void main(String[] args) {
//(1)創(chuàng)建兩個(gè)NIO線程組。bossGroup負(fù)責(zé)接收客戶端的連接,workerGroup負(fù)責(zé)網(wǎng)絡(luò)讀寫
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//(2) serverBootstrap 是NIO服務(wù)端的輔助啟動(dòng)類,可以降低服務(wù)端的開(kāi)發(fā)難度
ServerBootstrap serverBootstrap = new ServerBootstrap();
//(3) group將線程組作為參數(shù)加入到serverBootstrap,設(shè)置channel類似JDK中的ServerSocketChannel類
// 綁定channel初始化類TestServerInitializer
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
childHandler(new TestServerInitializer());
try {
//(4) server啟動(dòng)類配置完成之后 調(diào)用bind綁定監(jiān)聽(tīng)端口,sync同步等待綁定操作完成
// 綁定成功之后返回ChannelFuture類似JDK中的Future,用于異步操作回調(diào)通知
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
//(5) 等待服務(wù)端鏈路關(guān)閉之后main函數(shù)退出
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//(6) 優(yōu)雅的關(guān)閉兩個(gè)線程池,釋放線程資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
上面是一個(gè)簡(jiǎn)單的Server代碼,每行代碼都有注釋,如果不理解這些注釋,沒(méi)關(guān)系下面對(duì)代碼中的主要類源碼進(jìn)行分析。
EventLoopGroup源碼分析
EventLoopGroup 主要有兩個(gè)作用:注冊(cè)channel 以及 執(zhí)行Runnable任務(wù)。
EventLoopGroup的源代碼如下

下面是EventLoopGroup的整體類圖,可以看到NioEventLoopGroup繼承自MultithreadEventLoopGroup,而MultithreadEventLoopGroup實(shí)現(xiàn)了EventLoopGroup。

繼續(xù)看MultithreadEventLoopGroup源碼發(fā)現(xiàn)他繼承MultithreadEventExecutorGroup。該類的構(gòu)造方法如下

可以看到MultithreadEventLoopGroup 主要負(fù)責(zé)封裝線程數(shù)組,子類中newChild負(fù)責(zé)具體的初始化。
再看一下子類NioEventLoopGroup中的newChild方法如下。

經(jīng)過(guò)上述EventLoopGroup源代碼分析我們可以得下面的EventLoopGroup與EventLoop的關(guān)系。

EventLoop源碼分析
EventLoop負(fù)責(zé)監(jiān)聽(tīng)Channel讀寫事件以及注冊(cè)Channel。我們知道Linux下有三種事件監(jiān)控方法:select、poll、epoll。對(duì)應(yīng)到Netty中有兩種類:NioEventLoop 和 EpollEventLoop。前者使用的是JDK Selector接口實(shí)現(xiàn)Channel事件檢測(cè)(poll方式),而后者是Netty自己實(shí)現(xiàn)epoll對(duì)Channel事件檢測(cè)。
下面重點(diǎn)介紹介紹一個(gè)NioEventLoop。
NioEventLoop繼承自SingleThreadEventExecutor。SingleThreadEventExecutor有如下代碼:
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
可以看到SingleThreadEventExecutor包含了一個(gè)線程和一個(gè)隊(duì)列。NioEventLoop 既要執(zhí)行Selector 帶過(guò)來(lái)IO事件,還要執(zhí)行隊(duì)列中的非IO事件。
下面看一下NioEventLoop中線程執(zhí)行邏輯。
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
1、 首先switch判斷是否有需要執(zhí)行select過(guò)程,select過(guò)程之后討論。
2、根據(jù)ioRatio(比較IO任務(wù)的時(shí)間占比)來(lái)執(zhí)行IO任務(wù)和非IO任務(wù)。如果ioRatio=100,表示執(zhí)行全部的IO任務(wù)。默認(rèn)ioRatio=50,表示一半時(shí)間執(zhí)行IO任務(wù),另外一半時(shí)間執(zhí)行非IO任務(wù)。如何控制非IO任務(wù)的時(shí)間呢?
在runAllTasks函數(shù)中每執(zhí)行64個(gè)任務(wù)會(huì)判斷是否超時(shí)。
接著我們?cè)倏?code>select 函數(shù)做了什么
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 1.定時(shí)任務(wù)截至事時(shí)間快到了,中斷本次輪詢
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
//2.輪詢過(guò)程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 3.阻塞式select操作
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
// 4.解決jdk的nio bug
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
1、計(jì)算select過(guò)程可以執(zhí)行到的時(shí)間點(diǎn)selectDeadLineNanos
2、把時(shí)間點(diǎn)轉(zhuǎn)化成毫秒,如果時(shí)間很短,即timeoutMillis<=0,并且是第一次執(zhí)行 selectCnt==0,那么執(zhí)行selectNow,該函數(shù)返回已經(jīng)準(zhǔn)備好IO操作的select key 集合
3、如果有任務(wù)需要執(zhí)行,那么就跳出for 循環(huán)
4、執(zhí)行selector.select(timeoutMillis),如果有事件那么跳出for 循環(huán),如果沒(méi)有事件發(fā)生,那么會(huì)進(jìn)入下一個(gè)循環(huán)直到timeoutMillis小于0,跳出循環(huán)。 這一步同時(shí)會(huì)對(duì)selectCnt變量加1操作,SELECTOR_AUTO_REBUILD_THRESHOLD 默認(rèn)為512,當(dāng)selectCnt超過(guò)這個(gè)閥值時(shí)就會(huì),重新建立新的Selector,把原來(lái)的Channel遷移到新的Selector 上。為什么要設(shè)立一個(gè)selectCnt變量呢?因?yàn)镴DK nio可能會(huì)有Bug,具體可以參照http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055,這個(gè)bug會(huì)導(dǎo)致select 空輪詢,在執(zhí)行select操作時(shí)如果沒(méi)有事件發(fā)生,直接返回,沒(méi)有等待timeoutMillis, 這是一次空輪詢,selectCnt加1。
下圖是NioEventLoop的工作示意圖。

bind過(guò)程分析
ServerBootstrap 繼承自AbstractBootstrap,AbstractBootstrap的bind過(guò)程如下圖。

其中doBind的代碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
1、initAndRegister返回ChannelFuture實(shí)例regFuture,以此來(lái)判斷initAndRegister 是否執(zhí)行完畢。
- 如果執(zhí)行完畢那么調(diào)用doBind0進(jìn)行socket綁定
- 否則添加listener監(jiān)聽(tīng)器,當(dāng)initAndRegister完成時(shí),再調(diào)用doBind0進(jìn)行綁定
2、initAndRegister函數(shù)會(huì)創(chuàng)建NioServerSocketChannel,主要做一些初始化的工作,包括pipeline添加handler,把channel注冊(cè)到seletor上。
3、回到ServerBootstrap類,ServerBootstrap的init(Channel channel)方法,會(huì)添加handler到channel的pipeline中該handler就是ServerBootstrapAcceptor。代碼如下:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
4、如何把channel注冊(cè)到selector上呢?EventLoopGroup bossGroup會(huì)選擇內(nèi)部的一個(gè)EventLoop來(lái)執(zhí)行實(shí)際的注冊(cè)行為然后開(kāi)始將該Channel注冊(cè)到上述EventLoopGroup bossGroup。注冊(cè)成功后會(huì)執(zhí)行上述的initChannel方法。
bind過(guò)程結(jié)束,當(dāng)Selector檢測(cè)到NioServerSocketChannel有新的連接事件時(shí),就會(huì)交給NioServerSocketChannel的ChannelPipeline中的ServerBootstrapAcceptor處理。
ServerBootstrapAcceptor做兩件事:
- 為新的Channel的ChannelPipeline配置我們上述代碼中的childHandler指定的ChannelHandler
- 將新的Channel注冊(cè)到了上述EventLoopGroup workerGroup中
sync介紹
bind方法是異步方法,返回ChannelFuture,sync可以等待該異步過(guò)程結(jié)束。每一個(gè)ChannelFuture都是和一個(gè)Channel綁定的,而每一個(gè)ChannelFuture又有一個(gè)closeFuture方法,然后調(diào)用sync方法等待ChannelFuture的結(jié)束,只有當(dāng)Channel 關(guān)閉,closeFuture才會(huì)被調(diào)用,一般正常情況下不會(huì)被調(diào)用,所以主線程會(huì)一直阻塞在sync方法上。
注意點(diǎn)
Reactor模型中可以通過(guò)多個(gè)Acceptor線程加快accept操作,我們是否可以增大bossGroup線程數(shù)來(lái)加快accept呢?答案是否,沒(méi)有效果因?yàn)閎ossGroup中多線程是為了綁定多個(gè)端口,ServerSocketChannel創(chuàng)建后會(huì)綁定到bossGroup中的一個(gè)Eventloop, 由他來(lái)負(fù)責(zé)accept操作。
Tomocat 6開(kāi)始也支持NIO模型,可以開(kāi)啟多個(gè)Acceptor線程,可以參照這篇文章
參考文章:
http://www.itdecent.cn/p/e577803f0fb8
https://my.oschina.net/pingpangkuangmo/blog/742929