vertx 實現(xiàn)動態(tài) RPC

需求:替換ali lightApi 動態(tài)rpc的實現(xiàn),因為api為商業(yè)版,不是開源的,是基于pandora 的EDAS平臺的。那么我們?nèi)绾螌崿F(xiàn)開源的動態(tài)RPC呢?

定義:動態(tài)RPC指的是,可以動態(tài)的讓一個服務(wù)上線和下線,換句話說就是動態(tài)的從注冊中心剔除服務(wù),而不是停止這個服務(wù),啟動這個服務(wù)。這個需求是來自鏈路的原路返回

簡介 vertx:

vertx 是一套全異步的基于netty通信的框架,Vertx中核心組建有 verticle, eventbus,?circuit breaker,service discovery and register, Router等等。后期我會一一的去探索這里面的組件的原理。而本文介紹的vertx 的動態(tài)rpc,就是使用vertx proxy service 這個功能來實現(xiàn)的。另外我們得知道vertx 中的線程模型是基于netty的event loop,一個verticle 是一個微服務(wù),而且具有HA的機制的微服務(wù)。服務(wù)之間的通信是根據(jù)event bus 的netty 通信機制。但是event bus 通信不是100%的可靠的(這點很要命,后期我會寫博客探索的)。

此外vertx需要依賴 分布式緩存建立集群(hazelcast, ignite等),基于gossip協(xié)議的p2p網(wǎng)絡(luò)。

探索過程:

如果要實現(xiàn)需求的話,首先第一想到的還是spring cloud,dubbo,thrift 等RPC框架,hsf 是阿里pandora rpc 框架,肯定不能用。但是經(jīng)一番折騰之后,沒有可以讓我覺得可以在代碼里面直接讓服務(wù)上線下線,從注冊中心剔除掉或者注冊到注冊中心的。那怎么辦呢?曾經(jīng)也想過會用rabbitmq,這樣的MQ 去做動態(tài)的RPC,但是發(fā)現(xiàn)很復(fù)雜,關(guān)鍵很難做scalablity,如果有上千個鏈路,就需要上千個topic,或者tag 這類的標(biāo)記。感覺很復(fù)雜,也很難維護。

solutions: vertx proxy service?

這個方案上,我們不打算使用verticle的概念,僅僅使用vertx 代理服務(wù)的概念。首先我們要使用vertx 的服務(wù)發(fā)現(xiàn)和注冊,其次我們要使用proxy service。

第一個坑:代理服務(wù)的自動生成。請在pom或者gradle里面加入 生成代理的依賴(對接口的代理)。然后創(chuàng)建 package-info.java,在root package。所謂的root package就是基package,web developer 開的springboot 應(yīng)該都很清楚,需要將app.class 建在基package,以便掃描都可以掃描到

? ? <groupId>io.vertx</groupId>

? ? <artifactId>vertx-service-proxy</artifactId>

? ? <classifier>processor</classifier>

? ? <groupId>io.vertx</groupId>

? ? <artifactId>vertx-codegen</artifactId>

? ? <classifier>processor</classifier>

@ModuleGen(name ="ap-common-vertx", groupPackage ="com.xxxx.xx.xx.vertx")

package com.xxxx.xx.xx.vertx;

import io.vertx.codegen.annotations.ModuleGen;

其中g(shù)roupPackage ="com.xxxx.xx.xx.vertx"), 是基package, 也可以是 你定義interface 所在的包。完成之后,使用maven clean install, 你就會發(fā)現(xiàn)在target 目錄下面會有 ...Proxy 的class,這兩個class 就是代理類,對于原理,后面的博客我會慢慢分析。

第二個坑是:服務(wù)發(fā)現(xiàn)publish 服務(wù)之后,需要注冊服務(wù)代理,不然注冊的服務(wù),在event bus 上面是找不到的,看看代碼怎么寫吧:

Record record = EventBusService.createRecord(servicePublishRequestBean.getServiceName(), servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz());

