SOFABolt 源碼分析19 - Codec 編解碼設(shè)計(jì)

注意:本文部分內(nèi)容摘抄自 SOFABolt 源碼解析系列文章(螞蟻金服將會(huì)在最近不斷推出 SOFA 系列的博文),大家可以關(guān)注微信公眾號:金融級分布式架構(gòu) 來得到通知。部分內(nèi)容參考自:螞蟻通信框架實(shí)踐

image.png

類組成

  • 編解碼器工廠 - 工廠模式
  • Codec 是編解碼器工廠類接口
  • RpcCodec 是針對 Rpc 場景下的編解碼器工廠類
    附:SOFABolt 高度提取了一些接口,是的我們可以自己去進(jìn)行擴(kuò)展,假設(shè)我們要做 mq 的編解碼,可以模仿實(shí)現(xiàn) RpcCodec 的實(shí)現(xiàn)方式
  • RpcCodec 用于創(chuàng)建 ProtocolCodeBasedEncoder 和 ProtocolCodeBasedDecoder 的子類 RpcProtocolDecoder,二者被設(shè)置為 netty 的編解碼器 handler
  • 編解碼模板 - 模板模式
  • MessageToByteEncoder 是 Netty 提供的一個(gè)編碼模板
  • AbstractBatchDecoder 是 SOFABolt hack 了 Netty 的 ByteToMessageDecoder 來提供一個(gè)解碼模板(相較于 Netty 增加了批量提交的功能)
  • 編解碼代理類 - 代理模式策略模式
  • ProtocolCodeBasedEncoder 是 CommandEncoder 的代理類,通過不同的 protocol 協(xié)議,指定使用不同的編碼器
  • ProtocolCodeBasedDecoder 是 CommandDecoder 的代理類,通過不同的 protocol 協(xié)議,指定使用不同的解碼器
  • RpcProtocolDecoder 是 ProtocolCodeBasedDecoder 的子類,針對于 Rpc 場景定制了 decodeProtocolVersion 方法
  • 真正的解碼器
  • CommandEncoder 提供了編碼接口
  • CommandDecoder 提供了解碼接口
  • RpcCommandEncoder 提供了 RpcProtocol 協(xié)議數(shù)據(jù)的編碼器
  • RpcCommandEncoderV2 提供了 RpcProtocolV2 協(xié)議數(shù)據(jù)的編碼器
  • RpcCommandDecoder 提供了 RpcProtocol 協(xié)議數(shù)據(jù)的解碼器
  • RpcCommandDecoderV2 提供了 RpcProtocolV2 協(xié)議數(shù)據(jù)的解碼器

一、編碼分析

本質(zhì):序列化會(huì)將業(yè)務(wù)數(shù)據(jù)轉(zhuǎn)化為 byte[],編碼按照私有協(xié)議將 byte[] 寫入到 ByteBuf 中

基本流程

  1. 判斷傳入的數(shù)據(jù)是否是 Serializable 類型(該類型由 MessageToByteEncoder 的泛型指定),如果不是,直接傳播給 pipeline 中的下一個(gè) handler;否則
  2. 創(chuàng)建一個(gè) ByteBuf 實(shí)例,用于存儲(chǔ)最終的編碼數(shù)據(jù)
  3. 從 channel 的附加屬性中獲取協(xié)議標(biāo)識 protocolCode,之后從協(xié)議管理器中獲取相應(yīng)的 Protocol 對象
  4. 從 Protocol 對象中獲取相應(yīng)的 CommandEncoder 實(shí)現(xiàn)類實(shí)例,使用 CommandEncoder 實(shí)現(xiàn)類實(shí)例按照 SOFABolt 源碼分析18 - Protocol 私有協(xié)議的設(shè)計(jì) 所介紹的協(xié)議規(guī)則將數(shù)據(jù)寫入到第二步創(chuàng)建好的 ByteBuf 實(shí)例中
  5. 如果原始數(shù)據(jù)是 ReferenceCounted 實(shí)現(xiàn)類,則釋放原始數(shù)據(jù)
  6. 如果 ByteBuf 中有數(shù)據(jù)了,則傳播給 pipeline 中的下一個(gè) handler;否則,釋放該 ByteBuf 對象,傳遞一個(gè)空的 ByteBuf 給下一個(gè) handler

