使用 Spring Cloud Stream 構(gòu)建消息驅(qū)動微服務(wù)

相關(guān)源碼: spring cloud demo

微服務(wù)的目的: 松耦合

事件驅(qū)動的優(yōu)勢:高度解耦

Spring Cloud Stream 的幾個概念

Spring Cloud Stream is a framework for building message-driven microservice applications.

官方定義 Spring Cloud Stream 是一個構(gòu)建消息驅(qū)動微服務(wù)的框架。

Spring Cloud Stream Application

應(yīng)用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負(fù)責(zé)與中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動的方式

Binder

Binder 是 Spring Cloud Stream 的一個抽象概念,是應(yīng)用與消息中間件之間的粘合劑。目前 Spring Cloud Stream 實(shí)現(xiàn)了 Kafka 和 Rabbit MQ 的binder。

通過 binder ,可以很方便的連接中間件,可以動態(tài)的改變消息的
destinations(對應(yīng)于 Kafka 的topic,Rabbit MQ 的 exchanges),這些都可以通過外部配置項(xiàng)來做到。

甚至可以任意的改變中間件的類型而不需要修改一行代碼。

Publish-Subscribe

消息的發(fā)布(Publish)和訂閱(Subscribe)是事件驅(qū)動的經(jīng)典模式。Spring Cloud Stream 的數(shù)據(jù)交互也是基于這個思想。生產(chǎn)者把消息通過某個 topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務(wù),通過訂閱特定 topic 來獲取廣播出來的消息來觸發(fā)業(yè)務(wù)的進(jìn)行。

這種模式,極大的降低了生產(chǎn)者與消費(fèi)者之間的耦合。即使有新的應(yīng)用的引入,也不需要破壞當(dāng)前系統(tǒng)的整體結(jié)構(gòu)。

Consumer Groups

“Group”,如果使用過 Kafka 的童鞋并不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。

微服務(wù)中動態(tài)的縮放同一個應(yīng)用的數(shù)量以此來達(dá)到更高的處理能力是非常必須的。對于這種情況,同一個事件防止被重復(fù)消費(fèi),只要把這些應(yīng)用放置于同一個 “group” 中,就能夠保證消息只會被其中一個應(yīng)用消費(fèi)一次。

Durability

消息事件的持久化是必不可少的。Spring Cloud Stream 可以動態(tài)的選擇一個消息隊(duì)列是持久化,還是 present。

Bindings

bindings 是我們通過配置把應(yīng)用和spring cloud stream 的 binder 綁定在一起,之后我們只需要修改 binding 的配置來達(dá)到動態(tài)修改topic、exchange、type等一系列信息而不需要修改一行代碼。

基于 RabbitMQ 使用

以下內(nèi)容源碼: spring cloud demo

消息接收

Spring Cloud Stream 基本用法,需要定義一個接口,如下是內(nèi)置的一個接口。

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

注釋__ @Input__ 對應(yīng)的方法,需要返回 __ SubscribableChannel __ ,并且參入一個參數(shù)值。

這就接口聲明了一個__ binding __命名為 “input” 。

其他內(nèi)容通過配置指定:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: mqTestDefault   

destination:指定了消息獲取的目的地,對應(yīng)于MQ就是 exchange,這里的exchange就是 mqTestDefault

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    // 監(jiān)聽 binding 為 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("一般監(jiān)聽收到:" + message.getPayload());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

定義一個 class (這里直接在啟動類),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同時定義一個方法(此處是 input)標(biāo)明注解為 __ @StreamListener(Processor.INPUT) __,方法參數(shù)為 Message 。

啟動后,默認(rèn)是會創(chuàng)建一個臨時隊(duì)列,臨時隊(duì)列綁定的exchange為 “mqTestDefault”,routing key為 “#”。

所有發(fā)送 exchange 為“mqTestDefault” 的MQ消息都會被投遞到這個臨時隊(duì)列,并且觸發(fā)上述的方法。

以上代碼就完成了最基本的消費(fèi)者部分。

消息發(fā)送

消息的發(fā)送同消息的接受,都需要定義一個接口,不同的是接口方法的返回對象是 MessageChannel,下面是 Spring Cloud Stream 內(nèi)置的接口:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

這就接口聲明了一個 binding 命名為 “output” ,不同于上述的 “input”,這個binding 聲明了一個消息輸出流,也就是消息的生產(chǎn)者。

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: mqTestDefault
          contentType: text/plain

contentType:用于指定消息的類型。具體可以參考 spring cloud stream docs

destination:指定了消息發(fā)送的目的地,對應(yīng) RabbitMQ,會發(fā)送到 exchange 是 mqTestDefault 的所有消息隊(duì)列中。

代碼中調(diào)用:

@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {

    @Autowired
    @Qualifier("output")
    MessageChannel output;

    @Override
    public void run(String... strings) throws Exception {
        // 字符串類型發(fā)送MQ
        System.out.println("字符串信息發(fā)送");
        output.send(MessageBuilder.withPayload("大家好").build());
    }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
    
}

