Netty學(xué)習(xí)筆記四-序列化

上一節(jié)學(xué)習(xí)了Netty的TCP拆包粘包問(wèn)題的解決之道,今天學(xué)習(xí)Netty的序列化。

什么是序列化

引入百科:序列化 (Serialization)是將對(duì)象的狀態(tài)信息轉(zhuǎn)換為可以存儲(chǔ)或傳輸?shù)男问降倪^(guò)程。在序列化期間,對(duì)象將其當(dāng)前狀態(tài)寫入到臨時(shí)或持久性存儲(chǔ)區(qū)。以后,可以通過(guò)從存儲(chǔ)區(qū)中讀取或反序列化對(duì)象的狀態(tài),重新創(chuàng)建該對(duì)象

Java的序列化

Java原生API提供了對(duì)象的輸入輸出流ObjectIntputStream和ObjectOutputStream,可直接將Java對(duì)象作為可存儲(chǔ)的字節(jié)數(shù)組寫入到文件,也可以傳輸?shù)骄W(wǎng)絡(luò)上。
Java序列化的目的主要有兩個(gè):
1、網(wǎng)絡(luò)傳輸
2、對(duì)象持久化
但是基于Java的序列化方式在效率和速度上都有明顯缺陷,目前已出現(xiàn)多個(gè)序列化框架,我們先通過(guò)代碼比較下Java序列化的缺點(diǎn)

@Data
public class UserInfo implements Serializable {

  private static final long serialVersionUID = -3498249724990274743L;

  private String userName;

  private int userID;

  public byte[] codeC() {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      byte[] value = this.userName.getBytes();
      buffer.putInt(value.length);
      buffer.put(value);
      buffer.putInt(this.userID);
      buffer.flip();
      value = null;
      byte[] result = new byte[buffer.remaining()];
      buffer.get(result);
      return result;
  }
}

測(cè)試程序

public class TestUserInfo {
    public static void main(String[] args) {
        UserInfo info = new UserInfo();
        info.setUserID(100);
        info.setUserName("wcs");
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream os  = new ObjectOutputStream(bos);
            os.writeObject(info);
            os.flush();
            os.close();
            byte[] b = bos.toByteArray();
            System.out.println("the jdk serializable length is:"+b.length);
            bos.close();
            System.out.println("-----------");
            System.out.println("the byte array Serializable length is:"+info.codeC().length);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
image.png

由此可見采用JDK序列化編碼后的二進(jìn)制數(shù)組大小是二進(jìn)制編碼的10倍。
下面再比較下序列化的時(shí)間長(zhǎng)短,對(duì)同一個(gè)對(duì)象進(jìn)行100萬(wàn)次編碼實(shí)驗(yàn),然后統(tǒng)計(jì)耗費(fèi)的總時(shí)間:

public class PerformsTestUserInfo {
    public static void main(String[] args) throws IOException{
        UserInfo info = new UserInfo();
        info.setUserID(100);
        info.setUserName("welcome to netty");
        int loop = 100000;
        ByteArrayOutputStream bos = null;
        ObjectOutputStream os = null;
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < loop; i++) {
            bos = new ByteArrayOutputStream();
            os = new ObjectOutputStream(bos);
            os.writeObject(info);
            os.flush();
            os.close();
            byte[] b = bos.toByteArray();
            bos.close();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("---------jbk serializable cost time"+(endTime - startTime)+" ms");
        System.out.println("-------");
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        startTime = System.currentTimeMillis();
        for (int i = 0; i < loop; i++) {
            byte[] b = info.codeC();
        }
        endTime = System.currentTimeMillis();
        System.out.println("---------the byte array serializable cost time is"+(endTime - startTime)+" ms");
    }
}
image.png

由此可見Java序列化的性能只有二進(jìn)制編碼的6.17%.

Netty Java序列化開發(fā)

使用Netty對(duì)POJO對(duì)象進(jìn)行序列化開發(fā),POJO對(duì)象如下:

@Data
public class SubscribeResp  implements Serializable {
    private static final long serialVersionUID = -4261173283103510587L;

    private int subReqId;

    private int respCode;

    private String desc;
    @Override
    public String toString() {
        return "subReqId="+subReqId+" respCode="+respCode+" desc="+desc;
    }
}

在Netty服務(wù)端程序中添加解碼器ObjectDecode和編碼器ObjectEncode
服務(wù)端代碼如下

public class SubReqServer {
    public void bind(int port) throws Exception {
        //創(chuàng)建兩個(gè)線程組 一個(gè)用于服務(wù)端接收客戶端的連接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //一個(gè)用于網(wǎng)絡(luò)讀寫
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ObjectDecoder(1024 * 1024,
                            ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                        socketChannel.pipeline().addLast(new ObjectEncoder());
                        socketChannel.pipeline().addLast(new SubReqServerHandler());
                    }
                });
            ChannelFuture future = b.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {

            }
        }
        try {
            new SubReqServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class SubReqServerHandler  extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SubscribeResp resp = (SubscribeResp) msg;
        if ("wcs".equals(resp.getDesc())) {
            System.out.println("Service accept client subsribe resp:[" + resp.toString()+"]");
            ctx.writeAndFlush(buildResponse(resp.getSubReqId()));
        }
    }

