個(gè)人博客
基于Netty實(shí)現(xiàn)服務(wù)端與客戶端通信
前言
本文介紹基于Netty實(shí)現(xiàn)的服務(wù)端與客戶端通信的簡單使用方法,并在此基礎(chǔ)上實(shí)現(xiàn)一個(gè)簡單的服務(wù)端-客戶端指令通信的Demo。
Netty是什么
Netty是一個(gè)NIO客戶端-服務(wù)器框架,可以快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,例如協(xié)議服務(wù)器和客戶端。它極大地簡化了網(wǎng)絡(luò)編程,例如TCP和UDP套接字服務(wù)器的開發(fā)。提供一個(gè)異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,以快速開發(fā)可維護(hù)的高性能和高可擴(kuò)展性協(xié)議服務(wù)器和客戶端。
以上內(nèi)容摘選自https://netty.io/wiki/user-guide-for-4.x.html
Netty具有以下特點(diǎn):
- 適用于各種傳輸類型的統(tǒng)一API-阻塞和非阻塞套接字
- 更高的吞吐量,更低的延遲
- 減少資源消耗
- 減少不必要的內(nèi)存復(fù)制
- 完整的SSL / TLS和StartTLS支持
以上內(nèi)容摘選自https://netty.io/
使用入門
Netty的使用,可以參照Netty的官方文檔,這里以4.x為例來演示Netty在服務(wù)端和客戶端上使用。文檔地址:https://netty.io/wiki/user-guide-for-4.x.html
這里用Eclipse來進(jìn)行開發(fā),服務(wù)端和客戶端都放在一個(gè)工程里。
新建Java工程
服務(wù)端
首先需要導(dǎo)入netty的jar包。這里使用netty-all-4.1.48.Final.jar。
NettyServer
新建NettyServer類
public class NettyServer {
private int mPort;
public NettyServer(int port) {
this.mPort = port;
}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
// 指定連接隊(duì)列大小
.option(ChannelOption.SO_BACKLOG, 128)
//KeepAlive
.childOption(ChannelOption.SO_KEEPALIVE, true)
//Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture f = b.bind(mPort).sync();
if (f.isSuccess()) {
LogUtil.log("Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):" + mPort);
}
// f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// workerGroup.shutdownGracefully();
// bossGroup.shutdownGracefully();
}
}
}
NettyServerHandler
在初始化時(shí),需要指定Handle,用來處理Channel相關(guān)業(yè)務(wù)。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Server,channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LogUtil.log("Server,接收到客戶端發(fā)來的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LogUtil.log("Server,exceptionCaught");
cause.printStackTrace();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Server,channelInactive");
}
}
經(jīng)過上面這些步驟后,服務(wù)端最基本的設(shè)置就完成了。
客戶端
客戶端和服務(wù)端在初始化時(shí)大體是類似的,不過相比服務(wù)端要簡單一些。
NettyClient
public class NettyClient {
private String mHost;
private int mPort;
private NettyClientHandler mClientHandler;
private ChannelFuture mChannelFuture;
public NettyClient(String host, int port) {
this.mHost = host;
this.mPort = port;
}
public void connect() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
mClientHandler = new NettyClientHandler();
b.group(workerGroup).channel(NioSocketChannel.class)
// KeepAlive
.option(ChannelOption.SO_KEEPALIVE, true)
// Handler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(mClientHandler);
}
});
mChannelFuture = b.connect(mHost, mPort).sync();
if (mChannelFuture.isSuccess()) {
LogUtil.log("Client,連接服務(wù)端成功");
}
mChannelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Client,channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LogUtil.log("Client,接收到服務(wù)端發(fā)來的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LogUtil.log("Client,exceptionCaught");
cause.printStackTrace();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Client,channelInactive");
}
}
到這里,客戶端最基本設(shè)置就完成了。
連接服務(wù)端
新建一個(gè)Main類,用于測試服務(wù)端和客戶端是否能正常連接。
public class Main {
public static void main(String[] args) {
try {
String host = "127.0.0.1";
int port = 12345;
NettyServer server = new NettyServer(port);
server.run();
Thread.sleep(1000);
NettyClient client = new NettyClient(host, port);
client.connect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
運(yùn)行main方法,輸出日志如下:
2020-4-13 0:11:02--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 0:11:03--Client,channelActive
2020-4-13 0:11:03--Client,連接服務(wù)端成功
2020-4-13 0:11:03--Server,channelActive
可以看到,客戶端成功連接上了服務(wù)端,服務(wù)端和客戶端里設(shè)置的Handler的channelActive方法都會(huì)回調(diào)。
服務(wù)端與客戶端通信
在服務(wù)端與客戶端連接成功后,我們往往需要在雙方間進(jìn)行通信。這里假定,在連接成功后,服務(wù)端給客戶端發(fā)送一個(gè)歡迎信息"你好,客戶端",而客戶端在收到服務(wù)端的消息后,也給服務(wù)端回復(fù)一個(gè)消息"你好,服務(wù)端"。下面來實(shí)現(xiàn)具體的功能。
修改服務(wù)端NettyServerHandler中的channelActive方法和channelRead方法,在channelActive方法中給客戶端發(fā)送消息,在channelRead方法中解析客戶端發(fā)來的消息
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Server,channelActive");
ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端", Charset.forName("utf-8"));
ctx.writeAndFlush(byteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] buffer = new byte[buf.readableBytes()];
buf.readBytes(buffer);
String message = new String(buffer, "utf-8");
LogUtil.log("Server,接收到客戶端發(fā)來的消息:" + message);
}
}
修改客戶端NettyClientHandler中的channelRead方法,當(dāng)收到服務(wù)端的消息時(shí),回復(fù)服務(wù)端
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] buffer = new byte[buf.readableBytes()];
buf.readBytes(buffer);
String message = new String(buffer,"utf-8");
LogUtil.log("Client,接收到服務(wù)端發(fā)來的消息:" + message);
ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服務(wù)端", Charset.forName("utf-8"));
ctx.writeAndFlush(byteBuf);
}
}
運(yùn)行后,輸出日志如下:
2020-4-13 0:29:16--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 0:29:17--Client,channelActive
2020-4-13 0:29:17--Client,連接服務(wù)端成功
2020-4-13 0:29:17--Server,channelActive
2020-4-13 0:29:17--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
2020-4-13 0:29:17--Server,接收到客戶端發(fā)來的消息:你好,服務(wù)端
可以看到,服務(wù)端與客戶端已經(jīng)可以正常通信。
粘包與拆包
在實(shí)際的使用場景中,可能會(huì)存在短時(shí)間內(nèi)大量數(shù)據(jù)發(fā)送的問題。我們模擬這個(gè)場景。在客戶端連接上服務(wù)端后,服務(wù)端給客戶端發(fā)送100個(gè)消息,而為便于分析,客戶端在收到服務(wù)端消息后,不作回復(fù)。
修改服務(wù)端中NettyServerHandler的channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Server,channelActive");
for (int i = 0; i < 100; i++) {
ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端", Charset.forName("utf-8"));
ctx.writeAndFlush(byteBuf);
}
}
修改客戶端中NettyClientHandler的channelRead方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] buffer = new byte[buf.readableBytes()];
buf.readBytes(buffer);
String message = new String(buffer, "utf-8");
LogUtil.log("Client,接收到服務(wù)端發(fā)來的消息:" + message);
//ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服務(wù)端", Charset.forName("utf-8"));
//ctx.writeAndFlush(byteBuf);
}
運(yùn)行后,輸出的部分結(jié)果如下:
2020-4-13 0:35:28--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 0:35:29--Client,channelActive
2020-4-13 0:35:29--Client,連接服務(wù)端成功
2020-4-13 0:35:29--Server,channelActive
2020-4-13 0:35:29--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
2020-4-13 0:35:29--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端
2020-4-13 0:35:29--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
可以看到,出現(xiàn)了多條消息"粘"在一起的情況。
什么是粘包與拆包
TCP是個(gè)"流"協(xié)議,所謂流,就是沒有界限的一串?dāng)?shù)據(jù)。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個(gè)完整的包可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。
以上內(nèi)容摘選自TCP粘包/拆包與Netty解決方案
解決方案
在沒有 Netty 的情況下,用戶如果自己需要拆包,基本原理就是不斷從 TCP 緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包 如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從 TCP 緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包。 如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),構(gòu)成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接。
以上內(nèi)容摘選自徹底理解Netty,這一篇文章就夠了
而使用Netty,則解決這個(gè)問題的方法就簡單多了。Netty已經(jīng)提供了四個(gè)拆包器:
- FixedLengthFrameDecoder:固定長度的拆包器,Netty會(huì)把固定長度的數(shù)據(jù)包發(fā)送給下一個(gè)channelHandler
- LineBasedFrameDecoder:行拆包器,每個(gè)數(shù)據(jù)包以換行符分隔發(fā)送
- DelimiterBasedFrameDecoder:分隔符拆包器,可以自定義分隔符,行拆包器是分隔符拆包器的一種特例
- LengthFieldBasedFrameDecoder:基于長度域的拆包器,如果自定義協(xié)議中包含長度域的字段,就可以使用這個(gè)拆包器
在這里,我們選用分隔符拆包器
首先定義分隔符
public class Config {
public static final String DATA_PACK_SEPARATOR = "#$&*";
}
在服務(wù)端的channelHandler配置中,需要增加
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//這個(gè)配置需要在添加Handler前設(shè)置
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
channel.pipeline().addLast(new NettyServerHandler());
}
在客戶端的channelHandler的配置中,同樣也需要增加
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//這個(gè)配置需要在添加Handler前設(shè)置
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
channel.pipeline().addLast(new NettyServerHandler());
}
發(fā)送數(shù)據(jù)時(shí),在數(shù)據(jù)的末尾增加分隔符:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Server,channelActive");
for (int i = 0; i < 100; i++) {
ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端"+Config.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
ctx.writeAndFlush(byteBuf);
}
}
運(yùn)行后,可以發(fā)現(xiàn),已經(jīng)解決"粘包"與"拆包"的問題。
心跳
在網(wǎng)絡(luò)應(yīng)用中,為了判斷連接是否還存在,一般會(huì)通過發(fā)送心跳包來檢測。在Netty中,配置心跳包的步驟如下
在客戶端的channelHandler的配置中,需要增加
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
//...
}
在NettyClientHandler中,重寫userEventTriggered方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
LogUtil.log("Client,Idle:" + event.state());
switch (event.state()) {
case READER_IDLE:
break;
case WRITER_IDLE:
ByteBuf byteBuf = Unpooled.copiedBuffer("心跳^v^v", Charset.forName("utf-8"));
break;
case ALL_IDLE:
break;
default:
super.userEventTriggered(ctx, evt);
break;
}
}
當(dāng)寫空閑達(dá)到配置的時(shí)間時(shí),往服務(wù)端發(fā)送一個(gè)心跳消息
運(yùn)行后,日志輸出如下:
2020-4-13 1:22:50--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 1:22:51--Client,channelActive
2020-4-13 1:22:51--Client,連接服務(wù)端成功
2020-4-13 1:22:51--Server,channelActive
2020-4-13 1:22:51--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
2020-4-13 1:22:56--Client,Idle:WRITER_IDLE
2020-4-13 1:22:56--Server,接收到客戶端發(fā)來的消息:心跳^v^
2020-4-13 1:22:56--Client,Idle:READER_IDLE
2020-4-13 1:23:01--Client,Idle:WRITER_IDLE
2020-4-13 1:23:01--Server,接收到客戶端發(fā)來的消息:心跳^v^
2020-4-13 1:23:01--Client,Idle:READER_IDLE
可以看到,心跳包按我們配置的時(shí)間正常輸出了。
配置編碼器與解碼器
我們上面在發(fā)送數(shù)據(jù)時(shí),需要通過ByteBuf來轉(zhuǎn)換String,而通過配置編碼,解碼器,我們就可以直接發(fā)送字符串。配置如下:
在服務(wù)端與客戶端的channelHandler分別增加以下配置:
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//...
//這個(gè)配置需要在添加Handler前設(shè)置
channel.pipeline().addLast("encoder", new StringEncoder());
channel.pipeline().addLast("decoder", new StringDecoder());
//...
}
在發(fā)送消息時(shí),則可以直接通過ctx.writeAndFlush("心跳^v^" + Config.DATA_PACK_SEPARATOR)的形式來發(fā)送。
源碼
到此,最簡單的服務(wù)端與客戶端通信的Demo已經(jīng)完成。源碼地址:https://github.com/milovetingting/Samples/tree/master/NettyDemo
使用進(jìn)階
在上面的基礎(chǔ)上,我們來實(shí)現(xiàn)一個(gè)下面的需求:
客戶端需要登錄到服務(wù)端
客戶端登錄成功后,服務(wù)端可以給客戶端發(fā)送指令消息,客戶端在收到消息及處理完消息后,都需要上報(bào)給服務(wù)端
封裝連接
為便于程序擴(kuò)展,我們將客戶端連接服務(wù)端的部分抽取出來。通過一個(gè)接口來定義連接的方法,而連接的具體實(shí)現(xiàn)由子類來實(shí)現(xiàn)。
定義接口
public interface IConnection {
/**
* 連接服務(wù)器
*
* @param host 服務(wù)器地址
* @param port 端口
* @param callback 連接回調(diào)
*/
public void connect(String host, int port, IConnectionCallback callback);
}
在這里還需要定義連接的回調(diào)接口
public interface IConnectionCallback {
/**
* 連接成功
*/
public void onConnected();
}
具體的連接實(shí)現(xiàn)類
public class NettyConnection implements IConnection {
private NettyClient mClient;
@Override
public void connect(String host, int port, IConnectionCallback callback) {
if (mClient == null) {
mClient = new NettyClient(host, port);
mClient.setConnectionCallBack(callback);
mClient.connect();
}
}
}
為便于管理連接,定義一個(gè)連接的管理類
public class ConnectionManager implements IConnection {
private static IConnection mConnection;
private ConnectionManager() {
}
static class ConnectionManagerInner {
private static ConnectionManager INSTANCE = new ConnectionManager();
}
public static ConnectionManager getInstance() {
return ConnectionManagerInner.INSTANCE;
}
public static void initConnection(IConnection connection) {
mConnection = connection;
}
private void checkInit() {
if (mConnection == null) {
throw new IllegalAccessError("please invoke initConnection first!");
}
}
@Override
public void connect(String host, int port, IConnectionCallback callback) {
checkInit();
mConnection.connect(host, port, callback);
}
}
調(diào)用連接:
public class Main {
public static void main(String[] args) {
try {
String host = "127.0.0.1";
int port = 12345;
NettyServer server = new NettyServer(port);
server.run();
Thread.sleep(1000);
ConnectionManager.initConnection(new NettyConnection());
ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {
@Override
public void onConnected() {
LogUtil.log("Main,onConnected"););
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
在調(diào)用connect方法前,需要先調(diào)用initConnection來指定具體的連接類
消息Bean的定義
在連接成功后,服務(wù)端會(huì)給客戶端發(fā)送一個(gè)歡迎的消息。為便于管理,我們定義一個(gè)消息Bean
public class Msg {
/**
* 歡迎
*/
public static final int TYPE_WELCOME = 0;
public int type;
public String msg;
}
服務(wù)端發(fā)送歡迎消息
服務(wù)端發(fā)送消息
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContextWrapper mChannelHandlerContextWrapper;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log("Server,channelActive");
mChannelHandlerContextWrapper = new ChannelHandlerContextWrapper(ctx);
MsgUtil.sendWelcomeMsg(mChannelHandlerContextWrapper);
}
}
在這里,通過定義一個(gè)ChannelHandlerContextWrapper類來統(tǒng)一管理消息分隔符
public class ChannelHandlerContextWrapper {
private ChannelHandlerContext mContext;
public ChannelHandlerContextWrapper(ChannelHandlerContext context) {
this.mContext = context;
}
/**
* 包裝writeAndFlush方法
*
* @param object
*/
public void writeAndFlush(Object object) {
mContext.writeAndFlush(object + Config.DATA_PACK_SEPARATOR);
}
}
再進(jìn)一步,通過定義MsgUtil類來封裝發(fā)送歡迎消息
public class MsgUtil {
/**
* 發(fā)送歡迎消息
*
* @param wrapper
*/
public static void sendWelcomeMsg(ChannelHandlerContextWrapper wrapper) {
Msg msg = new Msg();
msg.type = Msg.TYPE_WELCOME;
msg.msg = "你好,客戶端";
wrapper.writeAndFlush(Global.sGson.toJson(msg));
}
}
客戶端消息接收
對(duì)于客戶端而言,為方便處理消息,我們需要定義一個(gè)方法來接收消息。通過在IConnection接口中新增一個(gè)registerMsgCallback方法來實(shí)現(xiàn)
public interface IConnection {
/**
* 連接服務(wù)器
*
* @param host 服務(wù)器地址
* @param port 端口
* @param callback 連接回調(diào)
*/
public void connect(String host, int port, IConnectionCallback callback);
/**
* 注冊(cè)消息回調(diào)
*
* @param callback
*/
public void registerMsgCallback(IMsgCallback callback);
}
在這里,還需要新增IMsgCallback接口
public interface IMsgCallback {
/**
* 接收到消息時(shí)的回調(diào)
*
* @param msg
*/
public void onMsgReceived(Msg msg);
}
對(duì)應(yīng)到實(shí)現(xiàn)類
public class NettyConnection implements IConnection {
private NettyClient mClient;
@Override
public void connect(String host, int port, IConnectionCallback callback) {
if (mClient == null) {
mClient = new NettyClient(host, port);
mClient.setConnectionCallBack(callback);
mClient.connect();
}
}
@Override
public void registerMsgCallback(IMsgCallback callback) {
if (mClient == null) {
throw new IllegalAccessError("please invoke connect first!");
}
mClient.registerMsgCallback(callback);
}
}
消息的分發(fā)
在客戶端,為便于處理消息,我們對(duì)消息類型進(jìn)行劃分
修改消息Bean
public class Msg {
/**
* 歡迎
*/
public static final int TYPE_WELCOME = 0;
/**
* 心跳
*/
public static final int TYPE_HEART_BEAT = 1;
/**
* 登錄
*/
public static final int TYPE_LOGIN = 2;
public static final int TYPE_COMMAND_A = 3;
public static final int TYPE_COMMAND_B = 4;
public static final int TYPE_COMMAND_C = 5;
public int type;
public String msg;
}
假定消息是串行的,需要一個(gè)一個(gè)地處理。為便于管理消息,增加MsgQueue類
public class MsgQueue {
private PriorityBlockingQueue<Msg> mQueue;
private boolean using;
private MsgQueue() {
mQueue = new PriorityBlockingQueue<>(128, new Comparator<Msg>() {
@Override
public int compare(Msg msg1, Msg msg2) {
int res = msg2.priority - msg1.priority;
if (res == 0 && msg1.time != msg2.time) {
return (int) (msg2.time - msg1.time);
}
return res;
}
});
}
public static MsgQueue getInstance() {
return MsgQueueInner.INSTANCE;
}
private static class MsgQueueInner {
private static final MsgQueue INSTANCE = new MsgQueue();
}
/**
* 將消息加入消息隊(duì)列
*
* @param msg
*/
public void enqueueMsg(Msg msg) {
mQueue.add(msg);
}
/**
* 從消息隊(duì)列獲取消息
*
* @return
*/
public synchronized Msg next() {
if (using) {
return null;
}
Msg msg = mQueue.poll();
if (msg != null) {
makeUse(true);
}
return msg;
}
/**
* 標(biāo)記使用狀態(tài)
*
* @param use
*/
public synchronized void makeUse(boolean use) {
using = use;
}
/**
* 是否能夠使用
*
* @return
*/
public synchronized boolean canUse() {
return !using;
}
}
增加消息的分發(fā)類MsgDispatcher
public class MsgDispatcher {
private static Map<Integer, Class<? extends IMsgHandler>> mHandlerMap;
static {
mHandlerMap = new HashMap<>();
mHandlerMap.put(Msg.TYPE_WELCOME, WelcomeMsgHandler.class);
mHandlerMap.put(Msg.TYPE_HEART_BEAT, HeartBeatMsgHandler.class);
mHandlerMap.put(Msg.TYPE_LOGIN, HeartBeatMsgHandler.class);
mHandlerMap.put(Msg.TYPE_COMMAND_A, CommandAMsgHandler.class);
mHandlerMap.put(Msg.TYPE_COMMAND_B, CommandBMsgHandler.class);
mHandlerMap.put(Msg.TYPE_COMMAND_C, CommandCMsgHandler.class);
}
public static void dispatch() {
if (MsgQueue.getInstance().canUse()) {
Msg msg = MsgQueue.getInstance().next();
if (msg == null) {
return;
}
dispatch(msg);
}
}
public static void dispatch(Msg msg) {
try {
IMsgHandler handler = (IMsgHandler) Class.forName(mHandlerMap.get(msg.type).getName()).newInstance();
handler.handle(msg);
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
消息的處理
定義IMsgHandler,在這里定義了處理的方法,具體實(shí)現(xiàn)由子類實(shí)現(xiàn)
public interface IMsgHandler {
/**
* 處理消息
*
* @param msg
*/
public void handle(Msg msg);
}
為統(tǒng)一管理,定義Base類BaseCommandHandler
public abstract class BaseCommandHandler implements IMsgHandler {
@Override
public void handle(Msg msg) {
execute(msg);
}
public final void execute(Msg msg) {
LogUtil.log("Client,received command:" + msg);
doHandle(msg);
MsgQueue.getInstance().makeUse(false);
LogUtil.log("Client,report command:" + msg);
MsgDispatcher.dispatch();
}
public abstract void doHandle(Msg msg);
}
在BaseCommandHandler中,定義execute方法,順序調(diào)用:上報(bào)消息已接收成功、處理消息、上報(bào)消息已處理完成。這里的消息上報(bào)部分,都只是輸出一個(gè)日志來代替,在實(shí)際的業(yè)務(wù)中,可以抽取出一個(gè)抽象方法,讓子類來實(shí)現(xiàn)。
定義子類,繼承自BaseCommandHandler
public class LoginMsgHandler extends BaseCommandHandler {
@Override
public void doHandle(Msg msg) {
LogUtil.log("Client,handle msg:" + msg);
}
}
對(duì)應(yīng)的心跳類型消息、歡迎類型消息等,都可以新增對(duì)應(yīng)的處理類來實(shí)現(xiàn),這里不再展開。
接收到消息時(shí)的處理
public class Main {
public static void main(String[] args) {
try {
String host = "127.0.0.1";
int port = 12345;
NettyServer server = new NettyServer(port);
server.run();
Thread.sleep(1000);
ConnectionManager.initConnection(new NettyConnection());
ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {
@Override
public void onConnected() {
LogUtil.log("Main,onConnected");
ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {
@Override
public void onMsgReceived(Msg msg) {
MsgQueue.getInstance().enqueueMsg(msg);
MsgDispatcher.dispatch();
}
});
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
客戶端登錄
修改消息Bean,增加登錄的請(qǐng)求和響應(yīng)
public class Msg {
/**
* 歡迎
*/
public static final int TYPE_WELCOME = 0;
/**
* 心跳
*/
public static final int TYPE_HEART_BEAT = 1;
/**
* 登錄
*/
public static final int TYPE_LOGIN = 2;
public static final int TYPE_COMMAND_A = 3;
public static final int TYPE_COMMAND_B = 4;
public static final int TYPE_COMMAND_C = 5;
public int type;
public String msg;
public int priority;
public long time;
/**
* 登錄請(qǐng)求信息
*
* @author Administrator
*
*/
public static class LoginRuquestInfo {
/**
* 用戶名
*/
public String user;
/**
* 密碼
*/
public String pwd;
@Override
public String toString() {
return "LoginRuquestInfo [user=" + user + ", pwd=" + pwd + "]";
}
}
/**
* 登錄響應(yīng)信息
*
* @author Administrator
*
*/
public static class LoginResponseInfo {
/**
* 登錄成功
*/
public static final int CODE_SUCCESS = 0;
/**
* 登錄失敗
*/
public static final int CODE_FAILED = 100;
/**
* 響應(yīng)碼
*/
public int code;
/**
* 響應(yīng)數(shù)據(jù)
*/
public String data;
public static class ResponseData {
public String token;
}
@Override
public String toString() {
return "LoginResponseInfo [code=" + code + ", data=" + data + "]";
}
}
}
發(fā)送登錄請(qǐng)求
public class Main {
public static void main(String[] args) {
try {
String host = "127.0.0.1";
int port = 12345;
NettyServer server = new NettyServer(port);
server.run();
Thread.sleep(1000);
ConnectionManager.initConnection(new NettyConnection());
ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {
@Override
public void onConnected() {
LogUtil.log("Main,onConnected");
ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {
@Override
public void onMsgReceived(Msg msg) {
MsgQueue.getInstance().enqueueMsg(msg);
MsgDispatcher.dispatch();
}
});
Msg msg = new Msg();
msg.type = Msg.TYPE_LOGIN;
Msg.LoginRuquestInfo request = new LoginRuquestInfo();
request.user = "wangyz";
request.pwd = "wangyz";
Gson gson = new Gson();
msg.msg = gson.toJson(request);
ConnectionManager.getInstance().sendMsg(msg);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
這里,引入Gson,將消息Bean轉(zhuǎn)成json字符串后發(fā)送。
對(duì)應(yīng)到服務(wù)端,為便于解析出消息,也需要對(duì)應(yīng)的修改消息的Bean。服務(wù)端對(duì)消息的具體分發(fā)與處理,和客戶端類似,這里不再展開。
源碼
由于篇幅限制,Demo中指令的優(yōu)先級(jí)處理,模擬服務(wù)端指令下發(fā)等,這里沒有再進(jìn)一步詳細(xì)介紹,具體可以參考源碼:https://github.com/milovetingting/Samples/tree/master/Netty
后記
本文介紹了基于Netty實(shí)現(xiàn)服務(wù)端與客戶端通信的基本用法,以及在此基礎(chǔ)上,實(shí)現(xiàn)處理服務(wù)端指令并上報(bào)。Demo中通信的數(shù)據(jù)格式,用到了json,而優(yōu)化的做法,可以用protobuf來實(shí)現(xiàn),這里只展示通信的流程及簡單的封裝,因而未使用protobuf。Demo中只實(shí)現(xiàn)大體的流程,可能存在未測試到的Bug,權(quán)當(dāng)一個(gè)參考的思路吧。
End~