基于Netty實(shí)現(xiàn)服務(wù)端與客戶端通信

個(gè)人博客

http://www.milovetingting.cn

基于Netty實(shí)現(xiàn)服務(wù)端與客戶端通信

前言

本文介紹基于Netty實(shí)現(xiàn)的服務(wù)端與客戶端通信的簡單使用方法,并在此基礎(chǔ)上實(shí)現(xiàn)一個(gè)簡單的服務(wù)端-客戶端指令通信的Demo。

Netty是什么

Netty是一個(gè)NIO客戶端-服務(wù)器框架,可以快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,例如協(xié)議服務(wù)器和客戶端。它極大地簡化了網(wǎng)絡(luò)編程,例如TCP和UDP套接字服務(wù)器的開發(fā)。提供一個(gè)異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,以快速開發(fā)可維護(hù)的高性能和高可擴(kuò)展性協(xié)議服務(wù)器和客戶端。

以上內(nèi)容摘選自https://netty.io/wiki/user-guide-for-4.x.html

Netty具有以下特點(diǎn):

  • 適用于各種傳輸類型的統(tǒng)一API-阻塞和非阻塞套接字
  • 更高的吞吐量,更低的延遲
  • 減少資源消耗
  • 減少不必要的內(nèi)存復(fù)制
  • 完整的SSL / TLS和StartTLS支持

以上內(nèi)容摘選自https://netty.io/

使用入門

Netty的使用,可以參照Netty的官方文檔,這里以4.x為例來演示Netty在服務(wù)端和客戶端上使用。文檔地址:https://netty.io/wiki/user-guide-for-4.x.html

這里用Eclipse來進(jìn)行開發(fā),服務(wù)端和客戶端都放在一個(gè)工程里。

新建Java工程

服務(wù)端

首先需要導(dǎo)入netty的jar包。這里使用netty-all-4.1.48.Final.jar。

NettyServer

新建NettyServer類

public class NettyServer {

    private int mPort;

    public NettyServer(int port) {
        this.mPort = port;
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    // 指定連接隊(duì)列大小
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //KeepAlive
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //Handler
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(mPort).sync();
            if (f.isSuccess()) {
                LogUtil.log("Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):" + mPort);
            }
            // f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // workerGroup.shutdownGracefully();
            // bossGroup.shutdownGracefully();
        }
    }

}

NettyServerHandler

在初始化時(shí),需要指定Handle,用來處理Channel相關(guān)業(yè)務(wù)。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log("Server,接收到客戶端發(fā)來的消息:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LogUtil.log("Server,exceptionCaught");
        cause.printStackTrace();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelInactive");
    }

}

經(jīng)過上面這些步驟后,服務(wù)端最基本的設(shè)置就完成了。

客戶端

客戶端和服務(wù)端在初始化時(shí)大體是類似的,不過相比服務(wù)端要簡單一些。

NettyClient

public class NettyClient {

    private String mHost;

    private int mPort;

    private NettyClientHandler mClientHandler;

    private ChannelFuture mChannelFuture;

    public NettyClient(String host, int port) {
        this.mHost = host;
        this.mPort = port;
    }

    public void connect() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            mClientHandler = new NettyClientHandler();
            b.group(workerGroup).channel(NioSocketChannel.class)
                    // KeepAlive
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    // Handler
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(mClientHandler);
                        }
                    });
            mChannelFuture = b.connect(mHost, mPort).sync();
            if (mChannelFuture.isSuccess()) {
                LogUtil.log("Client,連接服務(wù)端成功");
            }
            mChannelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Client,channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log("Client,接收到服務(wù)端發(fā)來的消息:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LogUtil.log("Client,exceptionCaught");
        cause.printStackTrace();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Client,channelInactive");
    }

}

到這里,客戶端最基本設(shè)置就完成了。

連接服務(wù)端

新建一個(gè)Main類,用于測試服務(wù)端和客戶端是否能正常連接。

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            NettyClient client = new NettyClient(host, port);
            client.connect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

運(yùn)行main方法,輸出日志如下:

2020-4-13 0:11:02--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 0:11:03--Client,channelActive
2020-4-13 0:11:03--Client,連接服務(wù)端成功
2020-4-13 0:11:03--Server,channelActive

可以看到,客戶端成功連接上了服務(wù)端,服務(wù)端和客戶端里設(shè)置的Handler的channelActive方法都會(huì)回調(diào)。

服務(wù)端與客戶端通信

在服務(wù)端與客戶端連接成功后,我們往往需要在雙方間進(jìn)行通信。這里假定,在連接成功后,服務(wù)端給客戶端發(fā)送一個(gè)歡迎信息"你好,客戶端",而客戶端在收到服務(wù)端的消息后,也給服務(wù)端回復(fù)一個(gè)消息"你好,服務(wù)端"。下面來實(shí)現(xiàn)具體的功能。

修改服務(wù)端NettyServerHandler中的channelActive方法和channelRead方法,在channelActive方法中給客戶端發(fā)送消息,在channelRead方法中解析客戶端發(fā)來的消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端", Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message = new String(buffer, "utf-8");
        LogUtil.log("Server,接收到客戶端發(fā)來的消息:" + message);
    }

}

修改客戶端NettyClientHandler中的channelRead方法,當(dāng)收到服務(wù)端的消息時(shí),回復(fù)服務(wù)端

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message = new String(buffer,"utf-8");
        LogUtil.log("Client,接收到服務(wù)端發(fā)來的消息:" + message);
        
        ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服務(wù)端", Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
    }

}

運(yùn)行后,輸出日志如下:

2020-4-13 0:29:16--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 0:29:17--Client,channelActive
2020-4-13 0:29:17--Client,連接服務(wù)端成功
2020-4-13 0:29:17--Server,channelActive
2020-4-13 0:29:17--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
2020-4-13 0:29:17--Server,接收到客戶端發(fā)來的消息:你好,服務(wù)端

可以看到,服務(wù)端與客戶端已經(jīng)可以正常通信。

粘包與拆包

在實(shí)際的使用場景中,可能會(huì)存在短時(shí)間內(nèi)大量數(shù)據(jù)發(fā)送的問題。我們模擬這個(gè)場景。在客戶端連接上服務(wù)端后,服務(wù)端給客戶端發(fā)送100個(gè)消息,而為便于分析,客戶端在收到服務(wù)端消息后,不作回復(fù)。

修改服務(wù)端中NettyServerHandler的channelActive方法

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        for (int i = 0; i < 100; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端", Charset.forName("utf-8"));
            ctx.writeAndFlush(byteBuf);
        }
    }

修改客戶端中NettyClientHandler的channelRead方法

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message = new String(buffer, "utf-8");
        LogUtil.log("Client,接收到服務(wù)端發(fā)來的消息:" + message);

        //ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服務(wù)端", Charset.forName("utf-8"));
        //ctx.writeAndFlush(byteBuf);
    }

運(yùn)行后,輸出的部分結(jié)果如下:

2020-4-13 0:35:28--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 0:35:29--Client,channelActive
2020-4-13 0:35:29--Client,連接服務(wù)端成功
2020-4-13 0:35:29--Server,channelActive
2020-4-13 0:35:29--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
2020-4-13 0:35:29--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端
2020-4-13 0:35:29--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端

可以看到,出現(xiàn)了多條消息"粘"在一起的情況。

什么是粘包與拆包

TCP是個(gè)"流"協(xié)議,所謂流,就是沒有界限的一串?dāng)?shù)據(jù)。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個(gè)完整的包可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。

以上內(nèi)容摘選自TCP粘包/拆包與Netty解決方案

解決方案

在沒有 Netty 的情況下,用戶如果自己需要拆包,基本原理就是不斷從 TCP 緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包 如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從 TCP 緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包。 如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),構(gòu)成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接。