源碼分析

======================= 編解碼工廠 =======================
public class RpcCodec implements Codec {
    @Override
    public ChannelHandler newEncoder() {
        return new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE));
    }
    @Override
    public ChannelHandler newDecoder() {
        return new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH);
    }
}

======================= 服務(wù)端 Netty 設(shè)置(客戶端類似) =======================
// 編解碼工廠
private Codec codec = new RpcCodec();
public class RpcServer {
    @Override
    protected void doInit() {
        ...
        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) {
                ...
                // 創(chuàng)建解碼器
                pipeline.addLast("decoder", codec.newDecoder());
                // 創(chuàng)建編碼器
                pipeline.addLast("encoder", codec.newEncoder());
                ...
            }
    }
}

======================= 編碼模板 =======================
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            // 判斷是否可編碼(SOFABolt 中判斷 msg instanceof Serializable)
            if (acceptOutboundMessage(msg)) {
                I cast = (I) msg;
                // 分配 ByteBuffer,默認(rèn)為 DirectByteBuf
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 調(diào)用子類進(jìn)行編碼
                    encode(ctx, cast, buf);
                } finally {
                    // 編碼結(jié)束后,如果原始數(shù)據(jù)是 ReferenceCounted 實(shí)現(xiàn)類,則釋放原始數(shù)據(jù)
                    ReferenceCountUtil.release(cast);
                }
                // 如果 ByteBuf 中有數(shù)據(jù)了,則傳播給 pipeline 中的下一個(gè) handler
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    // 如果 ByteBuf 中沒有數(shù)據(jù),釋放該 ByteBuf 對象,傳遞一個(gè)空的 ByteBuf 給下一個(gè) handler
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                // 如果不可進(jìn)行編碼,直接傳播給 pipeline 中的下一個(gè) handler
                ctx.write(msg, promise);
            }
        } finally {
            if (buf != null) {
                // 釋放開始分配的 ByteBuf
                buf.release();
            }
        }
    }
    private final TypeParameterMatcher matcher = TypeParameterMatcher.get(outboundMessageType); // 只要泛型 I 不是 Object,就是 ReflectiveMatcher 實(shí)例;如果 I 是 Object,則全部可編碼

    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }
}

public abstract class TypeParameterMatcher {
    private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() {
        @Override
        public boolean match(Object msg) {
            return true;
        }
    };
    // parameterType 是 MessageToByteEncoder 的泛型
    public static TypeParameterMatcher get(final Class<?> parameterType) {
        ...
        // 從緩存獲取,緩存有直接返回,沒有則 new
        TypeParameterMatcher matcher = getCache.get(parameterType);
        if (matcher == null) {
            // 如果泛型是 Object,全部可編碼
            if (parameterType == Object.class) {
                matcher = NOOP;
            } else {
                // 如果泛型不是 Object,則使用 ReflectiveMatcher
                matcher = new ReflectiveMatcher(parameterType);
            }
            // 設(shè)置到緩存
            getCache.put(parameterType, matcher);
        }
        return matcher;
    }

    private static final class ReflectiveMatcher extends TypeParameterMatcher {
        // 判斷傳遞進(jìn)來的將要編碼的數(shù)據(jù)是不是 MessageToByteEncoder 的泛型類型的數(shù)據(jù)
        @Override
        public boolean match(Object msg) {
            return type.isInstance(msg);
        }
    }
}

======================= 編碼代理類 =======================
@ChannelHandler.Sharable
public class ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable> {

    // default protocol code, 默認(rèn)是 RpcProtocolV2
    protected ProtocolCode defaultProtocolCode;

