1、 ReplayingDecoder
- ReplayingDecoder S 是指一個枚舉,如果不需要指定Void即可
- ReplayingDecoder 不需要判斷(ByteBuf)中的數(shù)量是否足夠
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {

ReplayingDecoder
- 可見ByteToMessageDecoder的子類。類定義中的泛型 S 是一個用于記錄解碼狀態(tài)的狀態(tài)機枚舉類,在state(S s)、checkpoint(S s)等方法中會用到。在簡單解碼時也可以用java.lang.Void來占位。
- 與ByteToMessageDecoder不同,該類可以在接收到所需要長度的字節(jié)之后再調(diào)用
decode方法,而不用一遍又一遍的手動檢查流中的字節(jié)長度 - 從源碼上看ReplayingDecoder重寫了ByteToMessageDecoder的
callDecode()方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();//清除
//在繼續(xù)解碼之前,檢查這個處理程序是否已被刪除。
//如果它被移除,繼續(xù)在緩沖區(qū)上操作是不安全的。
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
S oldState = state;
int oldInputLength = in.readableBytes();
try {
//解碼移除再入保護
decodeRemovalReentryProtection(ctx, replayable, out);
//在繼續(xù)循環(huán)之前,檢查是否刪除了這個處理程序。
//如果它被移除,繼續(xù)在緩沖區(qū)上操作是不安全的。
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
"data or change its state if it did not decode anything.");
} else {
//以前的數(shù)據(jù)已被丟棄或?qū)е聽顟B(tài)轉(zhuǎn)換。
//也許它還在繼續(xù)讀。
continue;
}
}
} catch (Signal replay) {
replay.expect(REPLAY);
//在繼續(xù)循環(huán)之前,檢查是否刪除了此處理程序。
//如果它被移除,繼續(xù)在緩沖區(qū)上操作是不安全的。
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
//返回到這個檢查點(或是舊的位置)和重試
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
}
break;
}
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
"or change its state if it decoded something.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
2、ReplayingDecoder如何工作?
- ReplayingDecoder傳遞一個專門的ByteBuf實現(xiàn),當緩沖區(qū)中沒有足夠的數(shù)據(jù)時,這個實現(xiàn)會拋出某種類型的錯誤。在上面的IntegerHeaderFrameDecoder中,您只是假設(shè)在調(diào)用
buf.readInt()時,緩沖區(qū)中有4個或更多字節(jié)。如果緩沖區(qū)中確實有4個字節(jié),它將像您期望的那樣返回整數(shù)報頭。否則,將引發(fā)錯誤并將控制返回到ReplayingDecoder。如果ReplayingDecoder捕捉到錯誤,那么它會將緩沖區(qū)的readerIndex倒回“初始”位置(即緩沖區(qū)的開始),并在緩沖區(qū)接收到更多數(shù)據(jù)時再次調(diào)用decode(..)方法。 - 請注意,ReplayingDecoder總是拋出相同的緩存錯誤實例,以避免每次拋出時創(chuàng)建新錯誤并填充其堆棧跟蹤的開銷。
3、ReplayingDecoder如何提高性能
- 幸運的是,使用
checkpoint()方法可以顯著提高復(fù)雜解碼器實現(xiàn)的性能。checkpoint()方法更新緩沖區(qū)的“初始”位置,以便重新播放解碼器將緩沖區(qū)的readerIndex回滾到調(diào)用checkpoint點()方法的最后位置。
4、ReplayingDecoder使用枚舉Enum調(diào)用 checkpoint(T)
- 即使您可以只使用
checkpoint()方法并自己管理解碼器的狀態(tài),但是管理解碼器狀態(tài)的最簡單方法是創(chuàng)建一個表示解碼器當前狀態(tài)的Enum類型,并在狀態(tài)發(fā)生變化時調(diào)用checkpoint(T)方法。根據(jù)要解碼的消息的復(fù)雜性,可以有任意多個狀態(tài):
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
5、ReplayingDecoder調(diào)用沒有參數(shù)的checkpoint()方法
- 管理解碼器狀態(tài)的另一種方法是自己管理它。
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {}
6、 ReplayingDecoder用管道中的另一個解碼器替換一個解碼器
- 如果您要編寫一個協(xié)議多路復(fù)用器,您可能需要用另一個重放解碼器(ByteToMessageDecoder或MessageToMessageDecoder,實際的協(xié)議解碼器)替換ReplayingDecoder(協(xié)議檢測器)。僅僅通過調(diào)用ChannelPipeline是不可能實現(xiàn)這一點的。替換(ChannelHandler, String, ChannelHandler),但需要一些額外的步驟:
public class FirstDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
...
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
}
7、開發(fā)過程中編寫解碼器與編碼器的建議(注意點)
- 建議要么解碼器,要么編碼器,遵循單一原則,不會那么困擾,方便開發(fā)與維護