【Spring Cloud Stream】入門(mén),并簡(jiǎn)單的配置Consumer進(jìn)行消息接收

【本文內(nèi)容】

  • 本文基于Spring Cloud Stream 3.2.4版本。
  • 介紹了官方的示例,定義Consumer接口來(lái)接收消息:
    • 使用的是RabbitMQ作為消息中間件:spring-cloud-stream-binder-rabbit
    • application.properties中加入RabbitMQ相關(guān)配置
    • 定義bean類(lèi)型為Consumer接口的方法log()
  • 一些遺留問(wèn)題,如@EnableBInding、@StreamListener都已經(jīng)不再使用(deprecated)。
  • 配置相關(guān)的學(xué)習(xí):
    • 自動(dòng)配置:當(dāng)一個(gè)bean類(lèi)型為SupplierFunctionConsumer時(shí)自動(dòng)配置bindings。如上述官方示例方法log(),會(huì)定義exchange=log-in-0,queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ,并通過(guò)routingKey=#綁定到exchange上。
    • 手動(dòng)配置:bean類(lèi)型為Function時(shí)方法uppercase():
      spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
      spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
    • 加上binding name的手動(dòng)配置
      spring.cloud.stream.function.bindings.uppercase-in-0=myInput
      spring.cloud.stream.bindings.myInput.destination=my-topic-in
      spring.cloud.stream.function.bindings.uppercase-out-0=output
      spring.cloud.stream.bindings.output.destination=my-topic-out
    • 多個(gè)functional時(shí)的binding:遇到超過(guò)1個(gè)functional的時(shí)候,需要顯性指定function,如同時(shí)存在log()uppercase()
      spring.cloud.function.definition=log;uppercase
    • 沒(méi)有functional的時(shí)候的binding(配合StreamBridge使用)
      spring.cloud.stream.input-bindings=fooin;barin
      spring.cloud.stream.output-bindings=fooout;barout
      則會(huì)創(chuàng)建2個(gè)in和2個(gè)out,其中in:fooin-in-0,barin-in-0

1. 前言

官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html

在數(shù)據(jù)整合方面,Spring提供了自己的解決方案,即Spring Integration (https://spring.io/projects/spring-integration/),它提供了和外部系統(tǒng)連接的模式,外部系統(tǒng)如數(shù)據(jù)庫(kù),消息中間件,郵件等等。

后來(lái)隨著微服務(wù)的流行,Spring Boot架構(gòu)成為主流,Spring Integration和Spring Boot集成,形成了一個(gè)新的項(xiàng)目——Spring Cloud Stream。

Spring Cloud Stream可以幫助app從event事件中解藕出來(lái),如在某個(gè)時(shí)間節(jié)點(diǎn)某個(gè)事件會(huì)發(fā)生,再通知給下游的app。總而言之,Spring Cloud Stream試圖提供一種通用的抽象來(lái)支持不同的消息中間件,如RabbitMQ, Apache Kafka, Amazon Kinesis。換句話說(shuō),如果我們的系統(tǒng)在某個(gè)時(shí)候用的消息中間件是RabbitMQ,如果使用Spring Cloud Stream來(lái)處理消息,那么某個(gè)版本后想要切換到Kafka,那么不需要改很多代碼,因?yàn)槲覀兪敲嫦騍pring Cloud Stream的API在開(kāi)發(fā),而不是具體的跟RabbitMQ或是Kafka的API打交道。

2. 官方簡(jiǎn)單的例子

參考:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-creating-sample-application

我用的是消息中間件是RabbitMQ。首先確保本地RabbitMQ已經(jīng)安裝。

2.1 依賴

除了Spring Boot相關(guān),還需要加入:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            <version>3.2.4</version>
        </dependency>
2.2 配置

新建application.properties,配置RabbitMQ相關(guān):

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.host=localhost
spring.rabbitmq.virtual-host=stream-test
2.3 相關(guān)代碼
  • 使用了java8新接口Consumer來(lái)作為message的handler。關(guān)于Consumer,可以參考網(wǎng)友寫(xiě)的文章:【Java 8 新特性】Java Consumer示例
  • 定義了這個(gè)handler,Spring Cloud Stream會(huì)自動(dòng)的定義相關(guān)的input destination。
  • 另外這個(gè)handler會(huì)自動(dòng)的將消息轉(zhuǎn)為Person對(duì)象,即消息為Json格式。
@SpringBootApplication
public class StreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }

    @Bean
    public Consumer<Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String toString() {
            return this.name;
        }
    }
}
2.4 測(cè)試

啟動(dòng)項(xiàng)目后,可以看到日志:

2023-01-23 14:02:22.760 INFO 77705 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ, bound to: log-in-0
2023-01-23 14:02:22.762 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-01-23 14:02:22.848 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2accaec2:0/SimpleConnection@3869a6e5 [delegate=amqp://guest@127.0.0.1:5672/stream-test, localPort= 58498]
...
2023-01-23 14:02:22.949 INFO 77705 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ'
2023-01-23 14:02:22.969 INFO 77705 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2023-01-23 14:02:22.978 INFO 77705 --- [ main] com.faj.StreamApplication : Started StreamApplication in 3.301 seconds (JVM running for 3.778)

嘗試在RabbitMQ console中publish一條消息:
image.png

發(fā)送成功后,app端可以收到消息:
image.png

3. 一些遺留的問(wèn)題

經(jīng)??吹揭恍﹉ello world例子(如https://www.baeldung.com/spring-cloud-stream),會(huì)用一些annotation:@EnableBInding、@StreamListener,目前在3.2.4版本中都已經(jīng)不推薦使用了(即deprecated了)。推薦使用的是基于functional的編程模式,具體看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring_cloud_function。

4. 配置解釋

4.1 自動(dòng)創(chuàng)建exchange和queue并binding

在#2中的例子,我們創(chuàng)建了一個(gè)返回Java Consumer接口的bean:

    @Bean
    public Consumer<Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

打開(kāi)RabbitMQ console page:http://localhost:15672/#/exchanges
可以看到自動(dòng)創(chuàng)建了類(lèi)型為topic的exchange=log-in-0:

image.png

并且創(chuàng)建了queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ:
image.png

綁定信息:
image.png

可以看到基于Spring Cloud Stream在啟動(dòng)時(shí)幫我們做了很多自動(dòng)化的配置:
當(dāng)一個(gè)bean類(lèi)型為SupplierFunctionConsumer時(shí)(這三個(gè)接口皆為java 8引入),就會(huì)當(dāng)作是Spring Cloud Stream的message handlers,并會(huì)找尋相關(guān)的destination的binding。如果沒(méi)有配置,則會(huì)按照遵循特定的規(guī)則。

4.2 手動(dòng)配置的規(guī)則

基于functional編程binding的規(guī)則說(shuō)明,以返回類(lèi)型是Function為例:

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }

在#4.1中的定義的Comsumer,關(guān)心的是如何消費(fèi),所以app中會(huì)自動(dòng)創(chuàng)建input相關(guān)的配置。而本例中bean的類(lèi)型為Function接口,它會(huì)有inputoutput,即接收消息,然后再將消息發(fā)送到下游。

input和output相關(guān)的bindings如下:

  • input:<方法名> + -in- + <index>
    -output:<方法名> + -out- + <index>

in或是out表示binding的類(lèi)型(即inputoutput)。index表示序列號(hào),如果是單個(gè)input/output,那么就是0。

如果是手動(dòng)配置,我們希望uppercase()的接收queue為my-queue,那么input的完整的配置如下:

spring.cloud.stream.bindings.uppercase-in-0.destination=my-queue

output的配置也類(lèi)似,使用的是:uppercase-out-0。

4.3 Binding名

Descriptive Binding Names
為了增加可讀性,可以給binding一個(gè)描述性的名字,可以通過(guò)配置spring.cloud.stream.function.bindings.<binding-name>來(lái)實(shí)現(xiàn)。如:

spring.cloud.stream.function.bindings.uppercase-in-0=myInput

即,通過(guò)上述配置,我們把uppercase-in-0重命名為myInput,那么我們?cè)诙xdetination的時(shí)候,就可以用myInput來(lái)定義了:

spring.cloud.stream.bindings.myInput.destination=my-queue
4.4 測(cè)試

基于#4.2代碼的測(cè)試:
case-1:使用自動(dòng)配置,創(chuàng)建的exchange如下:

image.png

queue如下:
image.png

input的exchange會(huì)binding到queue上:
image.png

output沒(méi)有binding的queue,很好理解,因?yàn)楫?dāng)前的app只需要關(guān)心發(fā)送到哪個(gè)exchange,至于下游誰(shuí)在接收,它并不關(guān)心。

case-2:如果我們手動(dòng)配置:

spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out

或用binding name來(lái)配置(和上面的配置等價(jià)):

spring.cloud.stream.function.bindings.uppercase-in-0=myInput
spring.cloud.stream.bindings.myInput.destination=my-topic-in

spring.cloud.stream.function.bindings.uppercase-out-0=output
spring.cloud.stream.bindings.output.destination=my-topic-out

那么創(chuàng)建的exchange如下:
image.png

binding如下:
image.png
4.5 多個(gè)functional時(shí)的binding

如果只有一個(gè)functional bean,如#2的log()或是#4.2中的uppercase(),那么不需要配置spring.cloud.function.definition(依賴說(shuō)Spring Cloud Stream的auto-discovered機(jī)制),反之則需要配置。

比如我們配置了兩個(gè)bean:

    @Bean
    public Consumer<Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }

如果我們?cè)?code>application.properties文件中沒(méi)有任何配置,打開(kāi)RabbitMQ console會(huì)發(fā)現(xiàn)相關(guān)的exchange/queue并沒(méi)有創(chuàng)建。加入配置:

spring.cloud.function.definition=log;uppercase

那么functional=log和uppercase會(huì)創(chuàng)建相關(guān)配置,而another()并不會(huì):
image.png

ps,一個(gè)bean時(shí)auto-detect機(jī)制會(huì)起作用,想要關(guān)閉這個(gè)功能,可以用spring.cloud.stream.function.autodetect=false。

4.6 沒(méi)有functional的時(shí)候的binding

我們上述都是基于functional的配置(即三個(gè)接口:Function,SupplierConsumer),有時(shí)候我們需要明確的binding,但不需要綁定到具體的function上。如需要支持Spring的其它框架(如Spring Integration),這時(shí)候可能會(huì)用MessageChannel來(lái)操作。如果是這樣,那么可以直接定義input/output,多個(gè)input可以用;來(lái)區(qū)分:

spring.cloud.stream.input-bindings=fooin;barin
spring.cloud.stream.output-bindings=fooout;barout

上述的配置,不需要定義額外的function,創(chuàng)建的exchange如下:
image.png

創(chuàng)建的queue如下:
image.png

同樣的,input exchange會(huì)綁定到相應(yīng)的queue上,而output exchange不會(huì)有queue綁定。如:
image.png

那么沒(méi)有functional的時(shí)候,誰(shuí)負(fù)責(zé)消息的發(fā)送呢?Spring Cloud Stream提供了StreamBridge,也可以發(fā)送消息,具體看官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容