以上內(nèi)容摘選自徹底理解Netty,這一篇文章就夠了

而使用Netty,則解決這個(gè)問題的方法就簡單多了。Netty已經(jīng)提供了四個(gè)拆包器:

  • FixedLengthFrameDecoder:固定長度的拆包器,Netty會(huì)把固定長度的數(shù)據(jù)包發(fā)送給下一個(gè)channelHandler
  • LineBasedFrameDecoder:行拆包器,每個(gè)數(shù)據(jù)包以換行符分隔發(fā)送
  • DelimiterBasedFrameDecoder:分隔符拆包器,可以自定義分隔符,行拆包器是分隔符拆包器的一種特例
  • LengthFieldBasedFrameDecoder:基于長度域的拆包器,如果自定義協(xié)議中包含長度域的字段,就可以使用這個(gè)拆包器

在這里,我們選用分隔符拆包器

首先定義分隔符

public class Config {
    public static final String DATA_PACK_SEPARATOR = "#$&*";
}

在服務(wù)端的channelHandler配置中,需要增加

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //這個(gè)配置需要在添加Handler前設(shè)置
    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
    channel.pipeline().addLast(new NettyServerHandler());
    }

在客戶端的channelHandler的配置中,同樣也需要增加

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //這個(gè)配置需要在添加Handler前設(shè)置
    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
    channel.pipeline().addLast(new NettyServerHandler());
    }

發(fā)送數(shù)據(jù)時(shí),在數(shù)據(jù)的末尾增加分隔符:

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        for (int i = 0; i < 100; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端"+Config.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
            ctx.writeAndFlush(byteBuf);
        }
    }

運(yùn)行后,可以發(fā)現(xiàn),已經(jīng)解決"粘包"與"拆包"的問題。

心跳

在網(wǎng)絡(luò)應(yīng)用中,為了判斷連接是否還存在,一般會(huì)通過發(fā)送心跳包來檢測。在Netty中,配置心跳包的步驟如下

在客戶端的channelHandler的配置中,需要增加

@Override
protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
            //...
                        }

在NettyClientHandler中,重寫userEventTriggered方法

@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        LogUtil.log("Client,Idle:" + event.state());
        switch (event.state()) {
        case READER_IDLE:

            break;
        case WRITER_IDLE:
            ByteBuf byteBuf = Unpooled.copiedBuffer("心跳^v^v", Charset.forName("utf-8"));
            break;
        case ALL_IDLE:
            break;
        default:
            super.userEventTriggered(ctx, evt);
            break;
        }
    }

當(dāng)寫空閑達(dá)到配置的時(shí)間時(shí),往服務(wù)端發(fā)送一個(gè)心跳消息

運(yùn)行后,日志輸出如下:

2020-4-13 1:22:50--Server,啟動(dòng)Netty服務(wù)端成功,端口號(hào):12345
2020-4-13 1:22:51--Client,channelActive
2020-4-13 1:22:51--Client,連接服務(wù)端成功
2020-4-13 1:22:51--Server,channelActive
2020-4-13 1:22:51--Client,接收到服務(wù)端發(fā)來的消息:你好,客戶端
2020-4-13 1:22:56--Client,Idle:WRITER_IDLE
2020-4-13 1:22:56--Server,接收到客戶端發(fā)來的消息:心跳^v^
2020-4-13 1:22:56--Client,Idle:READER_IDLE
2020-4-13 1:23:01--Client,Idle:WRITER_IDLE
2020-4-13 1:23:01--Server,接收到客戶端發(fā)來的消息:心跳^v^
2020-4-13 1:23:01--Client,Idle:READER_IDLE

可以看到,心跳包按我們配置的時(shí)間正常輸出了。

配置編碼器與解碼器

