Netty筆記之九: Netty多種通訊協(xié)議支持

上篇文章講解了自定義通信協(xié)議,本章節(jié)介紹如何支持多種協(xié)議。

會構建一個Server,同時支持Cat,Dog和People通信協(xié)議。有二種實現(xiàn)方式:

  • 第一種方式利用了自定義協(xié)議,傳遞消息的時候,對消息的前幾位(比如2位)進行自定義的位置(比如AB)解碼器解析的時候前二位為AB表示一種協(xié)議類型,CD一種協(xié)議類型。這種方式?jīng)]有利用protobuf,而是直接使用Netty自定義協(xié)議來解決的方案。
  • 第二種方式使用protobuf來實現(xiàn),實際上是對消息的定義方式進行規(guī)定,因為netty本身,客戶端和服務器端建立的是一條TCP連接,一方必須要判斷對方發(fā)送過來的對象是什么類型。

Protocol Buffers實現(xiàn)netty的多種傳輸協(xié)議

我們知道使用Protocol Buffers首先定義一個.proto文件

定義一個最外層的消息,最外層的消息(MyMessage)包含了所有傳遞的消息類型,所有的消息類型嵌套在最外層的消息類型中,每次傳遞都將傳遞具體消息類型(以最外層消息類型的枚舉類型傳遞)

syntax ="proto2";

package com.zhihao.miao.netty.sixthexample;

option optimize_for = SPEED;
option java_package = "com.zhihao.miao.netty.seventhexample";
option java_outer_classname="MyDataInfo";

message MyMessage {

    enum DataType{
        PeopleType = 1;
        DogType = 2;
        CatType = 3;
    }

    required DataType data_type = 1;

    //oneof的意思:如果有多個可選字段,在某一個時刻只能只有一個值被設置,可以節(jié)省內(nèi)存空間
    oneof dataBody {
        People people = 2;
        Dog dog = 3;
        Cat cat = 4;
    }
}

message People{
    optional string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

message Dog{
    optional string name = 1;
    optional string age = 2;
}

message Cat{
    optional string name = 1;
    optional string city = 2;
}

使用編譯器編譯生成代碼

protoc --java_out=src/main/java src/protobuf/People.proto

關于proto協(xié)議中的Oneof含義,如果有多個可選字段,在某一個時刻只能只有一個值被設置,官方鏈接,生成MyDataInfo類,類代碼太多,這邊不貼出了

服務端代碼:

package com.zhihao.miao.netty.seventhexample;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class TestServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new TestServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
    }
}

服務端初始化鏈接:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;


public class TestServerInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //使用最外層的消息實例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());

        pipeline.addLast(new TestServerHandler());
    }
}

其實實現(xiàn)的關鍵就在于此,使用MyDataInfo.MyMessage實列(MyDataInfo.MyMessage是枚舉類型),而我們定義的三種對象剛好就是其枚舉對象

自定義的服務端的Handler,根據(jù)通道中傳遞數(shù)據(jù)的不同dataType值來解析程具體的類型:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
        MyDataInfo.MyMessage.DataType dataType = msg.getDataType();

        if(dataType == MyDataInfo.MyMessage.DataType.PeopleType){
            MyDataInfo.People people = msg.getPeople();

            System.out.println(people.getName());
            System.out.println(people.getAge());
            System.out.println(people.getAddress());
        }else if(dataType == MyDataInfo.MyMessage.DataType.DogType){
            MyDataInfo.Dog dog = msg.getDog();

            System.out.println(dog.getName());
            System.out.println(dog.getAge());
        }else if(dataType == MyDataInfo.MyMessage.DataType.CatType){
            MyDataInfo.Cat cat = msg.getCat();

            System.out.println(cat.getName());
            System.out.println(cat.getCity());
        }
    }
}

客戶端代碼:

package com.zhihao.miao.netty.seventhexample;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TestClient {

    public static void main(String[] args) throws Exception{
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new TestClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost",8888).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客戶端的初始化鏈接:

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class TestClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //使用最外層的消息實例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());

        pipeline.addLast(new TestClientHandler());
    }
}

自定義處理器端的handler,隨機發(fā)送不同協(xié)議的數(shù)據(jù):

package com.zhihao.miao.netty.seventhexample;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Random;

