本文是Netty文集中“Netty in action”系列的文章。主要是對(duì)Norman Maurer and Marvin Allen Wolfthal 的 《Netty in action》一書(shū)簡(jiǎn)要翻譯,同時(shí)對(duì)重要點(diǎn)加上一些自己補(bǔ)充和擴(kuò)展。
本章含蓋
- 解碼器、編碼器、編解碼器綜述
- Netty 的編解碼類(lèi)
Netty提供可以簡(jiǎn)化各種協(xié)議的自定義編解碼器創(chuàng)建的組件。
什么是編解碼器?
每個(gè)網(wǎng)絡(luò)應(yīng)用都會(huì)定義端之間傳輸?shù)亩M(jìn)制字節(jié)該如何被解析和轉(zhuǎn)換,從發(fā)送端到目標(biāo)程序的數(shù)據(jù)類(lèi)型。這個(gè)轉(zhuǎn)換邏輯通過(guò)編解碼器來(lái)完成,編解碼器包含了一個(gè)編碼器和一個(gè)解碼器,每個(gè)編解碼器將一個(gè)字節(jié)流從一個(gè)格式轉(zhuǎn)換為另一個(gè)格式。那么怎么區(qū)分它們了?
一個(gè)編碼器轉(zhuǎn)換消息為一個(gè)適當(dāng)?shù)母袷接糜趥鬏?大部分情況下是一個(gè)字節(jié)流);對(duì)應(yīng)的解碼器轉(zhuǎn)換網(wǎng)絡(luò)流為一個(gè)程序的消息格式。一個(gè)編碼器操作一個(gè)出站數(shù)據(jù)(outbound data),一個(gè)解碼器處理一個(gè)入站數(shù)據(jù)(inbound data)。
Decoders
Decoder 實(shí)現(xiàn)了ChannelInboundHandler。
解碼器類(lèi)包含了兩個(gè)不同的使用場(chǎng)景:
- 解碼字節(jié)到消息 —— ByteToMessageDecoder 和 ReplayingDecoder
- 解碼一種消息類(lèi)型到另外一種消息類(lèi)型 —— MessageToMessageDecoder
因?yàn)榻獯a器的責(zé)任是轉(zhuǎn)換入站數(shù)據(jù)從一種格式到另一種格式,Netty的解碼器實(shí)現(xiàn)了ChannelInboundHandler。
因?yàn)镹etty的ChannelPipeline的設(shè)計(jì),你能夠鏈接多個(gè)解碼器去實(shí)現(xiàn)任意復(fù)雜的轉(zhuǎn)換邏輯,這是Netty支持代碼模塊化和重用的一個(gè)很好的例子。
ByteToMessageDecoder 抽象類(lèi)
由于你不知道遠(yuǎn)端是否會(huì)一次性發(fā)送一個(gè)完整的數(shù)據(jù),ByteToMessageDecoder類(lèi)緩存入站數(shù)據(jù)直到數(shù)據(jù)準(zhǔn)備好可用于處理。

注意,decodeLast()方法是在當(dāng)ByteBuf還有可讀數(shù)據(jù)時(shí),默認(rèn)調(diào)用decode()方法。
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in, out);
}
}
示例:

盡管ByteToMessageDecoder使得此模式實(shí)現(xiàn)簡(jiǎn)單,你可能發(fā)現(xiàn)有一點(diǎn)比較煩人,就是在調(diào)用readInt()前必須校驗(yàn)input ByteBuf是否有足夠的數(shù)據(jù)。下一章我們將討論ReplayingDecoder,一個(gè)特殊的解碼器,它能夠消除這個(gè)一步驟,只需要消耗很小的性能損耗。
編解碼器中的引用計(jì)數(shù)
正如我們?cè)诘谖逭潞偷诹滤岬降?,引用?jì)數(shù)是需要特別注意的。在編碼器和解碼器情況下,這個(gè)過(guò)程是相當(dāng)簡(jiǎn)單的:一旦一個(gè)消息被編碼或解碼,它將自動(dòng)被釋放通過(guò)調(diào)用ReferenceCountUtil.release(message)。如果你需要持有該引用以便后面使用,你能調(diào)用ReferenceCountUtil.retain(message)。該調(diào)用會(huì)增加引用計(jì)數(shù),防止消息被釋放。
ReplayingDecoder 抽象類(lèi)
ReplayingDecoder繼承了ByteToMessageDecoder,并將我們從調(diào)用readableBytes()中解放。它實(shí)現(xiàn)這個(gè)通過(guò)使用一個(gè)自定義ByteBuf的實(shí)現(xiàn)(ReplayingDecoderBuffer)來(lái)封裝入站ByteBuf。ReplayingDecoderBuffer在內(nèi)部執(zhí)行時(shí)調(diào)用。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
參數(shù)S指定用于狀態(tài)管理的類(lèi)型,Void表示不使用狀態(tài)管理。
示例:基于ReplayingDecoder來(lái)實(shí)現(xiàn)ToIntegerDecoder