通過注入MessageChannel的方式,發(fā)送消息。

通過注入Source 接口的方式,發(fā)送消息。 具體可以查看樣例

以上代碼就完成了最基本的生產(chǎn)者部分。

自定義消息發(fā)送接收

自定義接口

Spring Cloud Stream 內(nèi)置了兩種接口,分別定義了 binding 為 “input” 的輸入流,和 “output” 的輸出流,而在我們實(shí)際使用中,往往是需要定義各種輸入輸出流。使用方法也很簡單。

interface OrderProcessor {

    String INPUT_ORDER = "inputOrder";
    String OUTPUT_ORDER = "outputOrder";

    @Input(INPUT_ORDER)
    SubscribableChannel inputOrder();

    @Output(OUTPUT_ORDER)
    MessageChannel outputOrder();
}

一個接口中,可以定義無數(shù)個輸入輸出流,可以根據(jù)實(shí)際業(yè)務(wù)情況劃分。上述的接口,定義了一個訂單輸入,和訂單輸出兩個 binding。

使用時,需要在 @EnableBinding 注解中,添加自定義的接口。
使用 @StreamListener 做監(jiān)聽的時候,需要指定 OrderProcessor.INPUT_ORDER

spring:
  cloud:
    stream:
      defaultBinder: defaultRabbit
      bindings:
        inputOrder:
          destination: mqTestOrder
        outputOrder:
          destination: mqTestOrder

如上配置,指定了 destination 為 mqTestOrder 的輸入輸出流。

分組與持久化

上述自定義的接口配置中,Spring Cloud Stream 會在 RabbitMQ 中創(chuàng)建一個臨時的隊(duì)列,程序關(guān)閉,對應(yīng)的連接關(guān)閉的時候,該隊(duì)列也會消失。而在實(shí)際使用中,我們需要一個持久化的隊(duì)列,并且指定一個分組,用于保證應(yīng)用服務(wù)的縮放。

只需要在消費(fèi)者端的 binding 添加配置項(xiàng) spring.cloud.stream.bindings.[channelName].group = XXX 。對應(yīng)的隊(duì)列就是持久化,并且名稱為:mqTestOrder.XXX

rabbitMQ routing key 綁定

用慣了 rabbitMQ 的童鞋,在使用的時候,發(fā)現(xiàn) Spring Cloud Stream 的消息投遞,默認(rèn)是根據(jù) destination + group 進(jìn)行區(qū)分,所有的消息都投遞到 routing key 為 “#‘’ 的消息隊(duì)列里。

如果我們需要進(jìn)一步根據(jù) routing key 來進(jìn)行區(qū)分消息投遞的目的地,或者消息接受,需要進(jìn)一步配,Spring Cloud Stream 也提供了相關(guān)配置:

spring:
  cloud:
    stream:
      bindings:
        inputProductAdd:
          destination: mqTestProduct
          group: addProductHandler      # 擁有 group 默認(rèn)會持久化隊(duì)列
        outputProductAdd:
          destination: mqTestProduct
      rabbit:
        bindings:
          inputProductAdd:
            consumer:
              bindingRoutingKey: addProduct.*       # 用來綁定消費(fèi)者的 routing key
          outputProductAdd:
            producer:
              routing-key-expression: '''addProduct.*'''  # 需要用這個來指定 RoutingKey

spring.cloud.stream.rabbit.bindings.[channelName].consumer.bindingRoutingKey
指定了生成的消息隊(duì)列的routing key

spring.cloud.stream.rabbit.bindings.[channelName].producer.routing-key-expression 指定了生產(chǎn)者消息投遞的routing key

DLX 隊(duì)列

DLX 作用

DLX:Dead-Letter-Exchange(死信隊(duì)列)。利用DLX, 當(dāng)消息在一個隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX。消息變成死信一向有一下幾種情況:

消息被拒絕(basic.reject/ basic.nack)并且requeue=false
消息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間)
隊(duì)列達(dá)到最大長度

DLX也是一個正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個隊(duì)列的屬性,當(dāng)這個隊(duì)列中有死信時,RabbitMQ就會自動的將這個消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個隊(duì)列,可以監(jiān)聽這個隊(duì)列中消息做相應(yīng)的處理。

Spring Cloud Stream 中使用

spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true

spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true

配置說明,可以參考 spring cloud stream rabbitmq consumer properties

結(jié)論

Spring Cloud Stream 最大的方便之處,莫過于抽象了事件驅(qū)動的一些概念,對于消息中間件的進(jìn)一步封裝,可以做到代碼層面對中間件的無感知,甚至于動態(tài)的切換中間件,切換topic。使得微服務(wù)開發(fā)的高度解耦,服務(wù)可以關(guān)注更多自己的業(yè)務(wù)流程。

相關(guān)文檔

spring cloud stream 文檔

spring cloud stream 項(xiàng)目

spring cloud stream 樣例

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容