在RPC框架中,粘包和拆包問題是必須解決一個問題,因為RPC框架中,各個微服務(wù)相互之間都是維系了一個TCP長連接,比如dubbo就是一個全雙工的長連接。由于微服務(wù)往對方發(fā)送信息的時候,所有的請求都是使用的同一個連接,這樣就會產(chǎn)生粘包和拆包的問題。本文首先會對粘包和拆包問題進行描述,然后介紹其常用的解決方案,最后會對Netty提供的幾種解決方案進行講解。這里說明一下,由于oschina將“jie ma qi”認定為敏感文字,因而本文統(tǒng)一使用“解碼一器”表示該含義
1. 粘包和拆包
產(chǎn)生粘包和拆包問題的主要原因是,操作系統(tǒng)在發(fā)送TCP數(shù)據(jù)的時候,底層會有一個緩沖區(qū),例如1024個字節(jié)大小,如果一次請求發(fā)送的數(shù)據(jù)量比較小,沒達到緩沖區(qū)大小,TCP則會將多個請求合并為同一個請求進行發(fā)送,這就形成了粘包問題;如果一次請求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP就會將其拆分為多次發(fā)送,這就是拆包,也就是將一個大的包拆分為多個小包進行發(fā)送。如下圖展示了粘包和拆包的一個示意圖:

上圖中演示了粘包和拆包的三種情況:
- A和B兩個包都剛好滿足TCP緩沖區(qū)的大小,或者說其等待時間已經(jīng)達到TCP等待時長,從而還是使用兩個獨立的包進行發(fā)送;
- A和B兩次請求間隔時間內(nèi)較短,并且數(shù)據(jù)包較小,因而合并為同一個包發(fā)送給服務(wù)端;
- B包比較大,因而將其拆分為兩個包B_1和B_2進行發(fā)送,而這里由于拆分后的B_2比較小,其又與A包合并在一起發(fā)送。
2. 常見解決方案
對于粘包和拆包問題,常見的解決方案有四種:
- 客戶端在發(fā)送數(shù)據(jù)包的時候,每個包都固定長度,比如1024個字節(jié)大小,如果客戶端發(fā)送的數(shù)據(jù)長度不足1024個字節(jié),則通過補充空格的方式補全到指定長度;
- 客戶端在每個包的末尾使用固定的分隔符,例如\r\n,如果一個包被拆分了,則等待下一個包發(fā)送過來之后找到其中的\r\n,然后對其拆分后的頭部部分與前一個包的剩余部分進行合并,這樣就得到了一個完整的包;
- 將消息分為頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息;
- 通過自定義協(xié)議進行粘包和拆包的處理。
3. Netty提供的粘包拆包解決方案
3.1 FixedLengthFrameDecoder
對于使用固定長度的粘包和拆包場景,可以使用FixedLengthFrameDecoder,該解碼一器會每次讀取固定長度的消息,如果當前讀取到的消息不足指定長度,那么就會等待下一個消息到達后進行補足。其使用也比較簡單,只需要在構(gòu)造函數(shù)中指定每個消息的長度即可。這里需要注意的是,F(xiàn)ixedLengthFrameDecoder只是一個解碼一器,Netty也只提供了一個解碼一器,這是因為對于解碼是需要等待下一個包的進行補全的,代碼相對復(fù)雜,而對于編碼器,用戶可以自行編寫,因為編碼時只需要將不足指定長度的部分進行補全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來進行粘包和拆包處理:
public class EchoServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為20
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
// 將前一步解碼得到的數(shù)據(jù)轉(zhuǎn)碼為字符串
ch.pipeline().addLast(new StringDecoder());
// 這里FixedLengthFrameEncoder是我們自定義的,用于將長度不足20的消息進行補全空格
ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
// 最終的數(shù)據(jù)處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8080);
}
}
上面的pipeline中,對于入棧數(shù)據(jù),這里主要添加了FixedLengthFrameDecoder和StringDecoder,前面一個用于處理固定長度的消息的粘包和拆包問題,第二個則是將處理之后的消息轉(zhuǎn)換為字符串。最后由EchoServerHandler處理最終得到的數(shù)據(jù),處理完成后,將處理得到的數(shù)據(jù)交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實現(xiàn),主要作用是將長度不足20的消息進行空格補全。下面是FixedLengthFrameEncoder的實現(xiàn)代碼:
public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {
private int length;
public FixedLengthFrameEncoder(int length) {
this.length = length;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
// 對于超過指定長度的消息,這里直接拋出異常
if (msg.length() > length) {
throw new UnsupportedOperationException(
"message length is too large, it's limited " + length);
}
// 如果長度不足,則進行補全
if (msg.length() < length) {
msg = addSpace(msg);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
}
// 進行空格補全
private String addSpace(String msg) {
StringBuilder builder = new StringBuilder(msg);
for (int i = 0; i < length - msg.length(); i++) {
builder.append(" ");
}
return builder.toString();
}
}
這里FixedLengthFrameEncoder實現(xiàn)了decode()方法,在該方法中,主要是將消息長度不足20的消息進行空格補全。EchoServerHandler的作用主要是打印接收到的消息,然后發(fā)送響應(yīng)給客戶端:
public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server receives message: " + msg.trim());
ctx.writeAndFlush("hello client!");
}
}
對于客戶端,其實現(xiàn)方式基本與服務(wù)端的使用方式類似,只是在最后進行消息發(fā)送的時候與服務(wù)端的處理方式不同。如下是客戶端EchoClient的代碼:
public class EchoClient {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 對服務(wù)端發(fā)送的消息進行粘包和拆包處理,由于服務(wù)端發(fā)送的消息已經(jīng)進行了空格補全,
// 并且長度為20,因而這里指定的長度也為20
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
// 將粘包和拆包處理得到的消息轉(zhuǎn)換為字符串
ch.pipeline().addLast(new StringDecoder());
// 對客戶端發(fā)送的消息進行空格補全,保證其長度為20
ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
// 客戶端發(fā)送消息給服務(wù)端,并且處理服務(wù)端響應(yīng)的消息
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect("127.0.0.1", 8080);
}
}
對于客戶端而言,其消息的處理流程其實與服務(wù)端是相似的,對于入站消息,需要對其進行粘包和拆包處理,然后將其轉(zhuǎn)碼為字符串,對于出站消息,則需要將長度不足20的消息進行空格補全。客戶端與服務(wù)端處理的主要區(qū)別在于最后的消息處理handler不一樣,也即這里的EchoClientHandler,如下是該handler的源碼:
public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("client receives message: " + msg.trim());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("hello server!");
}
}
這里客戶端的處理主要是重寫了channelActive()和channelRead0()兩個方法,這兩個方法的主要作用在于,channelActive()會在客戶端連接上服務(wù)器時執(zhí)行,也就是說,其連上服務(wù)器之后就會往服務(wù)器發(fā)送消息。而channelRead0()主要是在服務(wù)器發(fā)送響應(yīng)給客戶端時執(zhí)行,這里主要是打印服務(wù)器的響應(yīng)消息。對于服務(wù)端而言,前面我們我們可以看到,EchoServerHandler只重寫了channelRead0()方法,這是因為服務(wù)器只需要等待客戶端發(fā)送消息過來,然后在該方法中進行處理,處理完成后直接將響應(yīng)發(fā)送給客戶端。如下是分別啟動服務(wù)端和客戶端之后控制臺打印的數(shù)據(jù):
// server
server receives message: hello server!
// client
client receives message: hello client!
3.2 LineBasedFrameDecoder與DelimiterBasedFrameDecoder
對于通過分隔符進行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。這里LineBasedFrameDecoder的作用主要是通過換行符,即\n或者\r\n對數(shù)據(jù)進行處理;而DelimiterBasedFrameDecoder的作用則是通過用戶指定的分隔符對數(shù)據(jù)進行粘包和拆包處理。同樣的,這兩個類都是解碼一器類,而對于數(shù)據(jù)的編碼,也即在每個數(shù)據(jù)包最后添加換行符或者指定分割符的部分需要用戶自行進行處理。這里以DelimiterBasedFrameDecoder為例進行講解,如下是EchoServer中使用該類的代碼片段,其余部分與前面的例子中的完全一致:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
String delimiter = "_$";
// 將delimiter設(shè)置到DelimiterBasedFrameDecoder中,經(jīng)過該解碼一器進行處理之后,源數(shù)據(jù)將會
// 被按照_$進行分隔,這里1024指的是分隔的最大長度,即當讀取到1024個字節(jié)的數(shù)據(jù)之后,若還是未
// 讀取到分隔符,則舍棄當前數(shù)據(jù)段,因為其很有可能是由于碼流紊亂造成的
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.wrappedBuffer(delimiter.getBytes())));
// 將分隔之后的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串數(shù)據(jù)
ch.pipeline().addLast(new StringDecoder());
// 這是我們自定義的一個編碼器,主要作用是在返回的響應(yīng)數(shù)據(jù)最后添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
// 最終處理數(shù)據(jù)并且返回響應(yīng)的handler
ch.pipeline().addLast(new EchoServerHandler());
}
上面pipeline的設(shè)置中,添加的解碼一器主要有DelimiterBasedFrameDecoder和StringDecoder,經(jīng)過這兩個處理器處理之后,接收到的字節(jié)流就會被分隔,并且轉(zhuǎn)換為字符串數(shù)據(jù),最終交由EchoServerHandler處理。這里DelimiterBasedFrameEncoder是我們自定義的編碼器,其主要作用是在返回的響應(yīng)數(shù)據(jù)之后添加分隔符。如下是該編碼器的源碼:
public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {
private String delimiter;
public DelimiterBasedFrameEncoder(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
// 在響應(yīng)的數(shù)據(jù)后面添加分隔符
ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));
}
}
對于客戶端而言,這里的處理方式與服務(wù)端類似,其pipeline的添加方式如下:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
String delimiter = "_$";
// 對服務(wù)端返回的消息通過_$進行分隔,并且每次查找的最大大小為1024字節(jié)
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.wrappedBuffer(delimiter.getBytes())));
// 將分隔之后的字節(jié)數(shù)據(jù)轉(zhuǎn)換為字符串
ch.pipeline().addLast(new StringDecoder());
// 對客戶端發(fā)送的數(shù)據(jù)進行編碼,這里主要是在客戶端發(fā)送的數(shù)據(jù)最后添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
// 客戶端發(fā)送數(shù)據(jù)給服務(wù)端,并且處理從服務(wù)端響應(yīng)的數(shù)據(jù)
ch.pipeline().addLast(new EchoClientHandler());
}
這里客戶端的處理方式與服務(wù)端基本一致,關(guān)于這里沒展示的代碼,其與示例一中的代碼完全一致,這里則不予展示。
3.3 LengthFieldBasedFrameDecoder與LengthFieldPrepender
這里LengthFieldBasedFrameDecoder與LengthFieldPrepender需要配合起來使用,其實本質(zhì)上來講,這兩者一個是解碼,一個是編碼的關(guān)系。它們處理粘拆包的主要思想是在生成的數(shù)據(jù)包中添加一個長度字段,用于記錄當前數(shù)據(jù)包的長度。LengthFieldBasedFrameDecoder會按照參數(shù)指定的包長度偏移量數(shù)據(jù)對接收到的數(shù)據(jù)進行解碼,從而得到目標消息體數(shù)據(jù);而LengthFieldPrepender則會在響應(yīng)的數(shù)據(jù)前面添加指定的字節(jié)數(shù)據(jù),這個字節(jié)數(shù)據(jù)中保存了當前消息體的整體字節(jié)數(shù)據(jù)長度。LengthFieldBasedFrameDecoder的解碼過程如下圖所示:

關(guān)于LengthFieldBasedFrameDecoder,這里需要對其構(gòu)造函數(shù)參數(shù)進行介紹:
- maxFrameLength:指定了每個包所能傳遞的最大數(shù)據(jù)包大小;
- lengthFieldOffset:指定了長度字段在字節(jié)碼中的偏移量;
- lengthFieldLength:指定了長度字段所占用的字節(jié)長度;
- lengthAdjustment:對一些不僅包含有消息頭和消息體的數(shù)據(jù)進行消息頭的長度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長度;
- initialBytesToStrip:對于長度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度字段占用的字節(jié)。
這里我們以json序列化為例對LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式進行講解。如下是EchoServer的源碼:
public class EchoServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 這里將LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的數(shù)據(jù)
// 進行長度字段解碼,這里也會對數(shù)據(jù)進行粘包和拆包處理
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// LengthFieldPrepender是一個編碼器,主要是在響應(yīng)字節(jié)數(shù)據(jù)前面添加字節(jié)長度字段
ch.pipeline().addLast(new LengthFieldPrepender(2));
// 對經(jīng)過粘包和拆包處理之后的數(shù)據(jù)進行json反序列化,從而得到User對象
ch.pipeline().addLast(new JsonDecoder());
// 對響應(yīng)數(shù)據(jù)進行編碼,主要是將User對象序列化為json
ch.pipeline().addLast(new JsonEncoder());
// 處理客戶端的請求的數(shù)據(jù),并且進行響應(yīng)
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8080);
}
}
這里EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼一器,編碼器主要是負責將響應(yīng)的User對象序列化為json對象,然后在其字節(jié)數(shù)組前面添加一個長度字段的字節(jié)數(shù)組;解碼一器主要是對接收到的數(shù)據(jù)進行長度字段的解碼,然后將其反序列化為一個User對象。下面是JsonDecoder的源碼:
public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out)
throws Exception {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);
out.add(user);
}
}
JsonDecoder首先從接收到的數(shù)據(jù)流中讀取字節(jié)數(shù)組,然后將其反序列化為一個User對象。下面我們看看JsonEncoder的源碼:
public class JsonEncoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)
throws Exception {
String json = JSON.toJSONString(user);
ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
}
}
JsonEncoder將響應(yīng)得到的User對象轉(zhuǎn)換為一個json對象,然后寫入響應(yīng)中。對于EchoServerHandler,其主要作用就是接收客戶端數(shù)據(jù),并且進行響應(yīng),如下是其源碼:
public class EchoServerHandler extends SimpleChannelInboundHandler<User> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
System.out.println("receive from client: " + user);
ctx.write(user);
}
}
對于客戶端,其主要邏輯與服務(wù)端的基本類似,這里主要展示其pipeline的添加方式,以及最后發(fā)送請求,并且對服務(wù)器響應(yīng)進行處理的過程:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new JsonDecoder());
ch.pipeline().addLast(new JsonEncoder());
ch.pipeline().addLast(new EchoClientHandler());
}
public class EchoClientHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write(getUser());
}
private User getUser() {
User user = new User();
user.setAge(27);
user.setName("zhangxufeng");
return user;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
System.out.println("receive message from server: " + user);
}
}
這里客戶端首先會在連接上服務(wù)器時,往服務(wù)器發(fā)送一個User對象數(shù)據(jù),然后在接收到服務(wù)器響應(yīng)之后,會打印服務(wù)器響應(yīng)的數(shù)據(jù)。
3.4 自定義粘包與拆包器
對于粘包與拆包問題,其實前面三種基本上已經(jīng)能夠滿足大多數(shù)情形了,但是對于一些更加復(fù)雜的協(xié)議,可能有一些定制化的需求。對于這些場景,其實本質(zhì)上,我們也不需要手動從頭開始寫一份粘包與拆包處理器,而是通過繼承LengthFieldBasedFrameDecoder和LengthFieldPrepender來實現(xiàn)粘包和拆包的處理。
如果用戶確實需要不通過繼承的方式實現(xiàn)自己的粘包和拆包處理器,這里可以通過實現(xiàn)MessageToByteEncoder和ByteToMessageDecoder來實現(xiàn)。這里MessageToByteEncoder的作用是將響應(yīng)數(shù)據(jù)編碼為一個ByteBuf對象,而ByteToMessageDecoder則是將接收到的ByteBuf數(shù)據(jù)轉(zhuǎn)換為某個對象數(shù)據(jù)。通過實現(xiàn)這兩個抽象類,用戶就可以達到實現(xiàn)自定義粘包和拆包處理的目的。如下是這兩個類及其抽象方法的聲明:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception;
}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)
throws Exception;
}
4. 小結(jié)
本文首先對粘包和拆包的問題原理進行描述,幫助讀者理解粘包和拆包問題所在。然后對處理粘包和拆包的幾種常用解決方案進行講解。接著通過輔以示例的方式對Netty提供的幾種解決粘包和拆包問題的解決方案進行了詳細講解。
大家可以加我的程序員交流群:833145934,群內(nèi)每晚都會有阿里技術(shù)大牛講解的最新Java架構(gòu)技術(shù)。并會錄制錄播視頻分享在群公告中,作為給廣大朋友的加群的福利——分布式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高并發(fā)、高可用架構(gòu))/微服務(wù)(Spring Boot、Spring Cloud)/源碼(Spring、Mybatis)/性能優(yōu)化(JVM、TomCat、MySQL)【加群備注好消息領(lǐng)取最新面試資料】