JGroupsConnector
JGroupsConnector使用(正如它的名字已經(jīng)說(shuō)明)JGroups作為底層發(fā)現(xiàn)和調(diào)度機(jī)制。描述JGroups的特性有太多的參考指南,請(qǐng)參閱JGroups用戶指南以了解更多的細(xì)節(jié)。
因?yàn)镴Groups既處理節(jié)點(diǎn)的發(fā)現(xiàn)又處理它們之間的通信,所以JGroupsConnector既充當(dāng)CommandBusConnector也充當(dāng)CommandRouter。
注意
你可以在axon-distributed-commandbus-jgroups模塊中,為DistributedCommandBus找到JGroups特定組件。
JGroupsConnector有四個(gè)強(qiáng)制性配置元素:
- 第一種是JChannel,它定義了JGroups協(xié)議棧。一般來(lái)說(shuō),用JGroups配置文件的引用構(gòu)造JChannel。JGroups附帶的默認(rèn)配置,可以用作自己配置的依據(jù)。請(qǐng)記住,IP多路廣播一般不工作在云服務(wù)中,像亞馬遜。中這種類(lèi)型的環(huán)境中, TCP Gossip通常是一個(gè)好的開(kāi)端。
- 集群名稱定義了每個(gè)segment應(yīng)登記到的集群的名稱。具有相同的集群名稱的Segment最終會(huì)探測(cè)到彼此,并在彼此間分發(fā)命令。
- “本地segment”是命令總線實(shí)現(xiàn),分發(fā)命令去往本地的JVM。這些命令可能已通過(guò)其他JVM或從本地的一個(gè)實(shí)例分發(fā)。
- 最后,序列化器是用來(lái)序列化之前通過(guò)線路發(fā)送的命令消息。
注意
當(dāng)使用緩存時(shí),當(dāng)ConsistentHash更改以避免潛在的數(shù)據(jù)損壞時(shí),它應(yīng)該被清空(例如,當(dāng)命令沒(méi)有指定一個(gè)@TargetAggregateVersion和新成員快速加入和離開(kāi)JGroup,修改聚合然而它還要緩存到其他地方)。
最終,JGroupsConnector需要實(shí)際連接,按順序分發(fā)消息到其他segment。這樣做,調(diào)用connect()方法。
JChannel channel = new JChannel("path/to/channel/config.xml");
CommandBus localSegment = new SimpleCommandBus();
Serializer serializer = new XStreamSerializer();
JGroupsConnector connector = new JGroupsConnector(channel, "myCommandBus", localSegment, serializer);
DistributedCommandBus commandBus = new DistributedCommandBus(connector, connector);
// on one node:
commandBus.subscribe(CommandType.class.getName(), handler);
connector.connect();
// on another node, with more CPU:
commandBus.subscribe(CommandType.class.getName(), handler);
commandBus.subscribe(AnotherCommandType.class.getName(), handler2);
commandBus.updateLoadFactor(150); // defaults to 100
connector.connect();
// from now on, just deal with commandBus as if it is local...
注意
注意,并非所有的segment都必需具有相同類(lèi)型的命令的命令處理器。你完全可以為不同的命令類(lèi)型使用不同的segment。分布式命令總線總是選擇一個(gè)節(jié)點(diǎn)分發(fā)命令到那個(gè)支持特定類(lèi)型的命令。
如果你使用Spring,你可能需要考慮使用JGroupsConnectorFactoryBean。它自動(dòng)連接連接器當(dāng)ApplicationContext啟動(dòng)后,并且在ApplicationContext關(guān)閉時(shí)完全的斷開(kāi)。此外,它為測(cè)試環(huán)境使用合理的默認(rèn)值(但不應(yīng)被視為生產(chǎn)準(zhǔn)備)和自動(dòng)裝配配置。
Spring Cloud Connector
Spring Cloud連接器裝置,用Spring Cloud來(lái)描述使用服務(wù)注冊(cè)和發(fā)現(xiàn)機(jī)制來(lái)分配命令總線。因此,你可以自由選擇使用哪一個(gè)Spring Cloud實(shí)現(xiàn)用來(lái)分發(fā)你的命令。實(shí)現(xiàn)的一個(gè)例子是 Eureka Discovery/Eureka 服務(wù)器組合。
注意
當(dāng)前版本(Axon 3.0.4)SpringCloudCommandRouter使用ServiceInstance。Metadata 字段來(lái)通知所有系統(tǒng)中的節(jié)點(diǎn),通過(guò)CommandNameFilter它可以處理命令。這是很重要的,Spring Cloud實(shí)現(xiàn)選擇支持ServiceInstance.Metadata字段的使用。例如Spring Cloud Consul目前不支持該字段,因此SpringCloudCommandRouter并不是一個(gè)可行的解決方案。我們正在研究一個(gè)額外的解決方案,從中檢索CommandNameFilter 。
提供每個(gè)SpringCloud實(shí)現(xiàn)的描述將推動(dòng)本參考指南。因此,我們參考他們各自的文件以獲得進(jìn)一步的信息。
Spring Cloud連接器裝置是一個(gè)SpringCloudCommandRouter和SpringHttpCommandBusConnector的組合,分別填充CommandRouter的地點(diǎn)和 DistributedCommandBus的CommandBusConnector。
注意
Spring Cloud連接器特定的組件DistributedCommandBus可以在axon-distributed-commandbus-springcloud模塊中找到 。
SpringCloudCommandRouter必須由以下提供的來(lái)創(chuàng)建:
- 一個(gè)DiscoveryClient類(lèi)型“discovery client”。這可以通過(guò)用@EnableDiscoveryClient注解你的Spring Boot應(yīng)用程序來(lái)提供,將在你的類(lèi)路徑中尋找Spring Cloud 的實(shí)現(xiàn)。
- 一個(gè)RoutingStrategy類(lèi)型的"routing strategy"。目前axon-core模塊提供了一些實(shí)現(xiàn),但是一函數(shù)調(diào)用也可以滿足要求。例如,如果你想路由命令基于“聚合標(biāo)識(shí)符”,你可以使用AnnotationRoutingStrategy和注解有效載荷的字段,用@TargetAggregateIdentifier識(shí)別聚合。
SpringHttpCommandBusConnector需要?jiǎng)?chuàng)建三個(gè)參數(shù):
- 一個(gè)CommandBus類(lèi)型的“l(fā)ocal command bus”。這是命令總線實(shí)現(xiàn),它將分發(fā)命令到本地的JVM。這些命令可能是由其他JVM上的或本地的實(shí)例分發(fā)。
- RestOperations對(duì)象來(lái)執(zhí)行一個(gè)命令消息的發(fā)布到另一個(gè)實(shí)例。
- 最后一個(gè)Serializer類(lèi)型的“序列serializer”。序列化器用于在命令發(fā)送到網(wǎng)絡(luò)之前序列化命令消息。
SpringCloudCommandRouter和SpringHttpCommandBusConnector應(yīng)該都被用于創(chuàng)建DistributedCommandsBus。在Spring Java 配置中,看起來(lái)如下:
// Simple Spring Boot App providing the `DiscoveryClient` bean
@EnableDiscoveryClient
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
// Example function providing a Spring Cloud Connector
@Bean
public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
return new SpringCloudCommandRouter(discoveryClient, new AnnotationRoutingStrategy());
}
@Bean
public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
RestOperations restOperations,
Serializer serializer) {
return new SpringHttpCommandBusConnector(localSegment, restOperations, serializer);
}
@Primary // to make sure this CommandBus implementation is used for autowiring
@Bean
public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
CommandBusConnector commandBusConnector) {
return new DistributedCommandBus(commandRouter, commandBusConnector);
}
}
// if you don't use Spring Boot Autoconfiguration, you will need to explicitly define the local segment:
@Bean
@Qualifier("localSegment")
public CommandBus localSegment() {
return new SimpleCommandBus();
}
注意
注意,并非所有的segment都必需具有相同類(lèi)型的命令的命令處理器。你完全可以為不同的命令類(lèi)型使用不同的segment。分布式命令總線總是選擇一個(gè)節(jié)點(diǎn)分發(fā)命令到那個(gè)支持特定類(lèi)型的命令。