一、Reactor線程
??源碼基于4.1.6.Final版本。
1.1 Reactor線程啟動(dòng)
??NioEventLoop的run方法是reactor線程的主體,在第一次添加任務(wù)的時(shí)候被啟動(dòng)。
- 入口:NioEventLoop父類SingleThreadEventExecutor的execute方法
@Override
public void execute(Runnable task) {
...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
...
}
- netty的reactor線程在添加一個(gè)任務(wù)的時(shí)候被創(chuàng)建,該線程實(shí)體為FastThreadLocalThread。
- 最后線程執(zhí)行主體為NioEventLoop的run方法。
1.2 Reactor線程執(zhí)行
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
processSelectedKeys();
runAllTasks(...);
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}

- 輪詢注冊到reactor線程對應(yīng)的selector上的所有的channel的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
定時(shí)任務(wù)截止事時(shí)間快到了,中斷本次輪詢
輪詢過程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
阻塞式select操作
??netty會(huì)在每次進(jìn)行selector.select(timeoutMillis)之前記錄一下開始時(shí)間currentTimeNanos,在select之后記錄一下結(jié)束時(shí)間,判斷select操作是否至少持續(xù)了timeoutMillis秒,如果持續(xù)的時(shí)間大于等于timeoutMillis,說明就是一次有效的輪詢,重置selectCnt標(biāo)志,否則,表明該阻塞方法并沒有阻塞這么長時(shí)間,可能觸發(fā)了jdk的空輪詢bug,當(dāng)空輪詢的次數(shù)超過一個(gè)閥值的時(shí)候,默認(rèn)是512,就開始重建selector。
- 處理產(chǎn)生網(wǎng)絡(luò)IO事件的channel
processSelectedKeys();
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
取出IO事件以及對應(yīng)的netty channel類
處理該channel
判斷是否該再來次輪詢
對于boss NioEventLoop來說,輪詢到的是基本上就是連接事件,后續(xù)的事情就通過他的pipeline將連接扔給一個(gè)worker NioEventLoop處理
對于worker NioEventLoop來說,輪詢到的基本上都是io讀寫事件,后續(xù)的事情就是通過他的pipeline將讀取到的字節(jié)流傳遞給每個(gè)channelHandler來處理SelectedSelectionKeySet
??netty使用數(shù)組替換掉jdk原生的HashSet來保證IO事件的高效處理,每個(gè)SelectionKey上綁定了netty類AbstractChannel對象作為attachment,在處理每個(gè)SelectionKey的時(shí)候,就可以找到AbstractChannel,然后通過pipeline的方式將處理串行到ChannelHandler,回調(diào)到用戶方法。
- 處理任務(wù)隊(duì)列
用戶自定義普通任務(wù)
非當(dāng)前reactor線程調(diào)用channel的各種方法
用戶自定義定時(shí)任務(wù):1)若干時(shí)間后執(zhí)行一次 2)每隔一段時(shí)間執(zhí)行一次 3)每次執(zhí)行結(jié)束,隔一定時(shí)間再執(zhí)行一次
??taskQueue在NioEventLoop中默認(rèn)是mpsc隊(duì)列,mpsc隊(duì)列,即多生產(chǎn)者單消費(fèi)者隊(duì)列,netty使用mpsc,方便的將外部線程的task聚集,在reactor線程內(nèi)部用單線程來串行執(zhí)行。
??reactor線程task調(diào)度:
- 從scheduledTaskQueue轉(zhuǎn)移到期的定時(shí)任務(wù)到taskQueue(mpsc queue)
- 計(jì)算本次任務(wù)循環(huán)的截止時(shí)間
- 執(zhí)行任務(wù)
- 收尾
當(dāng)前reactor線程調(diào)用當(dāng)前eventLoop執(zhí)行任務(wù),直接執(zhí)行,否則,添加到任務(wù)隊(duì)列稍后執(zhí)行
netty內(nèi)部的任務(wù)分為普通任務(wù)和定時(shí)任務(wù),分別落地到MpscQueue和PriorityQueue
netty每次執(zhí)行任務(wù)循環(huán)之前,會(huì)將已經(jīng)到期的定時(shí)任務(wù)從PriorityQueue轉(zhuǎn)移到MpscQueue
netty每隔64個(gè)任務(wù)檢查一下是否該退出任務(wù)循環(huán)
二、服務(wù)端啟動(dòng)
b.bind(8888).sync();
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//...
final ChannelFuture regFuture = initAndRegister();
//...
final Channel channel = regFuture.channel();
//...
doBind0(regFuture, channel, localAddress, promise);
//...
return promise;
}
- new一個(gè)channel

