2026-04-16 TCP粘包/拆包問題與 Netty 解決方案

一、為什么會有粘包/拆包問題

1.1 根本原因:TCP 是面向流的協(xié)議

TCP 是一個面向字節(jié)流(byte-stream)的協(xié)議,不是面向消息的協(xié)議。它沒有消息邊界的概念,只保證:

  • 數(shù)據(jù)有序到達(dá)
  • 數(shù)據(jù)不丟失
  • 數(shù)據(jù)不重復(fù)

完全不關(guān)心你的應(yīng)用層數(shù)據(jù)是怎么劃分消息的。

應(yīng)用層視角:   [Message A] [Message B] [Message C]    ← 離散的、有邊界的消息
                  |            |            |
                  v            v            v
TCP 層視角:   [byte][byte][byte][byte][byte][byte]   ← 連續(xù)的、無邊界的字節(jié)流

1.2 具體觸發(fā)因素

因素 導(dǎo)致的現(xiàn)象 說明
發(fā)送緩沖區(qū) 粘包 多次快速 send() 的數(shù)據(jù)被 OS 合并到同一個 TCP 段
Nagle 算法 粘包 默認(rèn)開啟,會將小包攢成大包再發(fā)送
MSS/MTU 限制 拆包 數(shù)據(jù)超過 MSS(通常 1460 字節(jié))會被拆分
接收緩沖區(qū) 粘包 應(yīng)用讀取慢,多個段在接收緩沖區(qū)累積
網(wǎng)絡(luò)延遲波動 拆包 數(shù)據(jù)被路由器分片、重傳導(dǎo)致到達(dá)順序與發(fā)送不完全對應(yīng)

1.3 一個具體的例子

發(fā)送方連續(xù)調(diào)用兩次 send()

send("Hello")    // 調(diào)用1: 5 bytes
send("World")    // 調(diào)用2: 5 bytes

接收方調(diào)用 recv()可能遇到以下任意情況:

場景1 - 正常:     recv() → "Hello"   recv() → "World"
場景2 - 完全粘包:  recv() → "HelloWorld"
場景3 - 拆包:     recv() → "Hel"     recv() → "loWorld"
場景4 - 混合:     recv() → "HelloWor"  recv() → "ld"
場景5 - 任意碎片:  recv() → "He"  recv() → "lloWo"  recv() → "rld"

關(guān)鍵結(jié)論send() 的調(diào)用次數(shù)和 recv() 的返回次數(shù)之間沒有任何對應(yīng)關(guān)系。TCP 保證的是字節(jié)順序,不是消息邊界。

1.4 UDP 為什么沒有這個問題?

UDP 是面向消息(message-oriented)的協(xié)議。每次 sendto() 都產(chǎn)生一個獨立的數(shù)據(jù)報,一次 sendto() 嚴(yán)格對應(yīng)一次 recvfrom(),消息邊界由協(xié)議本身保證。代價是不保證可靠、有序,所以不存在粘包問題。

舉個例子:有三個數(shù)據(jù)包,大小分別為2k、4k、6k,

  • 如果采用UDP發(fā)送的話,不管接受方的接收緩存有多大,我們必須要進行至少三次以上的發(fā)送才能把數(shù)據(jù)包發(fā)送完;
  • 但是使用TCP協(xié)議發(fā)送的話,我們只需要接受方的接收緩存有12k的大小,就可以一次把這3個數(shù)據(jù)包全部發(fā)送完畢。

二、對日常協(xié)議設(shè)計的影響

既然 TCP 不提供消息邊界,應(yīng)用層協(xié)議必須自己定義消息的邊界。這就是所謂的"消息成幀(framing)"。常見的成幀策略:

策略 A:定長消息

每條消息固定 N 字節(jié),不足的用填充符補齊。

[   10 bytes   ][   10 bytes   ][   10 bytes   ]
  • 優(yōu)點:實現(xiàn)最簡單
  • 缺點:浪費帶寬,不靈活

應(yīng)用場景:固定大小的二進制結(jié)構(gòu)體通信

策略 B:分隔符

用特殊字符/字節(jié)序列標(biāo)記消息結(jié)束,如 \r\n、\0

Hello\r\nWorld\r\n
  • 優(yōu)點:人類可讀,適合文本協(xié)議
  • 缺點:需要轉(zhuǎn)義,不適合二進制數(shù)據(jù)

應(yīng)用場景:HTTP 頭部、Redis RESP 協(xié)議、SMTP、FTP

策略 C:長度前綴(Length Field)—— 最推薦

每條消息前面加一個固定長度的字段表示消息體長度。

+--------+------------------+
| Length |     Content      |
| 4字節(jié)  |     N字節(jié)        |
+--------+------------------+
  • 優(yōu)點:高效、無需轉(zhuǎn)義、原生支持二進制
  • 缺點:需要緩沖管理,需要校驗惡意長度值

應(yīng)用場景:WebSocket、gRPC/HTTP2、MQTT、TLS Record、Thrift 以及絕大多數(shù)現(xiàn)代二進制協(xié)議

策略 D:TLV(Type-Length-Value)

+------+--------+---------+
| Type | Length |  Value   |
| 1字節(jié) | 4字節(jié)  |  N字節(jié)   |
+------+--------+---------+
  • 優(yōu)點:可擴展、支持可選字段、前/后向兼容
  • 缺點:每個字段都有額外開銷

應(yīng)用場景:ASN.1、DHCP Options、Protobuf 線格式、IPv6 擴展頭

設(shè)計建議

關(guān)注點 建議
最大消息大小 必須設(shè)上限,防止內(nèi)存耗盡攻擊(OOM)
字節(jié)序 明確定義,網(wǎng)絡(luò)字節(jié)序(大端)是標(biāo)準(zhǔn)
部分讀取 TCP 可能只傳來半條消息 → 必須緩沖累積
魔數(shù)/版本號 協(xié)議頭加入魔數(shù)用于快速識別,版本號便于升級

三、Netty 如何解決——源碼分析

Netty 提供了一套完整的解碼器框架來解決粘包/拆包問題。核心架構(gòu)分兩層:

ByteToMessageDecoder    ← 字節(jié)累積器(框架層,解決"不夠一幀就攢著"的問題)
    ├── FixedLengthFrameDecoder        ← 定長解碼
    ├── LineBasedFrameDecoder          ← 換行符解碼
    ├── DelimiterBasedFrameDecoder     ← 自定義分隔符解碼
    ├── LengthFieldBasedFrameDecoder   ← 長度字段解碼(最常用)
    └── 自定義 Decoder                  數(shù)據(jù)被路由器分片、重傳導(dǎo)致到達(dá)順序與發(fā)送不完全對應(yīng)← 你自己寫的解碼邏輯

3.1 框架層:ByteToMessageDecoder —— 字節(jié)累積器

源文件:codec-base/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java

這是所有解碼器的基類,核心職責(zé):把每次收到的零散 ByteBuf 累積起來,然后循環(huán)調(diào)用子類的 decode() 方法,直到不夠組成一幀為止。

關(guān)鍵字段

// ByteToMessageDecoder.java:78-192

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

    // 累積策略1:內(nèi)存拷貝合并(默認(rèn))
    // 將新數(shù)據(jù) copy 到累積緩沖區(qū)的尾部
    public static final Cumulator MERGE_CUMULATOR = ...;

    // 累積策略2:零拷貝合并
    // 使用 CompositeByteBuf 將多個 ByteBuf 邏輯上拼接,無需物理拷貝
    public static final Cumulator COMPOSITE_CUMULATOR = ...;

    // 狀態(tài)機:INIT → CALLING_CHILD_DECODE → HANDLER_REMOVED_PENDING
    private static final byte STATE_INIT = 0;
    private static final byte STATE_CALLING_CHILD_DECODE = 1;
    private static final byte STATE_HANDLER_REMOVED_PENDING = 2;

    private Queue<Object> inputMessages;     // 重入調(diào)用時的消息隊列
    ByteBuf cumulation;                      // ★ 核心字段:累積緩沖區(qū)
    private Cumulator cumulator = MERGE_CUMULATOR;
    private boolean singleDecode;            // 是否每次只解碼一條消息(用于協(xié)議升級)
    private boolean first;
    private byte decodeState = STATE_INIT;
    private int discardAfterReads = 16;      // 讀 16 次后嘗試丟棄已讀字節(jié)
    private int numReads;
}

