netty 數(shù)據(jù)包黏包拆包處理器使用及遇到的問(wèn)題
最近因?yàn)樵谧鲆粋€(gè)游戲后端,需要用到netty,在與前端溝通之后規(guī)定了數(shù)據(jù)包結(jié)構(gòu):
| tag | encode | encrypt | command | length | body |
| 結(jié)構(gòu) | 類型 | 解釋 |
|---|---|---|
| tag | byte | 標(biāo)簽,默認(rèn)值為0x01 |
| encode | byte | 編碼格式,默認(rèn)值為0x01 |
| encrypt | byte | 加密類型,默認(rèn)值為0x01 |
| command | int | 指令,根據(jù)指令去解析body |
| length | int | 長(zhǎng)度,body內(nèi)容的長(zhǎng)度 |
| body | string | 內(nèi)容,json序列化之后的對(duì)象 |
剛開始使用繼承ByteToMessageDecoder和MessageToByteEncoder做拆包黏包處理。
ByteToMessageDecoder 抽象方法實(shí)現(xiàn)
public static final byte PACKAGE_TAG = 0x01;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
buf.markReaderIndex();
byte tag = buf.readByte();
if (tag != PACKAGE_TAG) {
throw new CorruptedFrameException("標(biāo)志錯(cuò)誤");
}
byte encode = buf.readByte();
byte encrypt = buf.readByte();
int command = buf.readInt();
int length = buf.readInt();
byte[] data = new byte[length];
buf.readBytes(data);
Message message = new Message(tag, encode, encrypt, command, length,
new String(data, "UTF-8"));
out.add(message);
}
MessageToByteEncoder 抽象方法實(shí)現(xiàn)
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
out.writeByte(MessageDecoder.PACKAGE_TAG);
out.writeByte(msg.getEncode());
out.writeByte(msg.getEncrypt());
out.writeInt(msg.getCommand());
byte[] bytes = msg.getBody().getBytes("UTF-8");
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
Message.class
public class Message {
private byte tag;
/* 編碼*/
private byte encode;
/*加密*/
private byte encrypt;
/* 類型**/
private int command;
/*包的長(zhǎng)度*/
private int length;
/*內(nèi)容*/
private String body;
}
這樣在剛開始的工作中數(shù)據(jù)包傳輸沒(méi)有問(wèn)題,不過(guò)數(shù)據(jù)包的大小超過(guò)512b的時(shí)候就會(huì)拋出異常了。
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException:
readerIndex(11) + length(565) exceeds writerIndex(512): PooledUnsafeDirectByteBuf(ridx: 11, widx: 512, cap: 512)
數(shù)據(jù)包的長(zhǎng)度為565,而ByteToMessageDecoder只處理到了512。我并沒(méi)有找到控制ByteToMessageDecoder最大讀寫的方法。
但是,因?yàn)榻獯a器繼承ChannelInboundHandlerAdapter類,而我們可以使用多個(gè)處理器一起處理數(shù)據(jù)。
解決辦法
配合解碼器DelimiterBasedFrameDecoder一起使用,在數(shù)據(jù)包的末尾使用換行符\n表示本次數(shù)據(jù)包已經(jīng)結(jié)束,當(dāng)DelimiterBasedFrameDecoder把數(shù)據(jù)切割之后,再使用ByteToMessageDecoder實(shí)現(xiàn)decode方法把數(shù)據(jù)流轉(zhuǎn)換為Message對(duì)象。
我們?cè)?code>ChannelPipeline加入DelimiterBasedFrameDecoder解碼器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//使用\n作為分隔符
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageHandler());
}
}
在MessageToByteEncoder的實(shí)現(xiàn)方法encode()增加out.writeBytes(new byte[]{'\n'});
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
out.writeByte(MessageDecoder.PACKAGE_TAG);
out.writeByte(msg.getEncode());
out.writeByte(msg.getEncrypt());
out.writeInt(msg.getCommand());
byte[] bytes = msg.getBody().getBytes("UTF-8");
out.writeInt(bytes.length);
out.writeBytes(bytes);
//在寫出字節(jié)流的末尾增加\n表示數(shù)據(jù)結(jié)束
out.writeBytes(new byte[]{'\n'});
}
這時(shí)候就可以愉快的繼續(xù)處理數(shù)據(jù)了。
等我還沒(méi)有高興半天的時(shí)候,問(wèn)題又來(lái)了。
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(11) + length(379) exceeds writerIndex(276): PooledUnsafeDirectByteBuf(ridx: 11, widx: 276, cap: 276)
等等等,,,怎么又報(bào)錯(cuò)了,不是已經(jīng)加了黏包處理了嗎??,解決問(wèn)題把,首先看解析的數(shù)據(jù)包結(jié)構(gòu)
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 01 01 00 00 00 06 00 00 01 0a 7b 22 69 64 22 |...........{"id"|
|00000010| 3a 33 2c 22 75 73 65 72 6e 61 6d 65 22 3a 22 31 |:3,"username":"1|
|00000020| 38 35 30 30 33 34 30 31 36 39 22 2c 22 6e 69 63 |8500340169","nic|
|00000030| 6b 6e 61 6d 65 22 3a 22 e4 bb 96 e5 9b 9b e5 a4 |kname":"........|
|00000040| a7 e7 88 b7 22 2c 22 72 6f 6f 6d 49 64 22 3a 31 |....","roomId":1|
|00000050| 35 32 37 32 33 38 35 36 39 34 37 34 2c 22 74 65 |527238569474,"te|
|00000060| 61 6d 4e 61 6d 65 22 3a 22 e4 bf 84 e7 bd 97 e6 |amName":".......|
|00000070| 96 af 22 2c 22 75 6e 69 74 73 22 3a 7b 22 75 6e |..","units":{"un|
|00000080| 69 74 31 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it1":{"x":10.0,"|
|00000090| 79 22 3a 31 30 2e 30 7d 2c 22 75 6e 69 74 32 22 |y":10.0},"unit2"|
|000000a0| 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 79 22 3a 31 |:{"x":10.0,"y":1|
|000000b0| 30 2e 30 7d 2c 22 75 6e 69 74 33 22 3a 7b 22 78 |0.0},"unit3":{"x|
|000000c0| 22 3a 31 30 2e 30 2c 22 79 22 3a 31 30 2e 30 7d |":10.0,"y":10.0}|
|000000d0| 2c 22 75 6e 69 74 34 22 3a 7b 22 78 22 3a 31 30 |,"unit4":{"x":10|
|000000e0| 2e 30 2c 22 79 22 3a 31 30 2e 30 7d 2c 22 75 6e |.0,"y":10.0},"un|
|000000f0| 69 74 35 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it5":{"x":10.0,"|
|00000100| 79 22 3a 31 30 2e 30 7d 7d 2c 22 73 74 61 74 75 |y":10.0}},"statu|
|00000110| 73 22 3a 31 7d 0a |s":1}. |
+--------+-------------------------------------------------+----------------+
接收到的數(shù)據(jù)是完整的沒(méi)錯(cuò),但是還是報(bào)錯(cuò)了,而且數(shù)據(jù)結(jié)尾的字節(jié)的確是0a,轉(zhuǎn)化成字符就是\n沒(méi)有問(wèn)題啊。
在ByteToMessageDecoder的decode方法里打印ByteBuf buf的長(zhǎng)度之后,問(wèn)題找到了
長(zhǎng)度 : 10
這就是說(shuō)在進(jìn)入到ByteToMessageDecoder這個(gè)解碼器的時(shí)候,數(shù)據(jù)包已經(jīng)只剩下10個(gè)長(zhǎng)度了,那么長(zhǎng)的數(shù)據(jù)被上個(gè)解碼器DelimiterBasedFrameDecoder隔空劈開了- -。問(wèn)題出現(xiàn)在哪呢,看上面那塊字節(jié)流的字節(jié),找到第11個(gè)字節(jié),是0a。。。。因?yàn)椴皇菢?biāo)準(zhǔn)的json格式,最前面使用了3個(gè)字節(jié) 加上2個(gè)int長(zhǎng)度的屬性,所以 數(shù)據(jù)包頭應(yīng)該是11個(gè)字節(jié)長(zhǎng)。
而DelimiterBasedFrameDecoder在讀到第11個(gè)字節(jié)的時(shí)候讀成了\n,自然而然的就認(rèn)為這個(gè)數(shù)據(jù)包已經(jīng)結(jié)束了,而數(shù)據(jù)進(jìn)入到ByteToMessageDecoder的時(shí)候就會(huì)因?yàn)橐?guī)定的body長(zhǎng)度不等于length長(zhǎng)度而出現(xiàn)問(wèn)題。
再次解決問(wèn)題
思來(lái)想去 不實(shí)用\n 這樣的單字節(jié)作為換行符,很容易在數(shù)據(jù)流中遇到,轉(zhuǎn)而使用\r\n倆字節(jié)來(lái)處理,而這倆字節(jié)出現(xiàn)在前面兩個(gè)int長(zhǎng)度中的幾率應(yīng)該很小。
看最后的代碼
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//這里使用自定義分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter));
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageHandler());
}
}
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
out.writeByte(MessageDecoder.PACKAGE_TAG);
out.writeByte(msg.getEncode());
out.writeByte(msg.getEncrypt());
out.writeInt(msg.getCommand());
byte[] bytes = msg.getBody().getBytes("UTF-8");
out.writeInt(bytes.length);
out.writeBytes(bytes);
//這里最后修改使用\r\n
out.writeBytes(new byte[]{'\r','\n'});
}
}
再次運(yùn)行程序 數(shù)據(jù)包可以正常接收了。
總結(jié)
- 以前使用netty的時(shí)候也僅限于和硬件交互,而當(dāng)時(shí)的硬件受限于成本問(wèn)題是一條一條處理數(shù)據(jù)包的,所以基本上不會(huì)考慮黏包問(wèn)題
- 然后就是
ByteToMessageDecoder和MessageToByteEncoder兩個(gè)類是比較底層實(shí)現(xiàn)數(shù)據(jù)流處理的,并沒(méi)有帶有拆包黏包的處理機(jī)制,需要自己在數(shù)據(jù)包頭規(guī)定包的長(zhǎng)度,而且無(wú)法處理過(guò)大的數(shù)據(jù)包,因?yàn)槲乙婚_始首先使用了這種方式處理數(shù)據(jù),所以后來(lái)就沒(méi)有再換成DelimiterBasedFrameDecoder加StringDecoder來(lái)解析數(shù)據(jù)包,最后使用json直接轉(zhuǎn)化為對(duì)象。