    public ProtocolCodeBasedEncoder(ProtocolCode defaultProtocolCode) {
        super();
        this.defaultProtocolCode = defaultProtocolCode;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
        // 1. 從 Channel 獲取 ProtocolCode 附屬屬性
        Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
        ProtocolCode protocolCode;
        // 如果為 null,使用默認(rèn)協(xié)議 RpcProtocolV2;否則使用附屬屬性
        if (att == null || att.get() == null) {
            protocolCode = this.defaultProtocolCode;
        } else {
            protocolCode = att.get();
        }
        // 2. 從協(xié)議管理器中獲取 ProtocolCode 相應(yīng)的 Protocol 對象
        Protocol protocol = ProtocolManager.getProtocol(protocolCode);
        // 3. 再從協(xié)議對象中獲取相應(yīng)的 CommandEncoder 實(shí)現(xiàn)類實(shí)例
        protocol.getEncoder().encode(ctx, msg, out);
    }
}

======================= 編碼真實(shí)類(以簡單的 RpcProtocol 為例) =======================
// Encode remoting command into ByteBuf.
public class RpcCommandEncoder implements CommandEncoder {
    @Override
    public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
        if (msg instanceof RpcCommand) {
            /*
             * ver: version for protocol
             * type: request/response/request oneway
             * cmdcode: code for remoting command
             * ver2:version for remoting command
             * requestId: id of request
             * codec: code for codec
             * (req)timeout: request timeout.
             * (resp)respStatus: response status
             * classLen: length of request or response class name
             * headerLen: length of header
             * cotentLen: length of content
             * className
             * header
             * content
             */
            RpcCommand cmd = (RpcCommand) msg;
            out.writeByte(RpcProtocol.PROTOCOL_CODE); // protocol_code
            out.writeByte(cmd.getType()); // RpcCommandType
            out.writeShort(((RpcCommand) msg).getCmdCode().value()); // CommandCode
            out.writeByte(cmd.getVersion()); // commandVersion
            out.writeInt(cmd.getId()); // commandId
            out.writeByte(cmd.getSerializer()); // serializer
            if (cmd instanceof RequestCommand) {
                //timeout
                out.writeInt(((RequestCommand) cmd).getTimeout()); // timeout
            }
            if (cmd instanceof ResponseCommand) {
                //response status
                ResponseCommand response = (ResponseCommand) cmd;
                out.writeShort(response.getResponseStatus().getValue()); // responseStatus
            }
            out.writeShort(cmd.getClazzLength());
            out.writeShort(cmd.getHeaderLength());
            out.writeInt(cmd.getContentLength());
            if (cmd.getClazzLength() > 0) {
                out.writeBytes(cmd.getClazz());
            }
            if (cmd.getHeaderLength() > 0) {
                out.writeBytes(cmd.getHeader());
            }
            if (cmd.getContentLength() > 0) {
                out.writeBytes(cmd.getContent());
            }
        }
    }
}

注意事項(xiàng)

  • 由 acceptOutboundMessage 可知,在 SOFABolt 中數(shù)據(jù)要想經(jīng)過編碼器的處理,必須實(shí)現(xiàn) Serializable 接口。
  • 編碼器是無狀態(tài)的,可以標(biāo)注注解 @ChannelHandler.Sharable,見 ProtocolCodeBasedEncoder

二、解碼分析

本質(zhì):將 byte[] 按照私有協(xié)議轉(zhuǎn)化為中間數(shù)據(jù),再通過反序列化將請求信息轉(zhuǎn)化為業(yè)務(wù)數(shù)據(jù)

