【本文內(nèi)容】
- 本文基于Spring Cloud Stream 3.2.4版本。
-
介紹了官方的示例,定義
Consumer接口來(lái)接收消息:- 使用的是RabbitMQ作為消息中間件:
spring-cloud-stream-binder-rabbit -
application.properties中加入RabbitMQ相關(guān)配置 - 定義bean類(lèi)型為
Consumer接口的方法log()
- 使用的是RabbitMQ作為消息中間件:
-
一些遺留問(wèn)題,如
@EnableBInding、@StreamListener都已經(jīng)不再使用(deprecated)。 -
配置相關(guān)的學(xué)習(xí):
-
自動(dòng)配置:當(dāng)一個(gè)bean類(lèi)型為
Supplier或Function或Consumer時(shí)自動(dòng)配置bindings。如上述官方示例方法log(),會(huì)定義exchange=log-in-0,queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ,并通過(guò)routingKey=#綁定到exchange上。 -
手動(dòng)配置:bean類(lèi)型為
Function時(shí)方法uppercase():
spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out -
加上binding name的手動(dòng)配置:
spring.cloud.stream.function.bindings.uppercase-in-0=myInput
spring.cloud.stream.bindings.myInput.destination=my-topic-in
spring.cloud.stream.function.bindings.uppercase-out-0=output
spring.cloud.stream.bindings.output.destination=my-topic-out -
多個(gè)functional時(shí)的binding:遇到超過(guò)1個(gè)functional的時(shí)候,需要顯性指定function,如同時(shí)存在
log()和uppercase():
spring.cloud.function.definition=log;uppercase -
沒(méi)有functional的時(shí)候的binding(配合
StreamBridge使用):
spring.cloud.stream.input-bindings=fooin;barin
spring.cloud.stream.output-bindings=fooout;barout
則會(huì)創(chuàng)建2個(gè)in和2個(gè)out,其中in:fooin-in-0,barin-in-0。
-
自動(dòng)配置:當(dāng)一個(gè)bean類(lèi)型為
1. 前言
官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html
在數(shù)據(jù)整合方面,Spring提供了自己的解決方案,即Spring Integration (https://spring.io/projects/spring-integration/),它提供了和外部系統(tǒng)連接的模式,外部系統(tǒng)如數(shù)據(jù)庫(kù),消息中間件,郵件等等。
后來(lái)隨著微服務(wù)的流行,Spring Boot架構(gòu)成為主流,Spring Integration和Spring Boot集成,形成了一個(gè)新的項(xiàng)目——Spring Cloud Stream。
Spring Cloud Stream可以幫助app從event事件中解藕出來(lái),如在某個(gè)時(shí)間節(jié)點(diǎn)某個(gè)事件會(huì)發(fā)生,再通知給下游的app。總而言之,Spring Cloud Stream試圖提供一種通用的抽象來(lái)支持不同的消息中間件,如RabbitMQ, Apache Kafka, Amazon Kinesis。換句話說(shuō),如果我們的系統(tǒng)在某個(gè)時(shí)候用的消息中間件是RabbitMQ,如果使用Spring Cloud Stream來(lái)處理消息,那么某個(gè)版本后想要切換到Kafka,那么不需要改很多代碼,因?yàn)槲覀兪敲嫦騍pring Cloud Stream的API在開(kāi)發(fā),而不是具體的跟RabbitMQ或是Kafka的API打交道。
2. 官方簡(jiǎn)單的例子
我用的是消息中間件是RabbitMQ。首先確保本地RabbitMQ已經(jīng)安裝。
2.1 依賴
除了Spring Boot相關(guān),還需要加入:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>3.2.4</version>
</dependency>
2.2 配置
新建application.properties,配置RabbitMQ相關(guān):
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.host=localhost
spring.rabbitmq.virtual-host=stream-test
2.3 相關(guān)代碼
- 使用了java8新接口
Consumer來(lái)作為message的handler。關(guān)于Consumer,可以參考網(wǎng)友寫(xiě)的文章:【Java 8 新特性】Java Consumer示例 - 定義了這個(gè)handler,Spring Cloud Stream會(huì)自動(dòng)的定義相關(guān)的input destination。
- 另外這個(gè)handler會(huì)自動(dòng)的將消息轉(zhuǎn)為
Person對(duì)象,即消息為Json格式。
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
2.4 測(cè)試
啟動(dòng)項(xiàng)目后,可以看到日志:
嘗試在RabbitMQ console中publish一條消息:2023-01-23 14:02:22.760 INFO 77705 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ, bound to: log-in-0
2023-01-23 14:02:22.762 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-01-23 14:02:22.848 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2accaec2:0/SimpleConnection@3869a6e5 [delegate=amqp://guest@127.0.0.1:5672/stream-test, localPort= 58498]
...
2023-01-23 14:02:22.949 INFO 77705 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ'
2023-01-23 14:02:22.969 INFO 77705 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2023-01-23 14:02:22.978 INFO 77705 --- [ main] com.faj.StreamApplication : Started StreamApplication in 3.301 seconds (JVM running for 3.778)


3. 一些遺留的問(wèn)題
經(jīng)??吹揭恍﹉ello world例子(如https://www.baeldung.com/spring-cloud-stream),會(huì)用一些annotation:@EnableBInding、@StreamListener,目前在3.2.4版本中都已經(jīng)不推薦使用了(即deprecated了)。推薦使用的是基于functional的編程模式,具體看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring_cloud_function。
-
Reactive模塊目前已停更,如果想要繼續(xù)使用,可以使用老的版本的spring-cloud-stream-reactive。 - 測(cè)試相關(guān)的support
spring-cloud-stream-test-support,具體查看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_testing。 -
StreamMessageConverter不再使用(deprecated)。 -
original-content-type不再使用(deprecated)。 -
BinderAwareChannelResolver不再使用(deprecated)。
4. 配置解釋
4.1 自動(dòng)創(chuàng)建exchange和queue并binding
在#2中的例子,我們創(chuàng)建了一個(gè)返回Java Consumer接口的bean:
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
打開(kāi)RabbitMQ console page:http://localhost:15672/#/exchanges
可以看到自動(dòng)創(chuàng)建了類(lèi)型為topic的exchange=log-in-0:



可以看到基于Spring Cloud Stream在啟動(dòng)時(shí)幫我們做了很多自動(dòng)化的配置:
當(dāng)一個(gè)bean類(lèi)型為Supplier或Function或Consumer時(shí)(這三個(gè)接口皆為java 8引入),就會(huì)當(dāng)作是Spring Cloud Stream的message handlers,并會(huì)找尋相關(guān)的destination的binding。如果沒(méi)有配置,則會(huì)按照遵循特定的規(guī)則。
4.2 手動(dòng)配置的規(guī)則
基于functional編程binding的規(guī)則說(shuō)明,以返回類(lèi)型是Function為例:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
在#4.1中的定義的Comsumer,關(guān)心的是如何消費(fèi),所以app中會(huì)自動(dòng)創(chuàng)建input相關(guān)的配置。而本例中bean的類(lèi)型為Function接口,它會(huì)有input和output,即接收消息,然后再將消息發(fā)送到下游。
input和output相關(guān)的bindings如下:
- input:
<方法名> + -in- + <index>
-output:<方法名> + -out- + <index>
in或是out表示binding的類(lèi)型(即input和output)。index表示序列號(hào),如果是單個(gè)input/output,那么就是0。
如果是手動(dòng)配置,我們希望uppercase()的接收queue為my-queue,那么input的完整的配置如下:
spring.cloud.stream.bindings.uppercase-in-0.destination=my-queue
output的配置也類(lèi)似,使用的是:uppercase-out-0。
4.3 Binding名
Descriptive Binding Names
為了增加可讀性,可以給binding一個(gè)描述性的名字,可以通過(guò)配置spring.cloud.stream.function.bindings.<binding-name>來(lái)實(shí)現(xiàn)。如:
spring.cloud.stream.function.bindings.uppercase-in-0=myInput
即,通過(guò)上述配置,我們把uppercase-in-0重命名為myInput,那么我們?cè)诙xdetination的時(shí)候,就可以用myInput來(lái)定義了:
spring.cloud.stream.bindings.myInput.destination=my-queue
4.4 測(cè)試
基于#4.2代碼的測(cè)試:
case-1:使用自動(dòng)配置,創(chuàng)建的exchange如下:



output沒(méi)有binding的queue,很好理解,因?yàn)楫?dāng)前的app只需要關(guān)心發(fā)送到哪個(gè)exchange,至于下游誰(shuí)在接收,它并不關(guān)心。
case-2:如果我們手動(dòng)配置:
spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
或用binding name來(lái)配置(和上面的配置等價(jià)):
spring.cloud.stream.function.bindings.uppercase-in-0=myInput
spring.cloud.stream.bindings.myInput.destination=my-topic-in
spring.cloud.stream.function.bindings.uppercase-out-0=output
spring.cloud.stream.bindings.output.destination=my-topic-out
那么創(chuàng)建的exchange如下:

4.5 多個(gè)functional時(shí)的binding
如果只有一個(gè)functional bean,如#2的log()或是#4.2中的uppercase(),那么不需要配置spring.cloud.function.definition(依賴說(shuō)Spring Cloud Stream的auto-discovered機(jī)制),反之則需要配置。
比如我們配置了兩個(gè)bean:
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
如果我們?cè)?code>application.properties文件中沒(méi)有任何配置,打開(kāi)RabbitMQ console會(huì)發(fā)現(xiàn)相關(guān)的exchange/queue并沒(méi)有創(chuàng)建。加入配置:
spring.cloud.function.definition=log;uppercase
那么functional=log和uppercase會(huì)創(chuàng)建相關(guān)配置,而another()并不會(huì):
ps,一個(gè)bean時(shí)auto-detect機(jī)制會(huì)起作用,想要關(guān)閉這個(gè)功能,可以用spring.cloud.stream.function.autodetect=false。
4.6 沒(méi)有functional的時(shí)候的binding
我們上述都是基于functional的配置(即三個(gè)接口:Function,Supplier,Consumer),有時(shí)候我們需要明確的binding,但不需要綁定到具體的function上。如需要支持Spring的其它框架(如Spring Integration),這時(shí)候可能會(huì)用MessageChannel來(lái)操作。如果是這樣,那么可以直接定義input/output,多個(gè)input可以用;來(lái)區(qū)分:
spring.cloud.stream.input-bindings=fooin;barin
spring.cloud.stream.output-bindings=fooout;barout
上述的配置,不需要定義額外的function,創(chuàng)建的exchange如下:


那么沒(méi)有functional的時(shí)候,誰(shuí)負(fù)責(zé)消息的發(fā)送呢?Spring Cloud Stream提供了StreamBridge,也可以發(fā)送消息,具體看官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources