1. 下載RabbitMQ
可以使用docker鏡像:
https://python.iitter.com/other/170428.html
2. 添加依賴
生產(chǎn)者和消費(fèi)者都需要添加以下依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3. 生產(chǎn)者
@Service
@Slf4j
public class Producer {
@Autowired
private StreamBridge streamBridge;
public void send(Object message) {
log.info("sent: {}", message);
streamBridge.send("testMessage-out-0", message);
}
}
4. 消費(fèi)者
@Slf4j
@Service
public class Consumer {
@Autowired
private ConsumerService consumerService;
@Bean
public Consumer<Object> testMessage() {
return request -> {
log.info("received: {}", request);
consumerService.testConsumer(request);
};
}
}
在默認(rèn)情況下,框架會使用消費(fèi)者方法的 method name 作為當(dāng)前消費(fèi)者的標(biāo)識,如果消費(fèi)者標(biāo)識和配置文件中的名稱不一致,那么 Spring 應(yīng)用就不知道該把當(dāng)前的消費(fèi)者綁定到哪一個 Stream 信道上去。
5. 配置文件
- Binder
如果程序中只使用了單一的中間件,比如只接入了 RabbitMQ,那么可以直接在 spring.rabbitmq 節(jié)點(diǎn)下配置連接串,不需要特別指定 binders 配置。
spring:
cloud:
stream:
# 如果你項目里只對接一個中間件,那么不用定義binders
# 當(dāng)系統(tǒng)要定義多個不同消息中間件的時候,使用binders定義
binders:
my-rabbit:
type: rabbit # 消息中間件類型
environment: # 連接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
- bindings
配置了生產(chǎn)者、消費(fèi)者、binder 和 RabbitMQ 四方的關(guān)聯(lián)關(guān)系。
spring:
cloud:
stream:
bindings:
# 添加 Producer
testMessage-out-0:
destination: testMessage-topic
content-type: application/json
binder: my-rabbit
# Consumer
testMessage-in-0:
destination: testMessage-topic
content-type: application/json
# 消費(fèi)組,同一個組內(nèi)只能被消費(fèi)一次
group: add-coupon-group
binder: my-rabbit
function:
definition: testMessage
如上就可以生成和消費(fèi)消息了。
6. 異常處理
- 重試
消息重試是一種簡單高效的異?;謴?fù)手段,當(dāng) Consumer 端拋出異常的時候,Stream 會自動執(zhí)行 2 次重試。
spring:
cloud:
stream:
bindings:
testMessage-in-0:
destination: testMessage-topic
content-type: application/json
# 消費(fèi)組,同一個組內(nèi)只能被消費(fèi)一次
group: testMessage-group
binder: my-rabbit
consumer:
# 如果最大嘗試次數(shù)為1,即不重試
# 默認(rèn)是做3次嘗試
max-attempts: 5
# 兩次重試之間的初始間隔
backOffInitialInterval: 2000
# 重試最大間隔
backOffMaxInterval: 10000
# 每次重試后,間隔時間乘以的系數(shù)
backOffMultiplier: 2
# 如果某個異常你不想重試,寫在這里
retryableExceptions:
java.lang.IllegalArgumentException: false
除了本地重試以外,還可以把這個失敗的消息丟回到原始隊列中,做一個 requeue 的操作。
spring:
cloud:
stream:
rabbit:
bindings:
# requeue重試
testMessage-in-0:
consumer:
requeue-rejected: true
- 降級
通過 spring-integration 的注解 @ServiceActivator 做了一個橋接,將指定 Channel 的異常錯誤轉(zhuǎn)到本地方法里。
@ServiceActivator(inputChannel = "testMessage-topic.testMessage-group.errors")
public void requestCouponFallback(ErrorMessage errorMessage) throws Exception {
log.info("consumer error: {}", errorMessage);
// 實現(xiàn)自己的邏輯
}
- 死信隊列
如果想要保留這條出錯的 Message,可以選擇將它發(fā)送到另一個 Queue 里。這個特殊的 Queue 就叫做死信隊列。
開啟
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
配置
spring:
cloud:
stream:
rabbit:
bindings:
testMessage-in-0:
consumer:
auto-bind-dlq: true
死信隊列的名稱和第一個隊列幾乎一樣,唯一區(qū)別就是末尾多了一個.dlq,這個 dlq 就是死信隊列的標(biāo)志。
7.延遲消息
下載rabbitmq_delayed_message_exchange插件
https://blog.csdn.net/u010375456/article/details/106323962/
- 生產(chǎn)者
// 使用延遲消息發(fā)送
public void sendInDelay(Object message) {
log.info("sent: {}", coupon);
streamBridge.send("testMessageDelay-out-0"
MessageBuilder.withPayload(message)
.setHeader("x-delay", 10 * 1000)
.build());
}
- 消費(fèi)者
@Bean
public Consumer<Object> testMessageDelay() {
return request -> {
log.info("received: {}", request);
service.consumer(request);
};
}
- 配置文件
spring:
cloud:
stream:
bindings:
# 延遲發(fā)券 - producer
testMessageDelay-out-0:
destination: testMessage-delayed-topic
content-type: application/json
binder: my-rabbit
# 延遲發(fā)券 - Consumer
testMessageDelay-in-0:
destination: testMessage-delayed-topic
content-type: application/json
# 消費(fèi)組,同一個組內(nèi)只能被消費(fèi)一次
group: testMessage-group
binder: my-rabbit
consumer:
# 如果最大嘗試次數(shù)為1,即不重試
# 默認(rèn)是做3次嘗試
max-attempts: 1
function:
definition: testMessageDelay
rabbit:
bindings:
testMessageDelay-out-0:
producer:
delayed-exchange: true
testMessageDelay-in-0:
consumer:
delayed-exchange: true