注意:本文部分內(nèi)容摘抄自 SOFABolt 源碼解析系列文章(螞蟻金服將會(huì)在最近不斷推出 SOFA 系列的博文),大家可以關(guān)注微信公眾號:金融級分布式架構(gòu) 來得到通知。部分內(nèi)容參考自:螞蟻通信框架實(shí)踐

image.png
類組成
- 編解碼器工廠 -
工廠模式
- Codec 是編解碼器工廠類接口
- RpcCodec 是針對 Rpc 場景下的編解碼器工廠類
附:SOFABolt 高度提取了一些接口,是的我們可以自己去進(jìn)行擴(kuò)展,假設(shè)我們要做 mq 的編解碼,可以模仿實(shí)現(xiàn) RpcCodec 的實(shí)現(xiàn)方式- RpcCodec 用于創(chuàng)建 ProtocolCodeBasedEncoder 和 ProtocolCodeBasedDecoder 的子類 RpcProtocolDecoder,二者被設(shè)置為 netty 的編解碼器 handler
- 編解碼模板 -
模板模式
- MessageToByteEncoder 是 Netty 提供的一個(gè)編碼模板
- AbstractBatchDecoder 是 SOFABolt hack 了 Netty 的 ByteToMessageDecoder 來提供一個(gè)解碼模板(相較于 Netty 增加了
批量提交的功能)
- 編解碼代理類 -
代理模式和策略模式
- ProtocolCodeBasedEncoder 是 CommandEncoder 的代理類,通過不同的 protocol 協(xié)議,指定使用不同的編碼器
- ProtocolCodeBasedDecoder 是 CommandDecoder 的代理類,通過不同的 protocol 協(xié)議,指定使用不同的解碼器
- RpcProtocolDecoder 是 ProtocolCodeBasedDecoder 的子類,針對于 Rpc 場景定制了 decodeProtocolVersion 方法
- 真正的解碼器
- CommandEncoder 提供了編碼接口
- CommandDecoder 提供了解碼接口
- RpcCommandEncoder 提供了 RpcProtocol 協(xié)議數(shù)據(jù)的編碼器
- RpcCommandEncoderV2 提供了 RpcProtocolV2 協(xié)議數(shù)據(jù)的編碼器
- RpcCommandDecoder 提供了 RpcProtocol 協(xié)議數(shù)據(jù)的解碼器
- RpcCommandDecoderV2 提供了 RpcProtocolV2 協(xié)議數(shù)據(jù)的解碼器
一、編碼分析
本質(zhì):序列化會(huì)將業(yè)務(wù)數(shù)據(jù)轉(zhuǎn)化為 byte[],編碼按照私有協(xié)議將 byte[] 寫入到 ByteBuf 中
基本流程
- 判斷傳入的數(shù)據(jù)是否是 Serializable 類型(該類型由 MessageToByteEncoder 的泛型指定),如果不是,直接傳播給 pipeline 中的下一個(gè) handler;否則
- 創(chuàng)建一個(gè) ByteBuf 實(shí)例,用于存儲(chǔ)最終的編碼數(shù)據(jù)
- 從 channel 的附加屬性中獲取協(xié)議標(biāo)識 protocolCode,之后從協(xié)議管理器中獲取相應(yīng)的 Protocol 對象
- 從 Protocol 對象中獲取相應(yīng)的 CommandEncoder 實(shí)現(xiàn)類實(shí)例,使用 CommandEncoder 實(shí)現(xiàn)類實(shí)例按照 SOFABolt 源碼分析18 - Protocol 私有協(xié)議的設(shè)計(jì) 所介紹的協(xié)議規(guī)則將數(shù)據(jù)寫入到第二步創(chuàng)建好的 ByteBuf 實(shí)例中
- 如果原始數(shù)據(jù)是 ReferenceCounted 實(shí)現(xiàn)類,則釋放原始數(shù)據(jù)
- 如果 ByteBuf 中有數(shù)據(jù)了,則傳播給 pipeline 中的下一個(gè) handler;否則,釋放該 ByteBuf 對象,傳遞一個(gè)空的 ByteBuf 給下一個(gè) handler
源碼分析
======================= 編解碼工廠 =======================
public class RpcCodec implements Codec {
@Override
public ChannelHandler newEncoder() {
return new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE));
}
@Override
public ChannelHandler newDecoder() {
return new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH);
}
}
======================= 服務(wù)端 Netty 設(shè)置(客戶端類似) =======================
// 編解碼工廠
private Codec codec = new RpcCodec();
public class RpcServer {
@Override
protected void doInit() {
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
// 創(chuàng)建解碼器
pipeline.addLast("decoder", codec.newDecoder());
// 創(chuàng)建編碼器
pipeline.addLast("encoder", codec.newEncoder());
...
}
}
}
======================= 編碼模板 =======================
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判斷是否可編碼(SOFABolt 中判斷 msg instanceof Serializable)
if (acceptOutboundMessage(msg)) {
I cast = (I) msg;
// 分配 ByteBuffer,默認(rèn)為 DirectByteBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 調(diào)用子類進(jìn)行編碼
encode(ctx, cast, buf);
} finally {
// 編碼結(jié)束后,如果原始數(shù)據(jù)是 ReferenceCounted 實(shí)現(xiàn)類,則釋放原始數(shù)據(jù)
ReferenceCountUtil.release(cast);
}
// 如果 ByteBuf 中有數(shù)據(jù)了,則傳播給 pipeline 中的下一個(gè) handler
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 如果 ByteBuf 中沒有數(shù)據(jù),釋放該 ByteBuf 對象,傳遞一個(gè)空的 ByteBuf 給下一個(gè) handler
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果不可進(jìn)行編碼,直接傳播給 pipeline 中的下一個(gè) handler
ctx.write(msg, promise);
}
} finally {
if (buf != null) {
// 釋放開始分配的 ByteBuf
buf.release();
}
}
}
private final TypeParameterMatcher matcher = TypeParameterMatcher.get(outboundMessageType); // 只要泛型 I 不是 Object,就是 ReflectiveMatcher 實(shí)例;如果 I 是 Object,則全部可編碼
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
}
public abstract class TypeParameterMatcher {
private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() {
@Override
public boolean match(Object msg) {
return true;
}
};
// parameterType 是 MessageToByteEncoder 的泛型
public static TypeParameterMatcher get(final Class<?> parameterType) {
...
// 從緩存獲取,緩存有直接返回,沒有則 new
TypeParameterMatcher matcher = getCache.get(parameterType);
if (matcher == null) {
// 如果泛型是 Object,全部可編碼
if (parameterType == Object.class) {
matcher = NOOP;
} else {
// 如果泛型不是 Object,則使用 ReflectiveMatcher
matcher = new ReflectiveMatcher(parameterType);
}
// 設(shè)置到緩存
getCache.put(parameterType, matcher);
}
return matcher;
}
private static final class ReflectiveMatcher extends TypeParameterMatcher {
// 判斷傳遞進(jìn)來的將要編碼的數(shù)據(jù)是不是 MessageToByteEncoder 的泛型類型的數(shù)據(jù)
@Override
public boolean match(Object msg) {
return type.isInstance(msg);
}
}
}
======================= 編碼代理類 =======================
@ChannelHandler.Sharable
public class ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable> {
// default protocol code, 默認(rèn)是 RpcProtocolV2
protected ProtocolCode defaultProtocolCode;
public ProtocolCodeBasedEncoder(ProtocolCode defaultProtocolCode) {
super();
this.defaultProtocolCode = defaultProtocolCode;
}
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
// 1. 從 Channel 獲取 ProtocolCode 附屬屬性
Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
ProtocolCode protocolCode;
// 如果為 null,使用默認(rèn)協(xié)議 RpcProtocolV2;否則使用附屬屬性
if (att == null || att.get() == null) {
protocolCode = this.defaultProtocolCode;
} else {
protocolCode = att.get();
}
// 2. 從協(xié)議管理器中獲取 ProtocolCode 相應(yīng)的 Protocol 對象
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 3. 再從協(xié)議對象中獲取相應(yīng)的 CommandEncoder 實(shí)現(xiàn)類實(shí)例
protocol.getEncoder().encode(ctx, msg, out);
}
}
======================= 編碼真實(shí)類(以簡單的 RpcProtocol 為例) =======================
// Encode remoting command into ByteBuf.
public class RpcCommandEncoder implements CommandEncoder {
@Override
public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
if (msg instanceof RpcCommand) {
/*
* ver: version for protocol
* type: request/response/request oneway
* cmdcode: code for remoting command
* ver2:version for remoting command
* requestId: id of request
* codec: code for codec
* (req)timeout: request timeout.
* (resp)respStatus: response status
* classLen: length of request or response class name
* headerLen: length of header
* cotentLen: length of content
* className
* header
* content
*/
RpcCommand cmd = (RpcCommand) msg;
out.writeByte(RpcProtocol.PROTOCOL_CODE); // protocol_code
out.writeByte(cmd.getType()); // RpcCommandType
out.writeShort(((RpcCommand) msg).getCmdCode().value()); // CommandCode
out.writeByte(cmd.getVersion()); // commandVersion
out.writeInt(cmd.getId()); // commandId
out.writeByte(cmd.getSerializer()); // serializer
if (cmd instanceof RequestCommand) {
//timeout
out.writeInt(((RequestCommand) cmd).getTimeout()); // timeout
}
if (cmd instanceof ResponseCommand) {
//response status
ResponseCommand response = (ResponseCommand) cmd;
out.writeShort(response.getResponseStatus().getValue()); // responseStatus
}
out.writeShort(cmd.getClazzLength());
out.writeShort(cmd.getHeaderLength());
out.writeInt(cmd.getContentLength());
if (cmd.getClazzLength() > 0) {
out.writeBytes(cmd.getClazz());
}
if (cmd.getHeaderLength() > 0) {
out.writeBytes(cmd.getHeader());
}
if (cmd.getContentLength() > 0) {
out.writeBytes(cmd.getContent());
}
}
}
}
注意事項(xiàng)
- 由 acceptOutboundMessage 可知,在 SOFABolt 中數(shù)據(jù)要想經(jīng)過編碼器的處理,必須實(shí)現(xiàn) Serializable 接口。
- 編碼器是無狀態(tài)的,可以標(biāo)注注解 @ChannelHandler.Sharable,見 ProtocolCodeBasedEncoder
二、解碼分析
本質(zhì):將 byte[] 按照私有協(xié)議轉(zhuǎn)化為中間數(shù)據(jù),再通過反序列化將請求信息轉(zhuǎn)化為業(yè)務(wù)數(shù)據(jù)
基本流程
- 創(chuàng)建或者從 Netty 的回收池中獲取一個(gè) RecyclableArrayList 實(shí)例,用于存儲(chǔ)最終的解碼數(shù)據(jù)
- 將傳入的 ByteBuf 添加到 Cumulator 累加器實(shí)例中
- 之后
不斷的從 ByteBuf 中讀取數(shù)據(jù):首先解碼出 protocolCode,之后從協(xié)議管理器中獲取相應(yīng)的協(xié)議對象,再從協(xié)議對象中獲取相應(yīng)的 CommandDecoder 實(shí)現(xiàn)類實(shí)例 -Netty 的 ByteToMessageDecoder 具備 accumulate 批量解包能力,可以盡可能的從 socket 里讀取字節(jié),然后同步調(diào)用 decode 方法,解碼出業(yè)務(wù)對象,并組成一個(gè) List- 使用 CommandDecoder 實(shí)現(xiàn)類實(shí)例按照上文所介紹的協(xié)議規(guī)則進(jìn)行解碼,將解碼好的數(shù)據(jù)放到 RecyclableArrayList 實(shí)例中,需要注意的是在解碼之前必須先記錄當(dāng)前 ByteBuf 的 readerIndex,如果發(fā)現(xiàn)數(shù)據(jù)不夠一個(gè)整包長度(發(fā)生了拆包粘包問題),則將當(dāng)前 ByteBuf 的 readerIndex 復(fù)原到解碼之前,然后直接返回,等待讀取更多的數(shù)據(jù)
- 為了防止發(fā)送端發(fā)送數(shù)據(jù)太快導(dǎo)致OOM,會(huì)清理 Cumulator 累加器實(shí)例或者其空間,將已經(jīng)讀取的字節(jié)刪除,向左壓縮 ByteBuf 空間
- 判斷 RecyclableArrayList 中的元素個(gè)數(shù),如果是1個(gè),則將這個(gè)元素單個(gè)發(fā)送給 pipeline 的下一個(gè) handler;如果元素大于1個(gè),則將整個(gè) RecyclableArrayList 以 List 形式發(fā)送給 pipeline 的下一個(gè) handler。- 這就是 SOFABolt 相較于 Netty 改進(jìn)的地方,提供了
批量提交的功能(Netty 本身的做法是循環(huán)遍歷該 List ,依次提交到 ChannelPipeline 進(jìn)行處理。Bolt 是將提交的內(nèi)容從單個(gè) command ,改為整個(gè) List 一起提交,如此能減少 pipeline 的執(zhí)行次數(shù),同時(shí)提升吞吐量。這個(gè)模式在低并發(fā)場景,并沒有什么優(yōu)勢,而在高并發(fā)場景下對提升吞吐量有不小的性能提升),批量提交的數(shù)據(jù)會(huì)經(jīng)過如下的處理:
@Sharable
public class RpcCommandHandler implements CommandHandler {
private void handle(RemotingContext ctx, Object msg) {
// 批量提交來的數(shù)據(jù)
if (msg instanceof List) {
final Runnable handleTask = new Runnable() {
public void run() {
// 循環(huán)處理
for (final Object m : (List<?>) msg) {
RpcCommandHandler.this.process(ctx, m);
}
}
};
// 批量提交的數(shù)據(jù)是否在 processorManager#defaultExecutor 中執(zhí)行
// -Dbolt.rpc.dispatch-msg-list-in-default-executor=true
if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
// If msg is list ,then the batch submission to biz threadpool can save io thread.
processorManager.getDefaultExecutor().execute(handleTask);
} else {
handleTask.run();
}
} else {
// 處理單條數(shù)據(jù)
process(ctx, msg);
}
}
}
- 回收 RecyclableArrayList 實(shí)例
源碼分析
======================= 解碼模板 =======================
public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter {
/**
* 累加器常量
*/
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
// 將 in 累加到 cumulation
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
// 累加之后的 ByteBuf
ByteBuf buffer;
// 如果 cumulation 放不下 in 了,則進(jìn)行擴(kuò)容
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 將 in 寫入 buffer
buffer.writeBytes(in);
// 釋放 in
in.release();
// 返回類加后的結(jié)果
return buffer;
}
};
// 累加 ByteBuf
ByteBuf cumulation;
// 當(dāng)前 decoder 實(shí)例的累加器
private Cumulator cumulator = MERGE_CUMULATOR;
// 每調(diào)用一次 channelRead,只解碼一個(gè)消息(默認(rèn)為false,即批量盡可能多的對消息進(jìn)行解碼)
private boolean singleDecode;
private boolean decodeWasNull;
// 是否是第一次將 ByteBuf 添加到累加器
private boolean first;
// 連續(xù)調(diào)用 16 次 channelRead,而沒有調(diào)用 channelReadComplete 時(shí),進(jìn)行 Bytebuf 的壓縮,去掉已經(jīng)讀取的 byte,防止 OOM
private int discardAfterReads = 16;
private int numReads;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果是 ByteBuf,進(jìn)行解碼
if (msg instanceof ByteBuf) {
// 1. 創(chuàng)建或者從 netty 的回收池中獲取一個(gè) RecyclableArrayList 實(shí)例
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
// 2. 將傳入的 ByteBuf 添加到 Cumulator 累加器實(shí)例中
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 將當(dāng)前的 data 設(shè)置為 cumulation
cumulation = data;
} else {
// 將 data 累加到 cumulation 中
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 3 ~ 4
callDecode(ctx, cumulation, out);
} finally {
// 如果解碼后的 cumulation 沒有可讀字節(jié),直接釋放
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
// If a remote peer writes fast enough it may take a long time to have fireChannelReadComplete(...) triggered.
// Because of this we need to take special care and ensure we try to discard some bytes if channelRead(...) is called to often in ByteToMessageDecoder.
} else if (++numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
// 查看解碼后的消息列表大小
// 如果是一個(gè),則直接傳遞單條消息
// 如果是多個(gè),則直接傳遞消息列表
int size = out.size();
if (size == 0) {
decodeWasNull = true;
} else if (size == 1) {
ctx.fireChannelRead(out.get(0));
} else {
ArrayList<Object> ret = new ArrayList<Object>(size);
for (int i = 0; i < size; i++) {
ret.add(out.get(i));
}
ctx.fireChannelRead(ret);
}
// 回收 RecyclableArrayList
out.recycle();
}
} else {
// 如果不是 ByteBuf,直接傳遞
ctx.fireChannelRead(msg);
}
}
// 每次讀取完成之后,進(jìn)行 ByteBuf 壓縮
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
ctx.fireChannelReadComplete();
}
protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.refCnt() == 1) {
cumulation.discardSomeReadBytes();
}
}
// 將 in 解碼到 out 中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 循環(huán)不斷的讀取數(shù)據(jù),批量解包
while (in.isReadable()) {
// 獲取當(dāng)前的 out 中的消息個(gè)數(shù)
int outSize = out.size();
//
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// 如果解碼后的 out 中的消息個(gè)數(shù)與原先的相等,即沒有消息可讀或者沒有完整的消息了,后者繼續(xù)循環(huán),前者直接退出
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
// 如果一次只解碼一個(gè)消息
if (isSingleDecode()) {
break;
}
}
}
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
// 獲取舊的 cumulation
ByteBuf oldCumulation = cumulation;
// 分配一個(gè)正好可以容納老的 oldCumulation 已有字節(jié) + readable 字節(jié)個(gè)數(shù)的 ByteBuf
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
// 將老的 oldCumulation 寫入新分配的 cumulation
cumulation.writeBytes(oldCumulation);
// 釋放老的 oldCumulation
oldCumulation.release();
// 返回新的 cumulation
return cumulation;
}
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out);
}
======================= 解碼代理類 =======================
public class ProtocolCodeBasedDecoder extends AbstractBatchDecoder {
/** by default, suggest design a single byte for protocol version. */
public static final int DEFAULT_PROTOCOL_VERSION_LENGTH = 1;
/** protocol version should be a positive number, we use -1 to represent illegal */
public static final int DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH = -1;
/** the length of protocol code,默認(rèn)為 1 */
protected int protocolCodeLength;
protected ProtocolCode decodeProtocolCode(ByteBuf in) {
// 從 in 的 readerIndex 開始讀取 protocolCodeBytes.length(默認(rèn)為 1)個(gè)字節(jié)
// 即只解碼 protocolCode
if (in.readableBytes() >= protocolCodeLength) {
byte[] protocolCodeBytes = new byte[protocolCodeLength];
in.readBytes(protocolCodeBytes);
return ProtocolCode.fromBytes(protocolCodeBytes);
}
return null;
}
// RpcProtocolDecoder#decodeProtocolVersion
protected byte decodeProtocolVersion(ByteBuf in) {
// 恢復(fù)到 decode 中開始設(shè)置的讀指針
in.resetReaderIndex();
if (in.readableBytes() >= protocolCodeLength + 1) {
byte rpcProtocolCodeByte = in.readByte();
// 如果 ProtocolCode>=2,則繼續(xù)讀取 version,否則不讀?。粗蛔x取 RpcProtocolV2 及以上的 protocolVersion)
if (rpcProtocolCodeByte >= 2) {
return in.readByte();
}
}
return -1;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 標(biāo)記當(dāng)前的讀指針
in.markReaderIndex();
// 解碼出 protocol_code
ProtocolCode protocolCode = decodeProtocolCode(in);
if (null != protocolCode) {
// 解碼出 protocol_version
byte protocolVersion = decodeProtocolVersion(in);
...
// 根據(jù) protocolCode 獲取 Protocol
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 恢復(fù)讀指針到解析 protocolCode 之前
in.resetReaderIndex();
// 根據(jù)
protocol.getDecoder().decode(ctx, in, out);
}
}
}
======================= 解碼真實(shí)類(以簡單的 RpcProtocol 為例) =======================
public class RpcCommandDecoder implements CommandDecoder {
// 獲取 RpcProtocol 協(xié)議下的響應(yīng)頭(20)和請求頭(22)的長度,即為 20
private int lessLen;
{
lessLen = RpcProtocol.getResponseHeaderLength() < RpcProtocol.getRequestHeaderLength() ? RpcProtocol
.getResponseHeaderLength() : RpcProtocol.getRequestHeaderLength();
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// the less length between response header and request header
if (in.readableBytes() >= lessLen) {
// 標(biāo)記當(dāng)前讀指針的位置
in.markReaderIndex();
// 讀取 protocolCode
byte protocol = in.readByte();
// 恢復(fù)讀指針到讀取 protocolCode 之前
in.resetReaderIndex();
if (protocol == RpcProtocol.PROTOCOL_CODE) {
/*
* code: protocolCode
* type: request/response/request oneway
* cmdcode: code for remoting command
* ver2:version for remoting command
* requestId: id of request
* codec: code for codec
* (req)timeout: request timeout
* (resp)respStatus: response status
* classLen: length of request or response class name
* headerLen: length of header
* contentLen: length of content
* className
* header
* content
*/
if (in.readableBytes() > 2) {
in.markReaderIndex();
// 讀取 protocol_code
in.readByte();
// 讀取 type
byte type = in.readByte();
// 解碼請求
if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
if (in.readableBytes() >= RpcProtocol.getRequestHeaderLength() - 2) {
// CommandCode :請求命令類型,request / response / heartbeat
short cmdCode = in.readShort();
// CommandVersion
byte ver2 = in.readByte();
// 請求ID
int requestId = in.readInt();
// 序列化器
byte serializer = in.readByte();
int timeout = in.readInt();
short classLen = in.readShort();
short headerLen = in.readShort();
int contentLen = in.readInt();
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
if (in.readableBytes() >= classLen + headerLen + contentLen) {
if (classLen > 0) {
clazz = new byte[classLen];
in.readBytes(clazz);
}
if (headerLen > 0) {
header = new byte[headerLen];
in.readBytes(header);
}
// 解碼內(nèi)容,注意此時(shí)解碼出來的都是一個(gè) byte[],需要序列化才能轉(zhuǎn)化為真正對象
if (contentLen > 0) {
content = new byte[contentLen];
in.readBytes(content);
}
} else {
// 不夠一個(gè)完整包,返回,等待累加器類架構(gòu)足夠的內(nèi)容
in.resetReaderIndex();
return;
}
RequestCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
// 如果是心跳請求消息,直接創(chuàng)建一個(gè) HeartbeatCommand
command = new HeartbeatCommand();
} else {
// 如果是正常請求,創(chuàng)建一個(gè) RpcRequestCommand
command = createRequestCommand(cmdCode);
}
command.setType(type);
command.setVersion(ver2);
command.setId(requestId);
command.setSerializer(serializer);
command.setTimeout(timeout);
command.setClazz(clazz);
command.setHeader(header);
command.setContent(content);
out.add(command);
} else {
in.resetReaderIndex();
}
// 解碼響應(yīng)
} else if (type == RpcCommandType.RESPONSE) {
//decode response
if (in.readableBytes() >= RpcProtocol.getResponseHeaderLength() - 2) {
short cmdCode = in.readShort();
byte ver2 = in.readByte();
int requestId = in.readInt();
byte serializer = in.readByte();
short status = in.readShort();
short classLen = in.readShort();
short headerLen = in.readShort();
int contentLen = in.readInt();
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
if (in.readableBytes() >= classLen + headerLen + contentLen) {
if (classLen > 0) {
clazz = new byte[classLen];
in.readBytes(clazz);
}
if (headerLen > 0) {
header = new byte[headerLen];
in.readBytes(header);
}
if (contentLen > 0) {
content = new byte[contentLen];
in.readBytes(content);
}
} else {// not enough data
in.resetReaderIndex();
return;
}
ResponseCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
command = new HeartbeatAckCommand();
} else {
command = createResponseCommand(cmdCode);
}
command.setType(type);
command.setVersion(ver2);
command.setId(requestId);
command.setSerializer(serializer);
command.setResponseStatus(ResponseStatus.valueOf(status));
command.setClazz(clazz);
command.setHeader(header);
command.setContent(content);
command.setResponseTimeMillis(System.currentTimeMillis());
command.setResponseHost((InetSocketAddress) ctx.channel()
.remoteAddress());
out.add(command);
} else {
in.resetReaderIndex();
}
}
}
}
}
}
private ResponseCommand createResponseCommand(short cmdCode) {
ResponseCommand command = new RpcResponseCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
return command;
}
private RpcRequestCommand createRequestCommand(short cmdCode) {
RpcRequestCommand command = new RpcRequestCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
command.setArriveTime(System.currentTimeMillis());
return command;
}
}
注意事項(xiàng)
- 解碼器是有狀態(tài)的(含有 累加器),不可標(biāo)注注解 @ChannelHandler.Sharable
- 解碼器解決的很重要的一個(gè)問題就是 tcp 的拆包粘包問題,最常見的解決方案是
基于變長消息協(xié)議:每一個(gè)消息分為消息頭和消息體兩部分,在編碼時(shí),將消息體的長度設(shè)置到消息頭部,在解碼的時(shí)候,首先解析出消息頭部的長度信息,之后拆分或合并出該長度的消息體。