基本流程

  1. 創(chuàng)建或者從 Netty 的回收池中獲取一個(gè) RecyclableArrayList 實(shí)例,用于存儲(chǔ)最終的解碼數(shù)據(jù)
  2. 將傳入的 ByteBuf 添加到 Cumulator 累加器實(shí)例中
  3. 之后不斷的從 ByteBuf 中讀取數(shù)據(jù):首先解碼出 protocolCode,之后從協(xié)議管理器中獲取相應(yīng)的協(xié)議對象,再從協(xié)議對象中獲取相應(yīng)的 CommandDecoder 實(shí)現(xiàn)類實(shí)例 - Netty 的 ByteToMessageDecoder 具備 accumulate 批量解包能力,可以盡可能的從 socket 里讀取字節(jié),然后同步調(diào)用 decode 方法,解碼出業(yè)務(wù)對象,并組成一個(gè) List
  4. 使用 CommandDecoder 實(shí)現(xiàn)類實(shí)例按照上文所介紹的協(xié)議規(guī)則進(jìn)行解碼,將解碼好的數(shù)據(jù)放到 RecyclableArrayList 實(shí)例中,需要注意的是在解碼之前必須先記錄當(dāng)前 ByteBuf 的 readerIndex,如果發(fā)現(xiàn)數(shù)據(jù)不夠一個(gè)整包長度(發(fā)生了拆包粘包問題),則將當(dāng)前 ByteBuf 的 readerIndex 復(fù)原到解碼之前,然后直接返回,等待讀取更多的數(shù)據(jù)
  5. 為了防止發(fā)送端發(fā)送數(shù)據(jù)太快導(dǎo)致OOM,會(huì)清理 Cumulator 累加器實(shí)例或者其空間,將已經(jīng)讀取的字節(jié)刪除,向左壓縮 ByteBuf 空間
  6. 判斷 RecyclableArrayList 中的元素個(gè)數(shù),如果是1個(gè),則將這個(gè)元素單個(gè)發(fā)送給 pipeline 的下一個(gè) handler;如果元素大于1個(gè),則將整個(gè) RecyclableArrayList 以 List 形式發(fā)送給 pipeline 的下一個(gè) handler。- 這就是 SOFABolt 相較于 Netty 改進(jìn)的地方,提供了批量提交的功能(Netty 本身的做法是循環(huán)遍歷該 List ,依次提交到 ChannelPipeline 進(jìn)行處理。Bolt 是將提交的內(nèi)容從單個(gè) command ,改為整個(gè) List 一起提交,如此能減少 pipeline 的執(zhí)行次數(shù),同時(shí)提升吞吐量。這個(gè)模式在低并發(fā)場景,并沒有什么優(yōu)勢,而在高并發(fā)場景下對提升吞吐量有不小的性能提升),批量提交的數(shù)據(jù)會(huì)經(jīng)過如下的處理:
@Sharable
public class RpcCommandHandler implements CommandHandler {
    private void handle(RemotingContext ctx, Object msg) {
        // 批量提交來的數(shù)據(jù)
        if (msg instanceof List) {
            final Runnable handleTask = new Runnable() {
                public void run() {
                    // 循環(huán)處理
                    for (final Object m : (List<?>) msg) {
                        RpcCommandHandler.this.process(ctx, m);
                    }
                }
            };
            // 批量提交的數(shù)據(jù)是否在 processorManager#defaultExecutor 中執(zhí)行
            // -Dbolt.rpc.dispatch-msg-list-in-default-executor=true
            if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
                // If msg is list ,then the batch submission to biz threadpool can save io thread.
                processorManager.getDefaultExecutor().execute(handleTask);
            } else {
                handleTask.run();
            }
        } else {
            // 處理單條數(shù)據(jù)
            process(ctx, msg);
        }
    }
}
  1. 回收 RecyclableArrayList 實(shí)例

源碼分析