我們上面在發(fā)送數(shù)據(jù)時(shí),需要通過ByteBuf來轉(zhuǎn)換String,而通過配置編碼,解碼器,我們就可以直接發(fā)送字符串。配置如下:

在服務(wù)端與客戶端的channelHandler分別增加以下配置:

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //...
    //這個(gè)配置需要在添加Handler前設(shè)置
    channel.pipeline().addLast("encoder", new StringEncoder());
    channel.pipeline().addLast("decoder", new StringDecoder());
    //...
}

在發(fā)送消息時(shí),則可以直接通過ctx.writeAndFlush("心跳^v^" + Config.DATA_PACK_SEPARATOR)的形式來發(fā)送。

源碼

到此,最簡單的服務(wù)端與客戶端通信的Demo已經(jīng)完成。源碼地址:https://github.com/milovetingting/Samples/tree/master/NettyDemo

使用進(jìn)階

在上面的基礎(chǔ)上,我們來實(shí)現(xiàn)一個(gè)下面的需求:

  • 客戶端需要登錄到服務(wù)端

  • 客戶端登錄成功后,服務(wù)端可以給客戶端發(fā)送指令消息,客戶端在收到消息及處理完消息后,都需要上報(bào)給服務(wù)端

封裝連接

為便于程序擴(kuò)展,我們將客戶端連接服務(wù)端的部分抽取出來。通過一個(gè)接口來定義連接的方法,而連接的具體實(shí)現(xiàn)由子類來實(shí)現(xiàn)。

定義接口

public interface IConnection {

    /**
     * 連接服務(wù)器
     * 
     * @param host     服務(wù)器地址
     * @param port     端口
     * @param callback 連接回調(diào)
     */
    public void connect(String host, int port, IConnectionCallback callback);

}

在這里還需要定義連接的回調(diào)接口

public interface IConnectionCallback {

    /**
     * 連接成功
     */
    public void onConnected();

}

具體的連接實(shí)現(xiàn)類

public class NettyConnection implements IConnection {

    private NettyClient mClient;

    @Override
    public void connect(String host, int port, IConnectionCallback callback) {
        if (mClient == null) {
            mClient = new NettyClient(host, port);
            mClient.setConnectionCallBack(callback);
            mClient.connect();
        }
    }

}

為便于管理連接,定義一個(gè)連接的管理類

public class ConnectionManager implements IConnection {

    private static IConnection mConnection;

    private ConnectionManager() {

    }

    static class ConnectionManagerInner {
        private static ConnectionManager INSTANCE = new ConnectionManager();
    }

    public static ConnectionManager getInstance() {
        return ConnectionManagerInner.INSTANCE;
    }

    public static void initConnection(IConnection connection) {
        mConnection = connection;
    }

    private void checkInit() {
        if (mConnection == null) {
            throw new IllegalAccessError("please invoke initConnection first!");
        }
    }

    @Override
    public void connect(String host, int port, IConnectionCallback callback) {
        checkInit();
        mConnection.connect(host, port, callback);
    }

}

調(diào)用連接:

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            ConnectionManager.initConnection(new NettyConnection());
            ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

                @Override
                public void onConnected() {
                    LogUtil.log("Main,onConnected"););
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

在調(diào)用connect方法前,需要先調(diào)用initConnection來指定具體的連接類

消息Bean的定義

在連接成功后,服務(wù)端會(huì)給客戶端發(fā)送一個(gè)歡迎的消息。為便于管理,我們定義一個(gè)消息Bean

public class Msg {

    /**
     * 歡迎
     */
    public static final int TYPE_WELCOME = 0;

    public int type;

    public String msg;

}

服務(wù)端發(fā)送歡迎消息

服務(wù)端發(fā)送消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private ChannelHandlerContextWrapper mChannelHandlerContextWrapper;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log("Server,channelActive");
        mChannelHandlerContextWrapper = new ChannelHandlerContextWrapper(ctx);
        MsgUtil.sendWelcomeMsg(mChannelHandlerContextWrapper);
    }
}

