netty源碼分析之拆包器的奧秘

為什么要粘包拆包

為什么要粘包

首先你得了解一下TCP/IP協(xié)議,在用戶數(shù)據(jù)量非常小的情況下,極端情況下,一個(gè)字節(jié),該TCP數(shù)據(jù)包的有效載荷非常低,傳遞100字節(jié)的數(shù)據(jù),需要100次TCP傳送,100次ACK,在應(yīng)用及時(shí)性要求不高的情況下,將這100個(gè)有效數(shù)據(jù)拼接成一個(gè)數(shù)據(jù)包,那會(huì)縮短到一個(gè)TCP數(shù)據(jù)包,以及一個(gè)ack,有效載荷提高了,帶寬也節(jié)省了

非極端情況,有可能兩個(gè)數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包,也有可能一個(gè)半的數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包,也有可能兩個(gè)半的數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包

為什么要拆包

拆包和粘包是相對(duì)的,一端粘了包,另外一端就需要將粘過的包拆開,舉個(gè)栗子,發(fā)送端將三個(gè)數(shù)據(jù)包粘成兩個(gè)TCP數(shù)據(jù)包發(fā)送到接收端,接收端就需要根據(jù)應(yīng)用協(xié)議將兩個(gè)數(shù)據(jù)包重新組裝成三個(gè)數(shù)據(jù)包

還有一種情況就是用戶數(shù)據(jù)包超過了mss(最大報(bào)文長(zhǎng)度),那么這個(gè)數(shù)據(jù)包在發(fā)送的時(shí)候必須拆分成幾個(gè)數(shù)據(jù)包,接收端收到之后需要將這些數(shù)據(jù)包粘合起來之后,再拆開

拆包的原理

在沒有netty的情況下,用戶如果自己需要拆包,基本原理就是不斷從TCP緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包

1.如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從tcp緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包
2.如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接

netty中拆包的基類

netty 中的拆包也是如上這個(gè)原理,內(nèi)部會(huì)有一個(gè)累加器,每次讀取到數(shù)據(jù)都會(huì)不斷累加,然后嘗試對(duì)累加到的數(shù)據(jù)進(jìn)行拆包,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,這個(gè)基類叫做 ByteToMessageDecoder,下面我們先詳細(xì)分析下這個(gè)類

累加器

ByteToMessageDecoder 中定義了兩個(gè)累加器

public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;

默認(rèn)情況下,會(huì)使用 MERGE_CUMULATOR

private Cumulator cumulator = MERGE_CUMULATOR;

MERGE_CUMULATOR 的原理是每次都將讀取到的數(shù)據(jù)通過內(nèi)存拷貝的方式,拼接到一個(gè)大的字節(jié)容器中,這個(gè)字節(jié)容器在 ByteToMessageDecoder中叫做 cumulation

ByteBuf cumulation;

下面我們看一下 MERGE_CUMULATOR 是如何將新讀取到的數(shù)據(jù)累加到字節(jié)容器里的

public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) {
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        buffer.writeBytes(in);
        in.release();
        return buffer;
}

netty 中ByteBuf的抽象,使得累加非常簡(jiǎn)單,通過一個(gè)簡(jiǎn)單的api調(diào)用 buffer.writeBytes(in); 便將新數(shù)據(jù)累加到字節(jié)容器中,為了防止字節(jié)容器大小不夠,在累加之前還進(jìn)行了擴(kuò)容處理

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        ByteBuf oldCumulation = cumulation;
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        oldCumulation.release();
        return cumulation;
}

擴(kuò)容也是一個(gè)內(nèi)存拷貝操作,新增的大小即是新讀取數(shù)據(jù)的大小

拆包抽象

累加器原理清楚之后,下面我們回到主流程,目光集中在 channelRead 方法,channelRead方法是每次從TCP緩沖區(qū)讀到數(shù)據(jù)都會(huì)調(diào)用的方法,觸發(fā)點(diǎn)在AbstractNioByteChannelread方法中,里面有個(gè)while循環(huán)不斷讀取,讀取到一次就觸發(fā)一次channelRead

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0;
                discardSomeReadBytes();
            }

            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

方法體不長(zhǎng)不短,可以分為以下幾個(gè)邏輯步驟

1.累加數(shù)據(jù)
2.將累加到的數(shù)據(jù)傳遞給業(yè)務(wù)進(jìn)行業(yè)務(wù)拆包
3.清理字節(jié)容器
4.傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理

1 累加數(shù)據(jù)

