上一節(jié)學(xué)習(xí)了Netty的TCP拆包粘包問(wèn)題的解決之道,今天學(xué)習(xí)Netty的序列化。
什么是序列化
引入百科:序列化 (Serialization)是將對(duì)象的狀態(tài)信息轉(zhuǎn)換為可以存儲(chǔ)或傳輸?shù)男问降倪^(guò)程。在序列化期間,對(duì)象將其當(dāng)前狀態(tài)寫入到臨時(shí)或持久性存儲(chǔ)區(qū)。以后,可以通過(guò)從存儲(chǔ)區(qū)中讀取或反序列化對(duì)象的狀態(tài),重新創(chuàng)建該對(duì)象
Java的序列化
Java原生API提供了對(duì)象的輸入輸出流ObjectIntputStream和ObjectOutputStream,可直接將Java對(duì)象作為可存儲(chǔ)的字節(jié)數(shù)組寫入到文件,也可以傳輸?shù)骄W(wǎng)絡(luò)上。
Java序列化的目的主要有兩個(gè):
1、網(wǎng)絡(luò)傳輸
2、對(duì)象持久化
但是基于Java的序列化方式在效率和速度上都有明顯缺陷,目前已出現(xiàn)多個(gè)序列化框架,我們先通過(guò)代碼比較下Java序列化的缺點(diǎn)
@Data
public class UserInfo implements Serializable {
private static final long serialVersionUID = -3498249724990274743L;
private String userName;
private int userID;
public byte[] codeC() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
byte[] value = this.userName.getBytes();
buffer.putInt(value.length);
buffer.put(value);
buffer.putInt(this.userID);
buffer.flip();
value = null;
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}
}
測(cè)試程序
public class TestUserInfo {
public static void main(String[] args) {
UserInfo info = new UserInfo();
info.setUserID(100);
info.setUserName("wcs");
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ObjectOutputStream os = new ObjectOutputStream(bos);
os.writeObject(info);
os.flush();
os.close();
byte[] b = bos.toByteArray();
System.out.println("the jdk serializable length is:"+b.length);
bos.close();
System.out.println("-----------");
System.out.println("the byte array Serializable length is:"+info.codeC().length);
} catch (IOException e) {
e.printStackTrace();
}
}
}

由此可見采用JDK序列化編碼后的二進(jìn)制數(shù)組大小是二進(jìn)制編碼的10倍。
下面再比較下序列化的時(shí)間長(zhǎng)短,對(duì)同一個(gè)對(duì)象進(jìn)行100萬(wàn)次編碼實(shí)驗(yàn),然后統(tǒng)計(jì)耗費(fèi)的總時(shí)間:
public class PerformsTestUserInfo {
public static void main(String[] args) throws IOException{
UserInfo info = new UserInfo();
info.setUserID(100);
info.setUserName("welcome to netty");
int loop = 100000;
ByteArrayOutputStream bos = null;
ObjectOutputStream os = null;
long startTime = System.currentTimeMillis();
for (int i = 0; i < loop; i++) {
bos = new ByteArrayOutputStream();
os = new ObjectOutputStream(bos);
os.writeObject(info);
os.flush();
os.close();
byte[] b = bos.toByteArray();
bos.close();
}
long endTime = System.currentTimeMillis();
System.out.println("---------jbk serializable cost time"+(endTime - startTime)+" ms");
System.out.println("-------");
ByteBuffer buffer = ByteBuffer.allocate(1024);
startTime = System.currentTimeMillis();
for (int i = 0; i < loop; i++) {
byte[] b = info.codeC();
}
endTime = System.currentTimeMillis();
System.out.println("---------the byte array serializable cost time is"+(endTime - startTime)+" ms");
}
}

由此可見Java序列化的性能只有二進(jìn)制編碼的6.17%.
Netty Java序列化開發(fā)
使用Netty對(duì)POJO對(duì)象進(jìn)行序列化開發(fā),POJO對(duì)象如下:
@Data
public class SubscribeResp implements Serializable {
private static final long serialVersionUID = -4261173283103510587L;
private int subReqId;
private int respCode;
private String desc;
@Override
public String toString() {
return "subReqId="+subReqId+" respCode="+respCode+" desc="+desc;
}
}
在Netty服務(wù)端程序中添加解碼器ObjectDecode和編碼器ObjectEncode
服務(wù)端代碼如下
public class SubReqServer {
public void bind(int port) throws Exception {
//創(chuàng)建兩個(gè)線程組 一個(gè)用于服務(wù)端接收客戶端的連接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//一個(gè)用于網(wǎng)絡(luò)讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new SubReqServerHandler());
}
});
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
try {
new SubReqServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeResp resp = (SubscribeResp) msg;
if ("wcs".equals(resp.getDesc())) {
System.out.println("Service accept client subsribe resp:[" + resp.toString()+"]");
ctx.writeAndFlush(buildResponse(resp.getSubReqId()));
}
}
private SubscribeResp buildResponse(int subReqId) {
SubscribeResp resp = new SubscribeResp();
resp.setSubReqId(subReqId);
resp.setRespCode(0);
resp.setDesc("receive success");
return resp;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
在Netty客戶端程序中添加解碼器ObjectDecode和編碼器ObjectEncoder
public class SubReqClient {
public void connect(int port,String host) throws Exception {
//創(chuàng)建讀寫io線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
socketChannel.pipeline().addLast(new SubReqClientHandler());
}
});
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length >0) {
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e) {
}
}
try {
new SubReqClient().connect(port,"127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class SubReqClientHandler extends ChannelHandlerAdapter {
public SubReqClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i< 10000; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Service accept client subsribe resp:[" + msg+"]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private SubscribeResp subReq(int i) {
SubscribeResp subscribeResp = new SubscribeResp();
subscribeResp.setSubReqId(i);
subscribeResp.setDesc("wcs");
return subscribeResp;
}
}
服務(wù)端運(yùn)行結(jié)果

客戶端運(yùn)行結(jié)果
