Netty 權(quán)威指南筆記(三):TCP 粘包和拆包
什么是 TCP 粘包和拆包?
TCP 是一個(gè)“流”協(xié)議,所謂“流”就是沒(méi)有界限的一串?dāng)?shù)據(jù)。大家可以想像河流里的水,期間并沒(méi)有分界線。TCP 底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù) TCP 緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分。所以,在業(yè)務(wù)上,一個(gè)完整的包可能會(huì)被 TCP 拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包,封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的 TCP 粘包和拆包問(wèn)題。
為什么會(huì)發(fā)生粘包和拆包?
主要是應(yīng)用程序?qū)懭霐?shù)據(jù)大小、緩沖區(qū)大小、TCP/IP 最大報(bào)文大小、以太網(wǎng)幀 payload 大小不一致造成。
- 應(yīng)用程序一次寫(xiě)入的字節(jié)數(shù)大于發(fā)送緩沖區(qū)大小。
- 進(jìn)行 TSS 大小的 TCP 分段。
- 以太網(wǎng)幀的 payload 大于 MTU 進(jìn)行 IP 分片。
- 應(yīng)用程序多次寫(xiě)入少量數(shù)據(jù),導(dǎo)致粘包。
解決策略
由于底層的 TCP 無(wú)法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無(wú)法保證數(shù)據(jù)包不被拆分和重組的,這個(gè)問(wèn)題只能通過(guò)上層的應(yīng)用協(xié)議棧設(shè)計(jì)來(lái)解決。根據(jù)業(yè)界的主流協(xié)議的解決方案,歸納如下:
- 消息定長(zhǎng)。
- 包尾增加分隔符,比如 FTP 協(xié)議使用回車(chē)換行符進(jìn)行分割。
- 將消息分為消息頭和消息體,消息頭中包含表示消息總長(zhǎng)度的字段,這正是 TCP/UDP/IP 報(bào)文采用的方案。
- 更復(fù)雜的應(yīng)用層協(xié)議設(shè)計(jì)。
在 Netty 中,有處理定長(zhǎng)消息的 FixedLengthFrameDecoder、回車(chē)換行符分隔的 LineBasedFrameDecoder、特殊分隔符的 DelimiterBasedFrameDecoder,以及處理特殊協(xié)議的 Decoder,比如處理 HTTP 協(xié)議的 HttpRequestDecoder、HttpResponseDecoder 等。
下面,我們以定長(zhǎng)消息解碼器 FixedLengthFrameDecoder 為例,分析源碼,學(xué)習(xí)一下其工作原理。
FixedLengthFrameDecoder 源碼分析
從下面的類圖中可以看出來(lái),這些 Decoder 都繼承自 ByteToMessageDecoder、ChannelHandlerAdapter,實(shí)現(xiàn)了 ChannelHandler 接口?;仡欀笆褂?Netty 開(kāi)發(fā)的 TimeServer 程序中,TimeServerHandler 也是繼承自 ChannelHandlerAdapter。

ChannelHandler 接口中,與讀取數(shù)據(jù)相關(guān)的主要是 channelRead 方法。
public interface ChannelHandler {
// 讀取數(shù)據(jù)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
// 讀取數(shù)據(jù)完成
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
}
下面我們看一下 ByteToMessageDecoder 中實(shí)現(xiàn)的 channelRead 方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 只處理 ByteBuf 類型數(shù)據(jù)
if (msg instanceof ByteBuf) {
// RecyclableArrayList 是一個(gè)可循環(huán)使用的 ArrayList,使用它是為了減少 GC
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// cumulation 是上次處理數(shù)據(jù)后遺留的半包數(shù)據(jù)
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 上次遺留數(shù)據(jù)和本次數(shù)據(jù)進(jìn)行合并
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 對(duì)數(shù)據(jù)進(jìn)行解碼,解碼成功的數(shù)據(jù)存入 out,半包數(shù)據(jù)賦值給 cumulation
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();
// 如果 out.size 大于 0,表示有解碼成功的數(shù)據(jù),發(fā)送到下一個(gè) ChannelAdapter 進(jìn)行處理
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes();
// 解析數(shù)據(jù),并存入 out 中
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
// 如果成功解碼出數(shù)據(jù),但是 ByteBuf 中數(shù)據(jù)長(zhǎng)度不變,可能會(huì)導(dǎo)致死循環(huán)
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
- 首先判斷輸入數(shù)據(jù)是否是 ByteBuf 類型,是則處理,否則傳遞下去。
- 然后初始化一個(gè) RecyclableArrayList 用來(lái)保存解析成功的數(shù)據(jù)片。
- 將上次解析遺留的數(shù)據(jù) cumulation 和本次到來(lái)的數(shù)據(jù)進(jìn)行合并。
- 調(diào)用 decode 方法循環(huán)解析合并后的數(shù)據(jù),存入列表 out。
- 如果列表 out 中有解析成功的數(shù)據(jù),則調(diào)用 fireChannelRead 方法發(fā)送給下一個(gè) ChannelHandler 處理。
decode 方法是一個(gè)抽象方法,由子類 FixedLengthFrameDecoder 負(fù)責(zé)實(shí)現(xiàn),具體源碼如下所示。其原理是,如果輸入數(shù)據(jù) ByteBuf 長(zhǎng)度超過(guò) frameLength,則截取前 frameLength 字節(jié)數(shù)據(jù)為一個(gè)新的 ByteBuf 數(shù)據(jù)分片,存入列表 out 中。
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
// 讀取固定長(zhǎng)度 frameLength 的數(shù)據(jù)分片,存入列表 out 中
out.add(decoded);
}
}
protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 如果小于長(zhǎng)度 frameLength,則返回 null
if (in.readableBytes() < frameLength) {
return null;
} else {
// 否則,讀取長(zhǎng)度為 frameLength 的數(shù)據(jù),為一個(gè) ByteBuf 數(shù)據(jù)分片
return in.readSlice(frameLength).retain();
}
}
}