【20180330】基于Spring cloud stream 實(shí)現(xiàn)的事件驅(qū)動(dòng)處理的流程

一概要

Spring Cloud Stream 是一個(gè)用來(lái)為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動(dòng)能力的框架。它可以基于Spring Boot 來(lái)創(chuàng)建獨(dú)立的,可用于生產(chǎn)的Spring 應(yīng)用程序。他通過(guò)使用Spring Integration來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)。Spring Cloud Stream 為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn),引用了發(fā)布-訂閱、消費(fèi)組、分區(qū)的三個(gè)核心概念。Spring cloud stream 應(yīng)用模型的結(jié)構(gòu)圖如圖所示:

從圖中可以看出spring cloud stream 構(gòu)建的應(yīng)用程序與消息中間件之間是通過(guò)綁定器Binder相關(guān)聯(lián)的,綁定器對(duì)于應(yīng)用程序而言起到了隔離的作用,它使得不同消息中間件(Rabbitmq、kafka)的實(shí)現(xiàn)細(xì)節(jié)對(duì)于應(yīng)用程序來(lái)說(shuō)是透明的。我們不需要知道消息中間件的通信細(xì)節(jié),只需要知道Binder對(duì)應(yīng)用程序提供的抽象概念來(lái)使用消息中間件實(shí)現(xiàn)業(yè)務(wù)邏輯即可,這個(gè)抽象的概念就是消息通道:channel。如圖中所示的兩條輸入通道和三條輸出通道,綁定器就是作為這些通道和消息中間件之間的橋梁進(jìn)行通信的。

在沒(méi)有綁定器的情況下,Spring boot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各個(gè)消息中間件的構(gòu)建初衷不同,所以它們?cè)趯?shí)現(xiàn)細(xì)節(jié)上也會(huì)有很大的差別,這就會(huì)使得我們的消息交互邏輯會(huì)很笨重。由于對(duì)具體的消息中間件依賴太重,導(dǎo)致消息中間件升級(jí)或者更換的時(shí)候,我們就需要付出很大的代價(jià)。而綁定器作為中間層,通過(guò)向應(yīng)用程序暴露統(tǒng)一的Channel,恰好實(shí)現(xiàn)了隔離的作用。

二RabbitMQ基本概念

2.1 基本概念

?



Broker:即消息隊(duì)列服務(wù)器實(shí)體

Exchange:消息到達(dá)代理服務(wù)器的第一站,根據(jù)分發(fā)規(guī)則進(jìn)行轉(zhuǎn)發(fā)。消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。

Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。

Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來(lái)。

Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。

Binding Key:隊(duì)列需要通過(guò)綁定鍵(默認(rèn)為空)綁定到交換器上,交換器將消息的路由鍵與所綁定隊(duì)列的綁定鍵進(jìn)行匹配,正確匹配的消息將發(fā)送到隊(duì)列中。路由鍵是偏向生產(chǎn)的概念,而綁定鍵是偏向消費(fèi)的概念。

vhost:虛擬主機(jī),一個(gè)broker里可以開(kāi)設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。

producer:消息生產(chǎn)者,就是投遞消息的程序。

consumer:消息消費(fèi)者,就是接受消息的程序。

channel:消息通道,應(yīng)用程序(生產(chǎn)與/或消費(fèi))和代理服務(wù)器之間TCP連接內(nèi)的虛擬連接,解決TCP連接數(shù)量限制及降低TCP連接代價(jià)。每個(gè)信道有一個(gè)ID,其概念與“頻分多路復(fù)用”類似在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。

?

?

?

?

2.2 執(zhí)行過(guò)程示意圖

?



三Spring cloud stream 核心概念

3.1 綁定器 Binder

通過(guò)定義綁定器作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。通過(guò)向應(yīng)用程序暴露統(tǒng)一的Channel通道,使得應(yīng)用程序不需要再考慮各種不同的消息中間件的實(shí)現(xiàn)。當(dāng)需要升級(jí)消息中間件,或者是更換其他消息中間件產(chǎn)品時(shí),我們需要做的就是更換對(duì)應(yīng)的Binder綁定器而不需要修改任何應(yīng)用邏輯 。目前只提供了RabbitMQ和Kafka的Binder實(shí)現(xiàn)

3.2 發(fā)布-訂閱模式

