
image.png
按照RabbitMQ正常使用流程,生產者會發(fā)送一條消息到RabbitMQ服務器,消費者接收到消息進行消費。但是在實際情況下,生產者很有可能在到達RabbitMQ服務器后,由于服務器的某種原因導致消息丟失(因為RabbitMQ默認是將消息存儲在內存中),一旦丟失就是再也找不到了。
那么我們如何保證消息不丟失呢?
RabbitMQ有相應的持久化機制,可以將Exchange、Queue、Message全部持久化到磁盤。
那如果在將消息持久化到磁盤的過程中服務器掛了呢?
那么則需要通過數(shù)據(jù)保護機制來保證我們的消息一定能存儲到磁盤,如果不成功,消息生產者則一直發(fā)送這條消息。
在RabbitMq中有兩種數(shù)據(jù)保護機制:
1. 事物機制:
當消息達到服務器,開啟事物,只有當消息存儲完畢才提交事物,向生產者發(fā)送成功通知。如果失敗,也會向生產者發(fā)送失敗消息,生產者接收失敗消息則繼續(xù)發(fā)送此消息。在這個過程中生產者需要同步等待,所以,事物機制雖然可以保證消息可靠性,但是采用的是同步方式,會造成性能下降。
2. confirm機制:
一旦消息投遞到隊列,隊列則會向生產者發(fā)送一個通知,如果設置了消息持久化到磁盤,則會等待消息持久化到磁盤之后再發(fā)送通知。生產者在發(fā)送完消息后不會等待回應,所以confirm機制性能相對比事物機制高。
如何開啟RabbtiMQ的confirm模式:
需要在配置文件中配置如下:
rabbitmq:
publisher-confirms: true
如何開啟隊列持久化:
在聲明隊列時,設置durable屬性為true

image
消息默認就是持久化到磁盤的。
具體生產者代碼:
@Component
public class MessageSenderimplements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplaterabbitTemplate;
@Autowired
private RedisTemplateredisTemplate;
public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack) {//ack : 成功或失敗的布爾值
//成功
//在消息發(fā)送時,將消息存入redis中,方便消息發(fā)送失敗時,從redis中取值
//correlationData.getId()在發(fā)送消息時,需要生成的唯一標識
redisTemplate.delete(correlationData.getId());
}else {
//失敗
//從redis中獲取參數(shù)
Map map =redisTemplate.opsForHash().entries("message_" + correlationData.getId());
String exchange = map.get("exchange");
String message = map.get("message");
String routingKey = map.get("routingKey");
//重新發(fā)送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
//自定義發(fā)送消息方法
public void sendMessage(String exchange, String routingKey, String message) {
//設置消息的唯一標識,CorrelationData會在confirm方法中作為參數(shù)傳給我們
CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());
//將消息存入redis
redisTemplate.opsForValue().set(correlationData.getId(), message);
//將本次發(fā)送消息的元信息存入redis,方便后面失敗重新發(fā)送
Map metaDataMap =new HashMap<>();
metaDataMap.put("exchange", exchange);
metaDataMap.put("routingKey", routingKey);
metaDataMap.put("message", message);
redisTemplate.opsForHash().putAll("message_" + correlationData.getId(), metaDataMap);
//發(fā)送消息時,需要把correlationData對象帶過去
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}