前言
在前面的文章中,我們已經(jīng)詳細(xì)闡述了事件和異常傳播在netty中的實(shí)現(xiàn),(netty源碼分析之pipeline(一),netty源碼分析之pipeline(二)),其中有一類事件我們?cè)趯?shí)際編碼中用得最多,那就是 write或者writeAndFlush,也就是我們今天的主要內(nèi)容
主要內(nèi)容
本文分以下幾個(gè)部分闡述一個(gè)java對(duì)象最后是如何轉(zhuǎn)變成字節(jié)流,寫到socket緩沖區(qū)中去的
- pipeline中的標(biāo)準(zhǔn)鏈表結(jié)構(gòu)
- java對(duì)象編碼過程
- write:寫隊(duì)列
- flush:刷新寫隊(duì)列
- writeAndFlush: 寫隊(duì)列并刷新
pipeline中的標(biāo)準(zhǔn)鏈表結(jié)構(gòu)
一個(gè)標(biāo)準(zhǔn)的pipeline鏈?zhǔn)浇Y(jié)構(gòu)如下(我們省去了異常處理Handler)

數(shù)據(jù)從head節(jié)點(diǎn)流入,先拆包,然后解碼成業(yè)務(wù)對(duì)象,最后經(jīng)過業(yè)務(wù)Handler處理,調(diào)用write,將結(jié)果對(duì)象寫出去。而寫的過程先通過tail節(jié)點(diǎn),然后通過encoder節(jié)點(diǎn)將對(duì)象編碼成ByteBuf,最后將該ByteBuf對(duì)象傳遞到head節(jié)點(diǎn),調(diào)用底層的Unsafe寫到j(luò)dk底層管道
java對(duì)象編碼過程
為什么我們?cè)趐ipeline中添加了encoder節(jié)點(diǎn),java對(duì)象就轉(zhuǎn)換成netty可以處理的ByteBuf,寫到管道里?
我們先看下調(diào)用write的code
BusinessHandler
protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
Response response = doBusiness(request);
if (response != null) {
ctx.channel().write(response);
}
}
業(yè)務(wù)處理器接受到請(qǐng)求之后,做一些業(yè)務(wù)處理,返回一個(gè)Response,然后,response在pipeline中傳遞,落到 Encoder節(jié)點(diǎn),下面是 Encoder 的處理流程
Encoder
public class Encoder extends MessageToByteEncoder<Response> {
@Override
protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
out.writeByte(response.getVersion());
out.writeInt(4 + response.getData().length);
out.writeBytes(response.getData());
}
}
Encoder的處理流程很簡單,按照簡單自定義協(xié)議,將java對(duì)象 Response 寫到傳入的參數(shù) out中,這個(gè)out到底是什么?
為了回答這個(gè)問題,我們需要了解到 Response 對(duì)象,從 BusinessHandler 傳入到 MessageToByteEncoder的時(shí)候,首先是傳入到 write 方法
MessageToByteEncoder
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判斷當(dāng)前Handelr是否能處理寫入的消息
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
// 強(qiáng)制換換
I cast = (I) msg;
// 分配一段ButeBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 調(diào)用encode,這里就調(diào)回到 `Encoder` 這個(gè)Handelr中
encode(ctx, cast, buf);
} finally {
// 既然自定義java對(duì)象轉(zhuǎn)換成ByteBuf了,那么這個(gè)對(duì)象就已經(jīng)無用了,釋放掉
// (當(dāng)傳入的msg類型是ByteBuf的時(shí)候,就不需要自己手動(dòng)釋放了)
ReferenceCountUtil.release(cast);
}
// 如果buf中寫入了數(shù)據(jù),就把buf傳到下一個(gè)節(jié)點(diǎn)
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 否則,釋放buf,將空數(shù)據(jù)傳到下一個(gè)節(jié)點(diǎn)
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果當(dāng)前節(jié)點(diǎn)不能處理傳入的對(duì)象,直接扔給下一個(gè)節(jié)點(diǎn)處理
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 當(dāng)buf在pipeline中處理完之后,釋放
if (buf != null) {
buf.release();
}
}
}
其實(shí),這一小節(jié)的內(nèi)容,在前面的博文中,已經(jīng)提到過,這里,我們?cè)敿?xì)闡述一下Encoder是如何處理傳入的java對(duì)象的
1.判斷當(dāng)前Handler是否能處理寫入的消息,如果能處理,進(jìn)入下面的流程,否則,直接扔給下一個(gè)節(jié)點(diǎn)處理
2.將對(duì)象強(qiáng)制轉(zhuǎn)換成Encoder可以處理的 Response對(duì)象
3.分配一個(gè)ByteBuf
4.調(diào)用encoder,即進(jìn)入到 Encoder 的 encode方法,該方法是用戶代碼,用戶將數(shù)據(jù)寫入ByteBuf
5.既然自定義java對(duì)象轉(zhuǎn)換成ByteBuf了,那么這個(gè)對(duì)象就已經(jīng)無用了,釋放掉,(當(dāng)傳入的msg類型是ByteBuf的時(shí)候,就不需要自己手動(dòng)釋放了)
6.如果buf中寫入了數(shù)據(jù),就把buf傳到下一個(gè)節(jié)點(diǎn),否則,釋放buf,將空數(shù)據(jù)傳到下一個(gè)節(jié)點(diǎn)
7.最后,當(dāng)buf在pipeline中處理完之后,釋放節(jié)點(diǎn)
總結(jié)一點(diǎn)就是,Encoder節(jié)點(diǎn)分配一個(gè)ByteBuf,調(diào)用encode方法,將java對(duì)象根據(jù)自定義協(xié)議寫入到ByteBuf,然后再把ByteBuf傳入到下一個(gè)節(jié)點(diǎn),在我們的例子中,最終會(huì)傳入到head節(jié)點(diǎn)
HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
這里的msg就是前面在Encoder節(jié)點(diǎn)中,載有java對(duì)象數(shù)據(jù)的自定義ByteBuf對(duì)象,進(jìn)入下一節(jié)
write:寫隊(duì)列
AbstractChannel
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
1.首先,調(diào)用 assertEventLoop 確保該方法的調(diào)用是在reactor線程中,關(guān)于reactor線程可以查看我前面的文章
2.然后,調(diào)用 filterOutboundMessage() 方法,將待寫入的對(duì)象過濾,把非ByteBuf對(duì)象和FileRegion過濾,把所有的非直接內(nèi)存轉(zhuǎn)換成直接內(nèi)存DirectBuffer
AbstractNioByteChannel
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
3.接下來,估算出需要寫入的ByteBuf的size
4.最后,調(diào)用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下來,我們需要重點(diǎn)看一下這個(gè)方法干了什么事情
ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 創(chuàng)建一個(gè)待寫出的消息節(jié)點(diǎn)
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(size, false);
}
想要理解上面這段代碼,必須得掌握寫緩存中的幾個(gè)消息指針,如下圖

