將消息定義結(jié)構(gòu)為消息頭和消息體兩部分,消息頭中存儲消息的長度。netty讀取消息頭后,就能知道消息體的長度了。
自定義協(xié)議
| 協(xié)議開始標志 | 長度 | 數(shù)據(jù) |
|---|
- 協(xié)議開始標志head_data,為int類型的數(shù)據(jù),16進制表示為0X76;
- 傳輸數(shù)據(jù)的長度contentLength,int類型;
- 要傳輸?shù)臄?shù)據(jù)。
自定義協(xié)議類的封裝:
package learn.netty.protocal;
/**
* @author stone
* @date 2019/7/29 17:38
*/
public class ConstantValue {
/**
* 自定義協(xié)議,報文開始標志
*/
public static final int HEAD_DATA = 0X76;
}
package learn.netty.protocal;
import java.io.UnsupportedEncodingException;
/**
* @author stone
* @date 2019/7/29 17:39
*/
public class MyLsProtocol {
/**
* 消息頭標志
*/
private int header = ConstantValue.HEAD_DATA;
/**
* 消息長度
*/
private int contentLength;
/**
* 消息內(nèi)容
*/
private byte[] content;
public MyLsProtocol(int contentLength, byte[] content) {
this.contentLength = contentLength;
this.content = content;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
public int getHeader() {
return header;
}
@Override
public String toString() {
try {
return "MyLsProtocol{" +
"header=" + header +
", contentLength=" + contentLength +
", content=" + new String(content, "utf-8") +
'}';
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return "";
}
}
}
自定義協(xié)議的編碼器:
package learn.netty.protocal;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 自定義協(xié)議編碼器(對象轉(zhuǎn)為字節(jié))
*
* @author stone
* @date 2019/7/29 17:41
*/
public class LsEncoder extends MessageToByteEncoder<MyLsProtocol> {
@Override
public void encode(ChannelHandlerContext ctx, MyLsProtocol msg, ByteBuf out) throws Exception {
out.writeInt(msg.getHeader());
out.writeInt(msg.getContentLength());
out.writeBytes(msg.getContent());
}
}
自定義協(xié)議的解碼器:
package learn.netty.protocal;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 自定義協(xié)議解碼器 (字節(jié)轉(zhuǎn)對象)
*
* @author stone
* @date 2019/7/29 17:43
*/
public class LsDecoder extends ByteToMessageDecoder {
/**
* 報文開始的標志 header是int類型,占4個字節(jié)
* 表示報文長度的contentLength是int類型,占4個字節(jié)
*/
public final int BASE_LENGTH = 8;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 可讀長度必須大于基本長度
if (in.readableBytes() >= BASE_LENGTH) {
// 防止socket字節(jié)流攻擊
// 防止客戶端傳來的數(shù)據(jù)過大(太大的數(shù)據(jù)是不合理的)
if (in.readableBytes() > 2048) {
in.skipBytes(in.readableBytes());
}
// 記錄包頭開始的index
int beginReader;
while (true) {
// 獲取包頭開始的index
beginReader = in.readerIndex();
// 標記包頭開始的index
in.markReaderIndex();
// 讀到協(xié)議的開始標志,結(jié)束while循環(huán)
if (in.readInt() == ConstantValue.HEAD_DATA) {
break;
}
// 未讀到包頭,跳過一個字節(jié)
// 每次跳過一個字節(jié)后,再去讀取包頭信息的開始標記
in.resetReaderIndex();
in.readByte();
// 當跳過一個字節(jié)后,數(shù)據(jù)包的長度又變的不滿足,此時應該結(jié)束,等待后邊數(shù)據(jù)流的到達
if (in.readableBytes() < BASE_LENGTH) {
return;
}
}
// 代碼到這里,說明已經(jīng)讀到了報文標志
// 消息長度
int length = in.readInt();
// 判斷請求數(shù)據(jù)包是否到齊
if (in.readableBytes() < length) { // 數(shù)據(jù)不齊,回退讀指針
// 還原讀指針
in.readerIndex(beginReader);
return;
}
// 至此,讀到一條完整報文
byte[] data = new byte[length];
in.readBytes(data);
MyLsProtocol protocol = new MyLsProtocol(data.length, data);
out.add(protocol);
}
}
}
服務端實現(xiàn):
package learn.netty.protocal;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* @author stone
* @date 2019/7/30 9:30
*/
public class ProtocolServer {
public ProtocolServer() {
}
public void bind(int port) throws Exception {
// 配置IO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服務器輔助啟動類配置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler())
.option(ChannelOption.SO_BACKLOG, 1024) // 設置tcp緩沖區(qū)
.option(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口,同步等待綁定成功
ChannelFuture f = b.bind(port).sync();
// 等待服務端監(jiān)聽端口關閉
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 服務端加入的協(xié)議編碼/解碼器
public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加自定義協(xié)議編碼工具
ch.pipeline().addLast(new LsEncoder());
ch.pipeline().addLast(new LsDecoder());
// 處理網(wǎng)絡IO
ch.pipeline().addLast(new ProtocolServerHandler());
}
}
public static void main(String[] args) throws Exception {
new ProtocolServer().bind(9999);
}
}
服務端handler實現(xiàn):
package learn.netty.protocal;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author stone
* @date 2019/7/30 9:33
*/
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 用于獲取客戶端發(fā)來的數(shù)據(jù)信息
MyLsProtocol body = (MyLsProtocol) msg;
System.out.println("Server接收到的客戶端信息:" + body.toString());
// 寫數(shù)據(jù)給客戶端
String str = "Hi I am Server ...";
MyLsProtocol response = new MyLsProtocol(str.getBytes().length, str.getBytes());
// 當服務端完成寫操作后,關閉與客戶端的連接
ctx.writeAndFlush(response);
// 有寫操作時,不需要手動釋放msg的引用; 當只有讀操作時,才需要手動釋放msg的引用
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Server exceptionCaught");
cause.printStackTrace();
// if (ctx.channel().isActive()) {
// ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
// }
ctx.close();
}
}
客戶端實現(xiàn):
package learn.netty.protocal;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author stone
* @date 2019/7/30 9:45
*/
public class ProtocolClient {
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
// 配置啟動輔助類
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new MyChannelHandler());
// 異步連接服務器,同步等待連接成功
ChannelFuture f = b.connect(host, port).sync();
// 等待連接關閉
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new ProtocolClient().connect(9999, "127.0.0.1");
}
// 客戶端加入的協(xié)議編碼/解碼器
public class MyChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LsEncoder());
ch.pipeline().addLast(new LsDecoder());
// 處理網(wǎng)絡IO
ch.pipeline().addLast(new ProtocolClientHandler());
}
}
}
客戶端handler實現(xiàn):
package learn.netty.protocal;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* 客戶端Handler
* @author
*/
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {
/**
* 客戶端一旦與服務端建立好連接,就會觸發(fā)該方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 發(fā)送消息
String data = "I am client ...";
// 獲取要發(fā)送信息的字節(jié)數(shù)組
byte[] content = data.getBytes();
// 要發(fā)送信息的長度
int contentLength = content.length;
MyLsProtocol protocol = new MyLsProtocol(contentLength, content);
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(protocol);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// 用于獲取客戶端發(fā)來的數(shù)據(jù)信息
MyLsProtocol body = (MyLsProtocol) msg;
System.out.println("Client接收的客戶端的信息:" + body.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
參考博客:
http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html
http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html