
前言
Netty 的解碼器有很多種,比如基于長度的,基于分割符的,私有協(xié)議的。但是,總體的思路都是一致的。
拆包思路:當數(shù)據(jù)滿足了 解碼條件時,將其拆開。放到數(shù)組。然后發(fā)送到業(yè)務 handler 處理。
半包思路: 當讀取的數(shù)據(jù)不夠時,先存起來,直到滿足解碼條件后,放進數(shù)組。送到業(yè)務 handler 處理。
而實現(xiàn)這個邏輯的就是我們今天的主角:ByteToMessageDecoder。
看名字的意思是:將字節(jié)轉(zhuǎn)換成消息的解碼器。人如其名。而他本身也是一個入站 handler,所以,我們還是從他的 channelRead 方法入手。
1. channelRead 方法
精簡過的代碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 從對象池中取出一個List
CodecOutputList out = CodecOutputList.newInstance();
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 第一次解碼
cumulation = data;// 累計
} else {
// 第二次解碼,就將 data 向 cumulation 追加,并釋放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 得到追加后的 cumulation 后,調(diào)用 decode 方法進行解碼
// 解碼過程中,調(diào)用 fireChannelRead 方法,主要目的是將累積區(qū)的內(nèi)容 decode 到 數(shù)組中
callDecode(ctx, cumulation, out);
// 如果累計區(qū)沒有可讀字節(jié)了
if (cumulation != null && !cumulation.isReadable()) {
// 將次數(shù)歸零
numReads = 0;
// 釋放累計區(qū)
cumulation.release();
// 等待 gc
cumulation = null;
} // 如果超過了 16 次,就壓縮累計區(qū),主要是將已經(jīng)讀過的數(shù)據(jù)丟棄,將 readIndex 歸零。
else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
// 如果沒有向數(shù)組插入過任何數(shù)據(jù)
decodeWasNull = !out.insertSinceRecycled();
// 循環(huán)數(shù)組,向后面的 handler 發(fā)送數(shù)據(jù),如果數(shù)組是空,那不會調(diào)用
fireChannelRead(ctx, out, size);
// 將數(shù)組中的內(nèi)容清空,將數(shù)組的數(shù)組的下標恢復至原來
out.recycle();
}
樓主已經(jīng)在方法中寫了注釋,但還是說說主要的步驟:
- 從對象池中取出一個空的數(shù)組。
- 判斷成員變量是否是第一次使用,(注意,既然使用了成員變量,所以這個 handler 不能是 handler 的。)將 unsafe 中傳遞來的數(shù)據(jù)寫入到這個 cumulation 累積區(qū)中。
- 寫到累積區(qū)后,調(diào)用子類的 decode 方法,嘗試將累積區(qū)的內(nèi)容解碼,每成功解碼一個,就調(diào)用后面節(jié)點的 channelRead 方法。若沒有解碼成功,什么都不做。
- 如果累積區(qū)沒有未讀數(shù)據(jù)了,就釋放累積區(qū)。
- 如果還有未讀數(shù)據(jù),且解碼超過了 16 次(默認),就對累積區(qū)進行壓縮。將讀取過的數(shù)據(jù)清空,也就是將 readIndex 設置為0.
- 設置 decodeWasNull 的值,如果上一次沒有插入任何數(shù)據(jù),這個值就是 ture。該值在 調(diào)用 channelReadComplete 方法的時候,會觸發(fā) read 方法(不是自動讀取的話),嘗試從 JDK 的通道中讀取數(shù)據(jù),并將之前的邏輯重來。主要應該是怕如果什么數(shù)據(jù)都沒有插入,就執(zhí)行 channelReadComplete 會遺漏數(shù)據(jù)。
- 調(diào)用 fireChannelRead 方法,將數(shù)組中的元素發(fā)送到后面的 handler 中。
- 將數(shù)組清空。并還給對象池。
下面來說說詳細的步驟。
2. 從對象池中取出一個空的數(shù)組
代碼:
@1
CodecOutputList out = CodecOutputList.newInstance();
@2
static CodecOutputList newInstance() {
return CODEC_OUTPUT_LISTS_POOL.get().getOrCreate();
}
@3
private static final FastThreadLocal<CodecOutputLists> CODEC_OUTPUT_LISTS_POOL =
new FastThreadLocal<CodecOutputLists>() {
@Override
protected CodecOutputLists initialValue() throws Exception {
// 16 CodecOutputList per Thread are cached.
return new CodecOutputLists(16);
}
};
@4
CodecOutputLists(int numElements) {
elements = new CodecOutputList[MathUtil.safeFindNextPositivePowerOfTwo(numElements)];
for (int i = 0; i < elements.length; ++i) {
// Size of 16 should be good enough for the majority of all users as an initial capacity.
elements[i] = new CodecOutputList(this, 16);
}
count = elements.length;
currentIdx = elements.length;
mask = elements.length - 1;
}
@5
private CodecOutputList(CodecOutputListRecycler recycler, int size) {
this.recycler = recycler;
array = new Object[size];
}
@6
public CodecOutputList getOrCreate() {
if (count == 0) {
// Return a new CodecOutputList which will not be cached. We use a size of 4 to keep the overhead
// low.
return new CodecOutputList(NOOP_RECYCLER, 4);
}
--count;
int idx = (currentIdx - 1) & mask;
CodecOutputList list = elements[idx];
currentIdx = idx;
return list;
}
代碼分為 1,2,3,4,5, 6 步驟。
- 靜態(tài)方法調(diào)用。
- 從 FastThreadLocal 中取出一個 CodecOutputLists 對象,并從這個集合中再取出一個 List。也就是 List 中有 List。可以理解為雙重數(shù)組。
- 調(diào)用 FastThreadLocal 的 initialValue 方法返回一個 CodecOutputLists 對象。
- 創(chuàng)建數(shù)組。數(shù)組大小默認16,循環(huán)填充 CodecOutputList 元素。設置 count,currentIdx ,mask 屬性。
- 創(chuàng)建 CodecOutputList 對象,這個 recycler 就是他的父 CodecOutputLists,并創(chuàng)建一個默認 16 的空數(shù)組。
- 首次進入 count 不是0,應該是 16,隨后將 count -1,并與運算出 Lists 中的下標,獲取到下標的內(nèi)容。也就是一個 List。在調(diào)用 recycle 方法還給對象池的時候,會將所有參數(shù)恢復。
由于這個 getOrCreate 方法會被一個線程的多個地方使用,因此 16 是個統(tǒng)計值。當 16 不夠的時候,就會創(chuàng)建一個新的 List。也就是 count == 0 的邏輯。而 & mask 的操作就是一個取模的操作。
3. 寫入累積區(qū)
代碼如下:
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
這個 cumulator 默認是個 Cumulator 類型的 MERGE_CUMULATOR,該實例最主要的是從重寫了 cumulate 方法:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
可以看到該方法,主要是將 unsafe.read 傳遞過來的 ByteBuf 的內(nèi)容寫入到 cumulation 累積區(qū)中,然后釋放掉舊的內(nèi)容,由于這個變量是成員變量,因此可以多次調(diào)用 channelRead 方法寫入。
同時這個方法也考慮到了擴容的問題,總的來說就是 copy。
當然,ByteToMessageDecoder 中還有一個 Cumulator 實例,稱之為 COMPOSITE_CUMULATOR,混合累積。由于上個實例的 cumulate 方法是使用內(nèi)存拷貝的,因此,這里提供了使用混合內(nèi)存。相較于拷貝,性能會更好點,但同時也會更復雜。
4. decode 方法的作用
當數(shù)據(jù)追擊到累積區(qū)之后,需要調(diào)用 decode 方法進行解碼,代碼如下:
@ 1
callDecode(ctx, cumulation, out);
@2
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 如果累計區(qū)還有可讀字節(jié)
while (in.isReadable()) {
int outSize = out.size();
// 上次循環(huán)成功解碼
if (outSize > 0) {
// 調(diào)用后面的業(yè)務 handler 的 ChannelRead 方法
fireChannelRead(ctx, out, outSize);
// 將 size 置為0
out.clear();//
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 得到可讀字節(jié)數(shù)
int oldInputLength = in.readableBytes();
// 調(diào)用 decode 方法,將成功解碼后的數(shù)據(jù)放入道 out 數(shù)組中,可能會刪除當前節(jié)點,刪除之前會將數(shù)據(jù)發(fā)送到最后的 handler
decodeRemovalReentryProtection(ctx, in, out);// decode()
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (isSingleDecode()) {
break;
}
}
}
該方法主要邏輯:只要累積區(qū)還有未讀數(shù)據(jù),就循環(huán)進行讀取。
調(diào)用 decodeRemovalReentryProtection 方法,內(nèi)部調(diào)用了子類重寫的 decode 方法,很明顯,這里是個模板模式。decode 方法的邏輯就是將累積區(qū)的內(nèi)容按照約定進行解碼,如果成功解碼,就添加到數(shù)組中。同時該方法也會檢查該 handler 的狀態(tài),如果被移除出 pipeline 了,就將累積區(qū)的內(nèi)容直接刷新到后面的 handler 中。
如果 Context 節(jié)點被移除了,直接結(jié)束循環(huán)。如果解碼前的數(shù)組大小和解碼后的數(shù)組大小相等,且累積區(qū)的可讀字節(jié)數(shù)沒有變化,說明此次讀取什么都沒做,就直接結(jié)束。如果字節(jié)數(shù)變化了,說明雖然數(shù)組沒有增加,但確實在讀取字節(jié),就再繼續(xù)讀取。
如果上面的判斷過了,說明數(shù)組讀到數(shù)據(jù)了,但如果累積區(qū)的 readIndex 沒有變化,則拋出異常,說明沒有讀取數(shù)據(jù),但數(shù)組卻增加了,子類的操作是不對的。
如果是個單次解碼器,解碼一次就直接結(jié)束了。
所以,這段代碼的關鍵就是子類需要重寫 decode 方法,將累積區(qū)的數(shù)據(jù)正確的解碼并添加到數(shù)組中。每添加一次成功,就會調(diào)用 fireChannelRead 方法,將數(shù)組中的數(shù)據(jù)傳遞給后面的 handler。完成之后將數(shù)組的 size 設置為 0.
所以,如果你的業(yè)務 handler 在這個地方可能會被多次調(diào)用。也可能一次也不調(diào)用。取決于數(shù)組中的值。當然,如果解碼 handler 被移除了,就會將累積區(qū)的所有數(shù)據(jù)刷到后面的 handler。
5. 剩下的邏輯
上面的邏輯就是解碼器最主要的邏輯:
將 read 方法的數(shù)據(jù)讀取到累積區(qū),使用解碼器解碼累積區(qū)的數(shù)據(jù),解碼成功一個就放入到一個數(shù)組中,并將數(shù)組中的數(shù)據(jù)一次次的傳遞到后面的handler。
從上面的邏輯看,除非 handler 被移除,否則不會調(diào)用后面的 handler 方法,也就是說,只要不滿足解碼器的解碼規(guī)則,就不會傳遞給后面的 handler。
再看看后面的邏輯,主要在 finally 塊中:
- 如果累積區(qū)沒有可讀數(shù)據(jù)了,將計數(shù)器歸零,并釋放累積區(qū)。
- 如果不滿足上面的條件,且計數(shù)器超過了 16 次,就壓縮累積區(qū)的內(nèi)容,壓縮手段是刪除已讀的數(shù)據(jù)。將 readIndex 置為 0。還記得 ByteBuf 的指針結(jié)構(gòu)嗎?