ChannelOutboundBuffer 里面的數(shù)據(jù)結(jié)構(gòu)是一個(gè)單鏈表結(jié)構(gòu),每個(gè)節(jié)點(diǎn)是一個(gè) Entry,Entry 里面包含了待寫出ByteBuf 以及消息回調(diào) promise,下面分別是三個(gè)指針的作用
1.flushedEntry 指針表示第一個(gè)被寫到操作系統(tǒng)Socket緩沖區(qū)中的節(jié)點(diǎn)
2.unFlushedEntry 指針表示第一個(gè)未被寫入到操作系統(tǒng)Socket緩沖區(qū)中的節(jié)點(diǎn)
3.tailEntry指針表示ChannelOutboundBuffer緩沖區(qū)的最后一個(gè)節(jié)點(diǎn)
初次調(diào)用 addMessage 之后,各個(gè)指針的情況為

fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的節(jié)點(diǎn)
第二次調(diào)用 addMessage之后,各個(gè)指針的情況為

第n次調(diào)用 addMessage之后,各個(gè)指針的情況為

可以看到,調(diào)用n次addMessage,flushedEntry指針一直指向NULL,表示現(xiàn)在還未有節(jié)點(diǎn)需要寫出到Socket緩沖區(qū),而unFushedEntry之后有n個(gè)節(jié)點(diǎn),表示當(dāng)前還有n個(gè)節(jié)點(diǎn)尚未寫出到Socket緩沖區(qū)中去
flush:刷新寫隊(duì)列
不管調(diào)用channel.flush(),還是ctx.flush(),最終都會(huì)落地到pipeline中的head節(jié)點(diǎn)
HeadContext
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
之后進(jìn)入到AbstractUnsafe
AbstractUnsafe
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
flush方法中,先調(diào)用
ChannelOutboundBuffer
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
可以結(jié)合前面的圖來看,首先拿到 unflushedEntry 指針,然后將 flushedEntry 指向unflushedEntry所指向的節(jié)點(diǎn),調(diào)用完畢之后,三個(gè)指針的情況如下所示