MERGE_CUMULATOR 累積策略(默認(rèn))

// ByteToMessageDecoder.java:83-116

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        if (cumulation == in) {
            in.release();
            return cumulation;
        }
        if (!cumulation.isReadable() && in.isContiguous()) {
            // 累積區(qū)為空且輸入是連續(xù)的 → 直接復(fù)用輸入緩沖區(qū),零分配
            cumulation.release();
            return in;
        }
        try {
            final int required = in.readableBytes();
            if (required > cumulation.maxWritableBytes() ||
                required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
                cumulation.isReadOnly()) {
                // 容量不夠或緩沖區(qū)被共享 → 擴容(新分配一個更大的緩沖區(qū))
                return expandCumulation(alloc, cumulation, in);
            }
            // 容量足夠 → 直接將 in 寫入 cumulation 尾部
            cumulation.writeBytes(in, in.readerIndex(), required);
            in.readerIndex(in.writerIndex());
            return cumulation;
        } finally {
            in.release();  // 寫完后釋放輸入緩沖區(qū)
        }
    }
};

核心方法:channelRead() — 入口

// ByteToMessageDecoder.java:286-341

@Override
public void channelRead(ChannelHandlerContext ctx, Object input) throws Exception {
    if (decodeState == STATE_INIT) {
        do {
            if (input instanceof ByteBuf) {
                selfFiredChannelRead = true;
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    first = cumulation == null;
                    // ★ 關(guān)鍵步驟1:將新到達(dá)的數(shù)據(jù)累積到 cumulation 緩沖區(qū)
                    cumulation = cumulator.cumulate(ctx.alloc(),
                            first ? EMPTY_BUFFER : cumulation, (ByteBuf) input);
                    // ★ 關(guān)鍵步驟2:嘗試從累積數(shù)據(jù)中循環(huán)解碼出完整消息
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Exception e) {
                    throw new DecoderException(e);
                } finally {
                    try {
                        if (cumulation != null && !cumulation.isReadable()) {
                            // 累積區(qū)已全部讀完 → 釋放內(nèi)存
                            numReads = 0;
                            try {
                                cumulation.release();
                            } catch (IllegalReferenceCountException e) {
                                throw new IllegalReferenceCountException(
                                        getClass().getSimpleName() +
                                                "#decode() might have released its input buffer, " +
                                                "or passed it down the pipeline without a retain() call, " +
                                                "which is not allowed.", e);
                            }
                            cumulation = null;
                        } else if (++numReads >= discardAfterReads) {
                            // 讀太多次還沒消耗完 → 丟棄已讀部分,防止 OOM
                            // 參見 https://github.com/netty/netty/issues/4275
                            numReads = 0;
                            discardSomeReadBytes();
                        }

                        int size = out.size();
                        firedChannelRead |= out.insertSinceRecycled();
                        // ★ 關(guān)鍵步驟3:將解碼出的消息傳遞給 Pipeline 下游
                        fireChannelRead(ctx, out, size);
                    } finally {
                        out.recycle();
                    }
                }
            } else {
                // 不是 ByteBuf,直接透傳
                ctx.fireChannelRead(input);
            }
        } while (inputMessages != null && (input = inputMessages.poll()) != null);
    } else {
        // 重入調(diào)用(decode() 中觸發(fā)了 fireChannelRead,又回到了 channelRead)
        // 消息暫存到隊列,由原始調(diào)用處理
        if (inputMessages == null) {
            inputMessages = new ArrayDeque<>(2);
        }
        inputMessages.offer(input);
    }
}