public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

    }

    //客戶端像服務器端發(fā)送數(shù)據(jù)
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        int randomInt = new Random().nextInt(3);

        MyDataInfo.MyMessage myMessage = null;

        if(0 == randomInt){
            myMessage = MyDataInfo.MyMessage.newBuilder().
                    setDataType(MyDataInfo.MyMessage.DataType.PeopleType).
                    setPeople(MyDataInfo.People.newBuilder().setName("張三").
                            setAddress("上海").setAge(26).build()).build();
        }else if(1 == randomInt){
            myMessage = MyDataInfo.MyMessage.newBuilder().
                    setDataType(MyDataInfo.MyMessage.DataType.DogType).
                    setDog(MyDataInfo.Dog.newBuilder().setName("旺財")
                            .setAge("2").build()).build();
        }else if(2 == randomInt){
            myMessage = MyDataInfo.MyMessage.newBuilder().
                    setDataType(MyDataInfo.MyMessage.DataType.CatType).
                    setCat(MyDataInfo.Cat.newBuilder().setName("湯姆")
                            .setCity("上海").build()).build();
        }

        ctx.channel().writeAndFlush(myMessage);
    }
}

啟動服務器端,然后啟動客戶端多執(zhí)行幾次,服務器的控制臺顯示:

七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x82a26e9f, L:/127.0.0.1:8888 - R:/127.0.0.1:51777]
七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
湯姆
上海
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x128da3e7, L:/127.0.0.1:8888 - R:/127.0.0.1:52049]
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
張三
26
上海
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xa8220c73, L:/127.0.0.1:8888 - R:/127.0.0.1:52097]
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
湯姆
上海
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x9ac52ec1, L:/127.0.0.1:8888 - R:/127.0.0.1:52125]
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
張三
26
上海
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x797d03b6, L:/127.0.0.1:8888 - R:/127.0.0.1:52178]
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
旺財
2

使用netty實現(xiàn)多種傳輸協(xié)議

官網(wǎng)類似的demo,自己寫了很長也參考了官網(wǎng)才寫出這個demo,對netty的理解又加深了:

三種協(xié)議實體類:

Person協(xié)議

package com.zhihao.miao.test.day10;

public class Person {

    private String username;

    private int age;
    
    //get set方法

}

Dog協(xié)議

package com.zhihao.miao.test.day10;
public class Dog {

    private String name;

    private String age;

    //get set方法
}

Cat協(xié)議

package com.zhihao.miao.test.day10;

public class Cat {
    private String name;
    private String city;
  //get set方法
}

服務端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class MultiServer {

    public static void main(String args[]) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 指定socket的一些屬性
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)  // 指定是一個NIO連接通道
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerChannelInitializer());

        // 綁定對應的端口號,并啟動開始監(jiān)聽端口上的連接
        Channel ch = serverBootstrap.bind(8899).sync().channel();


        // 等待關閉,同步端口
        ch.closeFuture().sync();

    }
}

服務器端初始化lInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //解析handler
        pipeline.addLast(new ServlerDecoder());
        pipeline.addLast(new TestServerHandler());
    }
}

服務端解碼器Handler,如果解析的位置數(shù)據(jù)是0則按照 Person協(xié)議進行解碼,如果傳遞的位置數(shù)據(jù)是1,則按照Dog協(xié)議進行解碼,如果傳遞的位置數(shù)據(jù)是2,則按照Cat協(xié)議進行解碼:

public class ServlerDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        int flag = in.readInt();

        if(flag == 0){
            int usernamelength = in.readInt();

            byte[] usernamebys = new byte[usernamelength];
            in.readBytes(usernamebys);

            String username = new String(usernamebys);

            int age = in.readInt();

            Person pserson = new Person();
            pserson.setUsername(username);
            pserson.setAge(age);

            out.add(pserson);


        }
        if(flag ==1){
            int namelength =in.readInt();

            byte[] namebys = new byte[namelength];
            in.readBytes(namebys);

            String name = new String(namebys);

            byte[] agebys = new byte[in.readableBytes()];
            in.readBytes(agebys);

            String age = new String(agebys);

            Dog dog = new Dog();
            dog.setName(name);
            dog.setAge(age);

            out.add(dog);
        }
        if(flag ==2){
            int namelength = in.readInt();

            byte[] nameByte = new byte[namelength];
            in.readBytes(nameByte);

            String name = new String(nameByte);

            byte[] colorbys = new byte[in.readableBytes()];
            in.readBytes(colorbys);

            String color = new String(colorbys);

            Cat cat = new Cat();
            cat.setName(name);
            cat.setColor(color);

            out.add(cat);
        }
    }