Spring cloud stream 中的消息通信方式采用發(fā)布訂閱模式,當(dāng)一條消息被投遞到消息中間件之后,它會(huì)通過(guò)共享的Topic主題進(jìn)行廣播,消費(fèi)者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務(wù)處理邏輯。Topic主題是spring cloud stream中的一個(gè)抽象概念,用來(lái)代表發(fā)布共享消息給消費(fèi)者的地方。在不同的消息中間件中對(duì)應(yīng)不同的概念,如:Rabbitmq中對(duì)應(yīng)的是Exchange,Kafka中對(duì)應(yīng)的是Topic。綁定到同一個(gè)topic上的訂閱者,都將會(huì)收到相同的消息并根據(jù)自身的需求來(lái)對(duì)這個(gè)消息進(jìn)行處理。

Spring cloud stream采用的發(fā)布訂閱模式,可以有效的降低生產(chǎn)者和消費(fèi)者之間的耦合,當(dāng)需要同一類消息增加一種處理方式的時(shí)候,只需要到已有的topic上綁定一個(gè)通道,就可以實(shí)現(xiàn)功能的擴(kuò)展,而不需要改變?cè)瓉?lái)已經(jīng)實(shí)現(xiàn)了的任何內(nèi)容。


3.3 消費(fèi)組

在微服務(wù)中,我們的每一個(gè)微服務(wù)為了實(shí)現(xiàn)高可用和負(fù)載均衡,實(shí)際上都會(huì)部署多個(gè)實(shí)例。在多數(shù)情況下,當(dāng)生產(chǎn)者發(fā)送消息給某個(gè)具體的微服務(wù)時(shí),我們只希望它被消費(fèi)一次,而不是被重復(fù)消費(fèi)。消費(fèi)組就是解決這一需求的。


3.4 消息分區(qū)

從上一節(jié)知道,消費(fèi)組無(wú)法控制消息消息具體被哪個(gè)實(shí)例消費(fèi),只能保證是組內(nèi)的一個(gè)實(shí)例消費(fèi)。Spring Cloud Stream對(duì)給定應(yīng)用的多個(gè)實(shí)例之間分隔數(shù)據(jù)予以支持。在分隔方案中,物理交流媒介(如:代理主題)被視為分隔成了多個(gè)片(partitions)。一個(gè)或者多個(gè)生產(chǎn)者應(yīng)用實(shí)例給多個(gè)消費(fèi)者應(yīng)用實(shí)例發(fā)送消息并確保相同特征的數(shù)據(jù)被同一消費(fèi)者實(shí)例處理。?

比如一些監(jiān)控服務(wù),為了統(tǒng)計(jì)某一時(shí)間段內(nèi)消息生產(chǎn)者發(fā)送的報(bào)告內(nèi)容,監(jiān)控服務(wù)需要自身聚合這些數(shù)據(jù)。這時(shí)候消息生產(chǎn)者可以為這些消息增加一個(gè)固定的特征來(lái)區(qū)分是,使得擁有這些特征的消息每次都能被發(fā)送發(fā)到一個(gè)特定的實(shí)例上,以達(dá)到統(tǒng)計(jì)效果。

四使用介紹

4.1 基本注解介紹

@Input注釋標(biāo)識(shí)輸入通道,通過(guò)該輸出通道接收到的消息進(jìn)入應(yīng)用程序;@Output注釋標(biāo)識(shí)輸入通道,發(fā)布的消息將通過(guò)該通道離開(kāi)應(yīng)用程序。@Input和@Output注釋可以使用頻道名稱作為參數(shù);?如果未提供名稱,將使用注釋方法的名稱。@EnableBinding注釋添加到應(yīng)用程序,以便立即連接到消息代理。@StreamListener添加到方法中,以使其接收流處理的事件。

4.2 基本用法

我們模擬這樣一個(gè)場(chǎng)景來(lái)對(duì)spring cloud stream的使用進(jìn)行說(shuō)明。生產(chǎn)者A通過(guò)channel 發(fā)送一個(gè)消息,這個(gè)消息中包含了一個(gè)User對(duì)象(比如這個(gè)對(duì)象有兩個(gè)屬性,name,age),消費(fèi)者通過(guò)spring cloud stream 的綁定器通過(guò)消息中間件(這里使用rabbitmq)將這個(gè)消息發(fā)送給了消費(fèi)者B。消費(fèi)者B的功能就是將收到的消息持久化到數(shù)據(jù)庫(kù)中。

步驟一:加入spring cloud stream 依賴