請(qǐng)注意ReplayingDecoder的這些方面:
- 不是所有的ByteBuf操作都支持。如果一個(gè)不支持的方法被調(diào)用了,那么將拋出一個(gè)UnsupportedOperationException異常。
- ReplayingDecoder略慢與ByteToMessageDecoder
在現(xiàn)實(shí)環(huán)境總,在復(fù)雜情況下是使用ByteToMessageDecoder還是ReplayingDecoder的區(qū)別是很大的。
更多關(guān)于解碼器
下面的類(lèi)處理更復(fù)雜的使用情況:
- io.netty.handler.codec.LineBasedFrameDecoder —— 這個(gè)類(lèi)用于Netty內(nèi)部,使用'結(jié)束換行'控制字符( \n or \r\n )來(lái)解析消息數(shù)據(jù)
- io.netty.handler.codec.http.HttpObjectDecoder —— 用于Http數(shù)據(jù)的解碼器
MessageToMessageDecoder 抽象類(lèi)
使用MessageToMessageDecoder進(jìn)行消息格式間的轉(zhuǎn)換。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
參數(shù) “I" 指定了輸入消息的類(lèi)型,作為decode的輸入消息類(lèi)型參數(shù),decode()是你唯一需要實(shí)現(xiàn)的方法。


一個(gè)更復(fù)雜的例子,請(qǐng)看io.netty.handler.codec.http.HttpObjectAggregator,該類(lèi)間接繼承了MessageToMessageDecoder<HttpObject>.
TooLongFrameException 類(lèi)
因?yàn)镹etty是一個(gè)異步框架,你需要去緩存字節(jié)到內(nèi)存中直到你能夠去解析它。所以,你不應(yīng)該允許你的解碼器去緩存足夠的數(shù)據(jù)來(lái)耗盡可用內(nèi)存。為了解決這個(gè)共同關(guān)心的問(wèn)題,Netty提供了一個(gè)TooLongFrameException,如果一個(gè)幀的大小超過(guò)了指定大小則拋出該異常。
為了避免內(nèi)存被耗盡,你能夠設(shè)置一個(gè)最大字節(jié)數(shù)的閾值,如果超過(guò)了這個(gè)閾值,將導(dǎo)致一個(gè)TooLongFrameException異常拋出( 并被ChannelHandler.exceptionCaught()捕獲)。然后由解碼器的用戶(hù)來(lái)決定如果處理該異常。一些協(xié)議,例如HTTP,允許你返回一個(gè)特殊的響應(yīng)。在其他情況下,唯一的選擇可能就是關(guān)閉連接。

Encoders
Encoder 實(shí)現(xiàn)了ChannelOutboundHandler。
Netty提供了一個(gè)集合的類(lèi)來(lái)幫助你寫(xiě)支持如下功能的編碼器:
- 將一消息編碼為字節(jié)
- 將一個(gè)消息編碼為另一個(gè)消息
MessageToByteEncoder 抽象類(lèi)

你可能已經(jīng)注意到,這個(gè)類(lèi)只有一個(gè)方法,但decoder有兩個(gè)。這是因?yàn)榻獯a器經(jīng)常需要產(chǎn)生一個(gè)最后消息在channel已經(jīng)關(guān)閉前( 因此有了 decodeLast() 方法 )(注意,decodeLast會(huì)在channelInactive之前被調(diào)用)。我們清楚的知道編碼器是沒(méi)有這種情況的 —— 沒(méi)有必要在連接斷開(kāi)后去產(chǎn)生一個(gè)消息。
示例:
MessageToMessageEncoder 抽象類(lèi)