核心方法:callDecode() — 循環(huán)解碼引擎

// ByteToMessageDecoder.java:464-517

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {                           // ★ 只要還有數(shù)據(jù)可讀就循環(huán)
            final int outSize = out.size();

            if (outSize > 0) {
                // 上一輪解碼出了消息 → 先傳遞下去
                fireChannelRead(ctx, out, outSize);
                out.clear();

                // 安全檢查:decode() 可能移除了自身 handler
                if (ctx.isRemoved()) {
                    break;
                }
            }

            int oldInputLength = in.readableBytes();
            // ★ 調(diào)用子類的 decode() 方法(帶重入保護)
            decodeRemovalReentryProtection(ctx, in, out);

            if (ctx.isRemoved()) {
                break;
            }

            if (out.isEmpty()) {
                if (oldInputLength == in.readableBytes()) {
                    // 子類既沒讀數(shù)據(jù)也沒產(chǎn)出消息 → 數(shù)據(jù)不夠組成一幀,退出循環(huán)等下次
                    break;
                } else {
                    // 子類讀了數(shù)據(jù)但沒產(chǎn)出消息(比如跳過了一些非法字節(jié))→ 繼續(xù)循環(huán)
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                // 子類產(chǎn)出了消息但沒讀任何數(shù)據(jù) → 實現(xiàn)有 bug,拋異常
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                // 單次解碼模式(用于協(xié)議升級等場景),只解碼一條就退出
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}

callDecode 的精髓:這個循環(huán)邏輯完美解決了粘包/拆包問題:

  • 拆包(數(shù)據(jù)不夠):子類 decode() 返回 null,不添加到 out,out.isEmpty() 為 true 且沒讀數(shù)據(jù) → break,數(shù)據(jù)留在 cumulation 里等下次
  • 粘包(數(shù)據(jù)多了):子類 decode() 成功提取一幀,out 不為空 → 回到 while(in.isReadable()) 繼續(xù)嘗試提取下一幀

3.2 FixedLengthFrameDecoder —— 定長解碼

源文件:codec-base/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java

// FixedLengthFrameDecoder.java:41-79

/**
 * A decoder that splits the received {@link ByteBuf}s by the fixed number
 * of bytes. For example, if you received the following four fragmented packets:
 *
 *   +---+----+------+----+
 *   | A | BC | DEFG | HI |
 *   +---+----+------+----+
 *
 * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
 * following three packets with the fixed length:
 *
 *   +-----+-----+-----+
 *   | ABC | DEF | GHI |
 *   +-----+-----+-----+
 */
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        checkPositive(frameLength, "frameLength");
        this.frameLength = frameLength;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    // ★ 核心邏輯:極簡實現(xiàn)
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < frameLength) {
            return null;                          // 不夠一幀,等下次
        } else {
            return in.readRetainedSlice(frameLength);  // 夠了,切出 frameLength 字節(jié)
        }
    }
}

精髓:只做一件事——判斷 readableBytes() < frameLength。不夠就返回 null,夠了就切。所有累積、循環(huán)、內(nèi)存管理的復(fù)雜度都由父類 ByteToMessageDecoder 處理了。


3.3 LengthFieldBasedFrameDecoder —— 長度字段解碼(最常用)

源文件:codec-base/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java

這是工業(yè)界使用最廣泛的解碼器,支持高度靈活的協(xié)議格式配置。

構(gòu)造參數(shù)

// LengthFieldBasedFrameDecoder.java:269-329

public LengthFieldBasedFrameDecoder(
    ByteOrder byteOrder,       // 字節(jié)序(默認(rèn)大端)
    int maxFrameLength,        // ★ 最大幀長度,防止惡意大幀導(dǎo)致 OOM
    int lengthFieldOffset,     // ★ 長度字段在幀中的偏移量(前面可能有其他頭部)
    int lengthFieldLength,     // ★ 長度字段本身的字節(jié)數(shù)(1/2/3/4/8)
    int lengthAdjustment,      // 長度補償值(長度字段值不包含自身頭部時需要補償)
    int initialBytesToStrip,   // 解碼后跳過前面多少字節(jié)(通常用來跳過長度字段頭)
    boolean failFast           // 超過最大長度時是否立即拋異常(true=立即,false=讀完再拋)
)

