為什么要粘包拆包
為什么要粘包
首先你得了解一下TCP/IP協(xié)議,在用戶數(shù)據(jù)量非常小的情況下,極端情況下,一個(gè)字節(jié),該TCP數(shù)據(jù)包的有效載荷非常低,傳遞100字節(jié)的數(shù)據(jù),需要100次TCP傳送,100次ACK,在應(yīng)用及時(shí)性要求不高的情況下,將這100個(gè)有效數(shù)據(jù)拼接成一個(gè)數(shù)據(jù)包,那會(huì)縮短到一個(gè)TCP數(shù)據(jù)包,以及一個(gè)ack,有效載荷提高了,帶寬也節(jié)省了
非極端情況,有可能兩個(gè)數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包,也有可能一個(gè)半的數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包,也有可能兩個(gè)半的數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包
為什么要拆包
拆包和粘包是相對(duì)的,一端粘了包,另外一端就需要將粘過的包拆開,舉個(gè)栗子,發(fā)送端將三個(gè)數(shù)據(jù)包粘成兩個(gè)TCP數(shù)據(jù)包發(fā)送到接收端,接收端就需要根據(jù)應(yīng)用協(xié)議將兩個(gè)數(shù)據(jù)包重新組裝成三個(gè)數(shù)據(jù)包
還有一種情況就是用戶數(shù)據(jù)包超過了mss(最大報(bào)文長(zhǎng)度),那么這個(gè)數(shù)據(jù)包在發(fā)送的時(shí)候必須拆分成幾個(gè)數(shù)據(jù)包,接收端收到之后需要將這些數(shù)據(jù)包粘合起來之后,再拆開
拆包的原理
在沒有netty的情況下,用戶如果自己需要拆包,基本原理就是不斷從TCP緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包
1.如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從tcp緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包
2.如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接
netty中拆包的基類
netty 中的拆包也是如上這個(gè)原理,內(nèi)部會(huì)有一個(gè)累加器,每次讀取到數(shù)據(jù)都會(huì)不斷累加,然后嘗試對(duì)累加到的數(shù)據(jù)進(jìn)行拆包,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,這個(gè)基類叫做 ByteToMessageDecoder,下面我們先詳細(xì)分析下這個(gè)類
累加器
ByteToMessageDecoder 中定義了兩個(gè)累加器
public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;
默認(rèn)情況下,會(huì)使用 MERGE_CUMULATOR
private Cumulator cumulator = MERGE_CUMULATOR;
MERGE_CUMULATOR 的原理是每次都將讀取到的數(shù)據(jù)通過內(nèi)存拷貝的方式,拼接到一個(gè)大的字節(jié)容器中,這個(gè)字節(jié)容器在 ByteToMessageDecoder中叫做 cumulation
ByteBuf cumulation;
下面我們看一下 MERGE_CUMULATOR 是如何將新讀取到的數(shù)據(jù)累加到字節(jié)容器里的
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
netty 中ByteBuf的抽象,使得累加非常簡(jiǎn)單,通過一個(gè)簡(jiǎn)單的api調(diào)用 buffer.writeBytes(in); 便將新數(shù)據(jù)累加到字節(jié)容器中,為了防止字節(jié)容器大小不夠,在累加之前還進(jìn)行了擴(kuò)容處理
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
擴(kuò)容也是一個(gè)內(nèi)存拷貝操作,新增的大小即是新讀取數(shù)據(jù)的大小
拆包抽象
累加器原理清楚之后,下面我們回到主流程,目光集中在 channelRead 方法,channelRead方法是每次從TCP緩沖區(qū)讀到數(shù)據(jù)都會(huì)調(diào)用的方法,觸發(fā)點(diǎn)在AbstractNioByteChannel的read方法中,里面有個(gè)while循環(huán)不斷讀取,讀取到一次就觸發(fā)一次channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
方法體不長(zhǎng)不短,可以分為以下幾個(gè)邏輯步驟
1.累加數(shù)據(jù)
2.將累加到的數(shù)據(jù)傳遞給業(yè)務(wù)進(jìn)行業(yè)務(wù)拆包
3.清理字節(jié)容器
4.傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理
1 累加數(shù)據(jù)
如果當(dāng)前累加器沒有數(shù)據(jù),就直接跳過內(nèi)存拷貝,直接將字節(jié)容器的指針指向新讀取的數(shù)據(jù),否則,調(diào)用累加器累加數(shù)據(jù)至字節(jié)容器
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
2 將累加到的數(shù)據(jù)傳遞給業(yè)務(wù)進(jìn)行拆包
到這一步,字節(jié)容器里的數(shù)據(jù)已是目前未拆包部分的所有的數(shù)據(jù)了
CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);
callDecode 將嘗試將字節(jié)容器的數(shù)據(jù)拆分成業(yè)務(wù)數(shù)據(jù)包塞到業(yè)務(wù)數(shù)據(jù)容器out中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
while (in.isReadable()) {
// 記錄一下字節(jié)容器中有多少字節(jié)待拆
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
if (out.size() == 0) {
// 拆包器未讀取任何數(shù)據(jù)
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 拆包器已讀取部分?jǐn)?shù)據(jù),還需要繼續(xù)
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
}
我將原始代碼做了一些精簡(jiǎn),在解碼之前,先記錄一下字節(jié)容器中有多少字節(jié)待拆,然后調(diào)用抽象函數(shù) decode 進(jìn)行拆包
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
netty中對(duì)各種用戶協(xié)議的支持就體現(xiàn)在這個(gè)抽象函數(shù)中,傳進(jìn)去的是當(dāng)前讀取到的未被消費(fèi)的所有的數(shù)據(jù),以及業(yè)務(wù)協(xié)議包容器,所有的拆包器最終都實(shí)現(xiàn)了該抽象方法
業(yè)務(wù)拆包完成之后,如果發(fā)現(xiàn)并沒有拆到一個(gè)完整的數(shù)據(jù)包,這個(gè)時(shí)候又分兩種情況
1.一個(gè)是拆包器什么數(shù)據(jù)也沒讀取,可能數(shù)據(jù)還不夠業(yè)務(wù)拆包器處理,直接break等待新的數(shù)據(jù)
2.拆包器已讀取部分?jǐn)?shù)據(jù),說明解碼器仍然在工作,繼續(xù)解碼
業(yè)務(wù)拆包完成之后,如果發(fā)現(xiàn)已經(jīng)解到了數(shù)據(jù)包,但是,發(fā)現(xiàn)并沒有讀取任何數(shù)據(jù),這個(gè)時(shí)候就會(huì)拋出一個(gè)Runtime異常 DecoderException,告訴你,你什么數(shù)據(jù)都沒讀取,卻解析出一個(gè)業(yè)務(wù)數(shù)據(jù)包,這是有問題的
3 清理字節(jié)容器
業(yè)務(wù)拆包完成之后,只是從字節(jié)容器中取走了數(shù)據(jù),但是這部分空間對(duì)于字節(jié)容器來說依然保留著,而字節(jié)容器每次累加字節(jié)數(shù)據(jù)的時(shí)候都是將字節(jié)數(shù)據(jù)追加到尾部,如果不對(duì)字節(jié)容器做清理,那么時(shí)間一長(zhǎng)就會(huì)OOM
正常情況下,其實(shí)每次讀取完數(shù)據(jù),netty都會(huì)在下面這個(gè)方法中將字節(jié)容器清理,只不過,當(dāng)發(fā)送端發(fā)送數(shù)據(jù)過快,channelReadComplete可能會(huì)很久才被調(diào)用一次
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
這里順帶插一句,如果一次數(shù)據(jù)讀取完畢之后(可能接收端一邊收,發(fā)送端一邊發(fā),這里的讀取完畢指的是接收端在某個(gè)時(shí)間不再接受到數(shù)據(jù)為止),發(fā)現(xiàn)仍然沒有拆到一個(gè)完整的用戶數(shù)據(jù)包,即使該channel的設(shè)置為非自動(dòng)讀取,也會(huì)觸發(fā)一次讀取操作 ctx.read(),該操作會(huì)重新向selector注冊(cè)op_read事件,以便于下一次能讀到數(shù)據(jù)之后拼接成一個(gè)完整的數(shù)據(jù)包
所以為了防止發(fā)送端發(fā)送數(shù)據(jù)過快,netty會(huì)在每次讀取到一次數(shù)據(jù),業(yè)務(wù)拆包之后對(duì)字節(jié)字節(jié)容器做清理,清理部分的代碼如下
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
如果字節(jié)容器當(dāng)前已無數(shù)據(jù)可讀取,直接銷毀字節(jié)容器,并且標(biāo)注一下當(dāng)前字節(jié)容器一次數(shù)據(jù)也沒讀取
如果連續(xù)16次(discardAfterReads的默認(rèn)值),字節(jié)容器中仍然有未被業(yè)務(wù)拆包器讀取的數(shù)據(jù),那就做一次壓縮,有效數(shù)據(jù)段整體移到容器首部
discardSomeReadBytes之前,字節(jié)累加器中的數(shù)據(jù)分布
+--------------+----------+----------+
| readed | unreaded | writable |
+--------------+----------+----------+
discardSomeReadBytes之后,字節(jié)容器中的數(shù)據(jù)分布
+----------+-------------------------+
| unreaded | writable |
+----------+-------------------------+
這樣字節(jié)容器又可以承載更多的數(shù)據(jù)了
4 傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理
以上三個(gè)步驟完成之后,就可以將拆成的包丟到業(yè)務(wù)解碼器處理了,代碼如下
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
期間用一個(gè)成員變量 decodeWasNull 來標(biāo)識(shí)本次讀取數(shù)據(jù)是否拆到一個(gè)業(yè)務(wù)數(shù)據(jù)包,然后調(diào)用 fireChannelRead 將拆到的業(yè)務(wù)數(shù)據(jù)包都傳遞到后續(xù)的handler
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
這樣,就可以把一個(gè)個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到后續(xù)的業(yè)務(wù)解碼器進(jìn)行解碼,隨后處理業(yè)務(wù)邏輯
行拆包器
下面,以一個(gè)具體的例子來看看業(yè)netty自帶的拆包器是如何來拆包的
這個(gè)類叫做 LineBasedFrameDecoder,基于行分隔符的拆包器,TA可以同時(shí)處理 \n以及\r\n兩種類型的行分隔符,核心方法都在繼承的 decode 方法中
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
netty 中自帶的拆包器都是如上這種模板,其實(shí)可以加一層,把這這層模板抽取出來的,不知道為什么netty沒有這么做,我們接著跟進(jìn)去,代碼比較長(zhǎng),我們還是分模塊來剖析
1 找到換行符位置
final int eol = findEndOfLine(buffer);
private static int findEndOfLine(final ByteBuf buffer) {
int i = buffer.forEachByte(ByteProcessor.FIND_LF);
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
return i;
}
ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');
for循環(huán)遍歷,找到第一個(gè) \n 的位置,如果\n前面的字符為\r,那就返回\r的位置
2 非discarding模式的處理
接下來,netty會(huì)判斷,當(dāng)前拆包是否屬于丟棄模式,用一個(gè)成員變量來標(biāo)識(shí)
private boolean discarding;
第一次拆包不在discarding模式( 后面的分支會(huì)講何為非discarding模式),于是進(jìn)入以下環(huán)節(jié)
2.1 非discarding模式下找到行分隔符的處理
// 1.計(jì)算分隔符和包長(zhǎng)度
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 丟棄異常數(shù)據(jù)
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
// 取包的時(shí)候是否包括分隔符
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
1.首先,新建一個(gè)幀,計(jì)算一下當(dāng)前包的長(zhǎng)度和分隔符的長(zhǎng)度(因?yàn)橛袃煞N分隔符)
2.然后判斷一下需要拆包的長(zhǎng)度是否大于該拆包器允許的最大長(zhǎng)度(maxLength),這個(gè)參數(shù)在構(gòu)造函數(shù)中被傳遞進(jìn)來,如超出允許的最大長(zhǎng)度,就將這段數(shù)據(jù)拋棄,返回null
3.最后,將一個(gè)完整的數(shù)據(jù)包取出,如果構(gòu)造本解包器的時(shí)候指定 stripDelimiter為false,即解析出來的包包含分隔符,默認(rèn)為不包含分隔符
2.2 非discarding模式下未找到分隔符的處理
沒有找到對(duì)應(yīng)的行分隔符,說明字節(jié)容器沒有足夠的數(shù)據(jù)拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,進(jìn)入如下流程處理
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
首先取得當(dāng)前字節(jié)容器的可讀字節(jié)個(gè)數(shù),接著,判斷一下是否已經(jīng)超過可允許的最大長(zhǎng)度,如果沒有超過,直接返回null,字節(jié)容器中的數(shù)據(jù)沒有任何改變,否則,就需要進(jìn)入丟棄模式
使用一個(gè)成員變量 discardedBytes 來表示已經(jīng)丟棄了多少數(shù)據(jù),然后將字節(jié)容器的讀指針移到寫指針,意味著丟棄這一部分?jǐn)?shù)據(jù),設(shè)置成員變量discarding為true表示當(dāng)前處于丟棄模式。如果設(shè)置了failFast,那么直接拋出異常,默認(rèn)情況下failFast為false,即安靜得丟棄數(shù)據(jù)
3 discarding模式
如果解包的時(shí)候處在discarding模式,也會(huì)有兩種情況發(fā)生
3.1 discarding模式下找到行分隔符
在discarding模式下,如果找到分隔符,那可以將分隔符之前的都丟棄掉
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
計(jì)算出分隔符的長(zhǎng)度之后,直接把分隔符之前的數(shù)據(jù)全部丟棄,當(dāng)然丟棄的字符也包括分隔符,經(jīng)過這么一次丟棄,后面就有可能是正常的數(shù)據(jù)包,下一次解包的時(shí)候就會(huì)進(jìn)入正常的解包流程
3.2discarding模式下未找到行分隔符
這種情況比較簡(jiǎn)單,因?yàn)楫?dāng)前還在丟棄模式,沒有找到行分隔符意味著當(dāng)前一個(gè)完整的數(shù)據(jù)包還沒丟棄完,當(dāng)前讀取的數(shù)據(jù)是丟棄的一部分,所以直接丟棄
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
特定分隔符拆包
這個(gè)類叫做 DelimiterBasedFrameDecoder,可以傳遞給TA一個(gè)分隔符列表,數(shù)據(jù)包會(huì)按照分隔符列表進(jìn)行拆分,讀者可以完全根據(jù)行拆包器的思路去分析這個(gè)DelimiterBasedFrameDecoder,這里不在贅述,有問題可以留言
總結(jié)
netty中的拆包過程其實(shí)是和你自己去拆包過程一樣,只不過TA將拆包過程中邏輯比較獨(dú)立的部分抽象出來變成幾個(gè)不同層次的類,方便各種協(xié)議的擴(kuò)展,我們平時(shí)在寫代碼過程中,也必須培養(yǎng)這種抽象能力,這樣你的coding水平才會(huì)不斷提高,完。
如果你覺得看的不過癮,想系統(tǒng)學(xué)習(xí)Netty原理,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html