如果對(duì)特化的MessageToMessageEncoder感興趣,可以查看io.netty.handler.codec.protobuf.ProtobufEncoder類(lèi)
codec抽象類(lèi)
Netty的codec抽象類(lèi),將一個(gè)編碼器和解碼器捆綁成一對(duì)用于同時(shí)管理入站和出站消息的轉(zhuǎn)換。codec同時(shí)實(shí)現(xiàn)了ChannelInboundHandler 和 ChannelOutboundHandler。
為什么我們不是用這個(gè)復(fù)合類(lèi)在所有時(shí)候,而是更傾向于將解碼和編碼分開(kāi)了?因?yàn)閷⑦@兩個(gè)功能分開(kāi),無(wú)論何時(shí)都能最大程度上來(lái)保持代碼的重用性和可擴(kuò)展性,這是Netty的一個(gè)基本理念。
ByteToMessageCodec 抽象類(lèi)
ByteToMessageCodec 合并了ByteToMessageDecoder 和 MessageToByteEncoder。

任何 請(qǐng)求/響應(yīng) 協(xié)議都適合使用ByteToMessageCodec。
MessageToMessageCodec 抽象類(lèi)
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>

INBOUND_IN消息作為寫(xiě)操作發(fā)送出去的類(lèi)型,OUTBOUND_IN消息作為被應(yīng)用處理的類(lèi)型。
CombinedChannelDuplexHandler 類(lèi)
如我們?cè)缜罢f(shuō)的,合并一個(gè)解碼器和一個(gè)編碼器可能會(huì)對(duì)復(fù)用性造成影響。然后,這里提供了一個(gè)方式去避免這個(gè)損失且不用犧牲配置一個(gè)解碼器和一個(gè)編碼器為一個(gè)單元的便利性。使用CombinedChannelDuplexHandler來(lái)解決這個(gè)問(wèn)題
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
該類(lèi)扮演一個(gè)包含有ChannelInboundHandler和一個(gè)ChannelOutboundHandler的容器。通過(guò)分別提供一個(gè)docoder類(lèi)和一個(gè)encoder類(lèi),我們能夠?qū)崿F(xiàn)編解碼器而不需要直接繼承一個(gè)codec抽象類(lèi)。
也就是說(shuō),CombinedChannelDuplexHandler使用組合的方式,復(fù)用已經(jīng)存在的Decoder和Encoder來(lái)實(shí)現(xiàn)編解碼器,這樣就保持了代碼的重用性和可擴(kuò)展性。而如果是直接實(shí)現(xiàn)一個(gè)Codec抽象類(lèi)的話(huà),則是通過(guò)直接實(shí)現(xiàn)相關(guān)的encode、decode方法來(lái)實(shí)現(xiàn)編解碼器,這使得程序失去了代碼的重用性和可擴(kuò)展性。
示例:

擴(kuò)展
Q:ReplayingDecoder是如何做到,不用判斷字節(jié)數(shù)是否足夠就直接調(diào)用readXXX操作并能保證正確的邏輯了?
A:我們來(lái)簡(jiǎn)單看下ReplayingDecoder中的一些實(shí)現(xiàn):

