《RabbitMQ》如何保證消息的可靠性


一條消費成功被消費經(jīng)歷了生產(chǎn)者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。

一 消息生產(chǎn)者沒有把消息成功發(fā)送到MQ

1.1 事務機制

AMQP協(xié)議提供了事務機制,在投遞消息時開啟事務支持,如果消息投遞失敗,則回滾事務。

自定義事務管理器

@Configuration public class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); } }

修改yml

spring: rabbitmq: # 消息在未被隊列收到的情況下返回 publisher-returns: true

開啟事務支持

rabbitTemplate.setChannelTransacted(true);

消息未接收時調用ReturnCallback

rabbitTemplate.setMandatory(true);

生產(chǎn)者投遞消息

@Service public class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 設置channel開啟事務 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?); } @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager") public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); } }

但是,很少有人這么干,因為這是同步操作,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ-Server的回應,之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會大大降低。

1.2 發(fā)送方確認機制

發(fā)送消息時將信道設置為confirm模式,消息進入該信道后,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列后,RabbitMQ就會發(fā)送給生產(chǎn)者一個確認。

開啟消息確認機制

spring: rabbitmq: # 消息在未被隊列收到的情況下返回 publisher-returns: true # 開啟消息確認機制 publisher-confirm-type: correlated

消息未接收時調用ReturnCallback

rabbitTemplate.setMandatory(true);

生產(chǎn)者投遞消息

@Service public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("確認了這條消息:"+correlationData); }else{ System.out.println("確認失敗了:"+correlationData+";出現(xiàn)異常:"+cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?); } public void publisMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); } }

如果消息確認失敗后,我們可以進行消息補償,也就是消息的重試機制。當未收到確認信息時進行消息的重新投遞。設置如下配置即可完成。

spring: rabbitmq: # 支持消息發(fā)送失敗后重返隊列 publisher-returns: true # 開啟消息確認機制 publisher-confirm-type: correlated listener: simple: retry: # 開啟重試 enabled: true # 最大重試次數(shù) max-attempts: 5 # 重試時間間隔 initial-interval: 3000

二 消息發(fā)送到MQ后,MQ宕機導致內(nèi)存中的消息丟失

消息在MQ中有可能發(fā)生丟失,這時候我們就需要將隊列和消息都進行持久化。

@Queue注解為我們提供了隊列相關的一些屬性,具體如下:

  1. name: 隊列的名稱;
  2. durable: 是否持久化;
  3. exclusive: 是否獨享、排外的;
  4. autoDelete: 是否自動刪除;
  5. arguments:隊列的其他屬性參數(shù),有如下可選項,可參看圖2的arguments:
    1. x-message-ttl:消息的過期時間,單位:毫秒;
    2. x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;
    3. x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;
    4. x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊列頭部開始刪除消息;
    5. x-overflow:設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;
    6. x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
    7. x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設置,則使用消息的原來的路由鍵值
    8. x-single-active-consumer:表示隊列是否是單一活動消費者,true時,注冊的消費組內(nèi)只有一個消費者消費消息,其他被忽略,false時消息循環(huán)分發(fā)給所有消費者(默認false)
    9. x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設置,隊列將不支持消息優(yōu)先級;
    10. x-queue-mode(Lazy mode):將隊列設置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設置,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息;
    11. x-queue-master-locator:在集群模式下設置鏡像隊列的主節(jié)點信息。

持久化隊列

創(chuàng)建隊列的時候將持久化屬性durable設置為true,同時要將autoDelete設置為false

@Queue(value = "javatrip",durable = "false",autoDelete = "false")

持久化消息

發(fā)送消息的時候將消息的deliveryMode設置為2,在Spring Boot中消息默認就是持久化的。

三 消費者消費消息的時候,未消費完畢就出現(xiàn)了異常

消費者剛消費了消息,還沒有處理業(yè)務,結果發(fā)生異常。這時候就需要關閉自動確認,改為手動確認消息。

修改yml為手動簽收模式

spring: rabbitmq: listener: simple: # 手動簽收模式 acknowledge-mode: manual # 每次簽收一條消息 prefetch: 1

消費者手動簽收

@Component @RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true")) public class Consumer { @RabbitHandler public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{ System.out.println(message); // 唯一的消息ID Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 確認該條消息 if(...){ channel.basicAck(deliverTag,false); }else{ // 消費失敗,消息重返隊列 channel.basicNack(deliverTag,false,true); } } }

四 總結

消息丟失的原因?

生產(chǎn)者、MQ、消費者都有可能造成消息丟失

如何保證消息的可靠性?

  1. 發(fā)送方采取發(fā)送者確認模式
  2. MQ進行隊列及消息的持久化
  3. 消費者消費成功后手動確認消息
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容