關(guān)鍵字段

// LengthFieldBasedFrameDecoder.java:189-200

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {

    private final ByteOrder byteOrder;
    private final int maxFrameLength;            // 最大允許幀長度
    private final int lengthFieldOffset;         // 長度字段起始偏移
    private final int lengthFieldLength;         // 長度字段字節(jié)數(shù)
    private final int lengthFieldEndOffset;      // = lengthFieldOffset + lengthFieldLength
    private final int lengthAdjustment;          // 長度補償值
    private final int initialBytesToStrip;       // 解碼后跳過的字節(jié)數(shù)
    private final boolean failFast;
    private boolean discardingTooLongFrame;      // 是否正在丟棄超長幀
    private long tooLongFrameLength;
    private long bytesToDiscard;
    private int frameLengthInt = -1;             // ★ 緩存當(dāng)前幀長度,-1表示正在解析新幀
}

核心方法:decode() — 幀解析

// LengthFieldBasedFrameDecoder.java:331-444

@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

// ★ 核心解析邏輯
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    long frameLength = 0;
    if (frameLengthInt == -1) {   // -1 表示需要解析新的幀
        // 如果上一幀超長,先丟棄
        if (discardingTooLongFrame) {
            discardingTooLongFrame(in);
        }

        // ★ 步驟1:檢查是否有足夠字節(jié)讀取整個 length field
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;   // 連長度字段都不完整,等下次
        }

        // ★ 步驟2:讀取長度字段的值
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset,
                                                lengthFieldLength, byteOrder);

        // 步驟3:校驗長度字段不為負(fù)
        if (frameLength < 0) {
            failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
        }

        // ★ 步驟4:計算完整幀長度
        // 完整幀 = lengthFieldEndOffset(長度字段及其之前的部分) + frameLength(長度字段指示的大小) + lengthAdjustment(補償)
        frameLength += lengthAdjustment + lengthFieldEndOffset;

        // 步驟5:校驗幀長度合理性
        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }

        // ★ 步驟6:安全校驗——防止惡意超大幀
        if (frameLength > maxFrameLength) {
            exceededFrameLength(in, frameLength);
            return null;
        }
        // 緩存幀長度,下次進入不用重新解析 length field
        frameLengthInt = (int) frameLength;
    }

    // ★ 步驟7:檢查是否有足夠字節(jié)組成完整幀
    if (in.readableBytes() < frameLengthInt) {
        return null;   // 數(shù)據(jù)不夠完整幀,等下次
    }

    // 步驟8:校驗 initialBytesToStrip 不能超過幀長度
    if (initialBytesToStrip > frameLengthInt) {
        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
    }

    // ★ 步驟9:跳過不需要的字節(jié)(通常是長度字段頭)
    in.skipBytes(initialBytesToStrip);

    // ★ 步驟10:提取完整幀
    int readerIndex = in.readerIndex();
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    in.readerIndex(readerIndex + actualFrameLength);
    frameLengthInt = -1;   // 重置,準(zhǔn)備解析下一幀
    return frame;
}

超長幀丟棄邏輯

// LengthFieldBasedFrameDecoder.java:364-378

private void exceededFrameLength(ByteBuf in, long frameLength) {
    long discard = frameLength - in.readableBytes();
    tooLongFrameLength = frameLength;

    if (discard < 0) {
        // 緩沖區(qū)數(shù)據(jù)比 frameLength 還多 → 直接跳過 frameLength 字節(jié)
        in.skipBytes((int) frameLength);
    } else {
        // 緩沖區(qū)不夠 → 進入丟棄模式,后續(xù)數(shù)據(jù)繼續(xù)丟棄直到夠數(shù)
        discardingTooLongFrame = true;
        bytesToDiscard = discard;
        in.skipBytes(in.readableBytes());
    }
    failIfNecessary(true);
}

官方注釋中的配置示例

示例1:基礎(chǔ) — 2字節(jié)長度字段,不剝離頭部

lengthFieldOffset = 0, lengthFieldLength = 2, lengthAdjustment = 0, initialBytesToStrip = 0

BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+

示例2:剝離頭部 — 只拿內(nèi)容

lengthFieldOffset = 0, lengthFieldLength = 2, lengthAdjustment = 0, initialBytesToStrip = 2

BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
+--------+----------------+      +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
+--------+----------------+      +----------------+

示例3:長度字段包含自身 — 需要負(fù)補償

lengthFieldOffset = 0, lengthFieldLength = 2, lengthAdjustment = -2, initialBytesToStrip = 0
// 長度字段值 0x000E(14) 包含了自身2字節(jié),所以補償 -2

BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+

示例4:復(fù)雜頭部 — HDR1 + Length + HDR2 + Content

lengthFieldOffset = 1, lengthFieldLength = 2, lengthAdjustment = 1, initialBytesToStrip = 3
// offset=1 跳過 HDR1, adjustment=1 補償 HDR2, strip=3 剝離 HDR1+LEN

BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
+------+--------+------+----------------+      +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+      +------+----------------+

3.4 LineBasedFrameDecoder —— 換行符解碼

源文件:codec-base/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java

\n\r\n 為分隔符,是 DelimiterBasedFrameDecoder 的優(yōu)化特例。

// LineBasedFrameDecoder.java:42-187

public class LineBasedFrameDecoder extends ByteToMessageDecoder {

    private final int maxLength;       // 最大允許幀長度
    private final boolean failFast;    // 超長時是否立即失敗
    private final boolean stripDelimiter;  // 是否剝離分隔符

    private boolean discarding;        // 是否正在丟棄超長幀
    private int discardedBytes;        // 已丟棄的字節(jié)數(shù)
    private int offset;                // 上次掃描位置(增量掃描優(yōu)化)

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer);    // ★ 查找換行符位置

        if (!discarding) {
            if (eol >= 0) {
                // ★ 找到換行符 → 提取一幀
                final ByteBuf frame;
                final int length = eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;  // \r\n=2字節(jié), \n=1字節(jié)

                if (length > maxLength) {
                    buffer.readerIndex(eol + delimLength);
                    fail(ctx, length);
                    return null;
                }

                if (stripDelimiter) {
                    frame = buffer.readRetainedSlice(length);    // 讀內(nèi)容(不含分隔符)
                    buffer.skipBytes(delimLength);               // 跳過分隔符
                } else {
                    frame = buffer.readRetainedSlice(length + delimLength);  // 內(nèi)容+分隔符一起
                }
                return frame;
            } else {
                // 沒找到換行符 → 檢查是否已超長
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    // 超長了還沒遇到換行符 → 進入丟棄模式
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    offset = 0;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                return null;   // 不夠一幀,等下次
            }
        } else {
            // 丟棄模式:繼續(xù)丟棄直到找到換行符
            if (eol >= 0) {
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {
                // 還沒找到 → 繼續(xù)丟棄
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
                offset = 0;
            }
            return null;
        }
    }

    // ★ 增量掃描優(yōu)化:從上次停止的位置繼續(xù)找 \n
    private int findEndOfLine(final ByteBuf buffer) {
        int totalLength = buffer.readableBytes();
        int i = buffer.indexOf(buffer.readerIndex() + offset,
                               buffer.readerIndex() + totalLength, (byte) '\n');
        if (i >= 0) {
            offset = 0;
            if (i > 0 && buffer.getByte(i - 1) == '\r') {
                i--;   // \r\n → 指向 \r 的位置
            }
        } else {
            offset = totalLength;  // 記住已掃描的位置,下次從這里繼續(xù)
        }
        return i;
    }
}

