當(dāng)RabbitMQ向消費(fèi)者投遞消息以后,需要知道消費(fèi)者的處理狀態(tài)如何。因為消息投遞給消費(fèi)者并不代表就一定被正確消費(fèi)了,可能出現(xiàn)的故障有很多,比如:
- 消息投遞的過程中出現(xiàn)了網(wǎng)絡(luò)故障
- 消費(fèi)者接收到消息后突然宕機(jī)
- 消費(fèi)者接收到消息后,因處理不當(dāng)導(dǎo)致異常
- ...
一旦發(fā)生上述情況,消息也會丟失。因此,RabbitMQ必須知道消費(fèi)者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。
但問題來了:RabbitMQ如何得知消費(fèi)者的處理狀態(tài)呢?
1. 消費(fèi)者確認(rèn)機(jī)制
為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:
- ack:成功處理消息,RabbitMQ從隊列中刪除該消息
- nack:消息處理失敗,RabbitMQ需要再次投遞消息
- reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息
一般reject方式用的較少,除非是消息格式有問題,那就是開發(fā)問題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過try catch機(jī)制捕獲,消息處理成功時返回ack,處理失敗時返回nack.
由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實現(xiàn)了消息確認(rèn)。并允許我們通過配置文件設(shè)置ACK處理方式,有三種模式:
- none:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
- manual:手動模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活
- auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時則自動返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時,根據(jù)異常判斷返回不同結(jié)果:
- 如果是業(yè)務(wù)異常,會自動返回nack;
- 如果是消息處理或校驗異常,自動返回reject;
返回Reject的常見異常有:
Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
- o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
- o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
- o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
- o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
- java.lang.NoSuchMethodException: Added in version 1.6.3.
- java.lang.ClassCastException: Added in version 1.6.3.
通過下面的配置可以修改SpringAMQP的ACK處理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做處理
修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個消息處理的異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息處理完成");
}
測試可以發(fā)現(xiàn):當(dāng)消息處理發(fā)生異常時,消息依然被RabbitMQ刪除了。
我們再次把確認(rèn)機(jī)制修改為auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自動ack
在異常位置打斷點,再次發(fā)送消息,程序卡在斷點時,可以發(fā)現(xiàn)此時消息狀態(tài)為unacked(未確定狀態(tài)):

放行以后,由于拋出的是消息轉(zhuǎn)換異常,因此Spring會自動返回reject,所以消息依然會被刪除:

我們將異常改為RuntimeException類型:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
if (true) {
throw new RuntimeException("故意的");
}
log.info("消息處理完成");
}
在異常位置打斷點,然后再次發(fā)送消息測試,程序卡在斷點時,可以發(fā)現(xiàn)此時消息狀態(tài)為unacked(未確定狀態(tài)):

放行以后,由于拋出的是業(yè)務(wù)異常,所以Spring返回ack,最終消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:

當(dāng)我們把配置改為auto時,消息處理失敗后,會回到RabbitMQ,并重新投遞到消費(fèi)者。
如果覺得有收獲,歡迎點贊和評論,更多知識,請點擊關(guān)注查看我的主頁信息哦~