【第23篇】Netty的ReplayingDecoder源碼分析與特性解讀

1、 ReplayingDecoder

  • ReplayingDecoder S 是指一個枚舉,如果不需要指定Void即可
  • ReplayingDecoder 不需要判斷(ByteBuf)中的數(shù)量是否足夠
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
ReplayingDecoder
  • 可見ByteToMessageDecoder的子類。類定義中的泛型 S 是一個用于記錄解碼狀態(tài)的狀態(tài)機枚舉類,在state(S s)、checkpoint(S s)等方法中會用到。在簡單解碼時也可以用java.lang.Void來占位。
  • 與ByteToMessageDecoder不同,該類可以在接收到所需要長度的字節(jié)之后再調(diào)用decode方法,而不用一遍又一遍的手動檢查流中的字節(jié)長度
  • 從源碼上看ReplayingDecoder重寫了ByteToMessageDecoder的callDecode()方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
       replayable.setCumulation(in);
       try {
           while (in.isReadable()) {
               int oldReaderIndex = checkpoint = in.readerIndex();
               int outSize = out.size();

               if (outSize > 0) {
                   fireChannelRead(ctx, out, outSize);
                   out.clear();//清除
                   //在繼續(xù)解碼之前,檢查這個處理程序是否已被刪除。
                   //如果它被移除,繼續(xù)在緩沖區(qū)上操作是不安全的。
                   // See:
                   // - https://github.com/netty/netty/issues/4635
                   if (ctx.isRemoved()) {
                       break;
                   }
                   outSize = 0;
               }

               S oldState = state;
               int oldInputLength = in.readableBytes();
               try {
                   //解碼移除再入保護
                   decodeRemovalReentryProtection(ctx, replayable, out);
                   //在繼續(xù)循環(huán)之前,檢查是否刪除了這個處理程序。
                   //如果它被移除,繼續(xù)在緩沖區(qū)上操作是不安全的。
                   // See https://github.com/netty/netty/issues/1664
                   if (ctx.isRemoved()) {
                       break;
                   }

                   if (outSize == out.size()) {
                       if (oldInputLength == in.readableBytes() && oldState == state) {
                           throw new DecoderException(
                                   StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
                                   "data or change its state if it did not decode anything.");
                       } else {
                           //以前的數(shù)據(jù)已被丟棄或?qū)е聽顟B(tài)轉(zhuǎn)換。
                           //也許它還在繼續(xù)讀。
                           continue;
                       }
                   }
               } catch (Signal replay) {
                   replay.expect(REPLAY);
                   //在繼續(xù)循環(huán)之前,檢查是否刪除了此處理程序。
                   //如果它被移除,繼續(xù)在緩沖區(qū)上操作是不安全的。
                   // See https://github.com/netty/netty/issues/1664
                   if (ctx.isRemoved()) {
                       break;
                   }

                   //返回到這個檢查點(或是舊的位置)和重試
                   int checkpoint = this.checkpoint;
                   if (checkpoint >= 0) {
                       in.readerIndex(checkpoint);
                   } else {
                       
                   }
                   break;
               }

               if (oldReaderIndex == in.readerIndex() && oldState == state) {
                   throw new DecoderException(
                          StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
                          "or change its state if it decoded something.");
               }
               if (isSingleDecode()) {
                   break;
               }
           }
       } catch (DecoderException e) {
           throw e;
       } catch (Throwable cause) {
           throw new DecoderException(cause);
       }
   }

2、ReplayingDecoder如何工作?

  • ReplayingDecoder傳遞一個專門的ByteBuf實現(xiàn),當緩沖區(qū)中沒有足夠的數(shù)據(jù)時,這個實現(xiàn)會拋出某種類型的錯誤。在上面的IntegerHeaderFrameDecoder中,您只是假設(shè)在調(diào)用buf.readInt()時,緩沖區(qū)中有4個或更多字節(jié)。如果緩沖區(qū)中確實有4個字節(jié),它將像您期望的那樣返回整數(shù)報頭。否則,將引發(fā)錯誤并將控制返回到ReplayingDecoder。如果ReplayingDecoder捕捉到錯誤,那么它會將緩沖區(qū)的readerIndex倒回“初始”位置(即緩沖區(qū)的開始),并在緩沖區(qū)接收到更多數(shù)據(jù)時再次調(diào)用decode(..)方法。
  • 請注意,ReplayingDecoder總是拋出相同的緩存錯誤實例,以避免每次拋出時創(chuàng)建新錯誤并填充其堆棧跟蹤的開銷。

3、ReplayingDecoder如何提高性能

  • 幸運的是,使用checkpoint()方法可以顯著提高復(fù)雜解碼器實現(xiàn)的性能。checkpoint()方法更新緩沖區(qū)的“初始”位置,以便重新播放解碼器將緩沖區(qū)的readerIndex回滾到調(diào)用checkpoint點()方法的最后位置。

4、ReplayingDecoder使用枚舉Enum調(diào)用 checkpoint(T)

  • 即使您可以只使用checkpoint()方法并自己管理解碼器的狀態(tài),但是管理解碼器狀態(tài)的最簡單方法是創(chuàng)建一個表示解碼器當前狀態(tài)的Enum類型,并在狀態(tài)發(fā)生變化時調(diào)用checkpoint(T)方法。根據(jù)要解碼的消息的復(fù)雜性,可以有任意多個狀態(tài):
 public enum MyDecoderState {
     READ_LENGTH,
     READ_CONTENT;
   }
   
  public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
  
     private int length;
  
     public IntegerHeaderFrameDecoder() {
       // Set the initial state.
       super(MyDecoderState.READ_LENGTH);
     }
  
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
        case READ_LENGTH:
            length = buf.readInt();
            checkpoint(MyDecoderState.READ_CONTENT);
        case READ_CONTENT:
            ByteBuf frame = buf.readBytes(length);
            checkpoint(MyDecoderState.READ_LENGTH);
            out.add(frame);
         break;
       default:
         throw new Error("Shouldn't reach here.");
       }
     }
   }

5、ReplayingDecoder調(diào)用沒有參數(shù)的checkpoint()方法

  • 管理解碼器狀態(tài)的另一種方法是自己管理它。
  public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {}

6、 ReplayingDecoder用管道中的另一個解碼器替換一個解碼器

  • 如果您要編寫一個協(xié)議多路復(fù)用器,您可能需要用另一個重放解碼器(ByteToMessageDecoder或MessageToMessageDecoder,實際的協(xié)議解碼器)替換ReplayingDecoder(協(xié)議檢測器)。僅僅通過調(diào)用ChannelPipeline是不可能實現(xiàn)這一點的。替換(ChannelHandler, String, ChannelHandler),但需要一些額外的步驟:
 public class FirstDecoder extends ReplayingDecoder<Void> {
  
        @Override
       protected void decode(ChannelHandlerContext ctx,  ByteBuf buf, List<Object> out) {
           ...
           // Decode the first message
           Object firstMessage = ...;
  
           // Add the second decoder
           ctx.pipeline().addLast("second", new SecondDecoder());
  
           if (buf.isReadable()) {
               // Hand off the remaining data to the second decoder
               out.add(firstMessage);
               out.add(buf.readBytes(super.actualReadableBytes()));
           } else {
               // Nothing to hand off
               out.add(firstMessage);
           }
           // Remove the first decoder (me)
           ctx.pipeline().remove(this);
       }
}       

7、開發(fā)過程中編寫解碼器與編碼器的建議(注意點)

  • 建議要么解碼器,要么編碼器,遵循單一原則,不會那么困擾,方便開發(fā)與維護
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容