java 從零開始手寫 RPC (04) -序列化

序列化

java 從零開始手寫 RPC (01) 基于 socket 實(shí)現(xiàn)

java 從零開始手寫 RPC (02)-netty4 實(shí)現(xiàn)客戶端和服務(wù)端

java 從零開始手寫 RPC (03) 如何實(shí)現(xiàn)客戶端調(diào)用服務(wù)端?

前面幾節(jié)我們實(shí)現(xiàn)了最基礎(chǔ)的客戶端調(diào)用服務(wù)端,這一節(jié)來(lái)學(xué)習(xí)一下通訊中的對(duì)象序列化。

fastjson

為什么需要序列化

netty 底層都是基于 ByteBuf 進(jìn)行通訊的。

前面我們通過(guò)編碼器/解碼器專門為計(jì)算的入?yún)?出參進(jìn)行處理,這樣方便我們直接使用 pojo。

但是有一個(gè)問(wèn)題,如果想把我們的項(xiàng)目抽象為框架,那就需要為所有的對(duì)象編寫編碼器/解碼器。

顯然,直接通過(guò)每一個(gè)對(duì)象寫一對(duì)的方式是不現(xiàn)實(shí)的,而且用戶如何使用,也是未知的。

序列化的方式

基于字節(jié)的實(shí)現(xiàn),性能好,可讀性不高。

基于字符串的實(shí)現(xiàn),比如 json 序列化,可讀性好,性能相對(duì)較差。

ps: 可以根據(jù)個(gè)人還好選擇,相關(guān)序列化可參考下文,此處不做展開。

json 序列化框架簡(jiǎn)介

實(shí)現(xiàn)思路

可以將我們的 Pojo 全部轉(zhuǎn)化為 byte,然后 Byte 轉(zhuǎn)換為 ByteBuf 即可。

反之亦然。

代碼實(shí)現(xiàn)

maven

引入序列化包:

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>json</artifactId>
    <version>0.1.1</version>
</dependency>

服務(wù)端

核心

服務(wù)端的代碼可以大大簡(jiǎn)化:

serverBootstrap.group(workerGroup, bossGroup)
    .channel(NioServerSocketChannel.class)
    // 打印日志
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline()
                    .addLast(new RpcServerHandler());
        }
    })
    // 這個(gè)參數(shù)影響的是還沒(méi)有被accept 取出的連接
    .option(ChannelOption.SO_BACKLOG, 128)
    // 這個(gè)參數(shù)只是過(guò)一段時(shí)間內(nèi)客戶端沒(méi)有響應(yīng),服務(wù)端會(huì)發(fā)送一個(gè) ack 包,以判斷客戶端是否還活著。
    .childOption(ChannelOption.SO_KEEPALIVE, true);

這里只需要一個(gè)實(shí)現(xiàn)類即可。

RpcServerHandler

服務(wù)端的序列化/反序列化調(diào)整為直接使用 JsonBs 實(shí)現(xiàn)。

package com.github.houbb.rpc.server.handler;

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class RpcServerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        final String id = ctx.channel().id().asLongText();

        ByteBuf byteBuf = (ByteBuf)msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);
        log.info("[Server] receive channel {} request: {} from ", id, request);

        Calculator calculator = new CalculatorService();
        CalculateResponse response = calculator.sum(request);

        // 回寫到 client 端
        byte[] responseBytes = JsonBs.serializeBytes(response);
        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
        ctx.writeAndFlush(responseBuffer);
        log.info("[Server] channel {} response {}", id, response);
    }

}

客戶端

核心

客戶端可以簡(jiǎn)化如下:

channelFuture = bootstrap.group(workerGroup)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .handler(new ChannelInitializer<Channel>(){
        @Override
        protected void initChannel(Channel ch) throws Exception {
            channelHandler = new RpcClientHandler();
            ch.pipeline()
                    .addLast(new LoggingHandler(LogLevel.INFO))
                    .addLast(channelHandler);
        }
    })
    .connect(RpcConstant.ADDRESS, port)
    .syncUninterruptibly();

RpcClientHandler

客戶端的序列化/反序列化調(diào)整為直接使用 JsonBs 實(shí)現(xiàn)。

package com.github.houbb.rpc.client.handler;

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * <p> 客戶端處理類 </p>
 *
 * <pre> Created: 2019/10/16 11:30 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.2
 */
public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    /**
     * 響應(yīng)信息
     * @since 0.0.4
     */
    private CalculateResponse response;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);

        this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);
        log.info("[Client] response is :{}", response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 每次用完要關(guān)閉,不然拿不到response,我也不知道為啥(目測(cè)得了解netty才行)
        // 個(gè)人理解:如果不關(guān)閉,則永遠(yuǎn)會(huì)被阻塞。
        ctx.flush();
        ctx.close();
    }

    public CalculateResponse getResponse() {
        return response;
    }

}

小結(jié)

為了便于大家學(xué)習(xí),以上源碼已經(jīng)開源:

https://github.com/houbb/rpc

希望本文對(duì)你有所幫助,如果喜歡,歡迎點(diǎn)贊收藏轉(zhuǎn)發(fā)一波。

我是老馬,期待與你的下次相遇。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容