關(guān)鍵設(shè)計點

  • 增量掃描offset 字段記錄上次掃描位置,下次只掃描新增數(shù)據(jù),避免重復(fù)掃描已掃描過的字節(jié)
  • 丟棄模式:超長幀不會導(dǎo)致 OOM,而是進入丟棄模式,后續(xù)數(shù)據(jù)直接跳過直到找到下一個分隔符

四、整體工作流程圖

                    TCP 字節(jié)流到達(dá)
                         │
                         ▼
              ┌─────────────────────┐
              │  ByteToMessageDecoder│
              │    channelRead()     │
              │                     │
              │  1. cumulate:       │
              │     新數(shù)據(jù)追加到     │
              │     cumulation 緩沖區(qū)│
              │                     │
              │  2. callDecode:     │
              │     while(可讀) {   │
              │       子類.decode() │
              │       返回null → break(等下次)│
              │       返回frame → 繼續(xù)(粘包處理)│
              │     }               │
              │                     │
              │  3. 內(nèi)存管理:       │
              │     全部讀完 → release│
              │     讀16次還沒完 → discardSomeReadBytes│
              └─────────────────────┘
                         │
              ┌──────────┼──────────┐──────────┐
              ▼          ▼          ▼          ▼
         FixedLength  LineBased  Delimiter  LengthField
         定長判斷     找\n       找自定義    讀length字段
                     ┌──┘       分隔符      等夠完整幀
                     │          ┌──┘        ┌──┘
                     ▼          ▼          ▼
                  所有策略統(tǒng)一返回 null(等數(shù)據(jù))或 ByteBuf(一幀完整消息)

五、總結(jié)

┌──────────────────────────────────────────────────────┐
│                 粘包/拆包的完整圖景                      │
├──────────────────────────────────────────────────────┤
│                                                      │
│  根因:TCP 是字節(jié)流協(xié)議,沒有消息邊界                     │
│                                                      │
│  對協(xié)議設(shè)計的影響:                                     │
│    → 應(yīng)用層必須自己定義消息邊界(消息成幀)                │
│    → 策略:定長 / 分隔符 / 長度前綴 / TLV               │
│                                                      │
│  Netty 的解決方案:兩層架構(gòu)                              │
│    → 框架層 ByteToMessageDecoder:                     │
│       字節(jié)累積 + 循環(huán)解碼 + 內(nèi)存管理                     │
│    → 策略層子類實現(xiàn)不同成幀方式:                         │
│       FixedLengthFrameDecoder      定長               │
│       LineBasedFrameDecoder        換行符             │
│       DelimiterBasedFrameDecoder   自定義分隔符        │
│       LengthFieldBasedFrameDecoder 長度字段(最常用)    │
│                                                      │
│  設(shè)計模式:模板方法模式(Template Method)                │
│    父類定義骨架流程,子類實現(xiàn)具體的幀判定邏輯              │
│                                                      │
└──────────────────────────────────────────────────────┘

六、延伸思考

  1. 為什么 Netty 的解碼器框架要設(shè)計成兩層(框架層 + 策略層)? 這是什么設(shè)計模式?(提示:模板方法模式)
  2. 如果協(xié)議是變長頭部(如 HTTP),應(yīng)該怎么處理? (提示:HttpObjectDecoder 的實現(xiàn)方式)
  3. ReplayingDecoder 和 ByteToMessageDecoder 有什么區(qū)別?各適合什么場景?
  4. LengthFieldBasedFrameDecoder 的 frameLengthInt 字段為什么要緩存?不緩存會怎樣?
  5. 如果攻擊者發(fā)送一個超大 length 值但從不發(fā)送后續(xù)數(shù)據(jù),Netty 會怎樣?

七、相關(guān)源碼文件索引

文件 說明
codec-base/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java 字節(jié)累積解碼器基類
codec-base/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java 定長幀解碼器
codec-base/src/main/java/io/netty/handler/codec/LineBasedFrameDecoder.java 換行符幀解碼器
codec-base/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java 分隔符幀解碼器
codec-base/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java 長度字段幀解碼器

八、參考資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容