RabbitMQ如何保證消息不丟失?

image.png

按照RabbitMQ正常使用流程,生產者會發(fā)送一條消息到RabbitMQ服務器,消費者接收到消息進行消費。但是在實際情況下,生產者很有可能在到達RabbitMQ服務器后,由于服務器的某種原因導致消息丟失(因為RabbitMQ默認是將消息存儲在內存中),一旦丟失就是再也找不到了。

那么我們如何保證消息不丟失呢?

RabbitMQ有相應的持久化機制,可以將Exchange、Queue、Message全部持久化到磁盤。

那如果在將消息持久化到磁盤的過程中服務器掛了呢?

那么則需要通過數(shù)據(jù)保護機制來保證我們的消息一定能存儲到磁盤,如果不成功,消息生產者則一直發(fā)送這條消息。

在RabbitMq中有兩種數(shù)據(jù)保護機制:

1. 事物機制:

當消息達到服務器,開啟事物,只有當消息存儲完畢才提交事物,向生產者發(fā)送成功通知。如果失敗,也會向生產者發(fā)送失敗消息,生產者接收失敗消息則繼續(xù)發(fā)送此消息。在這個過程中生產者需要同步等待,所以,事物機制雖然可以保證消息可靠性,但是采用的是同步方式,會造成性能下降。

2. confirm機制:

一旦消息投遞到隊列,隊列則會向生產者發(fā)送一個通知,如果設置了消息持久化到磁盤,則會等待消息持久化到磁盤之后再發(fā)送通知。生產者在發(fā)送完消息后不會等待回應,所以confirm機制性能相對比事物機制高。

如何開啟RabbtiMQ的confirm模式:

需要在配置文件中配置如下:

rabbitmq:
  publisher-confirms: true

如何開啟隊列持久化:

在聲明隊列時,設置durable屬性為true

image

消息默認就是持久化到磁盤的。

具體生產者代碼:

@Component

public class MessageSenderimplements RabbitTemplate.ConfirmCallback {

@Autowired

    private RabbitTemplaterabbitTemplate;

    @Autowired

    private RedisTemplateredisTemplate;

    public MessageSender(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

        rabbitTemplate.setConfirmCallback(this);

    }

@Override

    public void confirm(CorrelationData correlationData, boolean ack, String s) {

if (ack) {//ack : 成功或失敗的布爾值

            //成功

            //在消息發(fā)送時,將消息存入redis中,方便消息發(fā)送失敗時,從redis中取值

            //correlationData.getId()在發(fā)送消息時,需要生成的唯一標識

            redisTemplate.delete(correlationData.getId());

        }else {

//失敗

            //從redis中獲取參數(shù)

            Map map =redisTemplate.opsForHash().entries("message_" + correlationData.getId());

            String exchange = map.get("exchange");

            String message = map.get("message");

            String routingKey = map.get("routingKey");

            //重新發(fā)送消息

            rabbitTemplate.convertAndSend(exchange, routingKey, message);

        }

}

//自定義發(fā)送消息方法

    public void sendMessage(String exchange, String routingKey, String message) {

//設置消息的唯一標識,CorrelationData會在confirm方法中作為參數(shù)傳給我們

        CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());

        //將消息存入redis

        redisTemplate.opsForValue().set(correlationData.getId(), message);

        //將本次發(fā)送消息的元信息存入redis,方便后面失敗重新發(fā)送

        Map metaDataMap =new HashMap<>();

        metaDataMap.put("exchange", exchange);

        metaDataMap.put("routingKey", routingKey);

        metaDataMap.put("message", message);

        redisTemplate.opsForHash().putAll("message_" + correlationData.getId(), metaDataMap);

        //發(fā)送消息時,需要把correlationData對象帶過去

        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);

    }

}

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容