在這里,通過定義一個(gè)ChannelHandlerContextWrapper類來統(tǒng)一管理消息分隔符

public class ChannelHandlerContextWrapper {

    private ChannelHandlerContext mContext;

    public ChannelHandlerContextWrapper(ChannelHandlerContext context) {
        this.mContext = context;
    }

    /**
     * 包裝writeAndFlush方法
     * 
     * @param object
     */
    public void writeAndFlush(Object object) {
        mContext.writeAndFlush(object + Config.DATA_PACK_SEPARATOR);
    }

}

再進(jìn)一步,通過定義MsgUtil類來封裝發(fā)送歡迎消息

public class MsgUtil {

    /**
     * 發(fā)送歡迎消息
     * 
     * @param wrapper
     */
    public static void sendWelcomeMsg(ChannelHandlerContextWrapper wrapper) {
        Msg msg = new Msg();
        msg.type = Msg.TYPE_WELCOME;
        msg.msg = "你好,客戶端";
        wrapper.writeAndFlush(Global.sGson.toJson(msg));
    }

}

客戶端消息接收

對(duì)于客戶端而言,為方便處理消息,我們需要定義一個(gè)方法來接收消息。通過在IConnection接口中新增一個(gè)registerMsgCallback方法來實(shí)現(xiàn)

public interface IConnection {

    /**
     * 連接服務(wù)器
     * 
     * @param host     服務(wù)器地址
     * @param port     端口
     * @param callback 連接回調(diào)
     */
    public void connect(String host, int port, IConnectionCallback callback);

    /**
     * 注冊(cè)消息回調(diào)
     * 
     * @param callback
     */
    public void registerMsgCallback(IMsgCallback callback);

}

在這里,還需要新增IMsgCallback接口

public interface IMsgCallback {

    /**
     * 接收到消息時(shí)的回調(diào)
     * 
     * @param msg
     */
    public void onMsgReceived(Msg msg);

}

對(duì)應(yīng)到實(shí)現(xiàn)類

public class NettyConnection implements IConnection {

    private NettyClient mClient;

    @Override
    public void connect(String host, int port, IConnectionCallback callback) {
        if (mClient == null) {
            mClient = new NettyClient(host, port);
            mClient.setConnectionCallBack(callback);
            mClient.connect();
        }
    }

    @Override
    public void registerMsgCallback(IMsgCallback callback) {
        if (mClient == null) {
            throw new IllegalAccessError("please invoke connect first!");
        }
        mClient.registerMsgCallback(callback);
    }

}

消息的分發(fā)

在客戶端,為便于處理消息,我們對(duì)消息類型進(jìn)行劃分

修改消息Bean

public class Msg {

    /**
     * 歡迎
     */
    public static final int TYPE_WELCOME = 0;

    /**
     * 心跳
     */
    public static final int TYPE_HEART_BEAT = 1;

    /**
     * 登錄
     */
    public static final int TYPE_LOGIN = 2;

    public static final int TYPE_COMMAND_A = 3;

    public static final int TYPE_COMMAND_B = 4;

    public static final int TYPE_COMMAND_C = 5;

    public int type;

    public String msg;
}

假定消息是串行的,需要一個(gè)一個(gè)地處理。為便于管理消息,增加MsgQueue類

public class MsgQueue {

    private PriorityBlockingQueue<Msg> mQueue;

    private boolean using;

    private MsgQueue() {
        mQueue = new PriorityBlockingQueue<>(128, new Comparator<Msg>() {

            @Override
            public int compare(Msg msg1, Msg msg2) {
                int res = msg2.priority - msg1.priority;
                if (res == 0 && msg1.time != msg2.time) {
                    return (int) (msg2.time - msg1.time);
                }
                return res;
            }
        });
    }

    public static MsgQueue getInstance() {
        return MsgQueueInner.INSTANCE;
    }

