Netty 權(quán)威指南筆記(三):TCP 粘包和拆包

Netty 權(quán)威指南筆記(三):TCP 粘包和拆包

什么是 TCP 粘包和拆包?

TCP 是一個(gè)“流”協(xié)議,所謂“流”就是沒(méi)有界限的一串?dāng)?shù)據(jù)。大家可以想像河流里的水,期間并沒(méi)有分界線。TCP 底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù) TCP 緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分。所以,在業(yè)務(wù)上,一個(gè)完整的包可能會(huì)被 TCP 拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包,封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的 TCP 粘包和拆包問(wèn)題。

為什么會(huì)發(fā)生粘包和拆包?

主要是應(yīng)用程序?qū)懭霐?shù)據(jù)大小、緩沖區(qū)大小、TCP/IP 最大報(bào)文大小、以太網(wǎng)幀 payload 大小不一致造成。

  1. 應(yīng)用程序一次寫(xiě)入的字節(jié)數(shù)大于發(fā)送緩沖區(qū)大小。
  2. 進(jìn)行 TSS 大小的 TCP 分段。
  3. 以太網(wǎng)幀的 payload 大于 MTU 進(jìn)行 IP 分片。
  4. 應(yīng)用程序多次寫(xiě)入少量數(shù)據(jù),導(dǎo)致粘包。

解決策略

由于底層的 TCP 無(wú)法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無(wú)法保證數(shù)據(jù)包不被拆分和重組的,這個(gè)問(wèn)題只能通過(guò)上層的應(yīng)用協(xié)議棧設(shè)計(jì)來(lái)解決。根據(jù)業(yè)界的主流協(xié)議的解決方案,歸納如下:

  1. 消息定長(zhǎng)。
  2. 包尾增加分隔符,比如 FTP 協(xié)議使用回車(chē)換行符進(jìn)行分割。
  3. 將消息分為消息頭和消息體,消息頭中包含表示消息總長(zhǎng)度的字段,這正是 TCP/UDP/IP 報(bào)文采用的方案。
  4. 更復(fù)雜的應(yīng)用層協(xié)議設(shè)計(jì)。

在 Netty 中,有處理定長(zhǎng)消息的 FixedLengthFrameDecoder、回車(chē)換行符分隔的 LineBasedFrameDecoder、特殊分隔符的 DelimiterBasedFrameDecoder,以及處理特殊協(xié)議的 Decoder,比如處理 HTTP 協(xié)議的 HttpRequestDecoder、HttpResponseDecoder 等。

下面,我們以定長(zhǎng)消息解碼器 FixedLengthFrameDecoder 為例,分析源碼,學(xué)習(xí)一下其工作原理。

FixedLengthFrameDecoder 源碼分析

從下面的類圖中可以看出來(lái),這些 Decoder 都繼承自 ByteToMessageDecoder、ChannelHandlerAdapter,實(shí)現(xiàn)了 ChannelHandler 接口?;仡欀笆褂?Netty 開(kāi)發(fā)的 TimeServer 程序中,TimeServerHandler 也是繼承自 ChannelHandlerAdapter。

Netty Decoder 類圖.png

ChannelHandler 接口中,與讀取數(shù)據(jù)相關(guān)的主要是 channelRead 方法。

public interface ChannelHandler {
    // 讀取數(shù)據(jù)
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    // 讀取數(shù)據(jù)完成
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
}

下面我們看一下 ByteToMessageDecoder 中實(shí)現(xiàn)的 channelRead 方法:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 只處理 ByteBuf 類型數(shù)據(jù)
        if (msg instanceof ByteBuf) {
            // RecyclableArrayList 是一個(gè)可循環(huán)使用的 ArrayList,使用它是為了減少 GC
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                // cumulation 是上次處理數(shù)據(jù)后遺留的半包數(shù)據(jù)
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    // 上次遺留數(shù)據(jù)和本次數(shù)據(jù)進(jìn)行合并
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 對(duì)數(shù)據(jù)進(jìn)行解碼,解碼成功的數(shù)據(jù)存入 out,半包數(shù)據(jù)賦值給 cumulation
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();

                // 如果 out.size 大于 0,表示有解碼成功的數(shù)據(jù),發(fā)送到下一個(gè) ChannelAdapter 進(jìn)行處理
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                // 解析數(shù)據(jù),并存入 out 中
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
                // 如果成功解碼出數(shù)據(jù),但是 ByteBuf 中數(shù)據(jù)長(zhǎng)度不變,可能會(huì)導(dǎo)致死循環(huán)
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
  1. 首先判斷輸入數(shù)據(jù)是否是 ByteBuf 類型,是則處理,否則傳遞下去。
  2. 然后初始化一個(gè) RecyclableArrayList 用來(lái)保存解析成功的數(shù)據(jù)片。
  3. 將上次解析遺留的數(shù)據(jù) cumulation 和本次到來(lái)的數(shù)據(jù)進(jìn)行合并。
  4. 調(diào)用 decode 方法循環(huán)解析合并后的數(shù)據(jù),存入列表 out。
  5. 如果列表 out 中有解析成功的數(shù)據(jù),則調(diào)用 fireChannelRead 方法發(fā)送給下一個(gè) ChannelHandler 處理。

decode 方法是一個(gè)抽象方法,由子類 FixedLengthFrameDecoder 負(fù)責(zé)實(shí)現(xiàn),具體源碼如下所示。其原理是,如果輸入數(shù)據(jù) ByteBuf 長(zhǎng)度超過(guò) frameLength,則截取前 frameLength 字節(jié)數(shù)據(jù)為一個(gè)新的 ByteBuf 數(shù)據(jù)分片,存入列表 out 中。

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                    "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            // 讀取固定長(zhǎng)度 frameLength 的數(shù)據(jù)分片,存入列表 out 中
            out.add(decoded);
        }
    }
    protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 如果小于長(zhǎng)度 frameLength,則返回 null
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            // 否則,讀取長(zhǎng)度為 frameLength 的數(shù)據(jù),為一個(gè) ByteBuf 數(shù)據(jù)分片
            return in.readSlice(frameLength).retain();
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容