序列化
java 從零開始手寫 RPC (01) 基于 socket 實(shí)現(xiàn)
java 從零開始手寫 RPC (02)-netty4 實(shí)現(xiàn)客戶端和服務(wù)端
java 從零開始手寫 RPC (03) 如何實(shí)現(xiàn)客戶端調(diào)用服務(wù)端?
前面幾節(jié)我們實(shí)現(xiàn)了最基礎(chǔ)的客戶端調(diào)用服務(wù)端,這一節(jié)來(lái)學(xué)習(xí)一下通訊中的對(duì)象序列化。
為什么需要序列化
netty 底層都是基于 ByteBuf 進(jìn)行通訊的。
前面我們通過(guò)編碼器/解碼器專門為計(jì)算的入?yún)?出參進(jìn)行處理,這樣方便我們直接使用 pojo。
但是有一個(gè)問(wèn)題,如果想把我們的項(xiàng)目抽象為框架,那就需要為所有的對(duì)象編寫編碼器/解碼器。
顯然,直接通過(guò)每一個(gè)對(duì)象寫一對(duì)的方式是不現(xiàn)實(shí)的,而且用戶如何使用,也是未知的。
序列化的方式
基于字節(jié)的實(shí)現(xiàn),性能好,可讀性不高。
基于字符串的實(shí)現(xiàn),比如 json 序列化,可讀性好,性能相對(duì)較差。
ps: 可以根據(jù)個(gè)人還好選擇,相關(guān)序列化可參考下文,此處不做展開。
實(shí)現(xiàn)思路
可以將我們的 Pojo 全部轉(zhuǎn)化為 byte,然后 Byte 轉(zhuǎn)換為 ByteBuf 即可。
反之亦然。
代碼實(shí)現(xiàn)
maven
引入序列化包:
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>json</artifactId>
<version>0.1.1</version>
</dependency>
服務(wù)端
核心
服務(wù)端的代碼可以大大簡(jiǎn)化:
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
// 打印日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new RpcServerHandler());
}
})
// 這個(gè)參數(shù)影響的是還沒(méi)有被accept 取出的連接
.option(ChannelOption.SO_BACKLOG, 128)
// 這個(gè)參數(shù)只是過(guò)一段時(shí)間內(nèi)客戶端沒(méi)有響應(yīng),服務(wù)端會(huì)發(fā)送一個(gè) ack 包,以判斷客戶端是否還活著。
.childOption(ChannelOption.SO_KEEPALIVE, true);
這里只需要一個(gè)實(shí)現(xiàn)類即可。
RpcServerHandler
服務(wù)端的序列化/反序列化調(diào)整為直接使用 JsonBs 實(shí)現(xiàn)。
package com.github.houbb.rpc.server.handler;
import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author binbin.hou
* @since 0.0.1
*/
public class RpcServerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel {} connected " + id);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final String id = ctx.channel().id().asLongText();
ByteBuf byteBuf = (ByteBuf)msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);
log.info("[Server] receive channel {} request: {} from ", id, request);
Calculator calculator = new CalculatorService();
CalculateResponse response = calculator.sum(request);
// 回寫到 client 端
byte[] responseBytes = JsonBs.serializeBytes(response);
ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
ctx.writeAndFlush(responseBuffer);
log.info("[Server] channel {} response {}", id, response);
}
}
客戶端
核心
客戶端可以簡(jiǎn)化如下:
channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
channelHandler = new RpcClientHandler();
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(channelHandler);
}
})
.connect(RpcConstant.ADDRESS, port)
.syncUninterruptibly();
RpcClientHandler
客戶端的序列化/反序列化調(diào)整為直接使用 JsonBs 實(shí)現(xiàn)。
package com.github.houbb.rpc.client.handler;
import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* <p> 客戶端處理類 </p>
*
* <pre> Created: 2019/10/16 11:30 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClientHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 響應(yīng)信息
* @since 0.0.4
*/
private CalculateResponse response;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);
log.info("[Client] response is :{}", response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 每次用完要關(guān)閉,不然拿不到response,我也不知道為啥(目測(cè)得了解netty才行)
// 個(gè)人理解:如果不關(guān)閉,則永遠(yuǎn)會(huì)被阻塞。
ctx.flush();
ctx.close();
}
public CalculateResponse getResponse() {
return response;
}
}
小結(jié)
為了便于大家學(xué)習(xí),以上源碼已經(jīng)開源:
希望本文對(duì)你有所幫助,如果喜歡,歡迎點(diǎn)贊收藏轉(zhuǎn)發(fā)一波。
我是老馬,期待與你的下次相遇。