    private static class MsgQueueInner {
        private static final MsgQueue INSTANCE = new MsgQueue();
    }

    /**
     * 將消息加入消息隊(duì)列
     * 
     * @param msg
     */
    public void enqueueMsg(Msg msg) {
        mQueue.add(msg);
    }

    /**
     * 從消息隊(duì)列獲取消息
     * 
     * @return
     */
    public synchronized Msg next() {
        if (using) {
            return null;
        }
        Msg msg = mQueue.poll();
        if (msg != null) {
            makeUse(true);
        }
        return msg;
    }

    /**
     * 標(biāo)記使用狀態(tài)
     * 
     * @param use
     */
    public synchronized void makeUse(boolean use) {
        using = use;
    }

    /**
     * 是否能夠使用
     * 
     * @return
     */
    public synchronized boolean canUse() {
        return !using;
    }

}

增加消息的分發(fā)類MsgDispatcher

public class MsgDispatcher {

    private static Map<Integer, Class<? extends IMsgHandler>> mHandlerMap;

    static {
        mHandlerMap = new HashMap<>();
        mHandlerMap.put(Msg.TYPE_WELCOME, WelcomeMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_HEART_BEAT, HeartBeatMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_LOGIN, HeartBeatMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_COMMAND_A, CommandAMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_COMMAND_B, CommandBMsgHandler.class);
        mHandlerMap.put(Msg.TYPE_COMMAND_C, CommandCMsgHandler.class);
    }

    public static void dispatch() {
        if (MsgQueue.getInstance().canUse()) {
            Msg msg = MsgQueue.getInstance().next();
            if (msg == null) {
                return;
            }
            dispatch(msg);
        }
    }

    public static void dispatch(Msg msg) {
        try {
            IMsgHandler handler = (IMsgHandler) Class.forName(mHandlerMap.get(msg.type).getName()).newInstance();
            handler.handle(msg);
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

}

消息的處理

定義IMsgHandler,在這里定義了處理的方法,具體實(shí)現(xiàn)由子類實(shí)現(xiàn)

public interface IMsgHandler {

    /**
     * 處理消息
     * 
     * @param msg
     */
    public void handle(Msg msg);

}

為統(tǒng)一管理,定義Base類BaseCommandHandler

public abstract class BaseCommandHandler implements IMsgHandler {

    @Override
    public void handle(Msg msg) {
        execute(msg);
    }

    public final void execute(Msg msg) {
        LogUtil.log("Client,received command:" + msg);
        doHandle(msg);
        MsgQueue.getInstance().makeUse(false);
        LogUtil.log("Client,report command:" + msg);
        MsgDispatcher.dispatch();
    }

    public abstract void doHandle(Msg msg);

}

在BaseCommandHandler中,定義execute方法,順序調(diào)用:上報(bào)消息已接收成功、處理消息、上報(bào)消息已處理完成。這里的消息上報(bào)部分,都只是輸出一個(gè)日志來代替,在實(shí)際的業(yè)務(wù)中,可以抽取出一個(gè)抽象方法,讓子類來實(shí)現(xiàn)。

定義子類,繼承自BaseCommandHandler

public class LoginMsgHandler extends BaseCommandHandler {

    @Override
    public void doHandle(Msg msg) {
        LogUtil.log("Client,handle msg:" + msg);
    }

}

對(duì)應(yīng)的心跳類型消息、歡迎類型消息等,都可以新增對(duì)應(yīng)的處理類來實(shí)現(xiàn),這里不再展開。

接收到消息時(shí)的處理

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            ConnectionManager.initConnection(new NettyConnection());
            ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

                @Override
                public void onConnected() {
                    LogUtil.log("Main,onConnected");

                    ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

                        @Override
                        public void onMsgReceived(Msg msg) {
                            MsgQueue.getInstance().enqueueMsg(msg);
                            MsgDispatcher.dispatch();
                        }
                    });
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

客戶端登錄

修改消息Bean,增加登錄的請(qǐng)求和響應(yīng)

public class Msg {

