RabbitMQ的主要作用基本上可以用8個(gè)字概括,削峰填谷異步解耦。但是引入MQ我們也不得不考慮引入MQ后帶來的一些問題,如消息丟失。
在一些業(yè)務(wù)場景不一樣,處理方式也就不一樣,比如發(fā)短信,日志收集我們主要看吞吐量所以對消息丟失容忍度較高,這類場景基本上不用花太多時(shí)間在消息丟失問題上。另外一種,如我們用MQ來做分布式事務(wù),續(xù)保計(jì)算,提成的計(jì)算,這類業(yè)務(wù)對消息丟失容忍度較底,所以我們一定要考慮消息丟失的問題。這次分享的內(nèi)容是怎么來最大限制的防止消息丟失,順帶提一下消息的重發(fā)和重復(fù)消費(fèi)。
RabbitMQ 模型圖

ConfirmCallback和ReturnCallback
在這個(gè)里我們主要實(shí)現(xiàn)了ConfirmCallback和ReturnCallback兩個(gè)接口。這兩個(gè)接口主要是用來發(fā)送消息后回調(diào)的。因?yàn)閞abbit發(fā)送消息是只管發(fā),至于發(fā)沒發(fā)成功,發(fā)送方法不管。
- ConfirmCallback:當(dāng)消息成功到達(dá)exchange的時(shí)候觸發(fā)的ack回調(diào)。
- ReturnCallback:當(dāng)消息成功到達(dá)exchange,但是沒有隊(duì)列與之綁定的時(shí)候觸發(fā)的ack回調(diào)。發(fā)生網(wǎng)絡(luò)分區(qū)會(huì)出現(xiàn)這種情況。
在這里一定要把這兩個(gè)開關(guān)打開, publisher-confirms="true" publisher-returns="true"。
生產(chǎn)者端使用ConfirmCallback和ReturnCallback回調(diào)機(jī)制,最大限度的保證消息不丟失,對原有CorrelationData類進(jìn)行擴(kuò)展,來實(shí)現(xiàn)消息的重發(fā),具體請看源碼。
消息的日志鏈路跟蹤
使用MQ來解耦服務(wù),異步化處理一些復(fù)雜耗時(shí)邏輯,但是也帶來了一個(gè)問題。由于異步化以后,排查問題就很不方便了,根本不知道這個(gè)消息什么時(shí)候消費(fèi),消費(fèi)的日志也很不好排查。所以引入了Slf4j MDC機(jī)制將主線程的日志鏈路和消息的日志鏈路連起來,方便MQ問題的排查。
RabbitSender
import com.alibaba.fastjson.JSON;
import com.wlqq.insurance.common.enums.MetricNameEnum;
import com.wlqq.insurance.common.enums.SystemTypeEnum;
import com.wlqq.insurance.common.log.core.FisLoggerFactory;
import com.wlqq.insurance.common.mq.CorrelationData;
import com.wlqq.insurance.common.service.AlertService;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.UUID;
/**
* Rabbit 發(fā)送消息
*
* @author yuhao.wang
*/
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);
@Value("${mq.retry.count}")
private int mqRetryCount;
/**
* 告警服務(wù)
*/
@Autowired
private AlertService alertService;
/**
* Rabbit MQ 客戶端
*/
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送MQ消息,異步
*
* @param exchangeName 交換機(jī)名稱
* @param routingKey 路由名稱
* @param message 發(fā)送消息體
*/
public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
Assert.notNull(message, "message 消息體不能為NULL");
Assert.notNull(exchangeName, "exchangeName 不能為NULL");
Assert.notNull(routingKey, "routingKey 不能為NULL");
// 獲取CorrelationData對象
CorrelationData correlationData = this.correlationData(message, message.getMessageId());
correlationData.setExchange(exchangeName);
correlationData.setRoutingKey(routingKey);
correlationData.setMessage(message);
logger.info("發(fā)送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
// 發(fā)送消息
this.convertAndSend(exchangeName, routingKey, message, correlationData);
}
/**
* RPC方式,發(fā)送MQ消息
*
* @param exchangeName 交換機(jī)名稱
* @param routingKey 路由名稱
* @param message 發(fā)送消息體
*/
public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
Assert.notNull(message, "message 消息體不能為NULL");
Assert.notNull(exchangeName, "exchangeName 不能為NULL");
Assert.notNull(routingKey, "routingKey 不能為NULL");
// 獲取CorrelationData對象
CorrelationData correlationData = this.correlationData(message, message.getMessageId());
correlationData.setExchange(exchangeName);
correlationData.setRoutingKey(routingKey);
correlationData.setMessage(message);
logger.info("發(fā)送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
}
/**
* 用于實(shí)現(xiàn)消息發(fā)送到RabbitMQ交換器后接收ack回調(diào)。
* 如果消息發(fā)送確認(rèn)失敗就進(jìn)行重試。
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
CorrelationData correlationDataExtends = null;
if (correlationData instanceof CorrelationData) {
correlationDataExtends = (CorrelationData) correlationData;
if (correlationDataExtends.getMdcContainer() != null) {
// 日志鏈路跟蹤
MDC.setContextMap(correlationDataExtends.getMdcContainer());
}
}
// 消息回調(diào)確認(rèn)失敗處理
if (!ack) {
if (correlationDataExtends != null) {
//消息發(fā)送失敗,就進(jìn)行重試,重試過后還不能成功就記錄到數(shù)據(jù)庫
if (correlationDataExtends.getRetryCount() < mqRetryCount) {
logger.info("MQ消息發(fā)送失敗,消息重發(fā),消息ID:{},重發(fā)次數(shù):{},消息體:{}", correlationDataExtends.getId(),
correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));
// 將重試次數(shù)加一
correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);
// 重發(fā)發(fā)消息
this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
correlationDataExtends.getMessage(), correlationDataExtends);
} else {
//消息重試發(fā)送失敗,將消息放到數(shù)據(jù)庫等待補(bǔ)發(fā)
logger.error("MQ消息重發(fā)失敗,消息ID:{},消息體:{}", correlationData.getId(),
JSON.toJSONString(correlationDataExtends.getMessage()));
alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
correlationDataExtends.getExchange(), null);
}
}
} else {
logger.info("消息發(fā)送成功,消息ID:{}", correlationData.getId());
}
}
/**
* 用于實(shí)現(xiàn)消息發(fā)送到RabbitMQ交換器,但無相應(yīng)隊(duì)列與交換器綁定時(shí)的回調(diào)。
* 在腦裂的情況下會(huì)出現(xiàn)這種情況。
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 反序列化消息
Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
// 日志鏈路跟蹤
MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
}
logger.error("MQ消息發(fā)送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息體:{}",
replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));
alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
}
/**
* 消息相關(guān)數(shù)據(jù)(消息ID)
*
* @param message 消息體
* @param messageId 消息ID
* @return
*/
private CorrelationData correlationData(Object message, String messageId) {
// 消息ID默認(rèn)使用UUID
if (StringUtils.isEmpty(messageId)) {
messageId = UUID.randomUUID().toString();
}
return new CorrelationData(messageId, message);
}
/**
* 發(fā)送消息
*
* @param exchange 交換機(jī)名稱
* @param routingKey 路由key
* @param message 消息內(nèi)容
* @param correlationData 消息相關(guān)數(shù)據(jù)(消息ID)
* @throws AmqpException
*/
private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} catch (Exception e) {
logger.error("MQ消息發(fā)送異常,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);
alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
}
}
@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
}
CorrelationData
import lombok.Data;
import org.slf4j.MDC;
import java.util.Map;
/**
* 發(fā)送消息的相關(guān)數(shù)據(jù)
*
* @author yuhao.wang
*/
@Data
public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {
/**
* MDC容器
* 獲取父線程MDC中的內(nèi)容,做日志鏈路
*/
private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
/**
* 消息體
*/
private volatile Object message;
/**
* 交換機(jī)名稱
*/
private String exchange;
/**
* 路由key
*/
private String routingKey;
/**
* 重試次數(shù)
*/
private int retryCount = 0;
public CorrelationData(String id) {
super(id);
}
public CorrelationData(String id, Object data) {
this(id);
this.message = data;
}
}
Message
/**
* MQ消息的父類消息體
*
* @author yuhao.wang
*/
@Data
public class Message implements Serializable {
private static final long serialVersionUID = -4731326195678504565L;
/**
* MDC容器
* 獲取父線程MDC中的內(nèi)容,做日志鏈路
*/
private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
/**
* 消息ID(消息的唯一標(biāo)示)
*/
private String messageId;
}
AbstractConsumer
/**
* 默認(rèn)消費(fèi)者
*
* @author yuhao.wang3
*/
public abstract class AbstractConsumer implements MessageListener {
private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);
@Override
public void onMessage(Message msg) {
String body = null;
try {
// 日志鏈路跟蹤邏輯
body = new String(msg.getBody(), "utf-8");
DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
Map<String, String> container = message.getMdcContainer();
if (container != null) {
// 日志鏈路跟蹤
MDC.setContextMap(message.getMdcContainer());
}
} catch (Exception e) {
LOGGER.warn("沒有找到MQ消息日志鏈路數(shù)據(jù),無法做日志鏈路追蹤");
}
try {
// 處理消息邏輯
doMessage(msg);
LOGGER.info("成功處理MQ消息, 消息體:{}", body);
} catch (Exception e) {
LOGGER.error("處理MQ消息異常 {}, 消息體:{}", JSON.toJSONString(msg), body, e);
}
}
/**
* 處理消息的實(shí)現(xiàn)方法
*
* @param msg
*/
public abstract void doMessage(Message msg);
}
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-rabbitmq 工程