步驟二:定義輸入和輸出通道

使用我們上文提到的@Input ?和 ?@Output來(lái)定義。其中@Input對(duì)應(yīng)的通道由消費(fèi)者監(jiān)聽(tīng)的時(shí)候使用,@Output由生產(chǎn)者發(fā)送消息的時(shí)候使用。我們只需要將他們定義在一個(gè)接口中,spring cloud stream 會(huì)自動(dòng)幫我們實(shí)現(xiàn)這個(gè)接口。

步驟三:書寫生產(chǎn)者發(fā)送消息的邏輯

注入我們?cè)诓襟E二中定義的通道接口,直接使用output()中的send(Message payload) 方法將消息發(fā)送出去。

?

?

?

?

?

?

步驟四:書寫消費(fèi)者邏輯


將@EnableBinding寫到我們的消費(fèi)者所在的類上,使用@StreamListener寫到我們消費(fèi)者方法上面,對(duì)通道中的消息進(jìn)行監(jiān)聽(tīng),把得到的消息處理后放入數(shù)據(jù)庫(kù),這里為了演示存入數(shù)據(jù)庫(kù)就直接用LOG輸出來(lái)模擬了,沒(méi)有直接操作數(shù)據(jù)庫(kù)。這里值得注意的是,由于我們的生產(chǎn)者和消費(fèi)者都在同一工程中,所以我們只在消費(fèi)者所在的類上加了@EnableBinding注解,而生產(chǎn)者所在的類上并沒(méi)有加該注解。在實(shí)際的生產(chǎn)中,我們可能會(huì)遇到生產(chǎn)者和消費(fèi)者不在同一工程中的情況,這個(gè)時(shí)候就要將生產(chǎn)者和消費(fèi)者所在的類上都使用@EnableBinding注解。

步驟五:外部配置

外部配置我將它分為了兩個(gè)個(gè)部分:spring cloud stream 配置、Rabbitmq配置。其中spring cloud stream的配置分為:應(yīng)用級(jí)別的配置,inputChannel配置,outputChannel配置以及Binder配置四個(gè)部分。Rabbitmq分為:應(yīng)用級(jí)別的配置,生產(chǎn)者配置,消費(fèi)者配置,默認(rèn)的rabbitmq配置(Rabbitmq默認(rèn)使用了Spring boot的ConnectionFactory)四個(gè)部分。

Spring cloud stream 應(yīng)用級(jí)別的配置以spring.cloud.stream.為前綴。

通道的通用配置以spring.cloud.stream.bindings..為前綴。

outputChannle以spring.cloud.stream.bindings..producer.為前綴。

inputChannel以spring.cloud.stream.bindings..consumer.為前綴。

綁定器的配置以spring.cloud.stream.binders..為前綴。

Rabbitmq應(yīng)用級(jí)別的配置以spring.rabbitmq.為前綴。

生產(chǎn)者配置以

spring.cloud.stream.rabbit.bindings..producer.為前綴。

消費(fèi)者配置以

Spring.cloud.stream.tabbit.bindings..consumer.為前綴

下圖就是我們?cè)谶@個(gè)例子中的配置:




步驟六:容錯(cuò)處理

當(dāng)消費(fèi)者中處理邏輯出現(xiàn)異?;蛘咛幚硎?,我們希望消息重新被消費(fèi)者消費(fèi)時(shí),我們就需要配置死信隊(duì)列。如下圖,當(dāng)消費(fèi)者中的消息被拒絕或者是在消費(fèi)過(guò)程中出現(xiàn)異常的時(shí)候,消息就會(huì)進(jìn)入這個(gè)死信隊(duì)列。


從上面的邏輯我們可以看出,死信隊(duì)列中為失敗的消息設(shè)置了重試的次數(shù),和每次重投遞時(shí)的延時(shí),即每一次重投遞都會(huì)根據(jù)投遞次數(shù)的增加而增加延時(shí)時(shí)長(zhǎng)。


接上面的截圖片段,要使得重投遞時(shí)候的延時(shí)起效,我們需要定義一個(gè)延時(shí)交換機(jī),如圖中的DirectExchange類型的交換機(jī),將原隊(duì)列綁定到延時(shí)交換機(jī)上。

當(dāng)重投遞次數(shù)超過(guò)我們所設(shè)定的閾值的時(shí)候,我們就把消息投遞到一個(gè)parking-lot中,以備排錯(cuò)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容