======================= 解碼模板 =======================
public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter {
    /**
     * 累加器常量
     */
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        // 將 in 累加到 cumulation
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            // 累加之后的 ByteBuf
            ByteBuf buffer;
            // 如果 cumulation 放不下 in 了,則進(jìn)行擴(kuò)容
            // Expand cumulation (by replace it) when either there is not more room in the buffer
            // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
            // duplicate().retain().
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) {
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            // 將 in 寫入 buffer
            buffer.writeBytes(in);
            // 釋放 in
            in.release();
            // 返回類加后的結(jié)果
            return buffer;
        }
    };

    // 累加 ByteBuf
    ByteBuf cumulation;
    // 當(dāng)前 decoder 實(shí)例的累加器
    private Cumulator cumulator = MERGE_CUMULATOR;
    // 每調(diào)用一次 channelRead,只解碼一個(gè)消息(默認(rèn)為false,即批量盡可能多的對消息進(jìn)行解碼)
    private boolean singleDecode;
    private boolean decodeWasNull;
    // 是否是第一次將 ByteBuf 添加到累加器
    private boolean first;
    // 連續(xù)調(diào)用 16 次 channelRead,而沒有調(diào)用 channelReadComplete 時(shí),進(jìn)行 Bytebuf 的壓縮,去掉已經(jīng)讀取的 byte,防止 OOM
    private int discardAfterReads = 16;
    private int numReads;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果是 ByteBuf,進(jìn)行解碼
        if (msg instanceof ByteBuf) {
            // 1. 創(chuàng)建或者從 netty 的回收池中獲取一個(gè) RecyclableArrayList 實(shí)例
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                // 2. 將傳入的 ByteBuf 添加到 Cumulator 累加器實(shí)例中
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    // 將當(dāng)前的 data 設(shè)置為 cumulation
                    cumulation = data;
                } else {
                    // 將 data 累加到 cumulation 中
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 3 ~ 4
                callDecode(ctx, cumulation, out);
            } finally {
                // 如果解碼后的 cumulation 沒有可讀字節(jié),直接釋放
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                    // If a remote peer writes fast enough it may take a long time to have fireChannelReadComplete(...) triggered.
                    // Because of this we need to take special care and ensure we try to discard some bytes if channelRead(...) is called to often in ByteToMessageDecoder.
                } else if (++numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

                // 查看解碼后的消息列表大小
                // 如果是一個(gè),則直接傳遞單條消息
                // 如果是多個(gè),則直接傳遞消息列表
                int size = out.size();
                if (size == 0) {
                    decodeWasNull = true;
                } else if (size == 1) {
                    ctx.fireChannelRead(out.get(0));
                } else {
                    ArrayList<Object> ret = new ArrayList<Object>(size);
                    for (int i = 0; i < size; i++) {
                        ret.add(out.get(i));
                    }
                    ctx.fireChannelRead(ret);
                }
                // 回收 RecyclableArrayList
                out.recycle();
            }
        } else {
            // 如果不是 ByteBuf,直接傳遞
            ctx.fireChannelRead(msg);
        }
    }

    // 每次讀取完成之后,進(jìn)行 ByteBuf 壓縮
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        numReads = 0;
        discardSomeReadBytes();
        ctx.fireChannelReadComplete();
    }

    protected final void discardSomeReadBytes() {
        if (cumulation != null && !first && cumulation.refCnt() == 1) {
            cumulation.discardSomeReadBytes();
        }
    }

    // 將 in 解碼到 out 中
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 循環(huán)不斷的讀取數(shù)據(jù),批量解包
        while (in.isReadable()) {
            // 獲取當(dāng)前的 out 中的消息個(gè)數(shù)
            int outSize = out.size();
            //
            int oldInputLength = in.readableBytes();
            decode(ctx, in, out);

            // 如果解碼后的 out 中的消息個(gè)數(shù)與原先的相等,即沒有消息可讀或者沒有完整的消息了,后者繼續(xù)循環(huán),前者直接退出
            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            // 如果一次只解碼一個(gè)消息
            if (isSingleDecode()) {
                break;
            }
        }
    }

    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        // 獲取舊的 cumulation
        ByteBuf oldCumulation = cumulation;
        // 分配一個(gè)正好可以容納老的 oldCumulation 已有字節(jié) + readable 字節(jié)個(gè)數(shù)的 ByteBuf 
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        // 將老的 oldCumulation 寫入新分配的 cumulation
        cumulation.writeBytes(oldCumulation);
        // 釋放老的 oldCumulation
        oldCumulation.release();
        // 返回新的 cumulation
        return cumulation;
    }

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out);
}

======================= 解碼代理類 =======================
public class ProtocolCodeBasedDecoder extends AbstractBatchDecoder {
    /** by default, suggest design a single byte for protocol version. */
    public static final int DEFAULT_PROTOCOL_VERSION_LENGTH         = 1;
    /** protocol version should be a positive number, we use -1 to represent illegal */
    public static final int DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH = -1;

    /** the length of protocol code,默認(rèn)為 1 */
    protected int           protocolCodeLength;

