RocketMQ消息消費(fèi)以及進(jìn)度管理解析

最近 ONS 消息堆積的很嚴(yán)重,并且經(jīng)常發(fā)現(xiàn)部分幾乎沒(méi)有消息消費(fèi)的消費(fèi)者也提示堆積,所以有必要深入了解一下
RocketMQ 的設(shè)計(jì)思路,來(lái)看看堆積量如何計(jì)算,以及如何正確的使用 Topic 以及 Consumer 等組件。

產(chǎn)生的問(wèn)題背景在于,由于一開(kāi)始對(duì)于RocketMQ不夠了解,同時(shí)足夠懶得原因,導(dǎo)致我們所有業(yè)務(wù)都僅適用了一個(gè)topic,所有業(yè)務(wù)線通過(guò)訂閱不同的tag來(lái)進(jìn)行消費(fèi),本次深入了解后將進(jìn)行業(yè)務(wù)重構(gòu),以正確的姿勢(shì)使用RocketMQ。

本次要排查的問(wèn)題包括:
1、消息拉取時(shí)模型,是否會(huì)將非該消費(fèi)者消息的消息也拉取到客戶端?
2、如何計(jì)算堆積?

問(wèn)題1的本質(zhì)問(wèn)題是消息拉取的過(guò)濾模型在于客戶端,還是在服務(wù)端?問(wèn)題2的本質(zhì)問(wèn)題是消息如何存儲(chǔ)計(jì)算?欲探究該問(wèn)題則需要明確RocketMQ的底層存儲(chǔ)模型設(shè)計(jì),從頂層設(shè)計(jì)俯瞰消息隊(duì)列整個(gè)框架。

底層存儲(chǔ)模型

摘自RocketMQ技術(shù)內(nèi)幕.png

commitlog 是整個(gè)消息隊(duì)列存儲(chǔ)的核心文件,而consumerquque是邏輯消息隊(duì)列,主要存儲(chǔ)commitlog offset消息長(zhǎng)度,tag的hashcode,用于在消息消費(fèi)時(shí)快速定位消息在commit log文件位置,便于讀取消息。IndexFile俗稱索引文件,主要存儲(chǔ)消息key的hashcode以及commitlog offset,用于通過(guò)key快速定位到消息在commit log文件位置,便于讀取消息。

消息拉取模型分析

找到問(wèn)題1的答案之前,先思考消息隊(duì)列投遞時(shí)做了什么?

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
    //Create a message instance, specifying topic, tag and message body.
    Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " +
            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    );
    //Call send message to deliver message to one of brokers.
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
}

以上是代碼是從官網(wǎng)的地址copy而來(lái),雖簡(jiǎn)單但是從其中足以找到消息投遞時(shí)所需要的基本條件包括namesrvAddr、topic 、tag。

消息投遞

// DefaultProducerImpl#sendDefaultImpl()
// 省略大部分代碼,關(guān)鍵看備注部分
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 從本地緩存或namesrv遠(yuǎn)程讀取topic信息
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();‘
                // 根據(jù)某種策略選擇一個(gè)邏輯消息隊(duì)列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

從文中可以看到,在消息投遞的過(guò)程中,已經(jīng)在客戶端通過(guò)某種策略找到指定的topic下的邏輯隊(duì)列,邏輯隊(duì)列具體指的是consumerqueue文件,服務(wù)端對(duì)應(yīng)的處理主要是寫入,具體有興趣可以了解SendMessageProcessor類,最終通過(guò)DefaultMessageStore實(shí)現(xiàn)了數(shù)據(jù)的寫入,但是并未看到寫入consumerqueue,因?yàn)閷?shí)現(xiàn)consumerqueue文件寫入是通過(guò)另外的線程實(shí)現(xiàn)的,具體實(shí)現(xiàn)請(qǐng)參考ReputMessageService,本文不再深入。

我們主要知道,在客戶端除了上傳基本屬性數(shù)據(jù)之外,同時(shí)還在客戶端選擇好了將要寫入的邏輯消息隊(duì)列。

消息拉取

消息的拉取在客戶端就不進(jìn)行贅述了,主要看服務(wù)端的實(shí)現(xiàn)。有興趣可以了解PullMessageService#run()。服務(wù)端則重點(diǎn)查閱PullMessageProcessor#processRequest()


MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
    messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
} else {
         // 構(gòu)建消息過(guò)濾
    messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
}

// 消息過(guò)濾的核心源碼在ExpressionMessageFilter#isMatchedByConsumeQueue方法
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    if (null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode()) {
        return true;
    }

    // by tags code.
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

        if (tagsCode == null) {
            return true;
        }

        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }
                // tagecode其實(shí)就是tag的hashcode
        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }
  /// ....
}


// 接著PullMessageProcessor#processRequest()往下看
final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
// 注意該消息讀取的參數(shù),包括topic, queueid, queueoffset, 已經(jīng)消息最大條數(shù)