用戶調(diào)用方法Bootstrap.bind(port)第一步就是通過反射的方式new一個(gè)NioServerSocketChannel對象,并且在new的過程中創(chuàng)建了一系列的核心組件。
- init這個(gè)channel
設(shè)置option和attr
設(shè)置新接入channel的option和attr
加入新連接處理器
- 將這個(gè)channel register到某個(gè)對象
1)設(shè)置啟動(dòng)類參數(shù),最重要的就是設(shè)置channel創(chuàng)建server對應(yīng)的channel,創(chuàng)建各大組件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
2)初始化server對應(yīng)的channel,設(shè)置一些attr,option,以及設(shè)置子channel的attr,option,給server的channel添加新channel接入器,并觸發(fā)addHandler,register等事件
3)調(diào)用jdk底層做端口綁定,并觸發(fā)active事件,active觸發(fā)的時(shí)候,真正做服務(wù)端口綁定
三、新連接接入
所有的channel底層都會(huì)有一個(gè)與unsafe綁定,每種類型的channel實(shí)際的操作都由unsafe來實(shí)現(xiàn)

??流水線的開始就是HeadContxt,流水線的結(jié)束就是TailConext,HeadContxt中調(diào)用Unsafe做具體的操作,TailConext中用于向用戶拋出pipeline中未處理異常以及對未處理消息的警告。
- 檢測到有新連接進(jìn)入
- 將新的連接注冊到worker線程組
- 注冊新連接的讀事件
//NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
- boos reactor線程輪詢到有新的連接進(jìn)入
- 通過封裝jdk底層的channel創(chuàng)建NioSocketChannel以及一系列的netty核心組件
- 將該條連接通過chooser,選擇一條worker reactor線程綁定上去
- 注冊讀事件,開始新連接的讀寫
四、pipeline
4.1 pipeline 初始化
??pipeline是channel其中的一員,在AbstractChannel中被創(chuàng)建。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
??pipeline中保存了channel的引用,默認(rèn)情況下,一條pipeline會(huì)有兩個(gè)節(jié)點(diǎn),head和tail。

4.2 pipeline添加節(jié)點(diǎn)
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.檢查是否有重復(fù)handler
checkMultiplicity(handler);
// 2.創(chuàng)建節(jié)點(diǎn)
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加節(jié)點(diǎn)
addLast0(newCtx);
}
// 4.回調(diào)用戶方法
callHandlerAdded0(handler);
return this;
}
netty中用兩個(gè)字段來表示這個(gè)channelHandlerContext屬于inBound還是outBound,或者兩者都是。
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}

- 檢查是否有重復(fù)handler
- 創(chuàng)建節(jié)點(diǎn)
- 添加節(jié)點(diǎn)
- 回調(diào)用戶方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1
newCtx.next = tail; // 2
prev.next = newCtx; // 3
tail.prev = newCtx; // 4
}

4.3 pipeline刪除節(jié)點(diǎn)
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
- 找到待刪除的節(jié)點(diǎn)
- 調(diào)整雙向鏈表指針刪除
- 回調(diào)用戶函數(shù)

4.4 pipeline其他
- 一個(gè)Channel對應(yīng)一個(gè)Unsafe,Unsafe處理底層操作,NioServerSocketChannel對應(yīng)NioMessageUnsafe, NioSocketChannel對應(yīng)NioByteUnsafe
- inBound事件從head節(jié)點(diǎn)傳播到tail節(jié)點(diǎn),outBound事件從tail節(jié)點(diǎn)傳播到head節(jié)點(diǎn)
- 異常傳播只會(huì)往后傳播,而且不分inbound還是outbound節(jié)點(diǎn),不像outBound事件一樣會(huì)往前傳播
五、writeAndFlush
- pipeline中的標(biāo)準(zhǔn)鏈表結(jié)構(gòu)

- Encoder節(jié)點(diǎn)分配一個(gè)ByteBuf,調(diào)用encode方法,將java對象根據(jù)自定義協(xié)議寫入到ByteBuf,然后再把ByteBuf傳入到下一個(gè)節(jié)點(diǎn)。
- pipeline中的編碼器原理是創(chuàng)建一個(gè)ByteBuf,將java對象轉(zhuǎn)換為ByteBuf,然后再把ByteBuf繼續(xù)向前傳遞
- 調(diào)用write方法并沒有將數(shù)據(jù)寫到Socket緩沖區(qū)中,而是寫到了一個(gè)單向鏈表的數(shù)據(jù)結(jié)構(gòu)中,flush才是真正的寫出
- writeAndFlush等價(jià)于先將數(shù)據(jù)寫到netty的緩沖區(qū),再將netty緩沖區(qū)中的數(shù)據(jù)寫到Socket緩沖區(qū)中,寫的過程與并發(fā)編程類似,用自旋鎖保證寫成功
- netty中的緩沖區(qū)中的ByteBuf為DirectByteBuf
六、拆包器
- 不斷從TCP緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包
- 如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從tcp緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包
- 如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接
netty中的拆包內(nèi)部會(huì)有一個(gè)累加器,每次讀取到數(shù)據(jù)都會(huì)不斷累加,然后嘗試對累加到的數(shù)據(jù)進(jìn)行拆包,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,這個(gè)基類叫做ByteToMessageDecoder
netty將具體如何拆包抽象出一個(gè)decode方法,不同的拆包器實(shí)現(xiàn)不同的decode方法,就能實(shí)現(xiàn)不同協(xié)議的拆包