通用調(diào)用
java 從零開(kāi)始手寫(xiě) RPC (01) 基于 socket 實(shí)現(xiàn)
java 從零開(kāi)始手寫(xiě) RPC (02)-netty4 實(shí)現(xiàn)客戶端和服務(wù)端
java 從零開(kāi)始手寫(xiě) RPC (03) 如何實(shí)現(xiàn)客戶端調(diào)用服務(wù)端?
java 從零開(kāi)始手寫(xiě) RPC (04) -序列化
前面我們的例子是一個(gè)固定的出參和入?yún)?,固定的方法?shí)現(xiàn)。
本節(jié)將實(shí)現(xiàn)通用的調(diào)用,讓框架具有更廣泛的實(shí)用性。
基本思路
所有的方法調(diào)用,基于反射進(jìn)行相關(guān)處理實(shí)現(xiàn)。
服務(wù)端
核心類(lèi)
- RpcServer
調(diào)整如下:
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
// 打印日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
// 解碼 bytes=>resp
.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
// request=>bytes
.addLast(new ObjectEncoder())
.addLast(new RpcServerHandler());
}
})
// 這個(gè)參數(shù)影響的是還沒(méi)有被accept 取出的連接
.option(ChannelOption.SO_BACKLOG, 128)
// 這個(gè)參數(shù)只是過(guò)一段時(shí)間內(nèi)客戶端沒(méi)有響應(yīng),服務(wù)端會(huì)發(fā)送一個(gè) ack 包,以判斷客戶端是否還活著。
.childOption(ChannelOption.SO_KEEPALIVE, true);
其中 ObjectDecoder 和 ObjectEncoder 都是 netty 內(nèi)置的實(shí)現(xiàn)。
RpcServerHandler
package com.github.houbb.rpc.server.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author binbin.hou
* @since 0.0.1
*/
public class RpcServerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel {} connected " + id);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel read start: {}", id);
// 接受客戶端請(qǐng)求
RpcRequest rpcRequest = (RpcRequest)msg;
log.info("[Server] receive channel {} request: {}", id, rpcRequest);
// 回寫(xiě)到 client 端
DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);
ctx.writeAndFlush(rpcResponse);
log.info("[Server] channel {} response {}", id, rpcResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 處理請(qǐng)求信息
* @param rpcRequest 請(qǐng)求信息
* @return 結(jié)果信息
* @since 0.0.6
*/
private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
rpcResponse.seqId(rpcRequest.seqId());
try {
// 獲取對(duì)應(yīng)的 service 實(shí)現(xiàn)類(lèi)
// rpcRequest=>invocationRequest
// 執(zhí)行 invoke
Object result = DefaultServiceFactory.getInstance()
.invoke(rpcRequest.serviceId(),
rpcRequest.methodName(),
rpcRequest.paramTypeNames(),
rpcRequest.paramValues());
rpcResponse.result(result);
} catch (Exception e) {
rpcResponse.error(e);
log.error("[Server] execute meet ex for request", rpcRequest, e);
}
// 構(gòu)建結(jié)果值
return rpcResponse;
}
}
和以前類(lèi)似,不過(guò) handleRpcRequest 要稍微麻煩一點(diǎn)。
這里需要根據(jù)發(fā)射,調(diào)用對(duì)應(yīng)的方法。
pojo
其中使用的出參、入?yún)?shí)現(xiàn)如下:
RpcRequest
package com.github.houbb.rpc.common.rpc.domain;
import java.util.List;
/**
* 序列化相關(guān)處理
* (1)調(diào)用創(chuàng)建時(shí)間-createTime
* (2)調(diào)用方式 callType
* (3)超時(shí)時(shí)間 timeOut
*
* 額外信息:
* (1)上下文信息
*
* @author binbin.hou
* @since 0.0.6
*/
public interface RpcRequest extends BaseRpc {
/**
* 創(chuàng)建時(shí)間
* @return 創(chuàng)建時(shí)間
* @since 0.0.6
*/
long createTime();
/**
* 服務(wù)唯一標(biāo)識(shí)
* @return 服務(wù)唯一標(biāo)識(shí)
* @since 0.0.6
*/
String serviceId();
/**
* 方法名稱(chēng)
* @return 方法名稱(chēng)
* @since 0.0.6
*/
String methodName();
/**
* 方法類(lèi)型名稱(chēng)列表
* @return 名稱(chēng)列表
* @since 0.0.6
*/
List<String> paramTypeNames();
// 調(diào)用參數(shù)信息列表
/**
* 調(diào)用參數(shù)值
* @return 參數(shù)值數(shù)組
* @since 0.0.6
*/
Object[] paramValues();
}
RpcResponse
package com.github.houbb.rpc.common.rpc.domain;
/**
* 序列化相關(guān)處理
* @author binbin.hou
* @since 0.0.6
*/
public interface RpcResponse extends BaseRpc {
/**
* 異常信息
* @return 異常信息
* @since 0.0.6
*/
Throwable error();
/**
* 請(qǐng)求結(jié)果
* @return 請(qǐng)求結(jié)果
* @since 0.0.6
*/
Object result();
}
BaseRpc
package com.github.houbb.rpc.common.rpc.domain;
import java.io.Serializable;
/**
* 序列化相關(guān)處理
* @author binbin.hou
* @since 0.0.6
*/
public interface BaseRpc extends Serializable {
/**
* 獲取唯一標(biāo)識(shí)號(hào)
* (1)用來(lái)唯一標(biāo)識(shí)一次調(diào)用,便于獲取該調(diào)用對(duì)應(yīng)的響應(yīng)信息。
* @return 唯一標(biāo)識(shí)號(hào)
*/
String seqId();
/**
* 設(shè)置唯一標(biāo)識(shí)號(hào)
* @param traceId 唯一標(biāo)識(shí)號(hào)
* @return this
*/
BaseRpc seqId(final String traceId);
}
ServiceFactory-服務(wù)工廠
為了便于對(duì)所有的 service 實(shí)現(xiàn)類(lèi)統(tǒng)一管理,這里定義 service 工廠類(lèi)。
ServiceFactory
package com.github.houbb.rpc.server.service;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import java.util.List;
/**
* 服務(wù)方法類(lèi)倉(cāng)庫(kù)管理類(lèi)-接口
*
*
* (1)對(duì)外暴露的方法,應(yīng)該盡可能的少。
* (2)對(duì)于外部的調(diào)用,后期比如 telnet 治理,可以使用比如有哪些服務(wù)列表?
* 單個(gè)服務(wù)有哪些方法名稱(chēng)?
*
* 等等基礎(chǔ)信息的查詢,本期暫時(shí)全部隱藏掉。
*
* (3)前期盡可能的少暴露方法。
* @author binbin.hou
* @since 0.0.6
* @see ServiceRegistry 服務(wù)注冊(cè),將服務(wù)信息放在這個(gè)類(lèi)中,進(jìn)行統(tǒng)一的管理。
* @see ServiceMethod 方法信息
*/
public interface ServiceFactory {
/**
* 注冊(cè)服務(wù)列表信息
* @param serviceConfigList 服務(wù)配置列表
* @return this
* @since 0.0.6
*/
ServiceFactory registerServices(final List<ServiceConfig> serviceConfigList);
/**
* 直接反射調(diào)用
* (1)此處對(duì)于方法反射,為了提升性能,所有的 class.getFullName() 進(jìn)行拼接然后放進(jìn) key 中。
*
* @param serviceId 服務(wù)名稱(chēng)
* @param methodName 方法名稱(chēng)
* @param paramTypeNames 參數(shù)類(lèi)型名稱(chēng)列表
* @param paramValues 參數(shù)值
* @return 方法調(diào)用返回值
* @since 0.0.6
*/
Object invoke(final String serviceId, final String methodName,
List<String> paramTypeNames, final Object[] paramValues);
}
DefaultServiceFactory
作為默認(rèn)實(shí)現(xiàn),如下:
package com.github.houbb.rpc.server.service.impl;
import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.service.ServiceFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 默認(rèn)服務(wù)倉(cāng)庫(kù)實(shí)現(xiàn)
* @author binbin.hou
* @since 0.0.6
*/
public class DefaultServiceFactory implements ServiceFactory {
/**
* 服務(wù) map
* @since 0.0.6
*/
private Map<String, Object> serviceMap;
/**
* 直接獲取對(duì)應(yīng)的 method 信息
* (1)key: serviceId:methodName:param1@param2@param3
* (2)value: 對(duì)應(yīng)的 method 信息
*/
private Map<String, Method> methodMap;
private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();
private DefaultServiceFactory(){}
public static DefaultServiceFactory getInstance() {
return INSTANCE;
}
/**
* 服務(wù)注冊(cè)一般在項(xiàng)目啟動(dòng)的時(shí)候,進(jìn)行處理。
* 屬于比較重的操作,而且一個(gè)服務(wù)按理說(shuō)只應(yīng)該初始化一次。
* 此處加鎖為了保證線程安全。
* @param serviceConfigList 服務(wù)配置列表
* @return this
*/
@Override
public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) {
ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");
// 集合初始化
serviceMap = new HashMap<>(serviceConfigList.size());
// 這里只是預(yù)估,一般為2個(gè)服務(wù)。
methodMap = new HashMap<>(serviceConfigList.size()*2);
for(ServiceConfig serviceConfig : serviceConfigList) {
serviceMap.put(serviceConfig.id(), serviceConfig.reference());
}
// 存放方法名稱(chēng)
for(Map.Entry<String, Object> entry : serviceMap.entrySet()) {
String serviceId = entry.getKey();
Object reference = entry.getValue();
//獲取所有方法列表
Method[] methods = reference.getClass().getMethods();
for(Method method : methods) {
String methodName = method.getName();
if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
continue;
}
List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);
String key = buildMethodKey(serviceId, methodName, paramTypeNames);
methodMap.put(key, method);
}
}
return this;
}
@Override
public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) {
//參數(shù)校驗(yàn)
ArgUtil.notEmpty(serviceId, "serviceId");
ArgUtil.notEmpty(methodName, "methodName");
// 提供 cache,可以根據(jù)前三個(gè)值快速定位對(duì)應(yīng)的 method
// 根據(jù) method 進(jìn)行反射處理。
// 對(duì)于 paramTypes 進(jìn)行 string 連接處理。
final Object reference = serviceMap.get(serviceId);
final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
final Method method = methodMap.get(methodKey);
try {
return method.invoke(reference, paramValues);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RpcRuntimeException(e);
}
}
/**
* (1)多個(gè)之間才用 : 分隔
* (2)參數(shù)之間采用 @ 分隔
* @param serviceId 服務(wù)標(biāo)識(shí)
* @param methodName 方法名稱(chēng)
* @param paramTypeNames 參數(shù)類(lèi)型名稱(chēng)
* @return 構(gòu)建完整的 key
* @since 0.0.6
*/
private String buildMethodKey(String serviceId, String methodName, List<String> paramTypeNames) {
String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);
return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON
+param;
}
}
ServiceRegistry-服務(wù)注冊(cè)類(lèi)
接口
package com.github.houbb.rpc.server.registry;
/**
* 服務(wù)注冊(cè)類(lèi)
* (1)每個(gè)應(yīng)用唯一
* (2)每個(gè)服務(wù)的暴露協(xié)議應(yīng)該保持一致
* 暫時(shí)不提供單個(gè)服務(wù)的特殊處理,后期可以考慮添加
*
* @author binbin.hou
* @since 0.0.6
*/
public interface ServiceRegistry {
/**
* 暴露的 rpc 服務(wù)端口信息
* @param port 端口信息
* @return this
* @since 0.0.6
*/
ServiceRegistry port(final int port);
/**
* 注冊(cè)服務(wù)實(shí)現(xiàn)
* @param serviceId 服務(wù)標(biāo)識(shí)
* @param serviceImpl 服務(wù)實(shí)現(xiàn)
* @return this
* @since 0.0.6
*/
ServiceRegistry register(final String serviceId, final Object serviceImpl);
/**
* 暴露所有服務(wù)信息
* (1)啟動(dòng)服務(wù)端
* @return this
* @since 0.0.6
*/
ServiceRegistry expose();
}
實(shí)現(xiàn)
package com.github.houbb.rpc.server.registry.impl;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.core.RpcServer;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import java.util.ArrayList;
import java.util.List;
/**
* 默認(rèn)服務(wù)端注冊(cè)類(lèi)
* @author binbin.hou
* @since 0.0.6
*/
public class DefaultServiceRegistry implements ServiceRegistry {
/**
* 單例信息
* @since 0.0.6
*/
private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();
/**
* rpc 服務(wù)端端口號(hào)
* @since 0.0.6
*/
private int rpcPort;
/**
* 協(xié)議配置
* (1)默認(rèn)只實(shí)現(xiàn) tcp
* (2)后期可以拓展實(shí)現(xiàn) web-service/http/https 等等。
* @since 0.0.6
*/
private ProtocolConfig protocolConfig;
/**
* 服務(wù)配置列表
* @since 0.0.6
*/
private List<ServiceConfig> serviceConfigList;
private DefaultServiceRegistry(){
// 初始化默認(rèn)參數(shù)
this.serviceConfigList = new ArrayList<>();
this.rpcPort = 9527;
}
public static DefaultServiceRegistry getInstance() {
return INSTANCE;
}
@Override
public ServiceRegistry port(int port) {
ArgUtil.positive(port, "port");
this.rpcPort = port;
return this;
}
/**
* 注冊(cè)服務(wù)實(shí)現(xiàn)
* (1)主要用于后期服務(wù)調(diào)用
* (2)如何根據(jù) id 獲取實(shí)現(xiàn)?非常簡(jiǎn)單,id 是唯一的。
* 有就是有,沒(méi)有就拋出異常,直接返回。
* (3)如果根據(jù) {@link com.github.houbb.rpc.common.rpc.domain.RpcRequest} 獲取對(duì)應(yīng)的方法。
*
* 3.1 根據(jù) serviceId 獲取唯一的實(shí)現(xiàn)
* 3.2 根據(jù) {@link Class#getMethod(String, Class[])} 方法名稱(chēng)+參數(shù)類(lèi)型唯一獲取方法
* 3.3 根據(jù) {@link java.lang.reflect.Method#invoke(Object, Object...)} 執(zhí)行方法
*
* @param serviceId 服務(wù)標(biāo)識(shí)
* @param serviceImpl 服務(wù)實(shí)現(xiàn)
* @return this
* @since 0.0.6
*/
@Override
@SuppressWarnings("unchecked")
public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
ArgUtil.notEmpty(serviceId, "serviceId");
ArgUtil.notNull(serviceImpl, "serviceImpl");
// 構(gòu)建對(duì)應(yīng)的其他信息
ServiceConfig serviceConfig = new DefaultServiceConfig();
serviceConfig.id(serviceId).reference(serviceImpl);
serviceConfigList.add(serviceConfig);
return this;
}
@Override
public ServiceRegistry expose() {
// 注冊(cè)所有服務(wù)信息
DefaultServiceFactory.getInstance()
.registerServices(serviceConfigList);
// 暴露 netty server 信息
new RpcServer(rpcPort).start();
return this;
}
}
ServiceConfig 是一些服務(wù)的配置信息,接口定義如下:
package com.github.houbb.rpc.server.config.service;
/**
* 單個(gè)服務(wù)配置類(lèi)
*
* 簡(jiǎn)化用戶使用:
* 在用戶使用的時(shí)候,這個(gè)類(lèi)應(yīng)該是不可見(jiàn)的。
* 直接提供對(duì)應(yīng)的服務(wù)注冊(cè)類(lèi)即可。
*
* 后續(xù)拓展
* (1)版本信息
* (2)服務(wù)端超時(shí)時(shí)間
*
* @author binbin.hou
* @since 0.0.6
* @param <T> 實(shí)現(xiàn)類(lèi)泛型
*/
public interface ServiceConfig<T> {
/**
* 獲取唯一標(biāo)識(shí)
* @return 獲取唯一標(biāo)識(shí)
* @since 0.0.6
*/
String id();
/**
* 設(shè)置唯一標(biāo)識(shí)
* @param id 標(biāo)識(shí)信息
* @return this
* @since 0.0.6
*/
ServiceConfig<T> id(String id);
/**
* 獲取引用實(shí)體實(shí)現(xiàn)
* @return 實(shí)體實(shí)現(xiàn)
* @since 0.0.6
*/
T reference();
/**
* 設(shè)置引用實(shí)體實(shí)現(xiàn)
* @param reference 引用實(shí)現(xiàn)
* @return this
* @since 0.0.6
*/
ServiceConfig<T> reference(T reference);
}
測(cè)試
maven 引入
引入服務(wù)端的對(duì)應(yīng) maven 包:
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>rpc-server</artifactId>
<version>0.0.6</version>
</dependency>
服務(wù)端啟動(dòng)
// 啟動(dòng)服務(wù)
DefaultServiceRegistry.getInstance()
.register(ServiceIdConst.CALC, new CalculatorServiceImpl())
.expose();
這里注冊(cè)了一個(gè)計(jì)算服務(wù),并且設(shè)置對(duì)應(yīng)的實(shí)現(xiàn)。
和以前實(shí)現(xiàn)類(lèi)似,此處不再贅述。
啟動(dòng)日志:
[DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)開(kāi)始啟動(dòng)服務(wù)端
十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xec4dc74f] REGISTERED
十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527
十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
[INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)端啟動(dòng)完成,監(jiān)聽(tīng)【9527】端口
ps: 寫(xiě)到這里忽然發(fā)現(xiàn)忘記添加對(duì)應(yīng)的 register 日志了,這里可以添加對(duì)應(yīng)的 registerListener 拓展。
小結(jié)
為了便于大家學(xué)習(xí),以上源碼已經(jīng)開(kāi)源:
希望本文對(duì)你有所幫助,如果喜歡,歡迎點(diǎn)贊收藏轉(zhuǎn)發(fā)一波。
我是老馬,期待與你的下次重逢。