    protected ProtocolCode decodeProtocolCode(ByteBuf in) {
        // 從 in 的 readerIndex 開始讀取 protocolCodeBytes.length(默認(rèn)為 1)個(gè)字節(jié)
        // 即只解碼 protocolCode
        if (in.readableBytes() >= protocolCodeLength) {
            byte[] protocolCodeBytes = new byte[protocolCodeLength];
            in.readBytes(protocolCodeBytes);
            return ProtocolCode.fromBytes(protocolCodeBytes);
        }
        return null;
    }

    // RpcProtocolDecoder#decodeProtocolVersion
    protected byte decodeProtocolVersion(ByteBuf in) {
        // 恢復(fù)到 decode 中開始設(shè)置的讀指針
        in.resetReaderIndex();
        if (in.readableBytes() >= protocolCodeLength + 1) {
            byte rpcProtocolCodeByte = in.readByte();
            // 如果 ProtocolCode>=2,則繼續(xù)讀取 version,否則不讀?。粗蛔x取 RpcProtocolV2 及以上的 protocolVersion)
            if (rpcProtocolCodeByte >= 2) {
                return in.readByte();
            }
        }
        return -1;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 標(biāo)記當(dāng)前的讀指針
        in.markReaderIndex();
        // 解碼出 protocol_code
        ProtocolCode protocolCode = decodeProtocolCode(in); 
        if (null != protocolCode) {
            // 解碼出 protocol_version
            byte protocolVersion = decodeProtocolVersion(in); 
            ...
            // 根據(jù) protocolCode 獲取 Protocol
            Protocol protocol = ProtocolManager.getProtocol(protocolCode);
            // 恢復(fù)讀指針到解析 protocolCode 之前
            in.resetReaderIndex();
            // 根據(jù) 
            protocol.getDecoder().decode(ctx, in, out);
        }
    }
}

