消息總線: Spring Cloud Bus
1.什么是BUS?
spring cloud是按照spring的配置對一系列微服務(wù)框架的集成,spring cloud bus是其中一個(gè)微服務(wù)框架,用于實(shí)現(xiàn)微服務(wù)之間的通信。spring cloud bus整合 java的事件處理機(jī)制和消息中間件消息的發(fā)送和接受,主要由發(fā)送端、接收端和事件組成。針對不同的業(yè)務(wù)需求,可以設(shè)置不同的事件,發(fā)送端發(fā)送事件,接收端接受相應(yīng)的事件,并進(jìn)行相應(yīng)的處理。
Spring cloud Bus將分布式系統(tǒng)的節(jié)點(diǎn)與輕量級消息代理鏈接。這可以用于廣播狀態(tài)更改(例如配置更改)或其他管理指令。一個(gè)關(guān)鍵的想法是,Bus就像一個(gè)擴(kuò)展的Spring Boot應(yīng)用程序的分布式執(zhí)行器,但也可以用作應(yīng)用程序之間的通信渠道。
如下架構(gòu)圖所示:

2.原理
spring cloud bus整合了java的事件處理機(jī)制和消息中間件,所以下面就從這兩個(gè)方面來說明spring cloud bus的原理。

如圖所示,作如下解釋:
(1)完整流程:發(fā)送端(endpoint)構(gòu)造事件event,將其publish到context上下文中(
spring cloud bus有一個(gè)父上下文,bootstrap),然后將事件發(fā)送到channel中(json串message),接收端從channel中獲取到message,將message轉(zhuǎn)為事件event(轉(zhuǎn)換過程這一塊沒有深究),然后將event事件publish到context上下文中,最后接收端(Listener)收到event,調(diào)用服務(wù)進(jìn)行處理。整個(gè)流程中,只有發(fā)送/接收端從context上下文中取事件和發(fā)送事件是需要我們在代碼中明確寫出來的,其它部分都由框架封裝完成。
(2)先大致描述了一下流程,關(guān)于封裝的部分流程,我們基本上可以在BusAutoConfiguration.class中找到,下面的代碼都是這個(gè)類中的代碼:
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicationEvent)) {
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
}
}
這是封裝了java事件處理機(jī)制,當(dāng)收到RemoteApplicationEvent時(shí),如果這個(gè)event是從這個(gè)服務(wù)發(fā)出的,而且不是ack事件,那么就會(huì)把這個(gè)事件發(fā)送到channel中。
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
if (event instanceof AckRemoteApplicationEvent) {
if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);
}
// If it's an ACK we are finished processing at this point
return;
}
if (this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher != null) {
if (!this.serviceMatcher.isFromSelf(event)) {
this.applicationEventPublisher.publishEvent(event);
}
if (this.bus.getAck().isEnabled()) {
AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
this.serviceMatcher.getServiceId(),
this.bus.getAck().getDestinationService(),
event.getDestinationService(), event.getId(), event.getClass());
this.cloudBusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventPublisher.publishEvent(ack);
}
}
if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
event.getOriginService(), event.getDestinationService(),
event.getId(), event.getClass()));
}
}
@StreamListener標(biāo)簽,這個(gè)方法就是從channel中取出事件進(jìn)行處理的過程(message轉(zhuǎn)事件部分需要自行了解,我沒有深入研究),根據(jù)事件的類型、發(fā)送方和接收方來處理這個(gè)事件:如果是ack事件,發(fā)送到context上下文中;如果自己是接收端且不是發(fā)送端,就會(huì)將事件發(fā)送到context上下文。
3.如何整合BUS?
消息總線可支持的有:

ActiveMQ是比較老牌的消息系統(tǒng),當(dāng)然了不一定是大家第一個(gè)熟知的消息系統(tǒng),因?yàn)楝F(xiàn)在電商、互聯(lián)網(wǎng)規(guī)模越來越大,不斷進(jìn)入程序員眼簾的大多是Kafka和RocketMQ。ActiveMQ出現(xiàn)的要比他們早,而且涵蓋的功能也特別全,路由、備份、查詢、事務(wù)、集群等等。他的美中不足是不能支撐超大規(guī)模、超高并發(fā)的互聯(lián)網(wǎng)應(yīng)用,ActiveMQ的并發(fā)承受能力在百萬級別,大概500次/s的消息頻率。

