Netty自定義協(xié)議

將消息定義結(jié)構(gòu)為消息頭和消息體兩部分,消息頭中存儲消息的長度。netty讀取消息頭后,就能知道消息體的長度了。

自定義協(xié)議

協(xié)議開始標志 長度 數(shù)據(jù)
  1. 協(xié)議開始標志head_data,為int類型的數(shù)據(jù),16進制表示為0X76;
  2. 傳輸數(shù)據(jù)的長度contentLength,int類型;
  3. 要傳輸?shù)臄?shù)據(jù)。

自定義協(xié)議類的封裝:

package learn.netty.protocal;

/**
 * @author stone
 * @date 2019/7/29 17:38
 */
public class ConstantValue {
    /**
     * 自定義協(xié)議,報文開始標志
     */
    public static final int HEAD_DATA = 0X76;
}

package learn.netty.protocal;

import java.io.UnsupportedEncodingException;

/**
 * @author stone
 * @date 2019/7/29 17:39
 */
public class MyLsProtocol {
    /**
     * 消息頭標志
     */
    private int header = ConstantValue.HEAD_DATA;

    /**
     * 消息長度
     */
    private int contentLength;

    /**
     * 消息內(nèi)容
     */
    private byte[] content;

    public MyLsProtocol(int contentLength, byte[] content) {
        this.contentLength = contentLength;
        this.content = content;
    }

    public int getContentLength() {
        return contentLength;
    }

    public void setContentLength(int contentLength) {
        this.contentLength = contentLength;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

    public int getHeader() {
        return header;
    }

    @Override
    public String toString() {
        try {
            return "MyLsProtocol{" +
                    "header=" + header +
                    ", contentLength=" + contentLength +
                    ", content=" + new String(content, "utf-8") +
                    '}';
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return "";
        }
    }
}

自定義協(xié)議的編碼器:

package learn.netty.protocal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 自定義協(xié)議編碼器(對象轉(zhuǎn)為字節(jié))
 *
 * @author stone
 * @date 2019/7/29 17:41
 */
public class LsEncoder extends MessageToByteEncoder<MyLsProtocol> {
    @Override
    public void encode(ChannelHandlerContext ctx, MyLsProtocol msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getHeader());
        out.writeInt(msg.getContentLength());
        out.writeBytes(msg.getContent());
    }
}

自定義協(xié)議的解碼器:

package learn.netty.protocal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 自定義協(xié)議解碼器 (字節(jié)轉(zhuǎn)對象)
 *
 * @author stone
 * @date 2019/7/29 17:43
 */
public class LsDecoder extends ByteToMessageDecoder {
    /**
     * 報文開始的標志 header是int類型,占4個字節(jié)
     * 表示報文長度的contentLength是int類型,占4個字節(jié)
     */
    public final int BASE_LENGTH = 8;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 可讀長度必須大于基本長度
        if (in.readableBytes() >= BASE_LENGTH) {
            // 防止socket字節(jié)流攻擊
            // 防止客戶端傳來的數(shù)據(jù)過大(太大的數(shù)據(jù)是不合理的)
            if (in.readableBytes() > 2048) {
                in.skipBytes(in.readableBytes());
            }

            // 記錄包頭開始的index
            int beginReader;
            while (true) {
                // 獲取包頭開始的index
                beginReader = in.readerIndex();
                // 標記包頭開始的index
                in.markReaderIndex();
                // 讀到協(xié)議的開始標志,結(jié)束while循環(huán)
                if (in.readInt() == ConstantValue.HEAD_DATA) {
                    break;
                }

                // 未讀到包頭,跳過一個字節(jié)
                // 每次跳過一個字節(jié)后,再去讀取包頭信息的開始標記
                in.resetReaderIndex();
                in.readByte();

                // 當跳過一個字節(jié)后,數(shù)據(jù)包的長度又變的不滿足,此時應該結(jié)束,等待后邊數(shù)據(jù)流的到達
                if (in.readableBytes() < BASE_LENGTH) {
                    return;
                }
            }

            // 代碼到這里,說明已經(jīng)讀到了報文標志

            // 消息長度
            int length = in.readInt();
            // 判斷請求數(shù)據(jù)包是否到齊
            if (in.readableBytes() < length) { // 數(shù)據(jù)不齊,回退讀指針
                // 還原讀指針
                in.readerIndex(beginReader);
                return;
            }

            // 至此,讀到一條完整報文
            byte[] data = new byte[length];
            in.readBytes(data);
            MyLsProtocol protocol = new MyLsProtocol(data.length, data);
            out.add(protocol);
        }
    }
}