如果當(dāng)前累加器沒有數(shù)據(jù),就直接跳過內(nèi)存拷貝,直接將字節(jié)容器的指針指向新讀取的數(shù)據(jù),否則,調(diào)用累加器累加數(shù)據(jù)至字節(jié)容器

ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
    cumulation = data;
} else {
    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}

2 將累加到的數(shù)據(jù)傳遞給業(yè)務(wù)進(jìn)行拆包

到這一步,字節(jié)容器里的數(shù)據(jù)已是目前未拆包部分的所有的數(shù)據(jù)了

CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);

callDecode 將嘗試將字節(jié)容器的數(shù)據(jù)拆分成業(yè)務(wù)數(shù)據(jù)包塞到業(yè)務(wù)數(shù)據(jù)容器out

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    while (in.isReadable()) {
        // 記錄一下字節(jié)容器中有多少字節(jié)待拆
        int oldInputLength = in.readableBytes();
        decode(ctx, in, out);
        if (out.size() == 0) {
            // 拆包器未讀取任何數(shù)據(jù)
            if (oldInputLength == in.readableBytes()) {
                break;
            } else {
             // 拆包器已讀取部分?jǐn)?shù)據(jù),還需要繼續(xù)
                continue;
            }
        }

        if (oldInputLength == in.readableBytes()) {
            throw new DecoderException(
                    StringUtil.simpleClassName(getClass()) +
                    ".decode() did not read anything but decoded a message.");
        }

        if (isSingleDecode()) {
            break;
        }
    }
}

我將原始代碼做了一些精簡(jiǎn),在解碼之前,先記錄一下字節(jié)容器中有多少字節(jié)待拆,然后調(diào)用抽象函數(shù) decode 進(jìn)行拆包

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

netty中對(duì)各種用戶協(xié)議的支持就體現(xiàn)在這個(gè)抽象函數(shù)中,傳進(jìn)去的是當(dāng)前讀取到的未被消費(fèi)的所有的數(shù)據(jù),以及業(yè)務(wù)協(xié)議包容器,所有的拆包器最終都實(shí)現(xiàn)了該抽象方法

業(yè)務(wù)拆包完成之后,如果發(fā)現(xiàn)并沒有拆到一個(gè)完整的數(shù)據(jù)包,這個(gè)時(shí)候又分兩種情況

1.一個(gè)是拆包器什么數(shù)據(jù)也沒讀取,可能數(shù)據(jù)還不夠業(yè)務(wù)拆包器處理,直接break等待新的數(shù)據(jù)
2.拆包器已讀取部分?jǐn)?shù)據(jù),說明解碼器仍然在工作,繼續(xù)解碼

業(yè)務(wù)拆包完成之后,如果發(fā)現(xiàn)已經(jīng)解到了數(shù)據(jù)包,但是,發(fā)現(xiàn)并沒有讀取任何數(shù)據(jù),這個(gè)時(shí)候就會(huì)拋出一個(gè)Runtime異常 DecoderException,告訴你,你什么數(shù)據(jù)都沒讀取,卻解析出一個(gè)業(yè)務(wù)數(shù)據(jù)包,這是有問題的

3 清理字節(jié)容器

業(yè)務(wù)拆包完成之后,只是從字節(jié)容器中取走了數(shù)據(jù),但是這部分空間對(duì)于字節(jié)容器來說依然保留著,而字節(jié)容器每次累加字節(jié)數(shù)據(jù)的時(shí)候都是將字節(jié)數(shù)據(jù)追加到尾部,如果不對(duì)字節(jié)容器做清理,那么時(shí)間一長(zhǎng)就會(huì)OOM

正常情況下,其實(shí)每次讀取完數(shù)據(jù),netty都會(huì)在下面這個(gè)方法中將字節(jié)容器清理,只不過,當(dāng)發(fā)送端發(fā)送數(shù)據(jù)過快,channelReadComplete可能會(huì)很久才被調(diào)用一次

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    numReads = 0;
    discardSomeReadBytes();
    if (decodeWasNull) {
        decodeWasNull = false;
        if (!ctx.channel().config().isAutoRead()) {
            ctx.read();
        }
    }
    ctx.fireChannelReadComplete();
}

這里順帶插一句,如果一次數(shù)據(jù)讀取完畢之后(可能接收端一邊收,發(fā)送端一邊發(fā),這里的讀取完畢指的是接收端在某個(gè)時(shí)間不再接受到數(shù)據(jù)為止),發(fā)現(xiàn)仍然沒有拆到一個(gè)完整的用戶數(shù)據(jù)包,即使該channel的設(shè)置為非自動(dòng)讀取,也會(huì)觸發(fā)一次讀取操作 ctx.read(),該操作會(huì)重新向selector注冊(cè)op_read事件,以便于下一次能讀到數(shù)據(jù)之后拼接成一個(gè)完整的數(shù)據(jù)包

