背景
我們現(xiàn)在有兩個主要的系統(tǒng)一個是活動系統(tǒng)一個是獎品系統(tǒng),活動系統(tǒng)會調(diào)用獎品系統(tǒng)發(fā)放獎勵。
最開始兩個之間只通過http直接調(diào)用,優(yōu)點:開發(fā)成本低,沒有多余組件引入;發(fā)放獎勵實時返回;活動系統(tǒng)不需要管獎品是否還有剩余庫存。缺點:這樣就導(dǎo)致上游活動系統(tǒng)強依賴于下游的獎品系統(tǒng),如果一旦獎品系統(tǒng)掛掉,我們活動系統(tǒng)也就不可用了;這里還有個bug,在調(diào)用獎品系統(tǒng)發(fā)放獎勵,獎品系統(tǒng)發(fā)放成功了,但是活動系統(tǒng)請求超時了,就會導(dǎo)致提示客戶的沒有獎品了,但是實際獎品又發(fā)放了。
訪問量上來后發(fā)直接走http肯定是不行的,所以引入了MQ將將兩個系統(tǒng)隔離開,優(yōu)點:所有發(fā)放流程異步執(zhí)行,活動系統(tǒng)響應(yīng)更快了;這兩個系統(tǒng)就變成弱引用關(guān)系,即使獎品系統(tǒng)掛掉,活動系統(tǒng)仍能正常運行;不會出現(xiàn)上面說的bug了;缺點:發(fā)放獎勵將會有延遲;引入MQ增加了項目復(fù)雜度,我們必須去考慮消息的丟失,重復(fù)消費等問題;活動系統(tǒng)需要知道獎品的庫存情況。
解決方案
針對上面使用MQ發(fā)放獎勵會遇到的問題,我們可以通過面的方案來解決。
消息的丟失問題
在數(shù)據(jù)庫創(chuàng)建一張異常消息表。
- 在發(fā)消息的時候如果出現(xiàn)異常,直接將消息記錄到異常消息表,等待后臺跑批,進行補償發(fā)放。
- 在發(fā)消息的時候,如果發(fā)送消息的ack回調(diào)沒沒有發(fā)送成功,將進行消息重發(fā),如果重發(fā)3次還是失敗,該消息就記錄到異常消息表,等待后臺跑批,進行補償發(fā)放。消息的重復(fù)發(fā)送可以使用RabbitMQ的ConfirmCallback、ReturnCallback機制來實現(xiàn)。
- 在消費端處理消息(調(diào)用獎品系統(tǒng)發(fā)放獎勵)的時候,如果出現(xiàn)異常也將消息放到異常消息表中,等待后臺跑批,進行補償發(fā)放。如果將異常消息保存到數(shù)據(jù)庫時發(fā)生了異常,則將消息放到死信隊列,等待后臺跑批,進行補償發(fā)放。
這樣子雖然還是不能完全杜絕消息丟失,但是絕大部分情況下是沒有問題的。
重復(fù)消費問題
為每個消息生成業(yè)務(wù)流水號,將流水號和發(fā)放里的參數(shù)一起發(fā)送到獎品系統(tǒng),獎品系統(tǒng)在發(fā)放獎勵的時候先判斷這個流水號是否存在,存在就表示該獎品已經(jīng)發(fā)過來直接返回發(fā)放成功,如果沒有就進行發(fā)放獎勵操作。
活動系統(tǒng)需要知道獎品的庫存情況。
我們在配置活動的時候會將獎品的庫存放到我們活動系統(tǒng),在發(fā)MQ消息之前回去判斷是否有剩余庫存,如果沒有直接返回獎勵領(lǐng)完了,如果有才回去發(fā)MQ消息。扣減庫存可以參考基于redis實現(xiàn)的扣減庫存。
活動流程圖
下面是引入MQ過后我們系統(tǒng)的流程圖