這樣就能節(jié)省一些內(nèi)存了,但這會引起一些內(nèi)存復制的過程,以性能損耗為前提的。
- 記錄 decodeWasNull 屬性,這個值的決定來自于你有沒有成功的向數(shù)組中插入數(shù)據(jù),如果插入了,它就是 fasle,沒有插入,他就是 true。這個值的作用在于,當 channelRead 方法結(jié)束的時候,執(zhí)行該 decoder 的 channelReadComplete 方法(如果你沒有重寫的話),會判斷這個值:

如果是 true,則會判斷 autoRead 屬性,如果是 false 的話,那么 Netty 認為還有數(shù)據(jù)沒有讀到,不然數(shù)組為什么一直是空的?就主動調(diào)用 read 方法從 Socket 讀取。
- 調(diào)用 fireChannelRead 方法,嘗試將數(shù)組中的數(shù)據(jù)發(fā)送到后面的 handler。為什么要這么做。按道理,到這一步的時候,數(shù)組不可能是空,為什么這里還要這么謹慎的再發(fā)送一次?
答:如果是單次解碼器,就需要發(fā)送了,因此單詞解碼器是不會再 callDecode 方法中發(fā)送的。
- 最后,將數(shù)組還給對象池。并清空數(shù)組內(nèi)容。

