消息重復被消費
今天日常開始統(tǒng)計系統(tǒng)信息(rabbitmq),被業(yè)務提醒數(shù)據(jù)出現(xiàn)異常,查找日志發(fā)現(xiàn)某條訂單的消息被消費了兩次。
以前我以為消息隊列中的消息不會被重復消費,今天被打臉了,根據(jù)日志,消息在20秒內(nèi)被消費了兩次。在我系統(tǒng)中,我手動配置了消息確認。因該是返回確認的消息延遲了,還是怎么回事,導致出現(xiàn)這種結果。
上網(wǎng)查找解決方案,有兩種方案:
- 消費端處理消息的業(yè)務邏輯保持冪等性
- 保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現(xiàn)
第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。
第2條原理就是利用一張日志表來記錄已經(jīng)處理成功的消息的ID,如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息。
第1條解決方案,很明顯應該在消費端實現(xiàn),不屬于消息系統(tǒng)要實現(xiàn)的功能。第2條可以消息系統(tǒng)實現(xiàn),也可以業(yè)務端實現(xiàn)。正常情況下出現(xiàn)重復消息的概率其實很小,如果由消息系統(tǒng)來實現(xiàn)的話,肯定會對消息系統(tǒng)的吞吐量和高可用有影響,所以最好還是由業(yè)務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的原因。
因為業(yè)務邏輯是直接更新數(shù)據(jù)庫,所以采用第二中方案。
實際項目不斷摸索逐漸摸索出一套自認為安全的流程,先記錄下
/**
* 系統(tǒng)通用消息隊列
*
* @param connectionFactory 鏈接工廠
* @return mq監(jiān)聽容器
*/
@Bean
public SimpleMessageListenerContainer messageContainer5(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(commonQueue);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(10);
container.setConcurrentConsumers(5);
container.setPrefetchCount(10);
// 設置確認模式手工確認
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
List<String> messageIds = new ArrayList<>();
StatisticMessage statisticMessage = null;
Integer dealNum = null;
try {
byte[] body = message.getBody();
String str = new String(body);
str = str.substring(1, str.length() - 1);
str = StringEscapeUtils.unescapeJava(str);
//消息隊列的集合
messageIds.addAll((List<String>) redisManager.getRedisObj(messageRecode));
statisticMessage = JSONObject.parseObject(str, StatisticMessage.class);
//獲取處理當前消息的次數(shù)
String dealNumString = redisManager.getObjByDB(5, statisticMessage.getMessageId());
if (CheckUtil.isEmpty(dealNumString)) {
redisManager.setObjDBAndTime(5, statisticMessage.getMessageId(), Integer.toString(0), 60 * 60 * 12);
dealNumString = String.valueOf(0);
}
dealNum = Integer.parseInt(dealNumString);
//超出重復處理次數(shù),舍棄當前消息體
if (dealNum > MESSAGE_AGAIN_NUM) {
//確認消息成功消費,刪除消息隊列中的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//刪除redis中信息
redisManager.delObjDB(5, statisticMessage.getMessageId());
}
//正式處理消息相關邏輯
if (messageIds.contains(statisticMessage.getMessageId())) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
String str2 = statisticMessage.getJsonMessage();
str2 = StringEscapeUtils.unescapeJava(str2);
//刷新用戶緩存
if (statisticMessage.getSratisticType() == StatisticType.FreshScondToken.getCode()) {
List<String> userIds = JSONObject.parseObject(str2, List.class);
//do somethings
secondTokenService.refreshRepairShopTokenInfo(userIds);
messageIds.add(statisticMessage.getMessageId());
redisManager.createRedisObjNoValid(messageRecode, messageIds);
//手工發(fā)送確認。
if (channel != null) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
} catch (Exception e) {
messageIds.remove(statisticMessage == null ? null : statisticMessage.getMessageId());
redisManager.createRedisObjNoValid(messageRecode, messageIds);
//添加一次處理次數(shù)
dealNum++;
redisManager.setObjDBAndTime(5, statisticMessage.getMessageId(), Integer.toString(dealNum), 60 * 60 * 24 * 12);
// ack返回false,并重新回到隊列,api里面解釋得很清楚
if (channel != null) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
logger.error(ExceptionUtil.stacktraceToString(e));
}
}
});
return container;
}
上述代碼邏輯規(guī)整
graph LR
接受到消息,并進行相關處理,轉(zhuǎn)換成業(yè)務對象-->根據(jù)消息中的唯一的messageid判斷是否處理完成當前消息
根據(jù)消息中的唯一的messageid判斷是否處理完成當前消息--> 處理過
處理過-->舍棄
根據(jù)消息中的唯一的messageid判斷是否處理完成當前消息--> 未處理過
未處理過--> 判斷當前消息被處理的次數(shù)
判斷當前消息被處理的次數(shù) --> 處理次數(shù)大于5
處理次數(shù)大于5--> 序列化該消息,短期放棄處理,交由人工處理
判斷當前消息被處理的次數(shù) --> 處理次數(shù)小于5
處理次數(shù)小于5 --> 繼續(xù)處理
繼續(xù)處理 -->處理中出現(xiàn)異常則處理次數(shù)加一,讓消息返回隊列
繼續(xù)處理 -->處理中出現(xiàn)正常則在messageid的集合中添加當前messageid