所以為了防止發(fā)送端發(fā)送數(shù)據(jù)過快,netty會(huì)在每次讀取到一次數(shù)據(jù),業(yè)務(wù)拆包之后對(duì)字節(jié)字節(jié)容器做清理,清理部分的代碼如下

if (cumulation != null && !cumulation.isReadable()) {
    numReads = 0;
    cumulation.release();
    cumulation = null;
} else if (++ numReads >= discardAfterReads) {
    numReads = 0;
    discardSomeReadBytes();
}

如果字節(jié)容器當(dāng)前已無數(shù)據(jù)可讀取,直接銷毀字節(jié)容器,并且標(biāo)注一下當(dāng)前字節(jié)容器一次數(shù)據(jù)也沒讀取

如果連續(xù)16次(discardAfterReads的默認(rèn)值),字節(jié)容器中仍然有未被業(yè)務(wù)拆包器讀取的數(shù)據(jù),那就做一次壓縮,有效數(shù)據(jù)段整體移到容器首部

discardSomeReadBytes之前,字節(jié)累加器中的數(shù)據(jù)分布

+--------------+----------+----------+
|   readed     | unreaded | writable | 
+--------------+----------+----------+

discardSomeReadBytes之后,字節(jié)容器中的數(shù)據(jù)分布

+----------+-------------------------+
| unreaded |      writable           | 
+----------+-------------------------+

這樣字節(jié)容器又可以承載更多的數(shù)據(jù)了

4 傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理

以上三個(gè)步驟完成之后,就可以將拆成的包丟到業(yè)務(wù)解碼器處理了,代碼如下

int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();

期間用一個(gè)成員變量 decodeWasNull 來標(biāo)識(shí)本次讀取數(shù)據(jù)是否拆到一個(gè)業(yè)務(wù)數(shù)據(jù)包,然后調(diào)用 fireChannelRead 將拆到的業(yè)務(wù)數(shù)據(jù)包都傳遞到后續(xù)的handler

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    for (int i = 0; i < numElements; i ++) {
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

這樣,就可以把一個(gè)個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到后續(xù)的業(yè)務(wù)解碼器進(jìn)行解碼,隨后處理業(yè)務(wù)邏輯

行拆包器

下面,以一個(gè)具體的例子來看看業(yè)netty自帶的拆包器是如何來拆包的

這個(gè)類叫做 LineBasedFrameDecoder,基于行分隔符的拆包器,TA可以同時(shí)處理 \n以及\r\n兩種類型的行分隔符,核心方法都在繼承的 decode 方法中

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

netty 中自帶的拆包器都是如上這種模板,其實(shí)可以加一層,把這這層模板抽取出來的,不知道為什么netty沒有這么做,我們接著跟進(jìn)去,代碼比較長(zhǎng),我們還是分模塊來剖析

1 找到換行符位置

final int eol = findEndOfLine(buffer);

private static int findEndOfLine(final ByteBuf buffer) {
    int i = buffer.forEachByte(ByteProcessor.FIND_LF);
    if (i > 0 && buffer.getByte(i - 1) == '\r') {
        i--;
    }
    return i;
}

ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');

for循環(huán)遍歷,找到第一個(gè) \n 的位置,如果\n前面的字符為\r,那就返回\r的位置

2 非discarding模式的處理

接下來,netty會(huì)判斷,當(dāng)前拆包是否屬于丟棄模式,用一個(gè)成員變量來標(biāo)識(shí)

private boolean discarding;

第一次拆包不在discarding模式( 后面的分支會(huì)講何為非discarding模式),于是進(jìn)入以下環(huán)節(jié)

2.1 非discarding模式下找到行分隔符的處理

// 1.計(jì)算分隔符和包長(zhǎng)度
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

// 丟棄異常數(shù)據(jù)
if (length > maxLength) {
    buffer.readerIndex(eol + delimLength);
    fail(ctx, length);
    return null;
}

// 取包的時(shí)候是否包括分隔符
if (stripDelimiter) {
    frame = buffer.readRetainedSlice(length);
    buffer.skipBytes(delimLength);
} else {
    frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;

1.首先,新建一個(gè)幀,計(jì)算一下當(dāng)前包的長(zhǎng)度和分隔符的長(zhǎng)度(因?yàn)橛袃煞N分隔符)
2.然后判斷一下需要拆包的長(zhǎng)度是否大于該拆包器允許的最大長(zhǎng)度(maxLength),這個(gè)參數(shù)在構(gòu)造函數(shù)中被傳遞進(jìn)來,如超出允許的最大長(zhǎng)度,就將這段數(shù)據(jù)拋棄,返回null
3.最后,將一個(gè)完整的數(shù)據(jù)包取出,如果構(gòu)造本解包器的時(shí)候指定 stripDelimiter為false,即解析出來的包包含分隔符,默認(rèn)為不包含分隔符

2.2 非discarding模式下未找到分隔符的處理

沒有找到對(duì)應(yīng)的行分隔符,說明字節(jié)容器沒有足夠的數(shù)據(jù)拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,進(jìn)入如下流程處理

final int length = buffer.readableBytes();
if (length > maxLength) {
    discardedBytes = length;
    buffer.readerIndex(buffer.writerIndex());
    discarding = true;
    if (failFast) {
        fail(ctx, "over " + discardedBytes);
    }
}
return null;

首先取得當(dāng)前字節(jié)容器的可讀字節(jié)個(gè)數(shù),接著,判斷一下是否已經(jīng)超過可允許的最大長(zhǎng)度,如果沒有超過,直接返回null,字節(jié)容器中的數(shù)據(jù)沒有任何改變,否則,就需要進(jìn)入丟棄模式

使用一個(gè)成員變量 discardedBytes 來表示已經(jīng)丟棄了多少數(shù)據(jù),然后將字節(jié)容器的讀指針移到寫指針,意味著丟棄這一部分?jǐn)?shù)據(jù),設(shè)置成員變量discarding為true表示當(dāng)前處于丟棄模式。如果設(shè)置了failFast,那么直接拋出異常,默認(rèn)情況下failFast為false,即安靜得丟棄數(shù)據(jù)

