一、為什么會有粘包/拆包問題
1.1 根本原因:TCP 是面向流的協(xié)議
TCP 是一個面向字節(jié)流(byte-stream)的協(xié)議,不是面向消息的協(xié)議。它沒有消息邊界的概念,只保證:
- 數(shù)據(jù)有序到達(dá)
- 數(shù)據(jù)不丟失
- 數(shù)據(jù)不重復(fù)
但完全不關(guān)心你的應(yīng)用層數(shù)據(jù)是怎么劃分消息的。
應(yīng)用層視角: [Message A] [Message B] [Message C] ← 離散的、有邊界的消息
| | |
v v v
TCP 層視角: [byte][byte][byte][byte][byte][byte] ← 連續(xù)的、無邊界的字節(jié)流
1.2 具體觸發(fā)因素
| 因素 | 導(dǎo)致的現(xiàn)象 | 說明 |
|---|---|---|
| 發(fā)送緩沖區(qū) | 粘包 | 多次快速 send() 的數(shù)據(jù)被 OS 合并到同一個 TCP 段 |
| Nagle 算法 | 粘包 | 默認(rèn)開啟,會將小包攢成大包再發(fā)送 |
| MSS/MTU 限制 | 拆包 | 數(shù)據(jù)超過 MSS(通常 1460 字節(jié))會被拆分 |
| 接收緩沖區(qū) | 粘包 | 應(yīng)用讀取慢,多個段在接收緩沖區(qū)累積 |
| 網(wǎng)絡(luò)延遲波動 | 拆包 | 數(shù)據(jù)被路由器分片、重傳導(dǎo)致到達(dá)順序與發(fā)送不完全對應(yīng) |
1.3 一個具體的例子
發(fā)送方連續(xù)調(diào)用兩次 send():
send("Hello") // 調(diào)用1: 5 bytes
send("World") // 調(diào)用2: 5 bytes
接收方調(diào)用 recv() 時可能遇到以下任意情況:
場景1 - 正常: recv() → "Hello" recv() → "World"
場景2 - 完全粘包: recv() → "HelloWorld"
場景3 - 拆包: recv() → "Hel" recv() → "loWorld"
場景4 - 混合: recv() → "HelloWor" recv() → "ld"
場景5 - 任意碎片: recv() → "He" recv() → "lloWo" recv() → "rld"
關(guān)鍵結(jié)論:send() 的調(diào)用次數(shù)和 recv() 的返回次數(shù)之間沒有任何對應(yīng)關(guān)系。TCP 保證的是字節(jié)順序,不是消息邊界。
1.4 UDP 為什么沒有這個問題?
UDP 是面向消息(message-oriented)的協(xié)議。每次 sendto() 都產(chǎn)生一個獨立的數(shù)據(jù)報,一次 sendto() 嚴(yán)格對應(yīng)一次 recvfrom(),消息邊界由協(xié)議本身保證。代價是不保證可靠、有序,所以不存在粘包問題。
舉個例子:有三個數(shù)據(jù)包,大小分別為2k、4k、6k,
- 如果采用UDP發(fā)送的話,不管接受方的接收緩存有多大,我們必須要進行至少三次以上的發(fā)送才能把數(shù)據(jù)包發(fā)送完;
- 但是使用TCP協(xié)議發(fā)送的話,我們只需要接受方的接收緩存有12k的大小,就可以一次把這3個數(shù)據(jù)包全部發(fā)送完畢。
二、對日常協(xié)議設(shè)計的影響
既然 TCP 不提供消息邊界,應(yīng)用層協(xié)議必須自己定義消息的邊界。這就是所謂的"消息成幀(framing)"。常見的成幀策略:
策略 A:定長消息
每條消息固定 N 字節(jié),不足的用填充符補齊。
[ 10 bytes ][ 10 bytes ][ 10 bytes ]
- 優(yōu)點:實現(xiàn)最簡單
- 缺點:浪費帶寬,不靈活
應(yīng)用場景:固定大小的二進制結(jié)構(gòu)體通信
策略 B:分隔符
用特殊字符/字節(jié)序列標(biāo)記消息結(jié)束,如 \r\n、\0。
Hello\r\nWorld\r\n
- 優(yōu)點:人類可讀,適合文本協(xié)議
- 缺點:需要轉(zhuǎn)義,不適合二進制數(shù)據(jù)
應(yīng)用場景:HTTP 頭部、Redis RESP 協(xié)議、SMTP、FTP
策略 C:長度前綴(Length Field)—— 最推薦
每條消息前面加一個固定長度的字段表示消息體長度。
+--------+------------------+
| Length | Content |
| 4字節(jié) | N字節(jié) |
+--------+------------------+
- 優(yōu)點:高效、無需轉(zhuǎn)義、原生支持二進制
- 缺點:需要緩沖管理,需要校驗惡意長度值
應(yīng)用場景:WebSocket、gRPC/HTTP2、MQTT、TLS Record、Thrift 以及絕大多數(shù)現(xiàn)代二進制協(xié)議
策略 D:TLV(Type-Length-Value)
+------+--------+---------+
| Type | Length | Value |
| 1字節(jié) | 4字節(jié) | N字節(jié) |
+------+--------+---------+
- 優(yōu)點:可擴展、支持可選字段、前/后向兼容
- 缺點:每個字段都有額外開銷
應(yīng)用場景:ASN.1、DHCP Options、Protobuf 線格式、IPv6 擴展頭
設(shè)計建議
| 關(guān)注點 | 建議 |
|---|---|
| 最大消息大小 | 必須設(shè)上限,防止內(nèi)存耗盡攻擊(OOM) |
| 字節(jié)序 | 明確定義,網(wǎng)絡(luò)字節(jié)序(大端)是標(biāo)準(zhǔn) |
| 部分讀取 | TCP 可能只傳來半條消息 → 必須緩沖累積 |
| 魔數(shù)/版本號 | 協(xié)議頭加入魔數(shù)用于快速識別,版本號便于升級 |
三、Netty 如何解決——源碼分析
Netty 提供了一套完整的解碼器框架來解決粘包/拆包問題。核心架構(gòu)分兩層:
ByteToMessageDecoder ← 字節(jié)累積器(框架層,解決"不夠一幀就攢著"的問題)
├── FixedLengthFrameDecoder ← 定長解碼
├── LineBasedFrameDecoder ← 換行符解碼
├── DelimiterBasedFrameDecoder ← 自定義分隔符解碼
├── LengthFieldBasedFrameDecoder ← 長度字段解碼(最常用)
└── 自定義 Decoder 數(shù)據(jù)被路由器分片、重傳導(dǎo)致到達(dá)順序與發(fā)送不完全對應(yīng)← 你自己寫的解碼邏輯
3.1 框架層:ByteToMessageDecoder —— 字節(jié)累積器
源文件:
codec-base/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
這是所有解碼器的基類,核心職責(zé):把每次收到的零散 ByteBuf 累積起來,然后循環(huán)調(diào)用子類的 decode() 方法,直到不夠組成一幀為止。
關(guān)鍵字段
// ByteToMessageDecoder.java:78-192
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 累積策略1:內(nèi)存拷貝合并(默認(rèn))
// 將新數(shù)據(jù) copy 到累積緩沖區(qū)的尾部
public static final Cumulator MERGE_CUMULATOR = ...;
// 累積策略2:零拷貝合并
// 使用 CompositeByteBuf 將多個 ByteBuf 邏輯上拼接,無需物理拷貝
public static final Cumulator COMPOSITE_CUMULATOR = ...;
// 狀態(tài)機:INIT → CALLING_CHILD_DECODE → HANDLER_REMOVED_PENDING
private static final byte STATE_INIT = 0;
private static final byte STATE_CALLING_CHILD_DECODE = 1;
private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
private Queue<Object> inputMessages; // 重入調(diào)用時的消息隊列
ByteBuf cumulation; // ★ 核心字段:累積緩沖區(qū)
private Cumulator cumulator = MERGE_CUMULATOR;
private boolean singleDecode; // 是否每次只解碼一條消息(用于協(xié)議升級)
private boolean first;
private byte decodeState = STATE_INIT;
private int discardAfterReads = 16; // 讀 16 次后嘗試丟棄已讀字節(jié)
private int numReads;
}
MERGE_CUMULATOR 累積策略(默認(rèn))
// ByteToMessageDecoder.java:83-116
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (cumulation == in) {
in.release();
return cumulation;
}
if (!cumulation.isReadable() && in.isContiguous()) {
// 累積區(qū)為空且輸入是連續(xù)的 → 直接復(fù)用輸入緩沖區(qū),零分配
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
cumulation.isReadOnly()) {
// 容量不夠或緩沖區(qū)被共享 → 擴容(新分配一個更大的緩沖區(qū))
return expandCumulation(alloc, cumulation, in);
}
// 容量足夠 → 直接將 in 寫入 cumulation 尾部
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
in.release(); // 寫完后釋放輸入緩沖區(qū)
}
}
};
核心方法:channelRead() — 入口
// ByteToMessageDecoder.java:286-341
@Override
public void channelRead(ChannelHandlerContext ctx, Object input) throws Exception {
if (decodeState == STATE_INIT) {
do {
if (input instanceof ByteBuf) {
selfFiredChannelRead = true;
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
// ★ 關(guān)鍵步驟1:將新到達(dá)的數(shù)據(jù)累積到 cumulation 緩沖區(qū)
cumulation = cumulator.cumulate(ctx.alloc(),
first ? EMPTY_BUFFER : cumulation, (ByteBuf) input);
// ★ 關(guān)鍵步驟2:嘗試從累積數(shù)據(jù)中循環(huán)解碼出完整消息
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
// 累積區(qū)已全部讀完 → 釋放內(nèi)存
numReads = 0;
try {
cumulation.release();
} catch (IllegalReferenceCountException e) {
throw new IllegalReferenceCountException(
getClass().getSimpleName() +
"#decode() might have released its input buffer, " +
"or passed it down the pipeline without a retain() call, " +
"which is not allowed.", e);
}
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// 讀太多次還沒消耗完 → 丟棄已讀部分,防止 OOM
// 參見 https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
// ★ 關(guān)鍵步驟3:將解碼出的消息傳遞給 Pipeline 下游
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
// 不是 ByteBuf,直接透傳
ctx.fireChannelRead(input);
}
} while (inputMessages != null && (input = inputMessages.poll()) != null);
} else {
// 重入調(diào)用(decode() 中觸發(fā)了 fireChannelRead,又回到了 channelRead)
// 消息暫存到隊列,由原始調(diào)用處理
if (inputMessages == null) {
inputMessages = new ArrayDeque<>(2);
}
inputMessages.offer(input);
}
}
核心方法:callDecode() — 循環(huán)解碼引擎
// ByteToMessageDecoder.java:464-517
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) { // ★ 只要還有數(shù)據(jù)可讀就循環(huán)
final int outSize = out.size();
if (outSize > 0) {
// 上一輪解碼出了消息 → 先傳遞下去
fireChannelRead(ctx, out, outSize);
out.clear();
// 安全檢查:decode() 可能移除了自身 handler
if (ctx.isRemoved()) {
break;
}
}
int oldInputLength = in.readableBytes();
// ★ 調(diào)用子類的 decode() 方法(帶重入保護)
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
if (out.isEmpty()) {
if (oldInputLength == in.readableBytes()) {
// 子類既沒讀數(shù)據(jù)也沒產(chǎn)出消息 → 數(shù)據(jù)不夠組成一幀,退出循環(huán)等下次
break;
} else {
// 子類讀了數(shù)據(jù)但沒產(chǎn)出消息(比如跳過了一些非法字節(jié))→ 繼續(xù)循環(huán)
continue;
}
}
if (oldInputLength == in.readableBytes()) {
// 子類產(chǎn)出了消息但沒讀任何數(shù)據(jù) → 實現(xiàn)有 bug,拋異常
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
// 單次解碼模式(用于協(xié)議升級等場景),只解碼一條就退出
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
callDecode 的精髓:這個循環(huán)邏輯完美解決了粘包/拆包問題:
-
拆包(數(shù)據(jù)不夠):子類
decode()返回 null,不添加到 out,out.isEmpty()為 true 且沒讀數(shù)據(jù) →break,數(shù)據(jù)留在cumulation里等下次 -
粘包(數(shù)據(jù)多了):子類
decode()成功提取一幀,out不為空 → 回到while(in.isReadable())繼續(xù)嘗試提取下一幀
3.2 FixedLengthFrameDecoder —— 定長解碼
源文件:
codec-base/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java
// FixedLengthFrameDecoder.java:41-79
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes. For example, if you received the following four fragmented packets:
*
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
*
* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
* following three packets with the fixed length:
*
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
// ★ 核心邏輯:極簡實現(xiàn)
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null; // 不夠一幀,等下次
} else {
return in.readRetainedSlice(frameLength); // 夠了,切出 frameLength 字節(jié)
}
}
}
精髓:只做一件事——判斷 readableBytes() < frameLength。不夠就返回 null,夠了就切。所有累積、循環(huán)、內(nèi)存管理的復(fù)雜度都由父類 ByteToMessageDecoder 處理了。
3.3 LengthFieldBasedFrameDecoder —— 長度字段解碼(最常用)
源文件:
codec-base/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java
這是工業(yè)界使用最廣泛的解碼器,支持高度靈活的協(xié)議格式配置。
構(gòu)造參數(shù)
// LengthFieldBasedFrameDecoder.java:269-329
public LengthFieldBasedFrameDecoder(
ByteOrder byteOrder, // 字節(jié)序(默認(rèn)大端)
int maxFrameLength, // ★ 最大幀長度,防止惡意大幀導(dǎo)致 OOM
int lengthFieldOffset, // ★ 長度字段在幀中的偏移量(前面可能有其他頭部)
int lengthFieldLength, // ★ 長度字段本身的字節(jié)數(shù)(1/2/3/4/8)
int lengthAdjustment, // 長度補償值(長度字段值不包含自身頭部時需要補償)
int initialBytesToStrip, // 解碼后跳過前面多少字節(jié)(通常用來跳過長度字段頭)
boolean failFast // 超過最大長度時是否立即拋異常(true=立即,false=讀完再拋)
)
關(guān)鍵字段
// LengthFieldBasedFrameDecoder.java:189-200
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteOrder byteOrder;
private final int maxFrameLength; // 最大允許幀長度
private final int lengthFieldOffset; // 長度字段起始偏移
private final int lengthFieldLength; // 長度字段字節(jié)數(shù)
private final int lengthFieldEndOffset; // = lengthFieldOffset + lengthFieldLength
private final int lengthAdjustment; // 長度補償值
private final int initialBytesToStrip; // 解碼后跳過的字節(jié)數(shù)
private final boolean failFast;
private boolean discardingTooLongFrame; // 是否正在丟棄超長幀
private long tooLongFrameLength;
private long bytesToDiscard;
private int frameLengthInt = -1; // ★ 緩存當(dāng)前幀長度,-1表示正在解析新幀
}
核心方法:decode() — 幀解析
// LengthFieldBasedFrameDecoder.java:331-444
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
// ★ 核心解析邏輯
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
long frameLength = 0;
if (frameLengthInt == -1) { // -1 表示需要解析新的幀
// 如果上一幀超長,先丟棄
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
// ★ 步驟1:檢查是否有足夠字節(jié)讀取整個 length field
if (in.readableBytes() < lengthFieldEndOffset) {
return null; // 連長度字段都不完整,等下次
}
// ★ 步驟2:讀取長度字段的值
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset,
lengthFieldLength, byteOrder);
// 步驟3:校驗長度字段不為負(fù)
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
// ★ 步驟4:計算完整幀長度
// 完整幀 = lengthFieldEndOffset(長度字段及其之前的部分) + frameLength(長度字段指示的大小) + lengthAdjustment(補償)
frameLength += lengthAdjustment + lengthFieldEndOffset;
// 步驟5:校驗幀長度合理性
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
// ★ 步驟6:安全校驗——防止惡意超大幀
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
// 緩存幀長度,下次進入不用重新解析 length field
frameLengthInt = (int) frameLength;
}
// ★ 步驟7:檢查是否有足夠字節(jié)組成完整幀
if (in.readableBytes() < frameLengthInt) {
return null; // 數(shù)據(jù)不夠完整幀,等下次
}
// 步驟8:校驗 initialBytesToStrip 不能超過幀長度
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
// ★ 步驟9:跳過不需要的字節(jié)(通常是長度字段頭)
in.skipBytes(initialBytesToStrip);
// ★ 步驟10:提取完整幀
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
frameLengthInt = -1; // 重置,準(zhǔn)備解析下一幀
return frame;
}
超長幀丟棄邏輯
// LengthFieldBasedFrameDecoder.java:364-378
private void exceededFrameLength(ByteBuf in, long frameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// 緩沖區(qū)數(shù)據(jù)比 frameLength 還多 → 直接跳過 frameLength 字節(jié)
in.skipBytes((int) frameLength);
} else {
// 緩沖區(qū)不夠 → 進入丟棄模式,后續(xù)數(shù)據(jù)繼續(xù)丟棄直到夠數(shù)
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
}
官方注釋中的配置示例
示例1:基礎(chǔ) — 2字節(jié)長度字段,不剝離頭部
lengthFieldOffset = 0, lengthFieldLength = 2, lengthAdjustment = 0, initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
示例2:剝離頭部 — 只拿內(nèi)容
lengthFieldOffset = 0, lengthFieldLength = 2, lengthAdjustment = 0, initialBytesToStrip = 2
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
示例3:長度字段包含自身 — 需要負(fù)補償
lengthFieldOffset = 0, lengthFieldLength = 2, lengthAdjustment = -2, initialBytesToStrip = 0
// 長度字段值 0x000E(14) 包含了自身2字節(jié),所以補償 -2
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
示例4:復(fù)雜頭部 — HDR1 + Length + HDR2 + Content
lengthFieldOffset = 1, lengthFieldLength = 2, lengthAdjustment = 1, initialBytesToStrip = 3
// offset=1 跳過 HDR1, adjustment=1 補償 HDR2, strip=3 剝離 HDR1+LEN
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
3.4 LineBasedFrameDecoder —— 換行符解碼
源文件:
codec-base/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java
以 \n 或 \r\n 為分隔符,是 DelimiterBasedFrameDecoder 的優(yōu)化特例。
// LineBasedFrameDecoder.java:42-187
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
private final int maxLength; // 最大允許幀長度
private final boolean failFast; // 超長時是否立即失敗
private final boolean stripDelimiter; // 是否剝離分隔符
private boolean discarding; // 是否正在丟棄超長幀
private int discardedBytes; // 已丟棄的字節(jié)數(shù)
private int offset; // 上次掃描位置(增量掃描優(yōu)化)
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer); // ★ 查找換行符位置
if (!discarding) {
if (eol >= 0) {
// ★ 找到換行符 → 提取一幀
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; // \r\n=2字節(jié), \n=1字節(jié)
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length); // 讀內(nèi)容(不含分隔符)
buffer.skipBytes(delimLength); // 跳過分隔符
} else {
frame = buffer.readRetainedSlice(length + delimLength); // 內(nèi)容+分隔符一起
}
return frame;
} else {
// 沒找到換行符 → 檢查是否已超長
final int length = buffer.readableBytes();
if (length > maxLength) {
// 超長了還沒遇到換行符 → 進入丟棄模式
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
offset = 0;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null; // 不夠一幀,等下次
}
} else {
// 丟棄模式:繼續(xù)丟棄直到找到換行符
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
// 還沒找到 → 繼續(xù)丟棄
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
offset = 0;
}
return null;
}
}
// ★ 增量掃描優(yōu)化:從上次停止的位置繼續(xù)找 \n
private int findEndOfLine(final ByteBuf buffer) {
int totalLength = buffer.readableBytes();
int i = buffer.indexOf(buffer.readerIndex() + offset,
buffer.readerIndex() + totalLength, (byte) '\n');
if (i >= 0) {
offset = 0;
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--; // \r\n → 指向 \r 的位置
}
} else {
offset = totalLength; // 記住已掃描的位置,下次從這里繼續(xù)
}
return i;
}
}
關(guān)鍵設(shè)計點:
-
增量掃描:
offset字段記錄上次掃描位置,下次只掃描新增數(shù)據(jù),避免重復(fù)掃描已掃描過的字節(jié) - 丟棄模式:超長幀不會導(dǎo)致 OOM,而是進入丟棄模式,后續(xù)數(shù)據(jù)直接跳過直到找到下一個分隔符
四、整體工作流程圖
TCP 字節(jié)流到達(dá)
│
▼
┌─────────────────────┐
│ ByteToMessageDecoder│
│ channelRead() │
│ │
│ 1. cumulate: │
│ 新數(shù)據(jù)追加到 │
│ cumulation 緩沖區(qū)│
│ │
│ 2. callDecode: │
│ while(可讀) { │
│ 子類.decode() │
│ 返回null → break(等下次)│
│ 返回frame → 繼續(xù)(粘包處理)│
│ } │
│ │
│ 3. 內(nèi)存管理: │
│ 全部讀完 → release│
│ 讀16次還沒完 → discardSomeReadBytes│
└─────────────────────┘
│
┌──────────┼──────────┐──────────┐
▼ ▼ ▼ ▼
FixedLength LineBased Delimiter LengthField
定長判斷 找\n 找自定義 讀length字段
┌──┘ 分隔符 等夠完整幀
│ ┌──┘ ┌──┘
▼ ▼ ▼
所有策略統(tǒng)一返回 null(等數(shù)據(jù))或 ByteBuf(一幀完整消息)
五、總結(jié)
┌──────────────────────────────────────────────────────┐
│ 粘包/拆包的完整圖景 │
├──────────────────────────────────────────────────────┤
│ │
│ 根因:TCP 是字節(jié)流協(xié)議,沒有消息邊界 │
│ │
│ 對協(xié)議設(shè)計的影響: │
│ → 應(yīng)用層必須自己定義消息邊界(消息成幀) │
│ → 策略:定長 / 分隔符 / 長度前綴 / TLV │
│ │
│ Netty 的解決方案:兩層架構(gòu) │
│ → 框架層 ByteToMessageDecoder: │
│ 字節(jié)累積 + 循環(huán)解碼 + 內(nèi)存管理 │
│ → 策略層子類實現(xiàn)不同成幀方式: │
│ FixedLengthFrameDecoder 定長 │
│ LineBasedFrameDecoder 換行符 │
│ DelimiterBasedFrameDecoder 自定義分隔符 │
│ LengthFieldBasedFrameDecoder 長度字段(最常用) │
│ │
│ 設(shè)計模式:模板方法模式(Template Method) │
│ 父類定義骨架流程,子類實現(xiàn)具體的幀判定邏輯 │
│ │
└──────────────────────────────────────────────────────┘
六、延伸思考
- 為什么 Netty 的解碼器框架要設(shè)計成兩層(框架層 + 策略層)? 這是什么設(shè)計模式?(提示:模板方法模式)
- 如果協(xié)議是變長頭部(如 HTTP),應(yīng)該怎么處理? (提示:HttpObjectDecoder 的實現(xiàn)方式)
- ReplayingDecoder 和 ByteToMessageDecoder 有什么區(qū)別?各適合什么場景?
- LengthFieldBasedFrameDecoder 的
frameLengthInt字段為什么要緩存?不緩存會怎樣? - 如果攻擊者發(fā)送一個超大 length 值但從不發(fā)送后續(xù)數(shù)據(jù),Netty 會怎樣?
七、相關(guān)源碼文件索引
| 文件 | 說明 |
|---|---|
codec-base/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java |
字節(jié)累積解碼器基類 |
codec-base/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java |
定長幀解碼器 |
codec-base/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java |
換行符幀解碼器 |
codec-base/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java |
分隔符幀解碼器 |
codec-base/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java |
長度字段幀解碼器 |