需求:替換ali lightApi 動態(tài)rpc的實現(xiàn),因為api為商業(yè)版,不是開源的,是基于pandora 的EDAS平臺的。那么我們?nèi)绾螌崿F(xiàn)開源的動態(tài)RPC呢?
定義:動態(tài)RPC指的是,可以動態(tài)的讓一個服務(wù)上線和下線,換句話說就是動態(tài)的從注冊中心剔除服務(wù),而不是停止這個服務(wù),啟動這個服務(wù)。這個需求是來自鏈路的原路返回
簡介 vertx:
vertx 是一套全異步的基于netty通信的框架,Vertx中核心組建有 verticle, eventbus,?circuit breaker,service discovery and register, Router等等。后期我會一一的去探索這里面的組件的原理。而本文介紹的vertx 的動態(tài)rpc,就是使用vertx proxy service 這個功能來實現(xiàn)的。另外我們得知道vertx 中的線程模型是基于netty的event loop,一個verticle 是一個微服務(wù),而且具有HA的機制的微服務(wù)。服務(wù)之間的通信是根據(jù)event bus 的netty 通信機制。但是event bus 通信不是100%的可靠的(這點很要命,后期我會寫博客探索的)。
此外vertx需要依賴 分布式緩存建立集群(hazelcast, ignite等),基于gossip協(xié)議的p2p網(wǎng)絡(luò)。
探索過程:
如果要實現(xiàn)需求的話,首先第一想到的還是spring cloud,dubbo,thrift 等RPC框架,hsf 是阿里pandora rpc 框架,肯定不能用。但是經(jīng)一番折騰之后,沒有可以讓我覺得可以在代碼里面直接讓服務(wù)上線下線,從注冊中心剔除掉或者注冊到注冊中心的。那怎么辦呢?曾經(jīng)也想過會用rabbitmq,這樣的MQ 去做動態(tài)的RPC,但是發(fā)現(xiàn)很復(fù)雜,關(guān)鍵很難做scalablity,如果有上千個鏈路,就需要上千個topic,或者tag 這類的標(biāo)記。感覺很復(fù)雜,也很難維護。
solutions: vertx proxy service?
這個方案上,我們不打算使用verticle的概念,僅僅使用vertx 代理服務(wù)的概念。首先我們要使用vertx 的服務(wù)發(fā)現(xiàn)和注冊,其次我們要使用proxy service。
第一個坑:代理服務(wù)的自動生成。請在pom或者gradle里面加入 生成代理的依賴(對接口的代理)。然后創(chuàng)建 package-info.java,在root package。所謂的root package就是基package,web developer 開的springboot 應(yīng)該都很清楚,需要將app.class 建在基package,以便掃描都可以掃描到
? ? <groupId>io.vertx</groupId>
? ? <artifactId>vertx-service-proxy</artifactId>
? ? <classifier>processor</classifier>
? ? <groupId>io.vertx</groupId>
? ? <artifactId>vertx-codegen</artifactId>
? ? <classifier>processor</classifier>
@ModuleGen(name ="ap-common-vertx", groupPackage ="com.xxxx.xx.xx.vertx")
package com.xxxx.xx.xx.vertx;
import io.vertx.codegen.annotations.ModuleGen;
其中g(shù)roupPackage ="com.xxxx.xx.xx.vertx"), 是基package, 也可以是 你定義interface 所在的包。完成之后,使用maven clean install, 你就會發(fā)現(xiàn)在target 目錄下面會有 ...Proxy 的class,這兩個class 就是代理類,對于原理,后面的博客我會慢慢分析。
第二個坑是:服務(wù)發(fā)現(xiàn)publish 服務(wù)之后,需要注冊服務(wù)代理,不然注冊的服務(wù),在event bus 上面是找不到的,看看代碼怎么寫吧:
Record record = EventBusService.createRecord(servicePublishRequestBean.getServiceName(), servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz());
servicePublishRequestBean.getDiscovery().publish(record, ar -> {
if (ar.succeeded() && ar.result() !=null) {
publishedRecords.add(record);
? ? ? ? recordMessageConsumerMap.putIfAbsent(record, ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));
? ? }
handler.handle(ar.map(ar.result()));
});
servicePublishRequestBean就是我封裝的bean,里面de屬性有:
private StringserviceName;
private StringeventBusAddress;
private ServiceDiscoverydiscovery;
private Classclazz;
private T service;
此外我們一定要注冊這樣的proxy service才能夠生效,不然是沒有用的:
ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));
看看里面怎么寫的
public static MessageConsumerregisterProxyService(String eventBusAddress, Class clazz, T service) {
return ServiceBinderUtil.getBinderInstance()
.setAddress(eventBusAddress)
.register(clazz, service);
}
第三個坑是: 和第二個坑一樣的,需要注銷服務(wù),注銷服務(wù)的話,我們也要調(diào)用proxyService 去把服務(wù)注銷了,而不能單純的調(diào)用 service discovery and register unpublish 方法,這個是很惡心的:看看我怎么寫的把,一些很細節(jié)的東西需要自己探索,我不會貼出所有的東西的。比如下面的discover record 有許多的狀態(tài),這里一不小心就會找不到你所發(fā)布的服務(wù),另外注銷代理服務(wù),需要個奇怪的參數(shù),這個參數(shù)我存儲在map里面:recordMessageConsumerMap, 在服務(wù)publish時候,就會生成的。
discovery.getRecord(info -> info.getName().equals(serviceName), res -> {
if (res.succeeded() && res.result() !=null && res.result().getStatus() == Status.UP) {
Record record = res.result();
? ? ? ? ? ? ? ? discovery.unpublish(record.getRegistration(), ar -> {
if(ar.succeeded()) {
List records =recordMessageConsumerMap.keySet().stream().filter(map-> map.getName().equals(serviceName)).collect(Collectors.toList());
? ? ? ? ? ? ? ? ? ? ? ? offlineRecord(records);
? ? ? ? ? ? ? ? ? ? }
handler.handle(ar.map((Void)null));
? ? ? ? ? ? ? ? });
? ? ? ? ? ? }else {
handler.handle(res.map((Void)null));
? ? ? ? ? ? }
}
);
OfflineRecord 主要是調(diào)用ProxyServiceUtil.unregisterProxyService(recordMessageConsumerMap.get(records.get(0)));,其中unregisterProxyService 方法的實現(xiàn)如下,是不是比較簡單?
ServiceBinderUtil.getBinderInstance().unregister(consumer);
然后基本上做到這里,vertx 動態(tài)的rpc 就可以實現(xiàn)了,主要是用了vertx 服務(wù)發(fā)現(xiàn)組件的,publish, unpublish方式,和proxy service 的 register和unregister方法。但是里面的坑比較多。此外除了上面所描述的坑之外,我還想告訴小伙伴們,event bus 通信exception 會出現(xiàn)一些奇怪的錯誤,所有event bus 通信也是個坑,其實proxy sevice 本質(zhì)就是 event bus,所以event bus 網(wǎng)絡(luò)連接不通的話,確實很頭疼,經(jīng)過我這邊的測試,不管ecs 集群的部署,還是ecs 和docker 的混合部署,網(wǎng)絡(luò)都是可以通的(我使用的是ignite分布式緩存)。暫時我先貼出來關(guān)于網(wǎng)絡(luò)的代碼,如果你們遇到了問題,先暫時按照我這個來,不會讓你很惱火。
@Bean
public IgniteConfigurationgetIgniteSelfConfiguration()throws Exception{
IgniteConfiguration igniteConfiguration =new IgniteConfiguration();
? ? igniteConfiguration.setClientMode(false);
? ? igniteConfiguration.setPeerClassLoadingEnabled(true);
? ? igniteConfiguration.setDeploymentMode(DeploymentMode.CONTINUOUS);
? ? igniteConfiguration.setPeerClassLoadingMissedResourcesCacheSize(0);
? ? igniteConfiguration.setDiscoverySpi(getTcpDiscoverySpi());
? ? igniteConfiguration.setCacheConfiguration(getCacheConfiguration());
? // igniteConfiguration.setLocalHost(IPUtil.getLocalIp());
? ? return igniteConfiguration;
}
@Bean
public TcpDiscoverySpigetTcpDiscoverySpi()throws Exception{
TcpDiscoverySpi tcpDiscoverySpi =new TcpDiscoverySpi();
? ? tcpDiscoverySpi.setIpFinder(getTcpDiscoveryMulticastIpFinder());
? ? tcpDiscoverySpi.setNetworkTimeout(10000);
? ? System.out.println("setting success for host");
? ? tcpDiscoverySpi.setLocalAddress(IPUtil.getLocalIp());
? ? return tcpDiscoverySpi;
}
@Bean
public TcpDiscoveryMulticastIpFindergetTcpDiscoveryMulticastIpFinder(){
TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder =new TcpDiscoveryMulticastIpFinder();
? ? tcpDiscoveryMulticastIpFinder.setMulticastGroup("224.0.0.100");
? ? return tcpDiscoveryMulticastIpFinder;
}
@Bean
public CacheConfigurationgetCacheConfiguration(){
CacheConfiguration cacheConfiguration =new CacheConfiguration();
? ? cacheConfiguration.setName("myCache");
? ? cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
? ? cacheConfiguration.setBackups(1);
? ? return cacheConfiguration;
}
@Bean
public VertxClusterStartervertxClusterStarter() {
VertxClusterStarter vertxClusterStarter =new VertxClusterStarter();
? ? return vertxClusterStarter;
}
ClusterManager clusterManager =new IgniteClusterManager(igniteSelfConfiguration);
TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)igniteSelfConfiguration.getDiscoverySpi();
TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = (TcpDiscoveryMulticastIpFinder) discoverySpi.getIpFinder();
tcpDiscoveryMulticastIpFinder.setAddresses(Arrays.asList(propertiesHolderUtils.getVertxClusterIps().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)));
VertxOptions vertxOptions =new VertxOptions().setClustered(true).setClusterHost(IPUtil.getLocalIp()).setClusterPort(Integer.valueOf(propertiesHolderUtils.getVertxClusterPorts().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)[0])).setClusterManager(clusterManager);
ServiceDiscoveryOptions discoveryOptions =new ServiceDiscoveryOptions();
謝謝,希望對大家有所幫助,后面我會嘗試探索event bus 通信的原理。