3 discarding模式

如果解包的時(shí)候處在discarding模式,也會(huì)有兩種情況發(fā)生

3.1 discarding模式下找到行分隔符

在discarding模式下,如果找到分隔符,那可以將分隔符之前的都丟棄掉

final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
    fail(ctx, length);
}

計(jì)算出分隔符的長(zhǎng)度之后,直接把分隔符之前的數(shù)據(jù)全部丟棄,當(dāng)然丟棄的字符也包括分隔符,經(jīng)過這么一次丟棄,后面就有可能是正常的數(shù)據(jù)包,下一次解包的時(shí)候就會(huì)進(jìn)入正常的解包流程

3.2discarding模式下未找到行分隔符

這種情況比較簡(jiǎn)單,因?yàn)楫?dāng)前還在丟棄模式,沒有找到行分隔符意味著當(dāng)前一個(gè)完整的數(shù)據(jù)包還沒丟棄完,當(dāng)前讀取的數(shù)據(jù)是丟棄的一部分,所以直接丟棄

discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());

特定分隔符拆包

這個(gè)類叫做 DelimiterBasedFrameDecoder,可以傳遞給TA一個(gè)分隔符列表,數(shù)據(jù)包會(huì)按照分隔符列表進(jìn)行拆分,讀者可以完全根據(jù)行拆包器的思路去分析這個(gè)DelimiterBasedFrameDecoder,這里不在贅述,有問題可以留言

總結(jié)

netty中的拆包過程其實(shí)是和你自己去拆包過程一樣,只不過TA將拆包過程中邏輯比較獨(dú)立的部分抽象出來變成幾個(gè)不同層次的類,方便各種協(xié)議的擴(kuò)展,我們平時(shí)在寫代碼過程中,也必須培養(yǎng)這種抽象能力,這樣你的coding水平才會(huì)不斷提高,完。

如果你覺得看的不過癮,想系統(tǒng)學(xué)習(xí)Netty原理,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、粘包與拆包 1、發(fā)送時(shí)的粘包與拆包 TCP連接維護(hù)了一個(gè)發(fā)送緩存區(qū)。將要發(fā)送給對(duì)端的數(shù)據(jù)會(huì)由socket AP...
    益文的圈閱讀 4,436評(píng)論 6 14
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 6,201評(píng)論 0 13
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評(píng)論 19 139
  • 編解碼處理器作為Netty編程時(shí)必備的ChannelHandler,每個(gè)應(yīng)用都必不可少。Netty作為網(wǎng)絡(luò)應(yīng)用框架...
    Hypercube閱讀 3,707評(píng)論 7 12
  • BGM:晴天 & worth it 以前跟你聊天大多時(shí)候你都在看書,所以你應(yīng)該是個(gè)愛看書的人。 Re So So ...
    歐尼柚閱讀 334評(píng)論 1 0

友情鏈接更多精彩內(nèi)容