最后一行的 recycler.recycle(this),有兩種結(jié)果,如果是 CodecOutputLists 的 recycle 方法,內(nèi)容如下:

恢復數(shù)組下標,對 count ++,表示有對象可用了。
還有第二種,當 16 個數(shù)組不夠用了,就需要創(chuàng)建一個新的,在 getOrCreate 方法體現(xiàn)。而構(gòu)造函數(shù)中的 recycler 是一個空對象。我們看看這個對象:

當調(diào)用 recycle 方法的時候,什么都不做。等待 GC 回收。因為這不是個對象池的引用。
好,到這里,關于 ByteToMessageDecoder 解碼器的主要功能就解讀完了。
5. 總結(jié)
可以說,ByteToMessageDecoder 是解碼器的核心所做,Netty 在這里使用了模板模式,留給子類擴展的方法就是 decode 方法。
主要邏輯就是將所有的數(shù)據(jù)全部放入累積區(qū),子類從累積區(qū)取出數(shù)據(jù)進行解碼后放入到一個 數(shù)組中,ByteToMessageDecoder 會循環(huán)數(shù)組調(diào)用后面的 handler 方法,將數(shù)據(jù)一幀幀的發(fā)送到業(yè)務 handler 。完成這個的解碼邏輯。
使用這種方式,無論是粘包還是拆包,都可以完美的實現(xiàn)。
還有一些小細節(jié):
- 比如解碼器可以單次的。
- 如果解碼一直不成功,那么數(shù)據(jù)就一直無法到達后面的 handler。除非該解碼器從 pipeline 移除。
- 像其他的 Netty 模塊一樣,這里也使用了對象池的概念,數(shù)組存放在線程安全的 ThreadLocal 中,默認 16 個,當不夠時,就創(chuàng)建新的,用完即被 GC 回收。
- 當數(shù)組從未成功添加數(shù)據(jù),且程序沒有開啟 autoRead ,就主動調(diào)用 read 方法。嘗試讀取數(shù)據(jù)。
Netty 所有的解碼器,都可以在此類上擴展,一切取決于 decode 的實現(xiàn)。只要遵守 ByteToMessageDecoder 的約定即可。
good luck!?。?!