// 通過(guò)DefaultMessageStore#getMessage()繼續(xù)查看
// 注意,這里的offset是queueoffset,而不是commitlog offset
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
    
    // ...
    // 查找consumerqueue
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    
    // 
    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
    if (bufferConsumeQueue != null) {
        try {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;

            long nextPhyFileStartOffset = Long.MIN_VALUE;
            long maxPhyOffsetPulling = 0;

            int i = 0;
            final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
            final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                                /// .....
                                // 消息匹配,這個(gè)對(duì)象由前文的MessageFilter定義
                if (messageFilter != null
                    && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                    if (getResult.getBufferTotalSize() == 0) {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                    }

                    continue; //不匹配的消息則繼續(xù)往下來(lái)讀取
                }
 
                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);// offsetPy與sizePy查找commitlog上存儲(chǔ)的消息內(nèi)容
                
  ///....
}

以上源碼閱讀完后,問(wèn)題1 不攻自破,在服務(wù)端上過(guò)濾好消息,但是很明顯,查閱完整地源碼可以清晰地確定,并非是每一次拉取消息都可以過(guò)濾到自己想要的消息,即該消費(fèi)者拉取消息時(shí)可能在某一個(gè)comsumerqueue上拉取不到消息,因?yàn)槌涑庵粋€(gè)topic下的其他tag的消息,也就意味著不是每次拉取都有意義,而阿里云ONS的計(jì)費(fèi)上明顯提示拉取消息是要計(jì)算費(fèi)用的。

消息堆積

消息堆積意為著服務(wù)端要維護(hù)消息的消費(fèi)進(jìn)度。

先來(lái)看一張圖,圖中的brokerOffset - consumerOffset = diffTotal, 而diffTotal就是指堆積量,而描述堆積量的指標(biāo)是消息條數(shù)。

image.png

從commitlog中來(lái)看,由于存儲(chǔ)了大量的消息文件,并且消息消費(fèi)是非順序消費(fèi),繼而很難從commitlog中看出哪個(gè)
哪個(gè)consumer堆積量。

那么哪里可以描述清楚消息條數(shù)呢?先來(lái)深入了解Consumer Queue的設(shè)計(jì)

ConsumerQueue

consumerqueue的設(shè)計(jì)以topic作為邏輯分區(qū),每個(gè)topic下分多個(gè)消息隊(duì)列進(jìn)行,具體多少消息隊(duì)列存儲(chǔ)參照broker的配置參數(shù),隊(duì)列名稱以數(shù)組0開(kāi)始,比如配置0,1,2,3 四個(gè)消息隊(duì)列。

配置參數(shù)請(qǐng)參考BrokerConfig,其中有一個(gè)參數(shù)private int defaultTopicQueueNums = 8;

從語(yǔ)義上理解,堆積量應(yīng)該指未被消費(fèi)的存在broker上的消息數(shù)量,這是基本認(rèn)知。

commitlog存儲(chǔ)著broker上所有的消息,設(shè)想一下如果每次要查詢消息并消費(fèi)需要從該文件遍歷查詢,性能之差可想
而知,為了提高查詢的消息,優(yōu)先想到的是諸如MySQL上的索引設(shè)計(jì)。同理,consumerqueue的設(shè)計(jì)之初就是為了
快速定位到對(duì)應(yīng)的消費(fèi)者可以消費(fèi)的消息,當(dāng)然RocketMQ也提供了indexfile,俗稱索引文件,主要是解決通過(guò)key
快速定位消息的方式。

consumerqueue 消息結(jié)構(gòu)

摘自RocketMQ技術(shù)內(nèi)幕.png

consumerqueue的結(jié)構(gòu)設(shè)計(jì),在consumequeue的條目設(shè)計(jì)是固定的,并且它整好對(duì)應(yīng)一條消息。consumerqueue單個(gè)文件默認(rèn)是30w個(gè)條目,單個(gè)文件長(zhǎng)度30w * 20字節(jié)。從文件的存儲(chǔ)模型可以看出,consumerqueue存儲(chǔ)維度是topic,并非是consumer。那么如何找到consumer的堆積量?

假設(shè)

假設(shè)一個(gè)topic對(duì)應(yīng)一個(gè)consumertopic的堆積量即consumer的堆積量。從這個(gè)維度來(lái)推理,前文提到部分consumer是幾乎沒(méi)有消息,但是卻提示消息堆積即合理,因?yàn)槎逊e的消息并非是該consumer的需要消費(fèi)的消息,而是該consumerqueue對(duì)應(yīng)的topic的堆積

論證過(guò)程

rocketmq console后臺(tái)看到的消費(fèi)者的堆積數(shù)量,看到AdminBrokerProcess#getConsumeStats()。