servicePublishRequestBean.getDiscovery().publish(record, ar -> {

if (ar.succeeded() && ar.result() !=null) {

publishedRecords.add(record);

? ? ? ? recordMessageConsumerMap.putIfAbsent(record, ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

? ? }

handler.handle(ar.map(ar.result()));

});

servicePublishRequestBean就是我封裝的bean,里面de屬性有:

private StringserviceName;

private StringeventBusAddress;

private ServiceDiscoverydiscovery;

private Classclazz;

private T service;

此外我們一定要注冊這樣的proxy service才能夠生效,不然是沒有用的:

ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

看看里面怎么寫的

public static MessageConsumerregisterProxyService(String eventBusAddress, Class clazz, T service) {

return ServiceBinderUtil.getBinderInstance()

.setAddress(eventBusAddress)

.register(clazz, service);

}

第三個坑是: 和第二個坑一樣的,需要注銷服務(wù),注銷服務(wù)的話,我們也要調(diào)用proxyService 去把服務(wù)注銷了,而不能單純的調(diào)用 service discovery and register unpublish 方法,這個是很惡心的:看看我怎么寫的把,一些很細節(jié)的東西需要自己探索,我不會貼出所有的東西的。比如下面的discover record 有許多的狀態(tài),這里一不小心就會找不到你所發(fā)布的服務(wù),另外注銷代理服務(wù),需要個奇怪的參數(shù),這個參數(shù)我存儲在map里面:recordMessageConsumerMap, 在服務(wù)publish時候,就會生成的。

discovery.getRecord(info -> info.getName().equals(serviceName), res -> {

if (res.succeeded() && res.result() !=null && res.result().getStatus() == Status.UP) {

Record record = res.result();

? ? ? ? ? ? ? ? discovery.unpublish(record.getRegistration(), ar -> {

if(ar.succeeded()) {

List records =recordMessageConsumerMap.keySet().stream().filter(map-> map.getName().equals(serviceName)).collect(Collectors.toList());

? ? ? ? ? ? ? ? ? ? ? ? offlineRecord(records);

? ? ? ? ? ? ? ? ? ? }

handler.handle(ar.map((Void)null));

? ? ? ? ? ? ? ? });

? ? ? ? ? ? }else {

handler.handle(res.map((Void)null));

? ? ? ? ? ? }

}

);

OfflineRecord 主要是調(diào)用ProxyServiceUtil.unregisterProxyService(recordMessageConsumerMap.get(records.get(0)));,其中unregisterProxyService 方法的實現(xiàn)如下,是不是比較簡單?

ServiceBinderUtil.getBinderInstance().unregister(consumer);

然后基本上做到這里,vertx 動態(tài)的rpc 就可以實現(xiàn)了,主要是用了vertx 服務(wù)發(fā)現(xiàn)組件的,publish, unpublish方式,和proxy service 的 register和unregister方法。但是里面的坑比較多。此外除了上面所描述的坑之外,我還想告訴小伙伴們,event bus 通信exception 會出現(xiàn)一些奇怪的錯誤,所有event bus 通信也是個坑,其實proxy sevice 本質(zhì)就是 event bus,所以event bus 網(wǎng)絡(luò)連接不通的話,確實很頭疼,經(jīng)過我這邊的測試,不管ecs 集群的部署,還是ecs 和docker 的混合部署,網(wǎng)絡(luò)都是可以通的(我使用的是ignite分布式緩存)。暫時我先貼出來關(guān)于網(wǎng)絡(luò)的代碼,如果你們遇到了問題,先暫時按照我這個來,不會讓你很惱火。

@Bean

public IgniteConfigurationgetIgniteSelfConfiguration()throws Exception{

IgniteConfiguration igniteConfiguration =new IgniteConfiguration();

? ? igniteConfiguration.setClientMode(false);

? ? igniteConfiguration.setPeerClassLoadingEnabled(true);

? ? igniteConfiguration.setDeploymentMode(DeploymentMode.CONTINUOUS);

? ? igniteConfiguration.setPeerClassLoadingMissedResourcesCacheSize(0);

? ? igniteConfiguration.setDiscoverySpi(getTcpDiscoverySpi());

? ? igniteConfiguration.setCacheConfiguration(getCacheConfiguration());

? // igniteConfiguration.setLocalHost(IPUtil.getLocalIp());

? ? return igniteConfiguration;

}

@Bean

public TcpDiscoverySpigetTcpDiscoverySpi()throws Exception{

TcpDiscoverySpi tcpDiscoverySpi =new TcpDiscoverySpi();

? ? tcpDiscoverySpi.setIpFinder(getTcpDiscoveryMulticastIpFinder());

? ? tcpDiscoverySpi.setNetworkTimeout(10000);

? ? System.out.println("setting success for host");

? ? tcpDiscoverySpi.setLocalAddress(IPUtil.getLocalIp());

? ? return tcpDiscoverySpi;

}

@Bean

public TcpDiscoveryMulticastIpFindergetTcpDiscoveryMulticastIpFinder(){

TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder =new TcpDiscoveryMulticastIpFinder();

? ? tcpDiscoveryMulticastIpFinder.setMulticastGroup("224.0.0.100");

? ? return tcpDiscoveryMulticastIpFinder;

}

@Bean

public CacheConfigurationgetCacheConfiguration(){

CacheConfiguration cacheConfiguration =new CacheConfiguration();

? ? cacheConfiguration.setName("myCache");

? ? cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);

? ? cacheConfiguration.setBackups(1);

? ? return cacheConfiguration;

}

@Bean

public VertxClusterStartervertxClusterStarter() {

VertxClusterStarter vertxClusterStarter =new VertxClusterStarter();

? ? return vertxClusterStarter;

}

ClusterManager clusterManager =new IgniteClusterManager(igniteSelfConfiguration);

TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)igniteSelfConfiguration.getDiscoverySpi();

TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = (TcpDiscoveryMulticastIpFinder) discoverySpi.getIpFinder();

tcpDiscoveryMulticastIpFinder.setAddresses(Arrays.asList(propertiesHolderUtils.getVertxClusterIps().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)));

VertxOptions vertxOptions =new VertxOptions().setClustered(true).setClusterHost(IPUtil.getLocalIp()).setClusterPort(Integer.valueOf(propertiesHolderUtils.getVertxClusterPorts().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)[0])).setClusterManager(clusterManager);

ServiceDiscoveryOptions discoveryOptions =new ServiceDiscoveryOptions();

謝謝,希望對大家有所幫助,后面我會嘗試探索event bus 通信的原理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容