RPC框架和簡易RPC實現(xiàn)

項目背景

中途加入一個團隊,團隊實現(xiàn)一個私有化的分布式服務,因為是私有化,不用大規(guī)模集群,服務已經(jīng)有個簡易RPC,
實現(xiàn)思路大致是:
1 暴露一個服務需要自己定義一個服務handler
2 消費者通過http發(fā)送輸入輸出函數(shù)名稱到服務提供者
3 http獲取結果反序列化獲得服務輸出

概括來說就是http強行rpc,而且復雜的輸入輸出配置還不如直接用http來實現(xiàn)?!爸貥嫛笔强吹竭@個實現(xiàn)的第一反應。

重構之前準備

1 了解一下為什么之前rpc協(xié)議寫成這個樣子,大概得到幾個觀點:
* 之前是demo期,快速實現(xiàn)需求
* 商戶不希望開多個端口,所有盡量復用http服務的端口
* 在運行中注入提供方的url,進行多個不同服務的直連
* 私有化小規(guī)模部署,不想用復雜的注冊中心
* 私有化小規(guī)模部署,簡化安裝依賴
2 團隊情況
* Springboot,java和python語言
* 熟練Dubbo,了解grpc

主流RPC框架

  • thrift
  • ZeroC Ice
  • gRPC
  • Dubbo

thrift

參考:https://blog.csdn.net/lupengfei1009/article/details/100934794

Facebook研發(fā)的輕量級RPC,可以配合服務器一起運行。
支持多語言
二進制格式傳輸數(shù)據(jù),比XML和JSON體積小,適合高并發(fā),大數(shù)據(jù)量,多語言的環(huán)境。
C/S結構

不選擇的原因:
1 需要額外的rpc端口
2 c / s結構,明確的服務端和客戶端,但是實際上,rpc更多時候,一個服務既是客戶端又是服務端。
3 springboot集成有一個依賴包,mac下開發(fā)可能需要安裝thrift才能使用代碼生成器(這個點不是核心考慮點)
4 團隊不太熟悉,但是學習成本很低,也不是核心考慮點

Dubbo

Dubbo的強大行業(yè)內(nèi)已經(jīng)眾所周知了,其原理圖如下:


image.png

但是:
1 直連模式(無注冊中心)在注解中配置(單個url),無法在運行中動態(tài)指定多個不同節(jié)點。
2 直連模式調試Reference的時候遇到一些bug(中途有一些特殊的空指針異常),提了issue建議升級版本,沒有提供者的時候,注入Reference為空,啟動服務提供者之后,Reference不會重新注入(不知道新版是否解決)。
3 也需要開放特定的Dubbo端口

簡易RPC框架實現(xiàn)

為了迎合項目特定的需求,考慮簡化實現(xiàn)一個RPC,幾乎沒有多余的第三方依賴,使用方式類似Dubbo,實現(xiàn)方式如下:
1 通過 @RPCService 注入 服務提供者,直接利用Springboot的IOC注入
2 通過 @RPCReference 注入 服務消費者,服務通過動態(tài)代理,調用遠程服務
3 通過setNode在運行中配置直連節(jié)點信息,比如節(jié)點端口,ip,httpPath,host等。(因為http直接通信,復用了應用容器的http端口,不需要新的端口)

代碼結構

├── core
│ ├── config
│ │ ├── ConsumerConfig.java 消費者配置
│ │ ├── ProviderConfig.java 提供者配置
│ │ └── annotation
│ │ ├── RPCReference.java 消費者注解
│ │ └── RPCService.java 提供者注解
│ ├── context
│ │ ├── ConsumerContext.java 消費者上下文
│ │ └── ProviderContext.java 提供者上下文
│ ├── domain
│ │ ├── Node.java 節(jié)點信息,用于配置特定的ip,端口 ,token等
│ │ ├── RPCRequest.java 消費者發(fā)送的數(shù)據(jù)結構
│ │ ├── RPCResponse.java 消費者從提供者那得到的返回結果
│ │ ├── RPCSerializer.java 序列化協(xié)議實現(xiàn)
│ │ └── interfaces
│ │ ├── Formater.java
│ │ └── Parser.java
│ ├── exception
│ │ └── RPCException.java 自定義異常
│ └── proxy
│ ├── ConsumerProxyFactory.java (rpc核心邏輯)消費者調用放的動態(tài)代理,是實際執(zhí)行的方法
│ └── ProviderProxyFactory.java (rpc核心邏輯)注入提供者服務,以及服務通過反射實際執(zhí)行
├── spring
│ └── factory
│ ├── RPCClassPathBeanDefinitionScanner.java 利用IOC注入服務
│ ├── RPCReferenceAnnotationBeanPostProcessor.java 利用IOC注入服務并通過反射替換Reference的實際方法
│ └── RPCServiceAnnotationBeanPostProcessor.java 利用IOC注入服務
└── utils
├── HttpClientUtil.java 通信用的Http工具類
└── HttpUtil.java

核心代碼 @RPCServide服務的注入

@Component
@Slf4j
public class RPCServiceAnnotationBeanPostProcessor implements BeanFactoryPostProcessor, ApplicationContextAware {


    private final Logger logger = LoggerFactory.getLogger(getClass());

    private ApplicationContext applicationContext;

    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
        RPCClassPathBeanDefinitionScanner scanner = new RPCClassPathBeanDefinitionScanner(registry);

        scanner.setResourceLoader(this.applicationContext);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
        scanner.setBeanNameGenerator(beanNameGenerator);

        scanner.addIncludeFilter(new AnnotationTypeFilter(RPCService.class));

        scanner.scan("");
    }


    /**
     * It'd better to use BeanNameGenerator instance that should reference
     * {@link ConfigurationClassPostProcessor},
     * thus it maybe a potential problem on bean name generation.
     *
     * @param registry {@link BeanDefinitionRegistry}
     * @return {@link BeanNameGenerator} instance
     * @see SingletonBeanRegistry
     * @see AnnotationConfigUtils#CONFIGURATION_BEAN_NAME_GENERATOR
     * @see ConfigurationClassPostProcessor#processConfigBeanDefinitions
     * @since 2.5.8
     */
    private BeanNameGenerator resolveBeanNameGenerator(BeanDefinitionRegistry registry) {

        BeanNameGenerator beanNameGenerator = null;

        if (registry instanceof SingletonBeanRegistry) {
            SingletonBeanRegistry singletonBeanRegistry = SingletonBeanRegistry.class.cast(registry);
            beanNameGenerator = (BeanNameGenerator) singletonBeanRegistry.getSingleton(CONFIGURATION_BEAN_NAME_GENERATOR);
        }

        if (beanNameGenerator == null) {

            if (logger.isInfoEnabled()) {

                logger.info("BeanNameGenerator bean can't be found in BeanFactory with name ["
                        + CONFIGURATION_BEAN_NAME_GENERATOR + "]");
                logger.info("BeanNameGenerator will be a instance of " +
                        AnnotationBeanNameGenerator.class.getName() +
                        " , it maybe a potential problem on bean name generation.");
            }
            beanNameGenerator = new AnnotationBeanNameGenerator();

        }

        return beanNameGenerator;
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

核心代碼 @RPCReferenceServide服務的注入

注入過程類似,不過增加了一個注解下方法的判斷
如果被RPCReferenceService注解,動態(tài)代理到新的方法上
核心代碼如下:

private void setField(Object bean, Field field) {
        if (!field.isAccessible()) {
            field.setAccessible(true);
        }
        try {
            // field.set(bean, applicationContext.getBean(field.getType()));
            if (field.getType().isInterface()) {
                field.set(bean, applicationContext.getBean(ConsumerProxyFactory.class).create(field.getType()));
            } else {
                field.set(bean, applicationContext.getBean(ConsumerProxyFactory.class).create(field.getType().getInterfaces()[0]));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("set field :", field.getName());
    }

這里既然用到了動態(tài)代理,補充一下,動態(tài)代理主要有兩種方式:

  • JDK動態(tài)代理,jre提供給我們的類庫,代理接口
  • cglib動態(tài)代理,通過“繼承”可以繼承父類所有的公開方法,然后可以重寫這些方法,在重寫時對這些方法增強,這就是cglib的思想。

此處用的是JDK動態(tài)代理,不依賴第三方安裝包,但是如果要追求靈活性的話,cglib其實是更好的選擇。其中動態(tài)代理的核心邏輯如下:

@Slf4j
public class ConsumerProxyFactory implements InvocationHandler {

    private ConcurrentHashMap<String, ConsumerConfig> consumerConfigConcurrentHashMap = new ConcurrentHashMap<>();

    /* create()創(chuàng)建工廠bean speakInterface Class<?> interfaceClass = Class.forName(clazz); */
    public Object create(Class<?> interfaceClass) {
        return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);
    }

    /**
     * 實現(xiàn)InvocationHandler的接口<br>
     * 遇到的問題,Spring容器可能訪問被代理類的的實例的toString()方法
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
     // 這里通過反射的方式寫實際的方法執(zhí)行邏輯
     // 發(fā)送實際的HTTP請求,發(fā)送:類名,方法名,參數(shù)名,參數(shù)
    }
}

提供方http獲取string反序列化之后,獲取容器中注入的方法

    public Object getBeanByClass(Class<?> clazz) throws RPCException {
        Object bean = providers.get(clazz);
        if (bean != null) {
            return bean;
        }
        throw new RPCException("xxx", clazz);
    }

然后通過反射執(zhí)行返回結果http返回給消費者。

消費方定義一個controller來接收請求,作為通信橋梁

@Autowired
  private ProviderProxyFactory providerProxyFactory;

  @RequestMapping(value = "/rpc", method = RequestMethod.POST)
  public String rpc(String resquestString) {
    String result = providerProxyFactory.handleHttpContent(resquestString);
    return result;
  }

使用方式舉例:

1 接口IUser
2 通過RPCService注入服務UserService作為提供者

@RPCService
public class UserService implements IUser{
  public String getName(String id) {
    return "name:" + id;
  }
 }
}

3 注入ip和port,調用本地服務和遠程服務

@RequestMapping("/rpc")
@RestController
public class UserController {

  @Autowired
  private UserService userService; // Springboot的注入方式,本地服務

  @RPCReference
  private IUser userService2; // 注入遠程服務

  @RequestMapping(value = "/getUserName", method = RequestMethod.POST)
  public JSONObject add(@RequestBody User user) {
    String a = userService.getName("1");  // 輸出“name:1”

    Node node = new Node();
    node.setIp("172.16.105.142");
    node.setPort("8088");
    userService2.setNode(node);
    String b = userService2.getName("2");  // 遠程服務返回"name:2"

    JSONObject result = new JSONObject();
    result.put("local", a);
    result.put("remote", b);
    return result;
  }
}

改進

1 有的項目結構是

  • root_path
    • package1
    • package2
      • WebApplication.java
        在WebApplication中引用package1中內(nèi)容,通常Springboot通過@ComponentScan(root_path)來指定掃描跟路徑。

所以在rpc注入provider的時候,也提供一個掃描根路徑的注解配置(也是仿的Dubbo)

在注解中使用import

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(SimpleRPCComponentScanRegistrar.class) // 這里
public @interface SimpleRPCComponentScan {

    String[] value() default {};

    String[] basePackages() default {};
}

import的SimpleRPCComponentScanRegistrar繼承ImportBeanDefinitionRegistrar,并覆寫registerBeanDefinitions方法

public class SimpleRPCComponentScanRegistrar implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);

        registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
    }
}

使用方式

@SpringBootApplication
@SimpleRPCComponentScan("xxx") // 這里
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

2 在注入提供者的時候注入的類可能被代理過,導致獲取的接口名字不準確,解決方式,使用AopUtils.getTargetClass(obj) 獲取真正的類

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容