callDecode()是一個(gè)寫(xiě)循環(huán)實(shí)現(xiàn),每次都會(huì)先記錄ByteBuf in當(dāng)前的讀索引位置,然后將ByteBuf in封裝成一個(gè)ReplayingDecoderByteBuf對(duì)象(這是一個(gè)特殊的ByteBuf實(shí)現(xiàn),它重寫(xiě)了ByteBuf的各種readXXX、getXXX。這些方法在獲取真實(shí)的數(shù)據(jù)前會(huì)先判斷字節(jié)是否足夠,如果不足夠則會(huì)拋出一個(gè)Signal異常。)。然后將封裝好的ReplayingDecoderByteBuf對(duì)象傳遞給decodeRemovalReentryProtection方法(decodeRemovalReentryProtection方法底層會(huì)調(diào)用decode()方法),這樣一來(lái)當(dāng)readXXX操作的時(shí)候數(shù)據(jù)不足的話(huà)就會(huì)拋出一個(gè)Signal異常。在catch{}語(yǔ)塊中就會(huì)將ByteBuf的readerIndex重置為本次解碼前的位置。
但也正是因?yàn)槿绱?,在某些情況下ReplayingDecoder可能存在較差的性能。如,在網(wǎng)絡(luò)很慢且消息格式較復(fù)雜的情況下。比如,有個(gè)一消息格式為:“消息頭”+“消息體”兩部分組成一個(gè)完整的消息包。我們需要根據(jù)消息頭獲取消息體數(shù)據(jù)長(zhǎng)度以獲取我們所需的數(shù)據(jù)。 但是了,因?yàn)榫W(wǎng)絡(luò)比較慢的關(guān)系,我們讀取到的ByteBuf可能不是一個(gè)完整的消息格式包(可能包含了消息頭以及部分的消息體),本次decode就無(wú)法解析出一個(gè)消息包(但是我們已經(jīng)成功解碼處理消息頭的數(shù)據(jù)了),那么就會(huì)在catch中將ByteBuf的readerIndex重置。那么下次decdoe的時(shí)候,又需要重新解析一次消息(即,消息頭數(shù)據(jù)又需要重新進(jìn)行一次解析)。如果依舊無(wú)法獲取一個(gè)完整的消息包,那么前面的操作將再執(zhí)行一次。。。
當(dāng)然,我們也是有辦法來(lái)解決這個(gè)問(wèn)題的,那就是使用ReplayingDecoderByteBuf的checkpoint(T)方法來(lái)管理解碼器的狀態(tài)。嗯,這里舉個(gè)java doc中的例子來(lái)說(shuō)明checkpoint(T)的使用。
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
首先,我們根據(jù)自定協(xié)議聲明好協(xié)議的各個(gè)狀態(tài)。這里就是“READ_LENGTH”和“READ_CONTENT”兩個(gè)狀態(tài)。然后在IntegerHeaderFrameDecoder解碼器的時(shí)候,設(shè)置初始狀態(tài)為“MyDecoderState.READ_LENGTH”。在decode方法中,我們根據(jù)不同的狀態(tài)來(lái)進(jìn)行相應(yīng)的操作:
一開(kāi)始state為READ_LENGTH,則先進(jìn)行消息頭部分的數(shù)據(jù)獲取,如果此時(shí)ByteBuf中的數(shù)據(jù)不足以獲取到消息頭的數(shù)據(jù)那么就會(huì)拋出一個(gè)Signal異常,由基類(lèi)根據(jù)上面的邏輯進(jìn)行readerIndex的重置;如果ByteBuf的數(shù)據(jù)足以獲取到消息頭,那么在獲取到消息頭的值后,執(zhí)行『checkpoint(MyDecoderState.READ_CONTENT);』這步非常的重要,checkpoint方法會(huì)完成兩個(gè)操作:① 將調(diào)用decode方法前記錄的readerIndex初始值修改為當(dāng)前ByteBuf的readerIndex值,② 將state狀態(tài)修改為MyDecoderState.READ_CONTENT。然后繼續(xù)state為MyDecoderState.READ_CONTENT情況的處理(注意,這里你會(huì)發(fā)現(xiàn)switch-case中沒(méi)有break語(yǔ)句,所以流程會(huì)走到下一個(gè)狀態(tài))。這樣一來(lái),當(dāng)ByteBuf中的數(shù)據(jù)不足以讀取到完整的消息體的內(nèi)容,基類(lèi)在重置readerIndex的時(shí)候,不再是重置到讀取消息頭之前的位置了,而是重置到讀取完消息頭之后的位置。這樣,當(dāng)decode再次被調(diào)用時(shí),我們就無(wú)需再解碼一次消息頭了,這時(shí)state()方法返回的值已經(jīng)是MyDecoderState.READ_CONTENT(因?yàn)槲覀兩厦嬖诮獯a完消息頭后通過(guò)checkpoint方法設(shè)置了狀態(tài)值為MyDecoderState.READ_CONTENT),流程也會(huì)從解碼消息體開(kāi)始繼續(xù)進(jìn)行。
后記
若文章有任何錯(cuò)誤,望大家不吝指教:)
參考
《Netty in action》
圣思園《精通并發(fā)與Netty》