rabbitMQ如何保證如果消息發(fā)送失敗,保證其消息不丟失、怎么設(shè)置消息過期時間以及死信隊列是如何在消息消費失敗時保證消息不丟失的、如何使用過期時間來實現(xiàn)延遲隊列以及rabbitMQ的持久化、消息確認(rèn)的機制是怎樣的?本博文將具體介紹上述內(nèi)容
本博文中的代碼實現(xiàn)實在SpringBoot整合RabbitMQ——消息的發(fā)送和接收的基礎(chǔ)上實現(xiàn)了,完整的代碼可以查看Gitee上的項目rabbitmq
rabbitMQ如何保證消息的不丟失
消息的丟失有以下四種情況:
- 消息發(fā)送到RabbitMQ服務(wù)器,交換機根據(jù)自身的類型和路由鍵無法匹配到隊列,導(dǎo)致消息丟失
- 消息設(shè)置了過期時間,消息過期了導(dǎo)致消息丟失
- 消息不能被正確的消費,導(dǎo)致消息的丟失
- 因為服務(wù)器的崩潰導(dǎo)致消息的丟失
針對以上的情況,rabbitMQ提供了不同的解決方案
消息設(shè)置mandatory參數(shù)和使用備份交換機
mandatory參數(shù)
在上篇博文中我們其實已經(jīng)在配置文件中配置了mandatory的參數(shù),并且在connectionFactory中設(shè)置了其中的參數(shù)
spring:
rabbitmq:
template:
mandatory: true
publisher-confirms: true
publisher-returns: true
然后再發(fā)送消息的時候設(shè)置回調(diào)函數(shù)
/**
* 確認(rèn)后回調(diào)方法
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public final void confirm(CorrelationData correlationData, boolean ack, String cause) {
this.logger.info("confirm-----correlationData:" + correlationData.toString() + "---ack:" + ack + "----cause:" + cause);
// TODO 記錄日志(數(shù)據(jù)庫或者es)
this.handleConfirmCallback(correlationData.getId(), ack, cause);
}
/**
* 失敗后回調(diào)方法
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public final void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
this.logger.info("return-----message:" + message.toString() + "---replyCode:" + replyCode + "----replyText:" + replyText + "----exchange:" + exchange + "----routingKey:" + routingKey);
// TODO 記錄日志(數(shù)據(jù)庫或者es)
this.handleReturnCallback(message, replyCode, replyText, routingKey);
}
用戶可以在這里對消息進(jìn)行本地持久化,其實上面的也叫消息確認(rèn)模式,發(fā)送端將消息發(fā)送給RabbitMQ,rabbitMQ會異步回調(diào)Confirm方法,告訴發(fā)送方,RabbitMQ服務(wù)端有沒有收到消息,如果沒有收到消息的話,原因是什么。同時會異步回調(diào)設(shè)置的returnedMessage將發(fā)送的消息返回
備份交換機
除了上面的消息確認(rèn)模式,還有一種備份交換機的方案也是可以解決消息的丟失問題,具體的邏輯如下:
- 聲明一個交換機A,其類型為fanout類型
- 聲明一個交換機B,設(shè)置其
alternate-exchange屬性為交換機A - 聲明一個隊列a,并且與交換機A綁定
- 聲明一個隊列b,并且與交換機B綁定,路由鍵為rb
這樣我們就實現(xiàn)了備份交換機功能,其業(yè)務(wù)實現(xiàn)邏輯如下:
我們發(fā)送一個消息到交換機B上,當(dāng)路由鍵等于rb時,消息會正確發(fā)送到隊列b上,當(dāng)路由鍵不等于rb時,即消息不能正確的發(fā)送到隊列b上,此時就會發(fā)送給交換機A,由于交換機A是fanout類型的,所以消息會被進(jìn)一步發(fā)送到隊列a上。這樣我們就實現(xiàn)了發(fā)送方消息的不丟失。
[圖片上傳失敗...(image-5a7baf-1562419602501)]
代碼實現(xiàn)這里給出alternaate-exchange的實現(xiàn)
Map<String,Object> map = new HashMap<String,Object>();
map.put("alternate-exchange","A");
MqExchange exchange = new MqExchange().arguments(map).name("B").type(ExchangeTypeEnum.DIRECT.getCode());
amExchangeDeclare.declareExchange(exchange);
過期時間和死信隊列
過期時間
我們常見的購物車訂單,一般有這樣的需求,在規(guī)定的時間內(nèi)沒有付款的話,該訂單就會失效,這里常見就是使用消息的過期時間來控制的,在rabbitMQ中實現(xiàn)過期時間有兩種方式
- 設(shè)置隊列的過期時間,則該隊列中所有的消息的過期時間都是一樣的
// 設(shè)置隊列的過期時間
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-message-ttl",6000);
MqQueue queue = new MqQueue().name(queueName).arguments(map);
amQueueDeclare.declareQueue(queue);
如果不設(shè)置ttl(Time To Live),則這個消息不會過期,如果將TTL設(shè)置為0,則表示除非此時可以直接將消息投遞到消費者,否則消息會立即丟棄
- 設(shè)置消息的過期時間,只有這個消息存在過期時間,設(shè)置消息的過期時間如下:
// 設(shè)置消息的過期時間
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(6000);
return message;
}
};
sendService.send(exchangeName,routingKey,data, messagePostProcessor, messageId);
注意:使用第一種方式來設(shè)置過期時間,一旦消息過期,就會從隊列中消除,而采用第二種方式,即使消息過期,也不會馬上從隊列中消除,因為每條消息是否過期是在即將投遞到消費者之期間進(jìn)行判斷的
隊列也是有過期時間的,通過x-expires屬性來設(shè)置的
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-expires",6000);
MqQueue queue = new MqQueue().name(queueName).arguments(map);
amQueueDeclare.declareQueue(queue);
死信隊列
DLX:Dead-Letter-Exchange:當(dāng)一個消息在一個隊列中變成死信之后,他能被重新發(fā)送到另一個交換機中,這個交換機就是DLX。
那么消息滿足什么條件就會成為死信呢?
- 消息被拒絕,并且設(shè)置requeue參數(shù)為false
- 消息過期
- 隊列達(dá)到最大長度
那么如何使用死信隊列?
死信隊列一般都是作為其他隊列的一個屬性來用的,當(dāng)這個隊列中存在死信時,RabbitMQ就會自動將這個消息重新發(fā)布到設(shè)置的DLX上,進(jìn)而被路由到另一個隊列中
// 定義一個交換機
MqExchange dlxExchange = new MqExchange().name("B").type(ExchangeTypeEnum.DIRECT.getCode());
amExchangeDeclare.declareExchange(exchange);
// 聲明一個隊列時,設(shè)置他的屬性 x-dead-letter-exchange 為上面定義的交換機
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-dead-letter-exchange ","dlxExchange");
// 也可以為DLX指定路由鍵 這個不是必須的,如果沒有設(shè)置路由鍵,則使用原隊列的路由鍵
map.put("x-dead-letter-routing-key","dlx-routing-key");
MqQueue queue = new MqQueue().name(queueName).arguments(map);
amQueueDeclare.declareQueue(queue);
這樣就初步完成了死信隊列的聲明。其業(yè)務(wù)流程圖如下:

延時隊列(定時隊列)
上述的購物車訂單的示例,其實最優(yōu)的方案設(shè)計是使用TTL+DLX來實現(xiàn),如果用戶沒有在規(guī)定的時間來支付,則這個訂單就進(jìn)行一場處理。
延時隊列的具體使用方法如下:
方案一:
- 聲明一個設(shè)置死信隊列的隊列,該隊列沒有消費者
- 給每一個發(fā)送該隊列的消息設(shè)置過期時間
- 消息一旦過期就會被死信隊列消費,這樣就能實現(xiàn)延時隊列的效果
方案二:
- 聲明多個設(shè)置死信隊列、不同過期時間的隊列,該隊列沒有消費者
- 然后通過不同的的routingKey來將消息發(fā)送到不同的隊列上
- 等隊列過期時間一到,消息就會匹配到死信隊列上,這樣也能實現(xiàn)延時隊列的效果
方案一和方案二的實現(xiàn)原理基本相同,不同的是一個是消息的過期時間,一個是隊列的過期時間,方案二聲明多個不同過期時間的隊列,而方案一只聲明一個隊列,這樣的話有優(yōu)點也有缺點,優(yōu)點是減少了RabbitMQ的隊列數(shù)量,缺點是降低了RabbitMQ隊列消息消費的速度,而使用哪種方案可以根據(jù)業(yè)務(wù)和流量來衡量使用
具體的業(yè)務(wù)流程圖如下:
[圖片上傳失敗...(image-1f73c4-1562419602501)]
消息持久化
RabbitMQ中交換機、隊列和消息都可以持久化,其中交換機和隊列的持久化只需要在聲明時,其屬性durable為true即可,而消息的持久化是建立在隊列持久化的基礎(chǔ)上,因為在RabbitMQ中,消息時存儲在隊列上的,隊列都沒有了,消息肯定也是存儲不了的。
消息的持久化在上面的內(nèi)容已經(jīng)介紹過了,不同版本的SpringBoot的RabbitMQ集成,實現(xiàn)的方式可能 不一樣,但是本質(zhì)都是一樣的,最終操作的都是RabbitMQ服務(wù)器
消息和隊列的持久化都是比較簡單的,但是我們這里要清楚的知道,如果我們將所有的消息都設(shè)置了持久化了,會嚴(yán)重影響RabbitMQ的性能,畢竟消息寫入到磁盤的速度比寫入內(nèi)存的數(shù)據(jù)慢的不是一點點的。
這個就要求我們在設(shè)計時需要注意,對于可靠性不是那么高的消息,可以不采用持久化處理來提高吞吐量。在選擇是否將消息持久化時,需要在可靠性和吞吐量之間做一個權(quán)衡
消息消費處理
參數(shù)isAck
這里需要注意一點,在我們上一篇博文SpringBoot整合RabbitMQ——消息的發(fā)送和接收中,我們有介紹在為隊列設(shè)置監(jiān)聽時,有個參數(shù)isAck,這里如果設(shè)置成true,則隊列在接收到消息后,不管業(yè)務(wù)方有沒有完全消費消息,都會給RabbitMQ返回個消息已經(jīng)消費成功的結(jié)果,RabbitMQ在判斷消息已經(jīng)成功消費了則會刪除隊列中的消息,但是業(yè)務(wù)方其實沒有真正完成消息的消費,這樣就會導(dǎo)致數(shù)據(jù)的丟失。
那我們怎么來處理這個問題呢?
其實很簡單,我們只需要將isAck字段的參數(shù)設(shè)置成false即可,這樣的話就需要業(yè)務(wù)方手動去操作該消息有沒有被成功消費。如果沒有消費的話就拒絕這個消息,是的拒絕這個消息,還記得我們之前介紹過的死信隊列,隊列設(shè)置了死信隊列,消息一旦被拒絕的話,消息就會進(jìn)入死信交換機,進(jìn)而匹配到響應(yīng)的隊列中,可以在隊列中將消息持久化到本地,這是一種解決方法。
具體的流程圖可以參考死信隊列的流程圖:

消息分發(fā)
在實際的生產(chǎn)過程中,可能一個隊列存在多個消費者,那么隊列此時收到的消息就會以輪詢的方式發(fā)送給消費者。
一般情況下,rabbitMQ會將第m條消息發(fā)送給第m%n個消費者。這里其實有個隱患,在消費者任務(wù)非常繁重的情況下,來不及消費那么多的消息,而其他的消費者,由于某些原因,很快的處理完消息,這種情況就很容易出現(xiàn)某個消費者承受的壓力就比較大,造成整體應(yīng)用的吞吐量下降。
為了解決這個問題, 我們其實可以設(shè)置Channel信道上的最大處理消息的個數(shù),代碼設(shè)置如下:
// 在系統(tǒng)初始化啟動時,加載Connection,創(chuàng)建Channel,然后設(shè)置BasicQos的個數(shù)
Channel channel = cachingConnectionFactory.createConnection().createChannel();
channel.basicQos(1000);
basicQos具體的作用是設(shè)置允許限制Channel上所有消費者所能保持的最大未確認(rèn)的消息的數(shù)量
注意如果這里設(shè)置了最大的未確認(rèn)的消息的數(shù)量,那么所有的消費者都會生效