    private SubscribeResp buildResponse(int subReqId) {
        SubscribeResp resp = new SubscribeResp();
        resp.setSubReqId(subReqId);
        resp.setRespCode(0);
        resp.setDesc("receive success");
        return resp;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

在Netty客戶端程序中添加解碼器ObjectDecode和編碼器ObjectEncoder

public class SubReqClient {
    public void connect(int port,String host) throws Exception {
        //創(chuàng)建讀寫io線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ObjectEncoder());
                        socketChannel.pipeline().addLast(new ObjectDecoder(1024 * 1024,
                            ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                        socketChannel.pipeline().addLast(new SubReqClientHandler());

                    }
                });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length >0) {
            try {
                port = Integer.valueOf(args[0]);
            }catch (NumberFormatException e) {

            }
        }
        try {
            new SubReqClient().connect(port,"127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class SubReqClientHandler extends ChannelHandlerAdapter {

    public SubReqClientHandler() {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i< 10000; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Service accept client subsribe resp:[" + msg+"]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    private SubscribeResp subReq(int i) {
        SubscribeResp subscribeResp = new SubscribeResp();
        subscribeResp.setSubReqId(i);
        subscribeResp.setDesc("wcs");
        return subscribeResp;
    }
}

服務(wù)端運(yùn)行結(jié)果


image.png

客戶端運(yùn)行結(jié)果


image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • JAVA序列化機(jī)制的深入研究 對(duì)象序列化的最主要的用處就是在傳遞,和保存對(duì)象(object)的時(shí)候,保證對(duì)象的完整...
    時(shí)待吾閱讀 11,203評(píng)論 0 24
  • Zookeeper用于集群主備切換。 YARN讓集群具備更好的擴(kuò)展性。 Spark沒(méi)有存儲(chǔ)能力。 Spark的Ma...
    Yobhel閱讀 7,605評(píng)論 0 34
  • 作者:李林鋒 原文:http://www.infoq.com/cn/articles/netty-high-per...
    楊鑫科閱讀 4,117評(píng)論 0 64
  • today is Monday。 本以為今天會(huì)有事,要到變電站去。但是昨晚沒(méi)有人通知,我就沒(méi)有去。 早上默默打開電...
    xiao錢錢閱讀 229評(píng)論 0 0
  • 稱呼自己的男人,也有學(xué)問(wèn)和講究,正確的稱呼,快樂(lè)幸福多多! 以后除了“老公”二字不用,否則是傻女人!“郎君”,在當(dāng)...
    緣悅閱讀 8,532評(píng)論 2 2

友情鏈接更多精彩內(nèi)容