之前實現(xiàn)了一個基于Socket傳輸?shù)暮唵蔚目蚣?,現(xiàn)在可以引入Netty進行傳輸。
將之前的Client和Server抽象成接口,方便以不同的方式來傳輸時都可以通用
public interface RpcClient {
Object sendRequest(RpcRequest rpcRequest);
}
public interface RpcServer {
void start(int port);
}
自定義一個序列化接口,包括序列化,反序列化,獲得序列化器的編號以及根據(jù)編號得到序列化器
public interface CommonSerializer {
byte[] serialize(Object obj);
Object deserialize(byte[] bytes, Class<?> clazz);
int getCode();
static CommonSerializer getByCode(int code) {
switch (code) {
case 1:
return new JsonSerializer();
default:
return null;
}
}
}
然后是它的實現(xiàn)類,使用Json進行序列化操作。序列化和反序列化都很簡單。
public class JsonSerializer implements CommonSerializer{
private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
logger.error("序列化時有錯誤發(fā)生: {}", e.getMessage());
e.printStackTrace();
return null;
}
}
@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try {
Object obj = objectMapper.readValue(bytes, clazz);
if(obj instanceof RpcRequest) {
obj = handleRequest(obj);
}
return obj;
} catch (IOException e) {
logger.error("反序列化時有錯誤發(fā)生: {}", e.getMessage());
e.printStackTrace();
return null;
}
}
@Override
public int getCode() {
return SerializerCode.valueOf("JSON").getCode();
}
}
但有一點要注意,RpcRequest中的參數(shù)是一個Object【】數(shù)組,不是一個具體的類型,直接反序列化會失敗
private Object[] parameters;
所以要進行一些處理,通過ParamTypes獲取Object【】數(shù)組中每個實例的類型,這樣就可以正確反序列化
private Object handleRequest(Object obj) throws IOException {
RpcRequest rpcRequest = (RpcRequest) obj;
for(int i = 0; i < rpcRequest.getParamTypes().length; i ++) {
Class<?> clazz = rpcRequest.getParamTypes()[i];
if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
}
}
return rpcRequest;
}
之后我們可以自定義一個傳輸協(xié)議。
+---------------+---------------+-----------------+-------------+
| Magic Number | Package Type | Serializer Type | Data Length |
| 4 bytes | 4 bytes | 4 bytes | 4 bytes |
+---------------+---------------+-----------------+-------------+
| Data Bytes |
| Length: ${Data Length} |
+---------------------------------------------------------------+
魔數(shù)主要用來起標識作用,只有魔數(shù)匹配才會進行下一步操作。Package Type,標明這是一個RpcRequest還是RPCResponse,Serializer Type 標明了實際數(shù)據(jù)使用的序列化器,這個服務(wù)端和客戶端應(yīng)當使用統(tǒng)一標準;Data Length 就是實際數(shù)據(jù)的長度,設(shè)置這個字段主要防止粘包,最后就是經(jīng)過序列化后的實際數(shù)據(jù),可能是 RpcRequest 也可能是 RpcResponse 經(jīng)過序列化后的字節(jié),取決于 Package Type。
有了協(xié)議之后,便可定義編碼器與解碼器
編碼器根據(jù)上述規(guī)定,將各個字段寫入管道
public class CommonEncoder extends MessageToByteEncoder {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
private final CommonSerializer serializer;
public CommonEncoder(CommonSerializer serializer) {
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if(msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
解碼器將字節(jié)序列轉(zhuǎn)化為對象,通過序列化器編號知道這是通過Json還是其他方式序列化的,還需讀取 length 字段來確定數(shù)據(jù)包的長度,防止粘包。
public class CommonDecoder extends ReplayingDecoder {
private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if(magic != MAGIC_NUMBER) {
logger.error("不識別的協(xié)議包: {}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if(packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不識別的數(shù)據(jù)包: {}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if(serializer == null) {
logger.error("不識別的反序列化器: {}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}
}
之后是以Netty形式實現(xiàn)服務(wù)端與客戶端
啟動Netty服務(wù)的方式是常規(guī)的用法
public class NettyServer implements RpcServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
@Override
public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.option(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonEncoder(new JsonSerializer()));
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("啟動服務(wù)器時有錯誤發(fā)生: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
其中NettyServerhandler 的作用是接收 RpcRequest,然后執(zhí)行調(diào)用,將調(diào)用結(jié)果封裝成 RpcResponse 發(fā)送出去。
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RequestHandler requestHandler;
private static ServiceRegistry serviceRegistry;
static {
requestHandler = new RequestHandler();
serviceRegistry = new ServiceRegistryImpl();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
try {
logger.info("服務(wù)器接收到請求: {}", msg);
String interfaceName = msg.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(msg, service);
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result));
future.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("處理過程調(diào)用時有錯誤發(fā)生:");
cause.printStackTrace();
ctx.close();
}
}
客戶端
public class NettyClient implements RpcClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private String host;
private int port;
private static final Bootstrap bootstrap;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
static {
EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonDecoder())
.addLast(new CommonEncoder(new JsonSerializer()))
.addLast(new NettyClientHandler());
}
});
}
@Override
public Object sendRequest(RpcRequest rpcRequest) {
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
logger.info("客戶端連接到服務(wù)器 {}:{}", host, port);
Channel channel = future.channel();
if(channel != null) {
channel.writeAndFlush(rpcRequest).addListener(future1 -> {
if(future1.isSuccess()) {
logger.info(String.format("客戶端發(fā)送消息: %s", rpcRequest.toString()));
} else {
logger.error("發(fā)送消息時有錯誤發(fā)生: ", future1.cause());
}
});
channel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();
return rpcResponse.getData();
}
} catch (InterruptedException e) {
logger.error("發(fā)送消息時有錯誤發(fā)生: ", e);
}
return null;
}
}
NettyClientHandler負責(zé)將接受的消息放入通道中
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
try {
logger.info(String.format("客戶端接收到消息: %s", msg));
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
ctx.channel().attr(key).set(msg);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("過程調(diào)用時有錯誤發(fā)生:");
cause.printStackTrace();
ctx.close();
}
}