服務端實現(xiàn):

package learn.netty.protocal;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author stone
 * @date 2019/7/30 9:30
 */
public class ProtocolServer {
    public ProtocolServer() {
    }

    public void bind(int port) throws Exception {
        // 配置IO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服務器輔助啟動類配置
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChildChannelHandler())
                    .option(ChannelOption.SO_BACKLOG, 1024) // 設置tcp緩沖區(qū)
                    .option(ChannelOption.SO_KEEPALIVE, true);
            // 綁定端口,同步等待綁定成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服務端監(jiān)聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

// 服務端加入的協(xié)議編碼/解碼器
    public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 添加自定義協(xié)議編碼工具
            ch.pipeline().addLast(new LsEncoder());
            ch.pipeline().addLast(new LsDecoder());
            // 處理網(wǎng)絡IO
            ch.pipeline().addLast(new ProtocolServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        new ProtocolServer().bind(9999);
    }
}

服務端handler實現(xiàn):

package learn.netty.protocal;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author stone
 * @date 2019/7/30 9:33
 */
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 用于獲取客戶端發(fā)來的數(shù)據(jù)信息
        MyLsProtocol body = (MyLsProtocol) msg;
        System.out.println("Server接收到的客戶端信息:" + body.toString());

        // 寫數(shù)據(jù)給客戶端
        String str = "Hi I am Server ...";
        MyLsProtocol response = new MyLsProtocol(str.getBytes().length, str.getBytes());
        // 當服務端完成寫操作后,關閉與客戶端的連接
        ctx.writeAndFlush(response);

        // 有寫操作時,不需要手動釋放msg的引用; 當只有讀操作時,才需要手動釋放msg的引用
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Server exceptionCaught");
        cause.printStackTrace();
//        if (ctx.channel().isActive()) {
//            ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//        }
        ctx.close();
    }
}

客戶端實現(xiàn):

package learn.netty.protocal;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author stone
 * @date 2019/7/30 9:45
 */
public class ProtocolClient {

    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 配置啟動輔助類
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new MyChannelHandler());
            // 異步連接服務器,同步等待連接成功
            ChannelFuture f = b.connect(host, port).sync();
            // 等待連接關閉
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new ProtocolClient().connect(9999, "127.0.0.1");
    }

// 客戶端加入的協(xié)議編碼/解碼器
    public class MyChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new LsEncoder());
            ch.pipeline().addLast(new LsDecoder());
            // 處理網(wǎng)絡IO
            ch.pipeline().addLast(new ProtocolClientHandler());
        }
    }
}

客戶端handler實現(xiàn):

package learn.netty.protocal;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * 客戶端Handler
 * @author
 */
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 客戶端一旦與服務端建立好連接,就會觸發(fā)該方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 發(fā)送消息
        String data = "I am client ...";
        // 獲取要發(fā)送信息的字節(jié)數(shù)組
        byte[] content = data.getBytes();
        // 要發(fā)送信息的長度
        int contentLength = content.length;

        MyLsProtocol protocol = new MyLsProtocol(contentLength, content);
        for (int i = 0; i < 100; i++) {
            ctx.writeAndFlush(protocol);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            // 用于獲取客戶端發(fā)來的數(shù)據(jù)信息
            MyLsProtocol body = (MyLsProtocol) msg;
            System.out.println("Client接收的客戶端的信息:" + body.toString());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

參考博客:
http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html
http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容