    /**
     * 歡迎
     */
    public static final int TYPE_WELCOME = 0;

    /**
     * 心跳
     */
    public static final int TYPE_HEART_BEAT = 1;

    /**
     * 登錄
     */
    public static final int TYPE_LOGIN = 2;

    public static final int TYPE_COMMAND_A = 3;

    public static final int TYPE_COMMAND_B = 4;

    public static final int TYPE_COMMAND_C = 5;

    public int type;

    public String msg;

    public int priority;

    public long time;

    /**
     * 登錄請(qǐng)求信息
     * 
     * @author Administrator
     *
     */
    public static class LoginRuquestInfo {
        /**
         * 用戶名
         */
        public String user;

        /**
         * 密碼
         */
        public String pwd;

        @Override
        public String toString() {
            return "LoginRuquestInfo [user=" + user + ", pwd=" + pwd + "]";
        }
    }

    /**
     * 登錄響應(yīng)信息
     * 
     * @author Administrator
     *
     */
    public static class LoginResponseInfo {

        /**
         * 登錄成功
         */
        public static final int CODE_SUCCESS = 0;

        /**
         * 登錄失敗
         */
        public static final int CODE_FAILED = 100;

        /**
         * 響應(yīng)碼
         */
        public int code;

        /**
         * 響應(yīng)數(shù)據(jù)
         */
        public String data;

        public static class ResponseData {
            public String token;
        }

        @Override
        public String toString() {
            return "LoginResponseInfo [code=" + code + ", data=" + data + "]";
        }

    }
}

發(fā)送登錄請(qǐng)求

public class Main {

    public static void main(String[] args) {
        try {
            String host = "127.0.0.1";
            int port = 12345;
            NettyServer server = new NettyServer(port);
            server.run();
            Thread.sleep(1000);
            ConnectionManager.initConnection(new NettyConnection());
            ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

                @Override
                public void onConnected() {
                    LogUtil.log("Main,onConnected");

                    ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

                        @Override
                        public void onMsgReceived(Msg msg) {
                            MsgQueue.getInstance().enqueueMsg(msg);
                            MsgDispatcher.dispatch();
                        }
                    });

                    Msg msg = new Msg();
                    msg.type = Msg.TYPE_LOGIN;

                    Msg.LoginRuquestInfo request = new LoginRuquestInfo();
                    request.user = "wangyz";
                    request.pwd = "wangyz";

                    Gson gson = new Gson();
                    msg.msg = gson.toJson(request);

                    ConnectionManager.getInstance().sendMsg(msg);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

這里,引入Gson,將消息Bean轉(zhuǎn)成json字符串后發(fā)送。

對(duì)應(yīng)到服務(wù)端,為便于解析出消息,也需要對(duì)應(yīng)的修改消息的Bean。服務(wù)端對(duì)消息的具體分發(fā)與處理,和客戶端類似,這里不再展開。

源碼

由于篇幅限制,Demo中指令的優(yōu)先級(jí)處理,模擬服務(wù)端指令下發(fā)等,這里沒有再進(jìn)一步詳細(xì)介紹,具體可以參考源碼:https://github.com/milovetingting/Samples/tree/master/Netty

后記

本文介紹了基于Netty實(shí)現(xiàn)服務(wù)端與客戶端通信的基本用法,以及在此基礎(chǔ)上,實(shí)現(xiàn)處理服務(wù)端指令并上報(bào)。Demo中通信的數(shù)據(jù)格式,用到了json,而優(yōu)化的做法,可以用protobuf來實(shí)現(xiàn),這里只展示通信的流程及簡單的封裝,因而未使用protobuf。Demo中只實(shí)現(xiàn)大體的流程,可能存在未測試到的Bug,權(quán)當(dāng)一個(gè)參考的思路吧。

End~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容