接下來,調(diào)用 flush0();
AbstractUnsafe
protected void flush0() {
doWrite(outboundBuffer);
}
發(fā)現(xiàn)這里的核心代碼就一個(gè) doWrite,繼續(xù)跟
AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
// 拿到第一個(gè)需要flush的節(jié)點(diǎn)的數(shù)據(jù)
Object msg = in.current();
if (msg instanceof ByteBuf) {
// 強(qiáng)轉(zhuǎn)為ByteBuf,若發(fā)現(xiàn)沒有數(shù)據(jù)可讀,直接刪除該節(jié)點(diǎn)
ByteBuf buf = (ByteBuf) msg;
boolean done = false;
long flushedAmount = 0;
// 拿到自旋鎖迭代次數(shù)
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
// 自旋,將當(dāng)前節(jié)點(diǎn)寫出
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
// 寫完之后,將當(dāng)前節(jié)點(diǎn)刪除
if (done) {
in.remove();
} else {
break;
}
}
}
}
這里略微有點(diǎn)復(fù)雜,我們分析一下
1.第一步,調(diào)用current()先拿到第一個(gè)需要flush的節(jié)點(diǎn)的數(shù)據(jù)
ChannelOutBoundBuffer
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
2.第二步,拿到自旋鎖的迭代次數(shù)
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
關(guān)于為什么要用自旋鎖,netty的文檔已經(jīng)解釋得很清楚,這里不過多解釋
ChannelConfig
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
3.自旋的方式將ByteBuf寫出到j(luò)dk nio的Channel
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
doWriteBytes 方法跟進(jìn)去
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
我們發(fā)現(xiàn),出現(xiàn)了 javaChannel(),表明已經(jīng)進(jìn)入到了jdk nio Channel的領(lǐng)域,有關(guān)netty中ByteBuf的介紹不打算在這里展開
4.刪除該節(jié)點(diǎn)
節(jié)點(diǎn)的數(shù)據(jù)已經(jīng)寫入完畢,接下來就需要?jiǎng)h除該節(jié)點(diǎn)
ChannelOutBoundBuffer
public boolean remove() {
Entry e = flushedEntry;
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
}
// recycle the entry
e.recycle();
return true;
}
首先拿到當(dāng)前被flush掉的節(jié)點(diǎn)(flushedEntry所指),然后拿到該節(jié)點(diǎn)的回調(diào)對(duì)象 ChannelPromise, 調(diào)用 removeEntry()方法移除該節(jié)點(diǎn)
private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
這里的remove是邏輯移除,只是將flushedEntry指針移到下個(gè)節(jié)點(diǎn),調(diào)用完畢之后,節(jié)點(diǎn)圖示如下

隨后,釋放該節(jié)點(diǎn)數(shù)據(jù)的內(nèi)存,調(diào)用 safeSuccess 進(jìn)行回調(diào),用戶代碼可以在回調(diào)里面做一些記錄,下面是一段Example
用戶代碼
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
// 回調(diào)
}
})
最后,調(diào)用 recycle方法,將當(dāng)前節(jié)點(diǎn)回收
writeAndFlush: 寫隊(duì)列并刷新
理解了write和flush這兩個(gè)過程,writeAndFlush 也就不難了
writeAndFlush在某個(gè)Handler中被調(diào)用之后,最終會(huì)落到 TailContext 節(jié)點(diǎn),見 netty源碼分析之pipeline(二)
TailContext
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
}
}
可以看到,最終,通過一個(gè)boolean變量,表示是調(diào)用 invokeWriteAndFlush,還是 invokeWrite,invokeWrite便是我們上文中的write過程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
invokeFlush0();
}
可以看到,最終調(diào)用的底層方法和單獨(dú)調(diào)用 write 和 flush 是一樣的
private void invokeWrite(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
}
private void invokeFlush(Object msg, ChannelPromise promise) {
invokeFlush0(msg, promise);
}
由此看來,invokeWriteAndFlush基本等價(jià)于write方法之后再來一次flush
另外,由于對(duì)端消費(fèi)不及時(shí)導(dǎo)致writeAndFlush引發(fā)頻繁O(jiān)ld GC的問題和解決思路可以看下 一次netty"引發(fā)的"詭異old gc問題排查過程
總結(jié)
1.pipeline中的編碼器原理是創(chuàng)建一個(gè)ByteBuf,將java對(duì)象轉(zhuǎn)換為ByteBuf,然后再把ByteBuf繼續(xù)向前傳遞
2.調(diào)用write方法并沒有將數(shù)據(jù)寫到Socket緩沖區(qū)中,而是寫到了一個(gè)單向鏈表的數(shù)據(jù)結(jié)構(gòu)中,flush才是真正的寫出
3.writeAndFlush等價(jià)于先將數(shù)據(jù)寫到netty的緩沖區(qū),再將netty緩沖區(qū)中的數(shù)據(jù)寫到Socket緩沖區(qū)中,寫的過程與并發(fā)編程類似,用自旋鎖保證寫成功
4.netty中的緩沖區(qū)中的ByteBuf為DirectByteBuf
如果你覺得看的不過癮,想系統(tǒng)學(xué)習(xí)Netty原理,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html