上篇文章講解了自定義通信協(xié)議,本章節(jié)介紹如何支持多種協(xié)議。
會構建一個Server,同時支持Cat,Dog和People通信協(xié)議。有二種實現(xiàn)方式:
- 第一種方式利用了自定義協(xié)議,傳遞消息的時候,對消息的前幾位(比如2位)進行自定義的位置(比如AB)解碼器解析的時候前二位為AB表示一種協(xié)議類型,CD一種協(xié)議類型。這種方式?jīng)]有利用protobuf,而是直接使用Netty自定義協(xié)議來解決的方案。
- 第二種方式使用protobuf來實現(xiàn),實際上是對消息的定義方式進行規(guī)定,因為netty本身,客戶端和服務器端建立的是一條TCP連接,一方必須要判斷對方發(fā)送過來的對象是什么類型。
Protocol Buffers實現(xiàn)netty的多種傳輸協(xié)議
我們知道使用Protocol Buffers首先定義一個.proto文件
定義一個最外層的消息,最外層的消息(MyMessage)包含了所有傳遞的消息類型,所有的消息類型嵌套在最外層的消息類型中,每次傳遞都將傳遞具體消息類型(以最外層消息類型的枚舉類型傳遞)
syntax ="proto2";
package com.zhihao.miao.netty.sixthexample;
option optimize_for = SPEED;
option java_package = "com.zhihao.miao.netty.seventhexample";
option java_outer_classname="MyDataInfo";
message MyMessage {
enum DataType{
PeopleType = 1;
DogType = 2;
CatType = 3;
}
required DataType data_type = 1;
//oneof的意思:如果有多個可選字段,在某一個時刻只能只有一個值被設置,可以節(jié)省內(nèi)存空間
oneof dataBody {
People people = 2;
Dog dog = 3;
Cat cat = 4;
}
}
message People{
optional string name = 1;
optional int32 age = 2;
optional string address = 3;
}
message Dog{
optional string name = 1;
optional string age = 2;
}
message Cat{
optional string name = 1;
optional string city = 2;
}
使用編譯器編譯生成代碼
protoc --java_out=src/main/java src/protobuf/People.proto
關于proto協(xié)議中的Oneof含義,如果有多個可選字段,在某一個時刻只能只有一個值被設置,官方鏈接,生成MyDataInfo類,類代碼太多,這邊不貼出了
服務端代碼:
package com.zhihao.miao.netty.seventhexample;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
服務端初始化鏈接:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//使用最外層的消息實例
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
其實實現(xiàn)的關鍵就在于此,使用MyDataInfo.MyMessage實列(MyDataInfo.MyMessage是枚舉類型),而我們定義的三種對象剛好就是其枚舉對象
自定義的服務端的Handler,根據(jù)通道中傳遞數(shù)據(jù)的不同dataType值來解析程具體的類型:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.PeopleType){
MyDataInfo.People people = msg.getPeople();
System.out.println(people.getName());
System.out.println(people.getAge());
System.out.println(people.getAddress());
}else if(dataType == MyDataInfo.MyMessage.DataType.DogType){
MyDataInfo.Dog dog = msg.getDog();
System.out.println(dog.getName());
System.out.println(dog.getAge());
}else if(dataType == MyDataInfo.MyMessage.DataType.CatType){
MyDataInfo.Cat cat = msg.getCat();
System.out.println(cat.getName());
System.out.println(cat.getCity());
}
}
}
客戶端代碼:
package com.zhihao.miao.netty.seventhexample;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TestClient {
public static void main(String[] args) throws Exception{
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客戶端的初始化鏈接:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//使用最外層的消息實例
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}
自定義處理器端的handler,隨機發(fā)送不同協(xié)議的數(shù)據(jù):
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
}
//客戶端像服務器端發(fā)送數(shù)據(jù)
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int randomInt = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.PeopleType).
setPeople(MyDataInfo.People.newBuilder().setName("張三").
setAddress("上海").setAge(26).build()).build();
}else if(1 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.DogType).
setDog(MyDataInfo.Dog.newBuilder().setName("旺財")
.setAge("2").build()).build();
}else if(2 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.CatType).
setCat(MyDataInfo.Cat.newBuilder().setName("湯姆")
.setCity("上海").build()).build();
}
ctx.channel().writeAndFlush(myMessage);
}
}
啟動服務器端,然后啟動客戶端多執(zhí)行幾次,服務器的控制臺顯示:
七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x82a26e9f, L:/127.0.0.1:8888 - R:/127.0.0.1:51777]
七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
湯姆
上海
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x128da3e7, L:/127.0.0.1:8888 - R:/127.0.0.1:52049]
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
張三
26
上海
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xa8220c73, L:/127.0.0.1:8888 - R:/127.0.0.1:52097]
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
湯姆
上海
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x9ac52ec1, L:/127.0.0.1:8888 - R:/127.0.0.1:52125]
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
張三
26
上海
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x797d03b6, L:/127.0.0.1:8888 - R:/127.0.0.1:52178]
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
旺財
2
使用netty實現(xiàn)多種傳輸協(xié)議
官網(wǎng)類似的demo,自己寫了很長也參考了官網(wǎng)才寫出這個demo,對netty的理解又加深了:
三種協(xié)議實體類:
Person協(xié)議
package com.zhihao.miao.test.day10;
public class Person {
private String username;
private int age;
//get set方法
}
Dog協(xié)議
package com.zhihao.miao.test.day10;
public class Dog {
private String name;
private String age;
//get set方法
}
Cat協(xié)議
package com.zhihao.miao.test.day10;
public class Cat {
private String name;
private String city;
//get set方法
}
服務端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MultiServer {
public static void main(String args[]) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些屬性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一個NIO連接通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerChannelInitializer());
// 綁定對應的端口號,并啟動開始監(jiān)聽端口上的連接
Channel ch = serverBootstrap.bind(8899).sync().channel();
// 等待關閉,同步端口
ch.closeFuture().sync();
}
}
服務器端初始化lInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//解析handler
pipeline.addLast(new ServlerDecoder());
pipeline.addLast(new TestServerHandler());
}
}
服務端解碼器Handler,如果解析的位置數(shù)據(jù)是0則按照 Person協(xié)議進行解碼,如果傳遞的位置數(shù)據(jù)是1,則按照Dog協(xié)議進行解碼,如果傳遞的位置數(shù)據(jù)是2,則按照Cat協(xié)議進行解碼:
public class ServlerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int flag = in.readInt();
if(flag == 0){
int usernamelength = in.readInt();
byte[] usernamebys = new byte[usernamelength];
in.readBytes(usernamebys);
String username = new String(usernamebys);
int age = in.readInt();
Person pserson = new Person();
pserson.setUsername(username);
pserson.setAge(age);
out.add(pserson);
}
if(flag ==1){
int namelength =in.readInt();
byte[] namebys = new byte[namelength];
in.readBytes(namebys);
String name = new String(namebys);
byte[] agebys = new byte[in.readableBytes()];
in.readBytes(agebys);
String age = new String(agebys);
Dog dog = new Dog();
dog.setName(name);
dog.setAge(age);
out.add(dog);
}
if(flag ==2){
int namelength = in.readInt();
byte[] nameByte = new byte[namelength];
in.readBytes(nameByte);
String name = new String(nameByte);
byte[] colorbys = new byte[in.readableBytes()];
in.readBytes(colorbys);
String color = new String(colorbys);
Cat cat = new Cat();
cat.setName(name);
cat.setColor(color);
out.add(cat);
}
}
自定義服務器端Handler:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof Person){
System.out.println(((Person) msg).getUsername());
System.out.println(((Person) msg).getAge());
}
if(msg instanceof Dog){
System.out.println(((Dog) msg).getName());
System.out.println(((Dog) msg).getAge());
}
if(msg instanceof Cat){
System.out.println(((Cat) msg).getName());
System.out.println(((Cat) msg).getColor());
}
}
}
客戶端:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MultiClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInitializer());
// Start the connection attempt.
Channel ch = b.connect("127.0.0.1", 8899).sync().channel();
ch.flush();
}
}
客戶端初始化Initializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import java.util.Random;
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
int randomInt = new Random().nextInt(3);
/**
* 編碼動作,如果隨機參數(shù)是1,則傳輸Person協(xié)議,如果隨機參數(shù)是2,則傳遞Dog協(xié)議,
* 如果隨機參數(shù)是3,則傳遞Cat協(xié)議
*
* Person協(xié)議就是傳遞一個標識位為0,然后將Person對象編碼成二進制傳輸
* Dog協(xié)議傳遞一個標識位為1,然后將Dog對象編碼成二進制進行傳輸
* Cat協(xié)議傳遞一個標識為2,然后將Cat對象編碼成二進制進行傳輸
*/
if(0 == randomInt){
pipeline.addLast(new PersonEncoder());
Person person = new Person();
person.setUsername("zhihao");
person.setAge(27);
pipeline.addLast(new TestClientHandler(person));
}
if(1 == randomInt){
pipeline.addLast(new DogEncoder());
Dog dog = new Dog();
dog.setName("wangcai");
dog.setAge("2");
pipeline.addLast(new TestClientHandler(dog));
}
if(2 == randomInt){
pipeline.addLast(new CatEncoder());
Cat cat = new Cat();
cat.setName("maomi");
cat.setColor("yellow");
pipeline.addLast(new TestClientHandler(cat));
}
}
}
三種自定義編碼協(xié)議,與服務器端進行對應傳輸Person數(shù)據(jù)的時候,在Person數(shù)據(jù)之前加上標識位置數(shù)據(jù)0,在Dog數(shù)據(jù)之前加上標識位置數(shù)據(jù)1,在Cat數(shù)據(jù)之前加上標識位置數(shù)據(jù)2,然后將其與本身的數(shù)據(jù)一起編碼成二進制進行傳輸。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class PersonEncoder extends MessageToByteEncoder<Person> {
@Override
protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
String username = msg.getUsername();
int usernamelength = username.length();
int age = msg.getAge();
out.writeInt(0); //標識位
out.writeInt(usernamelength);
out.writeBytes(username.getBytes());
out.writeInt(age);
}
}
Dog協(xié)議編碼器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class DogEncoder extends MessageToByteEncoder<Dog> {
@Override
protected void encode(ChannelHandlerContext ctx, Dog msg, ByteBuf out) throws Exception {
String name = msg.getName();
int namelength = name.length();
String age = msg.getAge();
out.writeInt(1); //標識位
out.writeInt(namelength);
out.writeBytes(name.getBytes());
out.writeBytes(age.getBytes());
}
}
Cat協(xié)議編碼器:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class CatEncoder extends MessageToByteEncoder<Cat> {
@Override
protected void encode(ChannelHandlerContext ctx, Cat msg, ByteBuf out) throws Exception {
String name = msg.getName();
int namelength = name.length();
String color = msg.getColor();
out.writeInt(2); //標識位
out.writeInt(namelength);
out.writeBytes(name.getBytes());
out.writeBytes(color.getBytes());
}
}
自定義客戶端處理器:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TestClientHandler extends ChannelInboundHandlerAdapter {
private Person person;
private Cat cat;
private Dog dog;
public TestClientHandler(Person person){
this.person = person;
}
public TestClientHandler(Dog dog){
this.dog = dog;
}
public TestClientHandler(Cat cat){
this.cat =cat;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if(person != null){
ctx.channel().writeAndFlush(person);
}
if(dog != null){
ctx.channel().writeAndFlush(dog);
}
if(cat != null){
ctx.channel().writeAndFlush(cat);
}
}
}
啟動服務端,再多次啟動客戶端,服務器控制臺打印出不同協(xié)議傳輸?shù)慕Y(jié)果
maomi
yellow
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xf40f7b07, L:/127.0.0.1:8899 - R:/127.0.0.1:57879]
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
wangcai
2
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x3384f158, L:/127.0.0.1:8899 - R:/127.0.0.1:57914]
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
zhihao
27