前言:demo演示
首先,我們來(lái)看個(gè)demo

1、EchoServer
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
EchoServer echoServer = new EchoServer(9999);
System.out.println("服務(wù)器即將啟動(dòng)");
echoServer.start();
System.out.println("服務(wù)器關(guān)閉");
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
ServerBootstrap b = new ServerBootstrap();/*服務(wù)端啟動(dòng)必須*/
b.group(group)/*將線程組傳入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO進(jìn)行網(wǎng)絡(luò)傳輸*/
.localAddress(new InetSocketAddress(port))/*指定服務(wù)器監(jiān)聽(tīng)端口*/
/*服務(wù)端每接收到一個(gè)連接請(qǐng)求,就會(huì)新啟一個(gè)socket通信,也就是channel,
所以下面這段代碼的作用就是為這個(gè)子channel增加handle*/
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);/*添加到該子channel的pipeline的尾部*/
}
});
ChannelFuture f = b.bind().sync();/*異步綁定到服務(wù)器,sync()會(huì)阻塞直到完成*/
System.out.println("服務(wù)器啟動(dòng)完成,等待客戶端的連接和數(shù)據(jù).....");
f.channel().closeFuture().sync();/*阻塞直到服務(wù)器的channel關(guān)閉*/
} finally {
group.shutdownGracefully().sync();/*優(yōu)雅關(guān)閉線程組*/
}
}
}
2、EchoServerHandler
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服務(wù)端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept[" + request
+ "] and the counter is:" + counter.incrementAndGet());
String resp = "Hello," + request + ". Welcome to Netty World!"
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 發(fā)生異常后的處理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
使用netty實(shí)現(xiàn)了個(gè)服務(wù)端,當(dāng)接收到客戶端的消息是,打印出來(lái)請(qǐng)求的內(nèi)容,并統(tǒng)計(jì)接收請(qǐng)求的次數(shù)。
3、EchoClient
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
final Bootstrap b = new Bootstrap();
/*客戶端啟動(dòng)必須*/
b.group(group)/*將線程組傳入*/
.channel(NioSocketChannel.class)/*指定使用NIO進(jìn)行網(wǎng)絡(luò)傳輸*/
.remoteAddress(new InetSocketAddress(host, port))/*配置要連接服務(wù)器的ip地址和端口*/
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999, "127.0.0.1").start();
}
}
4、EchoClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客戶端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
+ "] and the counter is:" + counter.incrementAndGet());
}
/*** 客戶端被通知channel活躍后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "test1,test2,test3,test4"
+ System.getProperty("line.separator");
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 發(fā)生異常后的處理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
使用netty實(shí)現(xiàn)了個(gè)客戶端,鏈接建立完成之后向服務(wù)端發(fā)送消息。循環(huán)100次。并且打印服務(wù)端返回的消息。并統(tǒng)計(jì)返回次數(shù)。
執(zhí)行結(jié)果
服務(wù)端輸出

客戶端打印

結(jié)果發(fā)現(xiàn),我們客戶單發(fā)送了100次數(shù)據(jù),但實(shí)際上只接收了30次。而且每次消息發(fā)送的是test1,test2,test3,test4,test5,但實(shí)際接受的卻有很多相鏈接起來(lái)的。這是為什么呢?為什么不是100次test1,test2,test3,test4,test5呢?這就是TCP傳輸?shù)恼嘲?半包問(wèn)題。
一、什么是TCP粘包半包?

假設(shè)客戶端分別發(fā)送了兩個(gè)數(shù)據(jù)包D1和D2給服務(wù)端,由于服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,故可能存在以下4種情況。
- 服務(wù)端分兩次讀取到了兩個(gè)獨(dú)立的數(shù)據(jù)包,分別是D1和D2,沒(méi)有粘包和拆包;
- 服務(wù)端一次接收到了兩個(gè)數(shù)據(jù)包,D1和D2粘合在一起,被稱(chēng)為T(mén)CP粘包;
- 服務(wù)端分兩次讀取到了兩個(gè)數(shù)據(jù)包,第一次讀取到了完整的D1包和D2包的部分內(nèi)容,第二次讀取到了D2包的剩余內(nèi)容,這被稱(chēng)為T(mén)CP拆包;
- 服務(wù)端分兩次讀取到了兩個(gè)數(shù)據(jù)包,第一次讀取到了D1包的部分內(nèi)容D1_1,第二次讀取到了D1包的剩余內(nèi)容D1_2和D2包的整包。
如果此時(shí)服務(wù)端TCP接收滑窗非常小,而數(shù)據(jù)包D1和D2比較大,很有可能會(huì)發(fā)生第五種可能,即服務(wù)端分多次才能將D1和D2包接收完全,期間發(fā)生多次拆包。
二、TCP粘包/半包發(fā)生的原因
由于TCP協(xié)議本身的機(jī)制(面向連接的可靠地協(xié)議-三次握手機(jī)制)客戶端與服務(wù)器會(huì)維持一個(gè)連接(Channel),數(shù)據(jù)在連接不斷開(kāi)的情況下,可以持續(xù)不斷地將多個(gè)數(shù)據(jù)包發(fā)往服務(wù)器,但是如果發(fā)送的網(wǎng)絡(luò)數(shù)據(jù)包太小,那么他本身會(huì)啟用Nagle算法(可配置是否啟用)對(duì)較小的數(shù)據(jù)包進(jìn)行合并(基于此,TCP的網(wǎng)絡(luò)延遲要UDP的高些)然后再發(fā)送(超時(shí)或者包大小足夠)。那么這樣的話,服務(wù)器在接收到消息(數(shù)據(jù)流)的時(shí)候就無(wú)法區(qū)分哪些數(shù)據(jù)包是客戶端自己分開(kāi)發(fā)送的,這樣產(chǎn)生了粘包;服務(wù)器在接收到數(shù)據(jù)庫(kù)后,放到緩沖區(qū)中,如果消息沒(méi)有被及時(shí)從緩存區(qū)取走,下次在取數(shù)據(jù)的時(shí)候可能就會(huì)出現(xiàn)一次取出多個(gè)數(shù)據(jù)包的情況,造成粘包現(xiàn)象
UDP:本身作為無(wú)連接的不可靠的傳輸協(xié)議(適合頻繁發(fā)送較小的數(shù)據(jù)包),他不會(huì)對(duì)數(shù)據(jù)包進(jìn)行合并發(fā)送(也就沒(méi)有Nagle算法之說(shuō)了),他直接是一端發(fā)送什么數(shù)據(jù),直接就發(fā)出去了,既然他不會(huì)對(duì)數(shù)據(jù)合并,每一個(gè)數(shù)據(jù)包都是完整的(數(shù)據(jù)+UDP頭+IP頭等等發(fā)一次數(shù)據(jù)封裝一次)也就沒(méi)有粘包一說(shuō)了。
分包產(chǎn)生的原因就簡(jiǎn)單的多:可能是IP分片傳輸導(dǎo)致的,也可能是傳輸過(guò)程中丟失部分包導(dǎo)致出現(xiàn)的半包,還有可能就是一個(gè)包可能被分成了兩次傳輸,在取數(shù)據(jù)的時(shí)候,先取到了一部分(還可能與接收的緩沖區(qū)大小有關(guān)系),總之就是一個(gè)數(shù)據(jù)包被分成了多次接收。
更具體的原因有三個(gè),分別如下。
- 應(yīng)用程序?qū)懭霐?shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小
- 進(jìn)行MSS大小的TCP分段。MSS是最大報(bào)文段長(zhǎng)度的縮寫(xiě)。MSS是TCP報(bào)文段中的數(shù)據(jù)字段的最大長(zhǎng)度。數(shù)據(jù)字段加上TCP首部才等于整個(gè)的TCP報(bào)文段。所以MSS并不是TCP報(bào)文段的最大長(zhǎng)度,而是:MSS=TCP報(bào)文段長(zhǎng)度-TCP首部長(zhǎng)
- 以太網(wǎng)的payload大于MTU進(jìn)行IP分片。MTU指:一種通信協(xié)議的某一層上面所能通過(guò)的最大數(shù)據(jù)包大小。如果IP層有一個(gè)數(shù)據(jù)包要傳,而且數(shù)據(jù)的長(zhǎng)度比鏈路層的MTU大,那么IP層就會(huì)進(jìn)行分片,把數(shù)據(jù)包分成托干片,讓每一片都不超過(guò)MTU。注意,IP分片可以發(fā)生在原始發(fā)送端主機(jī)上,也可以發(fā)生在中間路由器上。
三、解決粘包半包問(wèn)題
由于底層的TCP無(wú)法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無(wú)法保證數(shù)據(jù)包不被拆分和重組的,這個(gè)問(wèn)題只能通過(guò)上層的應(yīng)用協(xié)議棧設(shè)計(jì)來(lái)解決,根據(jù)業(yè)界的主流協(xié)議的解決方案,可以歸納如下。
1、在包尾增加分割符
在包尾增加分割符,比如回車(chē)換行符進(jìn)行分割,例如FTP協(xié)議;
demo如下:
LineBaseEchoServer
public class LineBaseEchoServer {
public static final int PORT = 9998;
public static void main(String[] args) throws InterruptedException {
LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
System.out.println("服務(wù)器即將啟動(dòng)");
lineBaseEchoServer.start();
}
public void start() throws InterruptedException {
final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
ServerBootstrap b = new ServerBootstrap();/*服務(wù)端啟動(dòng)必須*/
b.group(group)/*將線程組傳入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO進(jìn)行網(wǎng)絡(luò)傳輸*/
.localAddress(new InetSocketAddress(PORT))/*指定服務(wù)器監(jiān)聽(tīng)端口*/
/*服務(wù)端每接收到一個(gè)連接請(qǐng)求,就會(huì)新啟一個(gè)socket通信,也就是channel,
所以下面這段代碼的作用就是為這個(gè)子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*異步綁定到服務(wù)器,sync()會(huì)阻塞直到完成*/
System.out.println("服務(wù)器啟動(dòng)完成,等待客戶端的連接和數(shù)據(jù).....");
f.channel().closeFuture().sync();/*阻塞直到服務(wù)器的channel關(guān)閉*/
} finally {
group.shutdownGracefully().sync();/*優(yōu)雅關(guān)閉線程組*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//添加換行解碼器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LineBaseServerHandler());
}
}
}
LineBaseEchoServer
public class LineBaseEchoServer {
public static final int PORT = 9998;
public static void main(String[] args) throws InterruptedException {
LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
System.out.println("服務(wù)器即將啟動(dòng)");
lineBaseEchoServer.start();
}
public void start() throws InterruptedException {
final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
ServerBootstrap b = new ServerBootstrap();/*服務(wù)端啟動(dòng)必須*/
b.group(group)/*將線程組傳入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO進(jìn)行網(wǎng)絡(luò)傳輸*/
.localAddress(new InetSocketAddress(PORT))/*指定服務(wù)器監(jiān)聽(tīng)端口*/
/*服務(wù)端每接收到一個(gè)連接請(qǐng)求,就會(huì)新啟一個(gè)socket通信,也就是channel,
所以下面這段代碼的作用就是為這個(gè)子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*異步綁定到服務(wù)器,sync()會(huì)阻塞直到完成*/
System.out.println("服務(wù)器啟動(dòng)完成,等待客戶端的連接和數(shù)據(jù).....");
f.channel().closeFuture().sync();/*阻塞直到服務(wù)器的channel關(guān)閉*/
} finally {
group.shutdownGracefully().sync();/*優(yōu)雅關(guān)閉線程組*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//添加換行解碼器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LineBaseServerHandler());
}
}
}
LineBaseEchoClient
public class LineBaseEchoClient {
private final String host;
public LineBaseEchoClient(String host) {
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
try {
final Bootstrap b = new Bootstrap();
b.group(group)/*將線程組傳入*/
.channel(NioSocketChannel.class)/*指定使用NIO進(jìn)行網(wǎng)絡(luò)傳輸*/
.remoteAddress(new InetSocketAddress(host, LineBaseEchoServer.PORT))/*配置要連接服務(wù)器的ip地址和端口*/
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已連接到服務(wù)器.....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//回車(chē)符做了分割
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LineBaseClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {
new LineBaseEchoClient("127.0.0.1").start();
}
}
LineBaseClientHandler
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客戶端讀取到網(wǎng)絡(luò)數(shù)據(jù)后的處理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
+ "] and the counter is:" + counter.incrementAndGet());
ctx.close();
}
/*** 客戶端被通知channel活躍后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "test1,test2,test3,test4,test5"
+ System.getProperty("line.separator");
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
System.out.println(System.currentTimeMillis() + ":即將發(fā)送數(shù)據(jù):"
+ request);
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 發(fā)生異常后的處理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
執(zhí)行效果

2、消息定長(zhǎng)
例如每個(gè)報(bào)文的大小為固定長(zhǎng)度200字節(jié),如果不夠,空位補(bǔ)空格;
服務(wù)端只需將服務(wù)端的ChannelInitializerImp 解碼器new LineBasedFrameDecoder(1024)替換為new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())即可。
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//添加定長(zhǎng)報(bào)文長(zhǎng)度解碼器,長(zhǎng)度問(wèn)請(qǐng)求的長(zhǎng)度
ch.pipeline().addLast(
new FixedLengthFrameDecoder(
FixedLengthEchoClient.REQUEST.length()));
ch.pipeline().addLast(new FixedLengthServerHandler());
}
}
3、將消息分為消息頭和消息體
消息頭中包含表示消息總長(zhǎng)度(或者消息體長(zhǎng)度)的字段,通常設(shè)計(jì)思路為消息頭的第一個(gè)字段使用int32來(lái)表示消息的總長(zhǎng)度。類(lèi)似與第二條,只是我們按照頭部的content-length長(zhǎng)度進(jìn)行定長(zhǎng)解碼。