生產(chǎn)者端實現(xiàn)
/**
* Rabbit 發(fā)送消息
*
* @author yuhao.wang
*/
@Service
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);
/**
* Rabbit MQ 客戶端
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 系統(tǒng)配置
*/
@Autowired
private SystemConfig systemConfig;
/**
* 發(fā)送MQ消息
*
* @param exchangeName 交換機名稱
* @param routingKey 路由名稱
* @param message 發(fā)送消息體
*/
public void sendMessage(String exchangeName, String routingKey, Object message) {
Assert.notNull(message, "message 消息體不能為NULL");
Assert.notNull(exchangeName, "exchangeName 不能為NULL");
Assert.notNull(routingKey, "routingKey 不能為NULL");
// 獲取CorrelationData對象
CorrelationData correlationData = this.correlationData(message);
correlationData.setExchange(exchangeName);
correlationData.setRoutingKey(routingKey);
correlationData.setMessage(message);
logger.info("發(fā)送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
// 發(fā)送消息
this.convertAndSend(exchangeName, routingKey, message, correlationData);
}
/**
* 用于實現(xiàn)消息發(fā)送到RabbitMQ交換器后接收ack回調(diào)。
* 如果消息發(fā)送確認失敗就進行重試。
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
// 消息回調(diào)確認失敗處理
if (!ack && correlationData instanceof CorrelationData) {
CorrelationData correlationDataExtends = (CorrelationData) correlationData;
//消息發(fā)送失敗,就進行重試,重試過后還不能成功就記錄到數(shù)據(jù)庫
if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) {
logger.info("MQ消息發(fā)送失敗,消息重發(fā),消息ID:{},重發(fā)次數(shù):{},消息體:{}", correlationDataExtends.getId(),
correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));
// 將重試次數(shù)加一
correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);
// 重發(fā)發(fā)消息
this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
correlationDataExtends.getMessage(), correlationDataExtends);
} else {
//消息重試發(fā)送失敗,將消息放到數(shù)據(jù)庫等待補發(fā)
logger.warn("MQ消息重發(fā)失敗,消息入庫,消息ID:{},消息體:{}", correlationData.getId(),
JSON.toJSONString(correlationDataExtends.getMessage()));
// TODO 保存消息到數(shù)據(jù)庫
}
} else {
logger.info("消息發(fā)送成功,消息ID:{}", correlationData.getId());
}
}
/**
* 用于實現(xiàn)消息發(fā)送到RabbitMQ交換器,但無相應(yīng)隊列與交換器綁定時的回調(diào)。
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.error("MQ消息發(fā)送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息體:{}",
replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody()));
// TODO 保存消息到數(shù)據(jù)庫
}
/**
* 消息相關(guān)數(shù)據(jù)(消息ID)
*
* @param message
* @return
*/
private CorrelationData correlationData(Object message) {
return new CorrelationData(UUID.randomUUID().toString(), message);
}
/**
* 發(fā)送消息
*
* @param exchange 交換機名稱
* @param routingKey 路由key
* @param message 消息內(nèi)容
* @param correlationData 消息相關(guān)數(shù)據(jù)(消息ID)
* @throws AmqpException
*/
private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} catch (Exception e) {
logger.error("MQ消息發(fā)送異常,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);
// TODO 保存消息到數(shù)據(jù)庫
}
}
@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
}
生產(chǎn)者端使用ConfirmCallback和ReturnCallback回調(diào)機制,最大限度的保證消息不丟失,對原有CorrelationData類進行擴展,來實現(xiàn)消息的重發(fā),具體請看源碼。
消費者端實現(xiàn)
/**
* 發(fā)放優(yōu)惠券的MQ處理
*
* @author yuhao.wang
*/
@Service
@ConditionalOnClass({RabbitTemplate.class})
public class SendMessageListener {
private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class);
@RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON)
public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
logger.info("[{}]處理發(fā)放優(yōu)惠券獎勵消息隊列接收數(shù)據(jù),消息ID:{},消息體:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON,
message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage));
try {
// 參數(shù)校驗
Assert.notNull(sendMessage, "sendMessage 消息體不能為NULL");
// TODO 處理消息
// 確認消息已經(jīng)消費成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
logger.error("MQ消息處理異常,消息ID:{},消息體:{}", message.getMessageProperties().getCorrelationIdString(),
JSON.toJSONString(sendMessage), e);
try {
// TODO 保存消息到數(shù)據(jù)庫
// 確認消息已經(jīng)消費成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e1) {
logger.error("保存異常MQ消息到數(shù)據(jù)庫異常,放到死性隊列,消息ID:{}", message.getMessageProperties().getCorrelationIdString());
// 確認消息將消息放到死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
}
消費者端主要做了消息消費失敗的容錯處理。
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-rabbitmq 工程