RocketMq broker 重試和死信隊列

系列

開篇

  • 這個系列的主要目的是介紹RocketMq broker的原理和用法,在這個系列當(dāng)中會介紹 broker 配置文件、broker 啟動流程、broker延遲消息、broker消息存儲、broker的重試和死信隊列。

  • 這篇文章主要介紹broker 重試和死信隊列,本質(zhì)上所有的數(shù)據(jù)都是存在commitLog文件的,只是consumeQueue根據(jù)topic的不同進(jìn)行了區(qū)分,所以數(shù)據(jù)存儲過程可以參考 RocketMq broker CommitLog介紹RocketMq broker consumeQueue介紹

  • 重試隊列和死信隊列本質(zhì)上進(jìn)入到了對應(yīng)topic下的consumeQueue而已。


重試和死信隊列topic替換

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
    public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";

    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
                                      RemotingCommand request,
                                      MessageExt msg, TopicConfig topicConfig) {
        // 獲取topic進(jìn)行判斷邏輯
        String newTopic = requestHeader.getTopic();
        // 重試隊列是以%RETRY%+consumerGroup作為維度的
        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(
                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return false;
            }
            // 獲取最大重試次數(shù)
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
            // 超過最大重試次數(shù)之后發(fā)送到死信隊列
            if (reconsumeTimes >= maxReconsumeTimes) {
                // 死信隊列是以% DLQ%+consumerGroup作為維度的
                newTopic = MixAll.getDLQTopic(groupName);
                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0
                );
                msg.setTopic(newTopic);
                msg.setQueueId(queueIdInt);
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return false;
                }
            }
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
        }
        msg.setSysFlag(sysFlag);
        return true;
    }
}
  • 重試隊列是以%RETRY%+consumerGroup作為維度的生成consumeQueue。
  • 死信隊列是以%DLQ%+consumerGroup作為維度的生成consumeQueue。
  • 進(jìn)入死信隊列的條件是重試次數(shù)超過了最大重試次數(shù)。
  • 死信隊列的topic是在消息發(fā)送過程中判斷對應(yīng)的topic是否存在,不存在就動態(tài)進(jìn)行創(chuàng)建。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容