Kafka是新一代的消息系統(tǒng),相對于ActiveMQ來說增加了分片功能,類似于數(shù)據(jù)庫分庫分表,一臺Broker僅負(fù)責(zé)一部分?jǐn)?shù)據(jù)收發(fā),從而使得他的伸縮性特別好,通過增加Broker就可以不斷增加處理能力。一般來說,Kafka被用來處理日志流,作為流計(jì)算的接入點(diǎn)。在電商的訂單、庫存等系統(tǒng)里邊一般不用,主要顧慮的是Kafka異步刷盤機(jī)制可能導(dǎo)致數(shù)據(jù)丟失。當(dāng)然,對于數(shù)據(jù)丟失這一點(diǎn)不同的工程師也有不同的看法,認(rèn)為Kafka的Master-Slave的多寫機(jī)制,完全能夠避免數(shù)據(jù)丟失。

RocketMQ是阿里開源的一款消息系統(tǒng),開發(fā)的初衷就是要支撐阿里龐大的電商系統(tǒng)。RocketMQ和Kafka有很多相似之處,由于RocketMQ開發(fā)中很大程度上參考了Kafka的實(shí)現(xiàn)。RocketMQ同樣提供了優(yōu)秀的分片機(jī)制,RocketMQ的分片比Kafka的分片有所增強(qiáng),區(qū)分了絕對有序和非絕對有序兩種選項(xiàng)。另外RocketMQ采用的是同步刷盤,一般認(rèn)為不會(huì)造成數(shù)據(jù)丟失。

RabbitMQ類似于ActiveMQ也是一個(gè)相對小型的消息系統(tǒng),他的優(yōu)勢在于靈活的路由機(jī)制,可以進(jìn)行自由配置。

Redis的pub/sub功能,由于Redis是內(nèi)存級的系統(tǒng),所以速度和單機(jī)的并發(fā)能力是上述四個(gè)消息系統(tǒng)不能比擬的,但是也是由于內(nèi)存存儲(chǔ)的緣故,在消息的保障上就更弱一些。
Kafka為例:
Kafak架構(gòu)圖如下:

Kafka是基于消息發(fā)布/訂閱模式實(shí)現(xiàn)的消息系統(tǒng),其主要設(shè)計(jì)目標(biāo)如下:
1.消息持久化:以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問性能。
2.高吞吐:在廉價(jià)的商用機(jī)器上也能支持單機(jī)每秒100K條以上的吞吐量
3.分布式:支持消息分區(qū)以及分布式消費(fèi),并保證分區(qū)內(nèi)的消息順序
4.跨平臺:支持不同技術(shù)平臺的客戶端(如:Java、PHP、Python等)
5.實(shí)時(shí)性:支持實(shí)時(shí)數(shù)據(jù)處理和離線數(shù)據(jù)處理
6.伸縮性:支持水平擴(kuò)展
Kafka中涉及的一些基本概念:
1.Broker:Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這些服務(wù)器被稱為Broker。
2.Topic:邏輯上同Rabbit的Queue隊(duì)列相似,每條發(fā)布到Kafka集群的消息都必須有一個(gè)Topic。(物理上不同Topic的消息分開存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)Broker上,但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
3.Partition:Partition是物理概念上的分區(qū),為了提供系統(tǒng)吞吐率,在物理上每個(gè)Topic會(huì)分成一個(gè)或多個(gè)Partition,每個(gè)Partition對應(yīng)一個(gè)文件夾(存儲(chǔ)對應(yīng)分區(qū)的消息內(nèi)容和索引文件)。
4.Producer:消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息并發(fā)送到Kafka Broker。
5.Consumer:消息消費(fèi)者,向Kafka Broker讀取消息并處理的客戶端。
6.Consumer Group:每個(gè)Consumer屬于一個(gè)特定的組(可為每個(gè)Consumer指定屬于一個(gè)組,若不指定則屬于默認(rèn)組),組可以用來實(shí)現(xiàn)一條消息被組內(nèi)多個(gè)成員消費(fèi)等功能。
可以從kafka的架構(gòu)圖看到Kafka是需要Zookeeper支持的,你需要在你的Kafka配置里面指定Zookeeper在哪里,它是通過Zookeeper做一些可靠性的保證,做broker的主從,我們還要知道Kafka的消息是以topic形式作為組織的,Producers發(fā)送topic形式的消息,
Consumer是按照組來分的,所以,一組Consumers都會(huì)都要同樣的topic形式的消息。在服務(wù)端,它還做了一些分片,那么一個(gè)Topic可能分布在不同的分片上面,方便我們拓展部署多個(gè)機(jī)器,Kafka是天生分布式的。
4.什么時(shí)候用cloud bus
spring cloud bus在整個(gè)后端服務(wù)中起到聯(lián)通的作用,聯(lián)通后端的多臺服務(wù)器。我們?yōu)槭裁葱枰雎?lián)通呢?
后端服務(wù)器一般都做了集群化,很多臺服務(wù)器,而且在大促活動(dòng)期經(jīng)常發(fā)生服務(wù)的擴(kuò)容、縮容、上線、下線。這樣,后端服務(wù)器的數(shù)量、IP就會(huì)變來變?nèi)ィ绻覀兿脒M(jìn)行一些線上的管理和維護(hù)工作,就需要維護(hù)服務(wù)器的IP。
比如我們需要更新配置、比如我們需要同時(shí)失效所有服務(wù)器上的某個(gè)緩存,都需要向所有的相關(guān)服務(wù)器發(fā)送命令,也就是調(diào)用一個(gè)接口。
你可能會(huì)說,我們一般會(huì)采用zookeeper的方式,統(tǒng)一存儲(chǔ)服務(wù)器的ip地址,需要的時(shí)候,向?qū)?yīng)服務(wù)器發(fā)送命令。這是一個(gè)方案,但是他的解耦性、靈活性、實(shí)時(shí)性相比消息總線都差那么一點(diǎn)。
總的來說,就是在我們需要把一個(gè)操作散發(fā)到所有后端相關(guān)服務(wù)器的時(shí)候,就可以選擇使用cloud bus了。
spring cloud config 配合spring cloud bus實(shí)現(xiàn)配置信息更新
spring cloud config 配置更新有兩種方式:1.配置git倉庫的web hook,當(dāng)git倉庫有更新時(shí)自動(dòng)調(diào)用bus提供的刷新接口,刷新緩存;2.手工調(diào)用bus提供的刷新接口。
不論一方案還是二方案區(qū)別僅在于是不同的人觸發(fā)了刷新接口。實(shí)際上,線上服務(wù)器一般很少采用自動(dòng)刷新的機(jī)制,都會(huì)在修改后,確認(rèn)無誤后再執(zhí)行刷新。
關(guān)鍵的修改點(diǎn)是把所有的后端服務(wù)器連接到同一個(gè)消息系統(tǒng)上,然后監(jiān)聽配置更新消息。
安裝RabbitMQ
安裝方法很簡單,直接在官網(wǎng)下載對應(yīng)的安裝文件就可以了。
因?yàn)镽abbitMQ是Erlang語言寫的,所以如果你的機(jī)器上沒有安裝Erlang,那么需要先安裝Erlang。
增加bus包的引用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
增加RabbitMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
增加@RefreshScope注解
@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigClientApplication.class, args);
}
@Value("${app-name}")
private String app_name;
@RequestMapping("hi")
public String hi(){
return "hello "+ app_name;
}
}
spring cloud擴(kuò)展消息總線方法
可以參考spring cloud bus 擴(kuò)展消息總線方法
5.Spring Cloud 集成Kafka
pom.xml加入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
加入以上依賴后,Spring Cloud 消息總線 Kafka 已經(jīng)集成完成,使用的配置則是默認(rèn)啟動(dòng) Kafka 和 ZooKeeper 時(shí)的配置。確定 ZooKeeper 和 Kafka 已經(jīng)啟動(dòng),然后再啟動(dòng)有上面依賴的 Spring Boot 應(yīng)用 ,這時(shí) Kafka 會(huì)新增一個(gè)名為 springCloudBus 的 Topic,可以使用命令 kafka-topics --list --zookeeper localhost:2181 來查看當(dāng)前 Kafka 中的 Topic。
集成后Kafka 配置
以上的例子中 Kafka、ZooKeeper 均運(yùn)行于本地,但實(shí)際應(yīng)用中,Kafka 和 ZooKeeper 一般會(huì)獨(dú)立部署,所以需要為Kafka 和 ZooKeeper 配置一些連接信息,Spring Boot 1.3.7 沒有為 Kafka 直接提供 Starter 模塊,而是使用 Spring Cloud Stream 的 Kafka 模塊,配置的時(shí)候則采用 spring.cloud.stream.kafka 前綴
spring.cloud.stream.kafka 配置
spring:
cloud:
stream:
binders:
#binderName
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
#kafka地址
brokers: localhost:9092
#zookeeper節(jié)點(diǎn)地址
zk-nodes: localhost:2181
bindings:
#channelName
channelKafka:
#binderName
binder: kafka1
destination: event-demo
content-type: text/plain
producer:
partitionCount: 1
spring.kafka 配置
在啟動(dòng)Kafka的時(shí)候有這樣一個(gè)配置 config/server.properties 的 zookeeper.connect 指定 ZooKeeper連接地址,但是在 spring.kafka 中并沒有看到可以配置 ZooKeeper 連接地址 的地方
spring:
kafka:
consumer:
#消費(fèi)者服務(wù)器地址
bootstrap-servers: localhost:9092
producer:
#生產(chǎn)者服務(wù)器地址
bootstrap-servers: localhost:9092
cloud:
stream:
bindings:
channel1:
destination: event-demo
content-type: text/plain
producer:
partitionCount: 1
Less is more.