private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,   
    // ...
    for (String topic : topics) {
        // ...
        for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setTopic(topic);
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setQueueId(i);

            OffsetWrapper offsetWrapper = new OffsetWrapper();
            // 核心的問(wèn)題在于要確定brokerOffset 以及consumerOffset的語(yǔ)義
            long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
            if (brokerOffset < 0)
                brokerOffset = 0;

            long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
                requestHeader.getConsumerGroup(),
                topic,
                i);
            if (consumerOffset < 0)
                consumerOffset = 0;

    // ....
}

// 隊(duì)列最大索引
public long getMaxOffsetInQueue(String topic, int queueId) {
    ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
    if (logic != null) {
        long offset = logic.getMaxOffsetInQueue();
        return offset;
    }
    return 0;
}
public long getMaxOffsetInQueue() {
    return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
  // 總的邏輯偏移量 / 20 = 總的消息條數(shù)
}

public static final int CQ_STORE_UNIT_SIZE = 20;// 前文提到每個(gè)條目固定20個(gè)字節(jié)

// 當(dāng)前消費(fèi)者的消費(fèi)進(jìn)度
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic,i);
if (consumerOffset < 0)
    consumerOffset = 0;
    public long queryOffset(final String group, final String topic, final int queueId) {
    // topic@group
    String key = topic + TOPIC_GROUP_SEPARATOR + group;
    ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);// 從offsetTable中讀取
    if (null != map) {
        Long offset = map.get(queueId);
        if (offset != null)
            return offset;
    }
    return -1;
}

核心的問(wèn)題在于從offset緩存中讀取出來(lái)的,那么offset的數(shù)據(jù) 又是哪里來(lái)的?


// 通過(guò)IDE快速可以很快找到如下代碼
@Override
public String configFilePath() {
    return
    BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().
        getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
    if (jsonString != null) {
        ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString,ConsumerOffsetManager.class);
        if (obj != null) {
            this.offsetTable = obj.offsetTable;
        }
    }
}
public static String getConsumerOffsetPath(final String rootDir) {
    return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}

也就是說(shuō)offset的數(shù)據(jù)是從json文件中加載進(jìn)來(lái)的。

image.png

這個(gè)文件描述的是topic與消費(fèi)者的關(guān)系,每一個(gè)隊(duì)列對(duì)應(yīng)的消費(fèi)進(jìn)度。但是消費(fèi)是實(shí)時(shí)更新的,所以必須實(shí)時(shí)更新消費(fèi)進(jìn)度,消費(fèi)進(jìn)度的更新是從消息的拉取得到的。

DefaultStoreMessage

前文看過(guò)該類的部分代碼,主要是拉取的部分,這里補(bǔ)充拉取時(shí)的offset的值得語(yǔ)義。


ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
    int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
    long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

    // ...
        // offsetPy 是commitlog的邏輯偏移量
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    if (null == selectResult) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
        }

        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
        continue;
    }
        // 消息過(guò)濾
    if (messageFilter != null
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        // release...
        selectResult.release();
        continue;
    }
    // ....
}

// ...
//
// 計(jì)算下一次開(kāi)始的offset,是前文的offset
// i 是ConsumeQueue.CQ_STORE_UNIT_SIZE的倍數(shù)
// ConsumeQueue.CQ_STORE_UNIT_SIZE是每一條consumerqueue中的條目的大小,20字節(jié)
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

看到此處,可以明確消費(fèi)者拉取消息時(shí)的nextBeginOffset就是consumerqueue的偏移量/20,意味著類似下標(biāo)數(shù)組index。
到此處還要再確認(rèn)拉取的這個(gè)消費(fèi)進(jìn)度是不是會(huì)更新到到offsetTable?核心看RemoteBrokerOffsetStore

消息消費(fèi)

貼幾張圖簡(jiǎn)單了解客戶端上報(bào)消費(fèi)進(jìn)度的過(guò)程

image.png
image.png
image.png

至此,可以看到堆積量的實(shí)際是根據(jù)topic來(lái)算,按照前文最開(kāi)始的假設(shè)推斷其實(shí)是成立的,那么現(xiàn)在那些沒(méi)有消息堆積的消息為何還會(huì)顯示堆積就可以理解了。

總結(jié)

消息消費(fèi)屬于服務(wù)端過(guò)濾模式,不過(guò)其實(shí)還要其他的消息過(guò)濾模式,只是本文并未提及(Class)。但是由于topic使用的不合理導(dǎo)致消息可能存在拉取不到數(shù)據(jù),但是ONS是計(jì)算收費(fèi)的。同時(shí)消息的堆積意義明朗,那么使用RocketMQ的姿勢(shì)也就不言而喻,按照業(yè)務(wù)合理使用topic以及tag等。

參考資料

源碼:https://github.com/apache/rocketmq
官網(wǎng):http://rocketmq.apache.org/docs/rmq-deployment/
書籍:《RocketMQ技術(shù)內(nèi)幕》,特別推薦該書,讓你對(duì)RocketMQ的架構(gòu)設(shè)計(jì),代碼有更深的了解

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容