項目背景
中途加入一個團隊,團隊實現(xiàn)一個私有化的分布式服務,因為是私有化,不用大規(guī)模集群,服務已經(jīng)有個簡易RPC,
實現(xiàn)思路大致是:
1 暴露一個服務需要自己定義一個服務handler
2 消費者通過http發(fā)送輸入輸出函數(shù)名稱到服務提供者
3 http獲取結果反序列化獲得服務輸出
概括來說就是http強行rpc,而且復雜的輸入輸出配置還不如直接用http來實現(xiàn)?!爸貥嫛笔强吹竭@個實現(xiàn)的第一反應。
重構之前準備
1 了解一下為什么之前rpc協(xié)議寫成這個樣子,大概得到幾個觀點:
* 之前是demo期,快速實現(xiàn)需求
* 商戶不希望開多個端口,所有盡量復用http服務的端口
* 在運行中注入提供方的url,進行多個不同服務的直連
* 私有化小規(guī)模部署,不想用復雜的注冊中心
* 私有化小規(guī)模部署,簡化安裝依賴
2 團隊情況
* Springboot,java和python語言
* 熟練Dubbo,了解grpc
主流RPC框架
- thrift
- ZeroC Ice
- gRPC
- Dubbo
thrift
參考:https://blog.csdn.net/lupengfei1009/article/details/100934794
Facebook研發(fā)的輕量級RPC,可以配合服務器一起運行。
支持多語言
二進制格式傳輸數(shù)據(jù),比XML和JSON體積小,適合高并發(fā),大數(shù)據(jù)量,多語言的環(huán)境。
C/S結構
不選擇的原因:
1 需要額外的rpc端口
2 c / s結構,明確的服務端和客戶端,但是實際上,rpc更多時候,一個服務既是客戶端又是服務端。
3 springboot集成有一個依賴包,mac下開發(fā)可能需要安裝thrift才能使用代碼生成器(這個點不是核心考慮點)
4 團隊不太熟悉,但是學習成本很低,也不是核心考慮點
Dubbo
Dubbo的強大行業(yè)內(nèi)已經(jīng)眾所周知了,其原理圖如下:

但是:
1 直連模式(無注冊中心)在注解中配置(單個url),無法在運行中動態(tài)指定多個不同節(jié)點。
2 直連模式調試Reference的時候遇到一些bug(中途有一些特殊的空指針異常),提了issue建議升級版本,沒有提供者的時候,注入Reference為空,啟動服務提供者之后,Reference不會重新注入(不知道新版是否解決)。
3 也需要開放特定的Dubbo端口
簡易RPC框架實現(xiàn)
為了迎合項目特定的需求,考慮簡化實現(xiàn)一個RPC,幾乎沒有多余的第三方依賴,使用方式類似Dubbo,實現(xiàn)方式如下:
1 通過 @RPCService 注入 服務提供者,直接利用Springboot的IOC注入
2 通過 @RPCReference 注入 服務消費者,服務通過動態(tài)代理,調用遠程服務
3 通過setNode在運行中配置直連節(jié)點信息,比如節(jié)點端口,ip,httpPath,host等。(因為http直接通信,復用了應用容器的http端口,不需要新的端口)
代碼結構
├── core
│ ├── config
│ │ ├── ConsumerConfig.java 消費者配置
│ │ ├── ProviderConfig.java 提供者配置
│ │ └── annotation
│ │ ├── RPCReference.java 消費者注解
│ │ └── RPCService.java 提供者注解
│ ├── context
│ │ ├── ConsumerContext.java 消費者上下文
│ │ └── ProviderContext.java 提供者上下文
│ ├── domain
│ │ ├── Node.java 節(jié)點信息,用于配置特定的ip,端口 ,token等
│ │ ├── RPCRequest.java 消費者發(fā)送的數(shù)據(jù)結構
│ │ ├── RPCResponse.java 消費者從提供者那得到的返回結果
│ │ ├── RPCSerializer.java 序列化協(xié)議實現(xiàn)
│ │ └── interfaces
│ │ ├── Formater.java
│ │ └── Parser.java
│ ├── exception
│ │ └── RPCException.java 自定義異常
│ └── proxy
│ ├── ConsumerProxyFactory.java (rpc核心邏輯)消費者調用放的動態(tài)代理,是實際執(zhí)行的方法
│ └── ProviderProxyFactory.java (rpc核心邏輯)注入提供者服務,以及服務通過反射實際執(zhí)行
├── spring
│ └── factory
│ ├── RPCClassPathBeanDefinitionScanner.java 利用IOC注入服務
│ ├── RPCReferenceAnnotationBeanPostProcessor.java 利用IOC注入服務并通過反射替換Reference的實際方法
│ └── RPCServiceAnnotationBeanPostProcessor.java 利用IOC注入服務
└── utils
├── HttpClientUtil.java 通信用的Http工具類
└── HttpUtil.java
核心代碼 @RPCServide服務的注入
@Component
@Slf4j
public class RPCServiceAnnotationBeanPostProcessor implements BeanFactoryPostProcessor, ApplicationContextAware {
private final Logger logger = LoggerFactory.getLogger(getClass());
private ApplicationContext applicationContext;
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
RPCClassPathBeanDefinitionScanner scanner = new RPCClassPathBeanDefinitionScanner(registry);
scanner.setResourceLoader(this.applicationContext);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
scanner.addIncludeFilter(new AnnotationTypeFilter(RPCService.class));
scanner.scan("");
}
/**
* It'd better to use BeanNameGenerator instance that should reference
* {@link ConfigurationClassPostProcessor},
* thus it maybe a potential problem on bean name generation.
*
* @param registry {@link BeanDefinitionRegistry}
* @return {@link BeanNameGenerator} instance
* @see SingletonBeanRegistry
* @see AnnotationConfigUtils#CONFIGURATION_BEAN_NAME_GENERATOR
* @see ConfigurationClassPostProcessor#processConfigBeanDefinitions
* @since 2.5.8
*/
private BeanNameGenerator resolveBeanNameGenerator(BeanDefinitionRegistry registry) {
BeanNameGenerator beanNameGenerator = null;
if (registry instanceof SingletonBeanRegistry) {
SingletonBeanRegistry singletonBeanRegistry = SingletonBeanRegistry.class.cast(registry);
beanNameGenerator = (BeanNameGenerator) singletonBeanRegistry.getSingleton(CONFIGURATION_BEAN_NAME_GENERATOR);
}
if (beanNameGenerator == null) {
if (logger.isInfoEnabled()) {
logger.info("BeanNameGenerator bean can't be found in BeanFactory with name ["
+ CONFIGURATION_BEAN_NAME_GENERATOR + "]");
logger.info("BeanNameGenerator will be a instance of " +
AnnotationBeanNameGenerator.class.getName() +
" , it maybe a potential problem on bean name generation.");
}
beanNameGenerator = new AnnotationBeanNameGenerator();
}
return beanNameGenerator;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
核心代碼 @RPCReferenceServide服務的注入
注入過程類似,不過增加了一個注解下方法的判斷
如果被RPCReferenceService注解,動態(tài)代理到新的方法上
核心代碼如下:
private void setField(Object bean, Field field) {
if (!field.isAccessible()) {
field.setAccessible(true);
}
try {
// field.set(bean, applicationContext.getBean(field.getType()));
if (field.getType().isInterface()) {
field.set(bean, applicationContext.getBean(ConsumerProxyFactory.class).create(field.getType()));
} else {
field.set(bean, applicationContext.getBean(ConsumerProxyFactory.class).create(field.getType().getInterfaces()[0]));
}
} catch (Exception e) {
e.printStackTrace();
}
log.info("set field :", field.getName());
}
這里既然用到了動態(tài)代理,補充一下,動態(tài)代理主要有兩種方式:
- JDK動態(tài)代理,jre提供給我們的類庫,代理接口
- cglib動態(tài)代理,通過“繼承”可以繼承父類所有的公開方法,然后可以重寫這些方法,在重寫時對這些方法增強,這就是cglib的思想。
此處用的是JDK動態(tài)代理,不依賴第三方安裝包,但是如果要追求靈活性的話,cglib其實是更好的選擇。其中動態(tài)代理的核心邏輯如下:
@Slf4j
public class ConsumerProxyFactory implements InvocationHandler {
private ConcurrentHashMap<String, ConsumerConfig> consumerConfigConcurrentHashMap = new ConcurrentHashMap<>();
/* create()創(chuàng)建工廠bean speakInterface Class<?> interfaceClass = Class.forName(clazz); */
public Object create(Class<?> interfaceClass) {
return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);
}
/**
* 實現(xiàn)InvocationHandler的接口<br>
* 遇到的問題,Spring容器可能訪問被代理類的的實例的toString()方法
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 這里通過反射的方式寫實際的方法執(zhí)行邏輯
// 發(fā)送實際的HTTP請求,發(fā)送:類名,方法名,參數(shù)名,參數(shù)
}
}
提供方http獲取string反序列化之后,獲取容器中注入的方法
public Object getBeanByClass(Class<?> clazz) throws RPCException {
Object bean = providers.get(clazz);
if (bean != null) {
return bean;
}
throw new RPCException("xxx", clazz);
}
然后通過反射執(zhí)行返回結果http返回給消費者。
消費方定義一個controller來接收請求,作為通信橋梁
@Autowired
private ProviderProxyFactory providerProxyFactory;
@RequestMapping(value = "/rpc", method = RequestMethod.POST)
public String rpc(String resquestString) {
String result = providerProxyFactory.handleHttpContent(resquestString);
return result;
}
使用方式舉例:
1 接口IUser
2 通過RPCService注入服務UserService作為提供者
@RPCService
public class UserService implements IUser{
public String getName(String id) {
return "name:" + id;
}
}
}
3 注入ip和port,調用本地服務和遠程服務
@RequestMapping("/rpc")
@RestController
public class UserController {
@Autowired
private UserService userService; // Springboot的注入方式,本地服務
@RPCReference
private IUser userService2; // 注入遠程服務
@RequestMapping(value = "/getUserName", method = RequestMethod.POST)
public JSONObject add(@RequestBody User user) {
String a = userService.getName("1"); // 輸出“name:1”
Node node = new Node();
node.setIp("172.16.105.142");
node.setPort("8088");
userService2.setNode(node);
String b = userService2.getName("2"); // 遠程服務返回"name:2"
JSONObject result = new JSONObject();
result.put("local", a);
result.put("remote", b);
return result;
}
}
改進
1 有的項目結構是
- root_path
- package1
- package2
- WebApplication.java
在WebApplication中引用package1中內(nèi)容,通常Springboot通過@ComponentScan(root_path)來指定掃描跟路徑。
- WebApplication.java
所以在rpc注入provider的時候,也提供一個掃描根路徑的注解配置(也是仿的Dubbo)
在注解中使用import
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(SimpleRPCComponentScanRegistrar.class) // 這里
public @interface SimpleRPCComponentScan {
String[] value() default {};
String[] basePackages() default {};
}
import的SimpleRPCComponentScanRegistrar繼承ImportBeanDefinitionRegistrar,并覆寫registerBeanDefinitions方法
public class SimpleRPCComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
}
}
使用方式
@SpringBootApplication
@SimpleRPCComponentScan("xxx") // 這里
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
2 在注入提供者的時候注入的類可能被代理過,導致獲取的接口名字不準確,解決方式,使用AopUtils.getTargetClass(obj) 獲取真正的類