======================= 解碼真實(shí)類(以簡單的 RpcProtocol 為例) =======================
public class RpcCommandDecoder implements CommandDecoder {
    // 獲取 RpcProtocol 協(xié)議下的響應(yīng)頭(20)和請求頭(22)的長度,即為 20
    private int lessLen;
    {
        lessLen = RpcProtocol.getResponseHeaderLength() < RpcProtocol.getRequestHeaderLength() ? RpcProtocol
            .getResponseHeaderLength() : RpcProtocol.getRequestHeaderLength();
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // the less length between response header and request header
        if (in.readableBytes() >= lessLen) {
            // 標(biāo)記當(dāng)前讀指針的位置
            in.markReaderIndex();
            // 讀取 protocolCode
            byte protocol = in.readByte();
            // 恢復(fù)讀指針到讀取 protocolCode 之前
            in.resetReaderIndex();
            if (protocol == RpcProtocol.PROTOCOL_CODE) {
                /*
                 * code: protocolCode
                 * type: request/response/request oneway
                 * cmdcode: code for remoting command
                 * ver2:version for remoting command
                 * requestId: id of request
                 * codec: code for codec
                 * (req)timeout: request timeout
                 * (resp)respStatus: response status
                 * classLen: length of request or response class name
                 * headerLen: length of header
                 * contentLen: length of content
                 * className
                 * header
                 * content
                 */
                if (in.readableBytes() > 2) {
                    in.markReaderIndex();
                    // 讀取 protocol_code
                    in.readByte();
                    // 讀取 type
                    byte type = in.readByte();
                    // 解碼請求
                    if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
                        if (in.readableBytes() >= RpcProtocol.getRequestHeaderLength() - 2) {
                            // CommandCode :請求命令類型,request / response / heartbeat
                            short cmdCode = in.readShort();
                            // CommandVersion
                            byte ver2 = in.readByte();
                            // 請求ID
                            int requestId = in.readInt();
                            // 序列化器
                            byte serializer = in.readByte();
                            int timeout = in.readInt();
                            short classLen = in.readShort();
                            short headerLen = in.readShort();
                            int contentLen = in.readInt();
                            byte[] clazz = null;
                            byte[] header = null;
                            byte[] content = null;
                            if (in.readableBytes() >= classLen + headerLen + contentLen) {
                                if (classLen > 0) {
                                    clazz = new byte[classLen];
                                    in.readBytes(clazz);
                                }
                                if (headerLen > 0) {
                                    header = new byte[headerLen];
                                    in.readBytes(header);
                                }
                                // 解碼內(nèi)容,注意此時(shí)解碼出來的都是一個(gè) byte[],需要序列化才能轉(zhuǎn)化為真正對象
                                if (contentLen > 0) {
                                    content = new byte[contentLen];
                                    in.readBytes(content);
                                }
                            } else {
                                // 不夠一個(gè)完整包,返回,等待累加器類架構(gòu)足夠的內(nèi)容
                                in.resetReaderIndex();
                                return;
                            }
                            RequestCommand command;
                            if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
                                // 如果是心跳請求消息,直接創(chuàng)建一個(gè) HeartbeatCommand
                                command = new HeartbeatCommand();
                            } else {
                                // 如果是正常請求,創(chuàng)建一個(gè) RpcRequestCommand
                                command = createRequestCommand(cmdCode);
                            }
                            command.setType(type);
                            command.setVersion(ver2);
                            command.setId(requestId);
                            command.setSerializer(serializer);
                            command.setTimeout(timeout);
                            command.setClazz(clazz);
                            command.setHeader(header);
                            command.setContent(content);
                            out.add(command);

                        } else {
                            in.resetReaderIndex();
                        }
                    // 解碼響應(yīng)
                    } else if (type == RpcCommandType.RESPONSE) {
                        //decode response
                        if (in.readableBytes() >= RpcProtocol.getResponseHeaderLength() - 2) {
                            short cmdCode = in.readShort();
                            byte ver2 = in.readByte();
                            int requestId = in.readInt();
                            byte serializer = in.readByte();
                            short status = in.readShort();
                            short classLen = in.readShort();
                            short headerLen = in.readShort();
                            int contentLen = in.readInt();
                            byte[] clazz = null;
                            byte[] header = null;
                            byte[] content = null;
                            if (in.readableBytes() >= classLen + headerLen + contentLen) {
                                if (classLen > 0) {
                                    clazz = new byte[classLen];
                                    in.readBytes(clazz);
                                }
                                if (headerLen > 0) {
                                    header = new byte[headerLen];
                                    in.readBytes(header);
                                }
                                if (contentLen > 0) {
                                    content = new byte[contentLen];
                                    in.readBytes(content);
                                }
                            } else {// not enough data
                                in.resetReaderIndex();
                                return;
                            }
                            ResponseCommand command;
                            if (cmdCode == CommandCode.HEARTBEAT_VALUE) {

                                command = new HeartbeatAckCommand();
                            } else {
                                command = createResponseCommand(cmdCode);
                            }
                            command.setType(type);
                            command.setVersion(ver2);
                            command.setId(requestId);
                            command.setSerializer(serializer);
                            command.setResponseStatus(ResponseStatus.valueOf(status));
                            command.setClazz(clazz);
                            command.setHeader(header);
                            command.setContent(content);
                            command.setResponseTimeMillis(System.currentTimeMillis());
                            command.setResponseHost((InetSocketAddress) ctx.channel()
                                .remoteAddress());
                            out.add(command);
                        } else {
                            in.resetReaderIndex();
                        }
                    }
                }
            }
        }
    }

    private ResponseCommand createResponseCommand(short cmdCode) {
        ResponseCommand command = new RpcResponseCommand();
        command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
        return command;
    }

    private RpcRequestCommand createRequestCommand(short cmdCode) {
        RpcRequestCommand command = new RpcRequestCommand();
        command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
        command.setArriveTime(System.currentTimeMillis());
        return command;
    }

}

注意事項(xiàng)

  • 解碼器是有狀態(tài)的(含有 累加器),不可標(biāo)注注解 @ChannelHandler.Sharable
  • 解碼器解決的很重要的一個(gè)問題就是 tcp 的拆包粘包問題,最常見的解決方案是

基于變長消息協(xié)議:每一個(gè)消息分為消息頭和消息體兩部分,在編碼時(shí),將消息體的長度設(shè)置到消息頭部,在解碼的時(shí)候,首先解析出消息頭部的長度信息,之后拆分或合并出該長度的消息體。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容