【前置文章】
- 【Spring Cloud Stream】入門,并簡單的配置Consumer進(jìn)行消息接收
- 【Spring Cloud Stream】發(fā)送和接收消息
- 【RabbitMQ的那點(diǎn)事】Exchange類型(超詳細(xì))
- 【RabbitMQ的那點(diǎn)事】訂閱/發(fā)布模式下,動(dòng)態(tài)新建queue,實(shí)現(xiàn)所有節(jié)點(diǎn)都能收到消息
- 【RabbitMQ的那點(diǎn)事】死信列隊(duì)(Dead Letter Queue)
【官方文檔】
【本文內(nèi)容】
主要是基于官方文檔進(jìn)行的整合。
- 默認(rèn)生成的exchange為
topic類型。 - 解釋了
group的配置,不配即為publish/subscribe模式,配了即為Work Queues模式。 - 使用已經(jīng)存在的exchange和queue。
- RabbitMQ Binder的重試機(jī)制,以及如何配置死信隊(duì)列。
- 消費(fèi)端設(shè)置并發(fā)數(shù)。
- 消費(fèi)端手動(dòng)應(yīng)答模式。
另外,沒有介紹到Error Channels,可以參考官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-error-channels
1. Rabbit Binder

默認(rèn)情況下RabbitMQ Binder創(chuàng)建的是TopicExchange。每個(gè)consumer group中,會(huì)有一個(gè)queue綁定到這個(gè)TopicExchange上。


默認(rèn)情況下routingKey=#,即匹配所有:



2. 有無配置group導(dǎo)致的兩種模式
2.1 Publish/Subscribe模式(fanout)
官方的Publish/Subscribe模式:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
Publish/Subscribe模式,如需要更新所有節(jié)點(diǎn)上的基于內(nèi)存的緩存:
如果用Spring Boot實(shí)現(xiàn),可以參考上述第4個(gè)前置文章(即:【RabbitMQ的那點(diǎn)事】訂閱/發(fā)布模式下,動(dòng)態(tài)新建queue,實(shí)現(xiàn)所有節(jié)點(diǎn)都能收到消息)。
用Spring Cloud Stream實(shí)現(xiàn)的話,只要不聲明group即可(也就是第1章截圖的匿名的consumers)。
2.2 Work Queues模式
官方的Work Queues模式:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
用圖表示,可以看到多個(gè)Consumer監(jiān)聽同一個(gè)Queue,依次拿到消息:
用Spring Boot寫的話,可以新建DirectExchange,再新建Queue,這時(shí)候的程序可以布署兩個(gè)instance節(jié)點(diǎn),那么在實(shí)際環(huán)境中兩個(gè)節(jié)點(diǎn)就會(huì)依次收到消息。
用Spring Cloud Stream寫的話,只需要加上group屬性即可,比如我們有一個(gè)Consumer:
@Bean
public Consumer<String> log() {
return person -> {
System.out.println("Received: " + person);
};
}
那么在application.properties中定義:
spring.cloud.stream.bindings.log-in-0.destination=log-in-0
spring.cloud.stream.bindings.log-in-0.group=log
此時(shí)自動(dòng)創(chuàng)建的exchange=log-in-0,queue=log-in-0.log,這時(shí)候的queue則為durable的queue,即程序down后queue還是存在,并不會(huì)被刪除。
定義的方式也可按binding name進(jìn)行配置,即先給log-in-0的綁定起個(gè)描述性的name,再根據(jù)該name進(jìn)行定義group,這樣自動(dòng)創(chuàng)建的exchange則會(huì)=mylog,queue=mylog.log:
spring.cloud.stream.function.bindings.log-in-0=mylog
spring.cloud.stream.bindings.mylog.group=log
3. 使用已經(jīng)存在的exchange和queue
我們先在RabbitMQ Broker中新建以下關(guān)系:
-
exchange=
order.in,queue=order,routingKey=order.in.routing,截圖如下:image.png -
exchange=
order.out,queue=order.out.test,routingKey=order.out.routing,截圖如下:image.png
我們?cè)诖a是創(chuàng)建了一個(gè)Function,用來接收來自queue=order,處理完成后發(fā)送給exchange=order.out:
@Bean
public Function<String, String> handleOrder() {
return order -> {
System.out.println("Received: " + order);
return "finished for " + order;
};
}
那么,在application.yaml中配置如下:
spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn
spring.cloud.stream.bindings.orderIn.destination=order.in
spring.cloud.stream.bindings.orderIn.group=order
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindingRoutingKey=order.in.routing
spring.cloud.stream.function.bindings.handleOrder-out-0=orderOut
spring.cloud.stream.bindings.orderOut.destination=order.out
spring.cloud.stream.rabbit.bindings.orderOut.producer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.routingKeyExpression='order.out.routing'
【解釋】
首先是通用配置,以spring.cloud.stream.bindings開頭:
-
spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn:這個(gè)在第1篇前置文章#4.3有講到,為了配置方便,我們特定加了bindings的命名。 -
<通用配置>.<bindingsName>.destination表示exchnge的名字。是Spring Cloud Stream的通用配置。 -
<通用配置>.<bindingsName>.group,在#2.2有解釋。
其次是RabbitMQ的專有配置,以spring.cloud.stream.rabbit.bindings開頭,consumer端所有的配置,可以參考官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties:
-
<專有配置>.<bindingsName>.consumer.bindQueue:表示是否要?jiǎng)?chuàng)建queue并綁定到相應(yīng)的exchange上。默認(rèn)為true,即會(huì)創(chuàng)建并綁定。false表示不會(huì)新建。 -
<專有配置>.<bindingsName>.consumer.declareExchange:表示是否按destination的值創(chuàng)建exchange(默認(rèn)為true,并與destination同名)。false表示不會(huì)新建。 -
<專有配置>.<bindingsName>.consumer.queueNameGroupOnly:默認(rèn)為false。當(dāng)值為true表示在consumer端的queue的名字等于我們定義的group的名字,即當(dāng)我們想要使用已經(jīng)存在的queue時(shí),我們可以設(shè)置這個(gè)值為true,并把group的名字定義為queue的名字。 -
<專有配置>.<bindingsName>.consumer.bindingRoutingKey:默認(rèn)為#(即all match,匹配所有的字符)。自定義的時(shí)候可以設(shè)置多個(gè)值,想要多個(gè)值時(shí)可配合bindingRoutingKeyDelimiter使用。
在Producer端的配置,可以參考官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-prod-props
-
<專有配置>.<bindingsName>.producer.routingKeyExpression:值得注意的是這個(gè)配置,它是SpEL表達(dá)式,對(duì)于已經(jīng)確定的routingKey,需要用單引號(hào)標(biāo)記,以表示是字符串,如我們的例子中的'order.out.routing'。
4. RabbitMQ Binder的重試機(jī)制
當(dāng)開啟了重試機(jī)制,如果有消息被退回,那么將阻塞消費(fèi)端(會(huì)一直重復(fù)消費(fèi)該消息),通常情況下,可以使用Dead Letter Queue(死信隊(duì)列)來解決這個(gè)問題。
#disable binder retries
spring.cloud.stream.bindings.orderIn.consumer.max-attempts=5
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dlq-ttl=50000
spring.cloud.stream.rabbit.bindings.orderIn.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dead-letter-queue-name=myDlq
【解釋】
-
max-attempts表示重試次數(shù),默認(rèn)為3。如果要關(guān)閉重試,可以將value置為1。如果值>1,在遇到錯(cuò)誤時(shí),會(huì)先嘗試重試,然后才是將信息放到Dead Letter Queue中。以下是max-attempts=5時(shí)的日志:image.png
關(guān)于max-attempts,每重試一次,會(huì)在message的header中deliveryAttempt的值+1,比如第一次:image.png
第二次重試時(shí):image.png -
dlq-ttl表示queue中的message的time-to-live的時(shí)間,即過期時(shí)間,這里的意思是生成的死信隊(duì)列myDlx中的消息過期時(shí)間為5秒。 -
auto-bind-dlq=true表示會(huì)創(chuàng)建一個(gè)Dead Letter Queue并進(jìn)行綁定,默認(rèn)情況下生成的死信隊(duì)列名為{destination}.dlq,我們上述的例子,默認(rèn)的DLQ為order.dlq。也可通過dead-letter-queue-name進(jìn)行重新命名,即我們的myDlq。 - 如果沒有特別定義,那么默認(rèn)生成的Dead Letter Exchange名字則為DLX。也可通過
dead-letter-exchange進(jìn)行定義。另外默認(rèn)生成的DLX類型為direct,可通過dead-letter-exchange-type配置成其它類型如fanout或topic。 - 也可定義
dead-letter-routing-key,即DLX和DLQ進(jìn)行綁定時(shí)的routingKey,默認(rèn)為{destination},我們的case中為order。截圖如下:image.png
ps. 上述的DLQ和DLX默認(rèn)的命名可以再通過配置prefix來加上前綴,如我們定義了prefix=apple,那么如果我們沒有自己定義命名,默認(rèn)的Queue則為apple.{destination}.dlq,默認(rèn)的Exchange則為apple.DLX。
生成的DLX和DLQ如下:


【測試】
想要測試消息到達(dá)死信隊(duì)列,那么可以在正常消費(fèi)端的代碼中拋出異常:AmqpRejectAndDontRequeueException。這個(gè)異常的名字已經(jīng)很直白了,即否定應(yīng)答(basic.reject)以及requeue=false。
還有一種方式即先設(shè)置requeue-rejected值等于false(默認(rèn)即為false),然后拋出任意錯(cuò)誤即可。
spring.cloud.stream.rabbit.bindings.orderIn.consumer.requeue-rejected=false
比如我們定義Function如下,即收到消息等于字符串error,則報(bào)異常。
@Bean
public Function<String, String> handleOrder() {
return order -> {
log.info("handleOrder Received: " + order);
if (order.equals("error")) {
throw new AmqpRejectAndDontRequeueException("error");
}
return "finished for " + order;
};
}
同時(shí)我們給DLQ也設(shè)置了監(jiān)聽:
@RabbitListener(queues = {"myDlq"})
public void deadLetterListener(String message) {
log.info("Received Dead Letter message: {}", message);
}
運(yùn)行程序,在RabbitMQ Console中發(fā)送消息=error:
查看日志,首先是5次retry嘗試,然后由于消費(fèi)端拋出異常AmqpRejectAndDontRequeueException,消息轉(zhuǎn)而通過DLX發(fā)送到了我們的死信隊(duì)列中,即myDlq:

5. 消費(fèi)端設(shè)置并發(fā)數(shù)
在消費(fèi)端設(shè)置并發(fā)數(shù),這個(gè)并不是RabbitMQ的參數(shù),而是Spring Cloud Stream的通用參數(shù),詳見:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_consumer_properties
比如我們?cè)O(shè)置了并發(fā)數(shù)為5:
spring.cloud.stream.bindings.orderIn.consumer.concurrency=5
那么消費(fèi)端則會(huì)開5個(gè)線程來消費(fèi)消息:

6. 消費(fèi)端手動(dòng)應(yīng)答模式
可通過設(shè)置acknowledge-mode,默認(rèn)為AUTO。
設(shè)置手動(dòng)應(yīng)答模式:
spring.cloud.stream.rabbit.bindings.orderIn.consumer.acknowledge-mode=MANUAL
消費(fèi)端代碼,代碼還是通過channel和delivertyTag進(jìn)行應(yīng)答,和Spring Boot的代碼差不多。
@Bean
public Function<Message<String>, Message<String>> handleOrder() {
return message -> {
log.info("handleOrder Received: " + message.getPayload());
Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
try {
if (message.getPayload().equals("error")) {
channel.basicNack(deliveryTag, false, false);
} else {
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return new GenericMessage<>("finished for " + message, message.getHeaders());
};
}





