系列
- RocketMq broker 配置文件
- RocketMq broker 啟動流程
- RocketMq broker CommitLog介紹
- RocketMq broker consumeQueue介紹
- RocketMq broker 重試和死信隊列
- RocketMq broker 延遲消息
- RocketMq IndexService介紹
- RocketMq 讀寫分離機(jī)制
- RocketMq Client管理
- RocketMq Broker線程模型及快速失敗機(jī)制
- RocketMq broker過期文件刪除
開篇
這個系列的主要目的是介紹RocketMq broker的原理和用法,在這個系列當(dāng)中會介紹 broker 配置文件、broker 啟動流程、broker延遲消息、broker消息存儲、broker的重試和死信隊列。
這篇文章主要介紹broker 重試和死信隊列,本質(zhì)上所有的數(shù)據(jù)都是存在commitLog文件的,只是consumeQueue根據(jù)topic的不同進(jìn)行了區(qū)分,所以數(shù)據(jù)存儲過程可以參考 RocketMq broker CommitLog介紹 和 RocketMq broker consumeQueue介紹。
重試隊列和死信隊列本質(zhì)上進(jìn)入到了對應(yīng)topic下的consumeQueue而已。
重試和死信隊列topic替換
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
// 獲取topic進(jìn)行判斷邏輯
String newTopic = requestHeader.getTopic();
// 重試隊列是以%RETRY%+consumerGroup作為維度的
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
// 獲取最大重試次數(shù)
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
// 超過最大重試次數(shù)之后發(fā)送到死信隊列
if (reconsumeTimes >= maxReconsumeTimes) {
// 死信隊列是以% DLQ%+consumerGroup作為維度的
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}
}
- 重試隊列是以%RETRY%+consumerGroup作為維度的生成consumeQueue。
- 死信隊列是以%DLQ%+consumerGroup作為維度的生成consumeQueue。
- 進(jìn)入死信隊列的條件是重試次數(shù)超過了最大重試次數(shù)。
- 死信隊列的topic是在消息發(fā)送過程中判斷對應(yīng)的topic是否存在,不存在就動態(tài)進(jìn)行創(chuàng)建。