自定義服務器端Handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TestServerHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof Person){
            System.out.println(((Person) msg).getUsername());
            System.out.println(((Person) msg).getAge());
        }

        if(msg instanceof Dog){
            System.out.println(((Dog) msg).getName());
            System.out.println(((Dog) msg).getAge());
        }

        if(msg instanceof Cat){
            System.out.println(((Cat) msg).getName());
            System.out.println(((Cat) msg).getColor());
        }
    }
}

客戶端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MultiClient {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInitializer());

        // Start the connection attempt.
        Channel ch = b.connect("127.0.0.1", 8899).sync().channel();

        ch.flush();
    }
}

客戶端初始化Initializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

import java.util.Random;

public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        int randomInt = new Random().nextInt(3);

        /**
         * 編碼動作,如果隨機參數(shù)是1,則傳輸Person協(xié)議,如果隨機參數(shù)是2,則傳遞Dog協(xié)議,
         * 如果隨機參數(shù)是3,則傳遞Cat協(xié)議
         *
         * Person協(xié)議就是傳遞一個標識位為0,然后將Person對象編碼成二進制傳輸
         * Dog協(xié)議傳遞一個標識位為1,然后將Dog對象編碼成二進制進行傳輸
         * Cat協(xié)議傳遞一個標識為2,然后將Cat對象編碼成二進制進行傳輸
         */
        if(0 == randomInt){
            pipeline.addLast(new PersonEncoder());
            Person person = new Person();
            person.setUsername("zhihao");
            person.setAge(27);
            pipeline.addLast(new TestClientHandler(person));
        }

        if(1 == randomInt){
            pipeline.addLast(new DogEncoder());
            Dog dog = new Dog();
            dog.setName("wangcai");
            dog.setAge("2");
            pipeline.addLast(new TestClientHandler(dog));
        }

        if(2 == randomInt){
            pipeline.addLast(new CatEncoder());
            Cat cat = new Cat();
            cat.setName("maomi");
            cat.setColor("yellow");
            pipeline.addLast(new TestClientHandler(cat));
        }
    }
}

三種自定義編碼協(xié)議,與服務器端進行對應傳輸Person數(shù)據(jù)的時候,在Person數(shù)據(jù)之前加上標識位置數(shù)據(jù)0,在Dog數(shù)據(jù)之前加上標識位置數(shù)據(jù)1,在Cat數(shù)據(jù)之前加上標識位置數(shù)據(jù)2,然后將其與本身的數(shù)據(jù)一起編碼成二進制進行傳輸。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class PersonEncoder extends MessageToByteEncoder<Person> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
        String username = msg.getUsername();
        int usernamelength = username.length();
        int age = msg.getAge();

        out.writeInt(0); //標識位
        out.writeInt(usernamelength);
        out.writeBytes(username.getBytes());
        out.writeInt(age);
    }
}

Dog協(xié)議編碼器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class DogEncoder extends MessageToByteEncoder<Dog> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Dog msg, ByteBuf out) throws Exception {

        String name = msg.getName();
        int namelength = name.length();
        String age = msg.getAge();

        out.writeInt(1); //標識位
        out.writeInt(namelength);
        out.writeBytes(name.getBytes());
        out.writeBytes(age.getBytes());
    }
}

Cat協(xié)議編碼器:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class CatEncoder extends MessageToByteEncoder<Cat> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Cat msg, ByteBuf out) throws Exception {
        String name = msg.getName();
        int namelength = name.length();
        String color = msg.getColor();

        out.writeInt(2); //標識位
        out.writeInt(namelength);
        out.writeBytes(name.getBytes());
        out.writeBytes(color.getBytes());

    }
}

自定義客戶端處理器:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TestClientHandler extends ChannelInboundHandlerAdapter {

    private Person person;

    private Cat cat;

    private Dog dog;

    public TestClientHandler(Person person){
        this.person = person;
    }

    public TestClientHandler(Dog dog){
        this.dog = dog;
    }

    public TestClientHandler(Cat cat){
        this.cat =cat;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if(person != null){
            ctx.channel().writeAndFlush(person);
        }

        if(dog != null){
            ctx.channel().writeAndFlush(dog);
        }

        if(cat != null){
            ctx.channel().writeAndFlush(cat);
        }
    }
}

啟動服務端,再多次啟動客戶端,服務器控制臺打印出不同協(xié)議傳輸?shù)慕Y(jié)果

maomi
yellow
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xf40f7b07, L:/127.0.0.1:8899 - R:/127.0.0.1:57879]
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
wangcai
2
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x3384f158, L:/127.0.0.1:8899 - R:/127.0.0.1:57914]
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
zhihao
27

demo鏈接

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容