rocket mq 底層存儲(chǔ)源碼分析(7)-業(yè)務(wù)消息查詢

本章節(jié)是《rocket mq 底層存儲(chǔ)源碼分析》系列的最后一章,我們結(jié)合【邏輯位移索引】以及【key查詢索引】,從低層接口分析如何利用這兩類索引,為上層業(yè)務(wù)接口提供查詢業(yè)務(wù)消息的實(shí)現(xiàn)。因此,這里并不涉及Consumer客戶端是如何發(fā)起拉取消息請(qǐng)求,以及broker端接收請(qǐng)求后,根據(jù)客戶端的查詢條件,查詢出指定的業(yè)務(wù)消息并,最后返回給客戶端的 整個(gè)流程。該章節(jié)只是分析如何通過指定的查詢參數(shù),獲取指定的業(yè)務(wù)消息。


在開始之前,我們先回顧一下java 中ArrayList是如何訪問一個(gè)指定的元素?相信不少讀者可以立刻給出答案,通過index下標(biāo)就可以訪問了。

那rmq是如何設(shè)計(jì)底層接口來訪問具體的業(yè)務(wù)消息呢?帶著這個(gè)疑問,我們來一起分析查詢業(yè)務(wù)消息的底層核心接口:

    /**
     * @param group 消費(fèi)客戶端所制定的ConsumeGroup
     * @param topic
     * @param queueId 指定的消費(fèi)隊(duì)列
     * @param offset 消息的邏輯位移
     * @param maxMsgNums 拉取的最大消息數(shù)量,PULL模式的客戶端由用戶自行制定,PUSH模式一般使用默認(rèn)值32
     * @param subscriptionData
     * @return
     */
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
        final SubscriptionData subscriptionData) {
        ...

        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long nextBeginOffset = offset;
        long minOffset = 0;
        long maxOffset = 0;

        GetMessageResult getResult = new GetMessageResult();

        //this.mappedFileQueue.getMaxOffset() 獲取最大的已commit 的物理offset。
        final long maxOffsetPy = this.commitLog.getMaxOffset();

        //step1、通過topic和queueId找到指定消費(fèi)的ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            //邏輯消費(fèi)隊(duì)列目前最小的【邏輯位移索引】
            minOffset = consumeQueue.getMinOffsetInQueue();
            //邏輯消費(fèi)隊(duì)列目前最大的【邏輯位移索引】
            maxOffset = consumeQueue.getMaxOffsetInQueue();


            //nextOffsetCorrection   根據(jù)consumer指定消費(fèi)的offset修正nextOffset
            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } 
            else {
                //step2
                //獲取該索引位置之后的【邏輯位移索引】字節(jié),這里有可能獲取多個(gè)  【邏輯位移索引】(20字節(jié)一個(gè))
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;
                        final int maxFilterMessageCount = 16000;
                        ...

                        //start rolling get 【邏輯位移索引】
                        //step3
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //step3.1 讀取完整的【邏輯位移索引】?jī)?nèi)容
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); //業(yè)務(wù)消息的開始  物理存儲(chǔ)位移
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); //業(yè)務(wù)消息的實(shí)際大小
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); //業(yè)務(wù)消息的 標(biāo)識(shí)hash值

                            //已經(jīng)獲取的最大的具體消息的物理位移
                            maxPhyOffsetPulling = offsetPy;

                            ...
                            //step3.2這里表明如果當(dāng)前查詢出的具體消息的物理位移落后于已CommitLog的maxOffsetPy即最大已提交到緩存的消息物理位移
                            //總內(nèi)存的40%,則表明該消息時(shí)在磁盤中
                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

                            //該邏輯判斷循環(huán)內(nèi),該條消息是否達(dá)到了批次滿的條件
                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                isInDisk)) {
                                break;
                            }

                             //step3.3
                            if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                if (selectResult != null) {
                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                    getResult.addMessage(selectResult);
                                    status = GetMessageStatus.FOUND;
                                    nextPhyFileStartOffset = Long.MIN_VALUE;
                                } else {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }

                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                }
                            } else {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                            }
                        } //end for

                        ...

                        //step3.4
                        nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                        //記錄目前的消費(fèi)進(jìn)度差值
                        long diff = maxOffsetPy - maxPhyOffsetPulling;

                        //系統(tǒng)總存儲(chǔ)的40%  accessMessageInMemoryMaxRatio = 40,可以配置
                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                        //換言之,如果目前的消費(fèi)進(jìn)度差值  超過40%,則建議消費(fèi)者端下次拉取的目標(biāo)為slave
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    //如果該索引文件找不到相對(duì)應(yīng)的【邏輯位移索引】字節(jié)內(nèi)容,則滾動(dòng)到下一個(gè)索引文件的的開始位置所對(duì)應(yīng)的邏輯位移
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                    
                }
            }
        } else {
            //找不到對(duì)應(yīng)的消費(fèi)邏輯隊(duì)列
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }

        ...
        //step4記錄拉取消息的消耗時(shí)間,并返回結(jié)果
        ...
        long eclipseTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }

接下來我們主要從4個(gè)步驟分析整個(gè)查詢流程::

1、通過topic和queueId找到指定消費(fèi)的ConsumeQueue
2、獲取根據(jù)查詢位移,通過具體ConsumeQueue實(shí)例獲取多條【邏輯位移索引】字節(jié)內(nèi)容
3、根據(jù)【邏輯位移索引】列表滾動(dòng)獲取業(yè)務(wù)消息
4、返回查詢結(jié)果

1、通過topic和queueId找到指定消費(fèi)的ConsumeQueue。

之前在【rocket mq 底層存儲(chǔ)源碼分析(4)-索引構(gòu)建】章節(jié)中,已經(jīng)分析過,如何通過topic即queueId獲取對(duì)應(yīng)的ConsumeQueue實(shí)例,這里就不在詳細(xì)展開。

獲取ConsumeQueue實(shí)例后,先得到該邏輯消費(fèi)隊(duì)列目前的可消費(fèi)范圍:

minOffset = consumeQueue.getMinOffsetInQueue()
getMinOffsetInQueue() = this.minLogicOffset / CQ_STORE_UNIT_SIZE

maxOffset = consumeQueue.getMaxOffsetInQueue()
getMaxOffsetInQueue() = this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE

字段minLogicOffset以及 this.mappedFileQueue.getMaxOffset()分別代表目前的 邏輯消費(fèi)隊(duì)列實(shí)例中,【邏輯索引位移】所對(duì)應(yīng)的【最小物理位移起始地址】以及【最大物理位移起始地址】 ,通過【邏輯索引位移】的物理位移起始位置除以其大小(CQ_STORE_UNIT_SIZE),即可換算出【下標(biāo)位移】(例如java中查詢 ArrayList中的index)。

接著,通過 查詢參數(shù)中的offset (客戶端指定的【下標(biāo)位移】)與 邏輯消費(fèi)隊(duì)列可查詢范圍[minOffset ,maxOffset] 比較,如果offset不在[minOffset ,maxOffset] 的范圍內(nèi),則跳過查詢業(yè)務(wù)消息的步驟。否則,在根據(jù)所在范圍,返回查詢結(jié)果。例如,如果offset < minOffset,則設(shè)置查詢狀態(tài)為status = GetMessageStatus.OFFSET_TOO_SMALL, 以及下次可查詢的下標(biāo)位移nextBeginOffset = nextOffsetCorrection(offset, minOffset),其業(yè)務(wù)含義為,該次查詢的消息位移過小,并要求客戶端下一次查詢下標(biāo)位移為minOffset。如果offset[minOffset ,maxOffset] 范圍內(nèi),則代碼走到步驟2。


2、獲取根據(jù)查詢位移,通過具體ConsumeQueue實(shí)例獲取多條【邏輯位移索引】字節(jié)內(nèi)容。

接著步驟1中的, 如果offset在查詢范圍[minOffset ,maxOffset],接著分析獲取多條【邏輯位移索引】字節(jié)內(nèi)容。

對(duì)應(yīng)代碼consumeQueue.getIndexBuffer(offset)

    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        //300000 * 20 ,即每一個(gè)mappedFile存放30萬條位置索引消息
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) {
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {

                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
   }

分析一下該方法,首先是this.mappedFileQueue.findMappedFileByOffset(offset)想必讀者對(duì)該方法不陌生,即通過指定的物理位移,找出該物理位移所在的映射文件。

然后,在通過SelectMappedBufferResult bufferConsumeQueue = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)),將映射文件大于該物理位移的【邏輯位移索引】字節(jié)內(nèi)容一并查詢出來。


3、根據(jù)【邏輯位移索引】列表滾動(dòng)獲取業(yè)務(wù)消息。

在上一步分析中,我們知道bufferConsumeQueue存放著連續(xù)遞增的【邏輯位移索引】字節(jié)內(nèi)容,接下來,我們將分析如何使用bufferConsumeQueue查詢出符合條件的業(yè)務(wù)消息字節(jié)。

for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) 的for 循環(huán)里,滾動(dòng)讀取bufferConsumeQueue中的【邏輯位移索引】。我們?cè)敿?xì)分析讀取流程:

首先是step3.1,根據(jù)【邏輯位移索引】的存儲(chǔ)結(jié)構(gòu),讀取完整的【邏輯位移索引】?jī)?nèi)容,maxPhyOffsetPulling則記錄目前的最大的業(yè)務(wù)消息的物理位移,主要是用來記錄消費(fèi)進(jìn)度的。

然后到step3.2,checkInDiskByCommitOffset(offsetPy, maxOffsetPy),該方法主要是判斷,目前我們所查詢的業(yè)務(wù)消息是否在磁盤中,因?yàn)閞mq只將部分最新的業(yè)務(wù)消息放在pagecache,而大部分的消息還是存放在磁盤中。判斷的依據(jù)是,當(dāng)前查詢出的具體消息的物理位移 與 已CommitLog的maxOffsetPy即最大已提交到緩存的消息物理位移的差值,是否超過總內(nèi)存的40%,如果超過,則表明該消息在磁盤中。接著,判斷循環(huán)內(nèi),該次讀取是否達(dá)到了批次滿的條件,對(duì)應(yīng)代碼
this.isTheBatchFull(...)

    private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {

        if (0 == bufferTotal || 0 == messageTotal) {
            return false;
        }

        if ((messageTotal + 1) >= maxMsgNums) {
            return true;
        }

        if (isInDisk) {
            //如果當(dāng)前已獲取的總消息大小大于1024 * 64,則停止獲取
            if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
                return true;
            }
            //MaxTransferCountOnMessageInDisk : 8
            if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
                return true;
            }
        } else {
            //如果當(dāng)前已獲取的總消息大小大于1024 * 256
            if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
                return true;
            }
            //MaxTransferBytesOnMessageInMemory:32
            if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
                return true;
            }
        }

        return false;
    }

總結(jié)一下下面的判斷邏輯:

如果isInDisk為true,說明目前消費(fèi)進(jìn)度不到系統(tǒng)總存儲(chǔ)的60%,則批次量滿的條件為:目前的傳輸字節(jié)量不能大于64k或者總條數(shù)不能大于8, 否則,目前的傳輸字節(jié)量不能大于256k或者總條數(shù)不能大于32條。

如果該次讀取未到達(dá)批次滿的條件,則繼續(xù)判斷目前的業(yè)務(wù)想消息的tagsCode是否符合查詢所指定的訂閱條件,this.messageFilter.isMessageMatched(...)

    public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
        if (tagsCode == null) {
            return true;
        }

        if (null == subscriptionData) {
            return true;
        }

        if (subscriptionData.isClassFilterMode())
            return true;

        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }

        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }

總結(jié)一下上述判斷邏輯,如果該【邏輯位移索引】所對(duì)應(yīng)的tagsCode或者上層業(yè)務(wù)調(diào)用方?jīng)]有指定subscriptionData,則認(rèn)為這條消息符合訂閱條件。否則,則根據(jù)上層業(yè)務(wù)所指定的訂閱數(shù)據(jù),判斷訂閱數(shù)據(jù)是否包含tagsCode。這里在說明一下,tagsCode是由生產(chǎn)者方指定,而subscriptionData則由消費(fèi)者端指定。

如果消息符合查詢條件,即this.messageFilter.isMessageMatched(...)為true,則通過【邏輯位移索引】所存儲(chǔ)的offsetPy以及sizePy字段,查詢出具體的業(yè)務(wù)消息字節(jié),對(duì)應(yīng)代碼SelectMappedBufferResult selectResult = this.commitLog.getMessage(...),之前有分析過該方法,這里就不在展開分析了。

如果查詢子結(jié)果selectResult不為空,則說明該次查詢正常,繼而,往拉取總結(jié)果getResult添加子結(jié)果,getResult.addMessage(selectResult),我們跟進(jìn)該方法:

    public void addMessage(final SelectMappedBufferResult mapedBuffer) {
        this.messageMapedList.add(mapedBuffer);
        this.messageBufferList.add(mapedBuffer.getByteBuffer());
        this.bufferTotalSize += mapedBuffer.getSize();
        this.msgCount4Commercial += (int) Math.ceil(
            mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
    }

可以看出,每次添加子結(jié)果后,會(huì)往結(jié)果列表messageMapedList添加業(yè)務(wù)消息字節(jié),而消費(fèi)者客戶端所得到的所有消息元數(shù)據(jù)就在messageMapedList中,并且由客戶端負(fù)責(zé)反序列化業(yè)務(wù)消息;以及記錄當(dāng)已獲取消息的總大小bufferTotalSize 和總條數(shù)msgCount4Commercial。這些結(jié)果就是作用與下一次 批次滿判斷的 條件,就是步驟step3.2中的邏輯判斷。

如果查詢子結(jié)果selectResult為空,說明該【邏輯位移索引】所對(duì)應(yīng)的【業(yè)務(wù)消息】不在該映射文件中,需要滾動(dòng)到下一個(gè)存儲(chǔ)映射文件。

當(dāng)拉取的消息達(dá)到了批次滿判斷的條件或者是到達(dá)了【邏輯位移索引】所在的映射文件尾部時(shí),則結(jié)束該次的循環(huán)流程。

for循環(huán)結(jié)束后,接著step3.4,更新nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE),在說一下nextBeginOffset ,該字段所代表的業(yè)務(wù)含義是,希望消費(fèi)者客戶端下一次拉取消息下標(biāo)索引位置。

并且判斷消費(fèi)者客戶端拉取消息的broker角色,如果目前的消費(fèi)進(jìn)度差值 超過40%,則建議消費(fèi)者端下次拉取的目標(biāo)為slave,否則繼續(xù)向master拉取,這部分內(nèi)容我們會(huì)在分析消費(fèi)者客戶端拉取消息時(shí),詳細(xì)分析的,這里就不在展開討論。


4、返回查詢結(jié)果。

最后一步step4比較簡(jiǎn)單,往結(jié)果getResult填充相對(duì)應(yīng)的查詢結(jié)果字段。



總結(jié)一下上述查找流程,首先,根據(jù)查詢條件topic 與 queueId 確定該broker唯一一條邏輯消費(fèi)隊(duì)列實(shí)例(ConsumeQueue),然后,獲取該消費(fèi)隊(duì)列目前可以消費(fèi)的索引范圍[minOffset ,maxOffset]。換言之,業(yè)務(wù)調(diào)用方所指定的請(qǐng)求查詢參數(shù)中的索引字段offset一定要在[minOffset ,maxOffset]這個(gè)范圍內(nèi),如果不在該范圍內(nèi),則會(huì)給業(yè)務(wù)調(diào)用方返回相對(duì)應(yīng)的響應(yīng)碼。例如offset比最小的可消費(fèi)索引minOffset還要小,則返回響應(yīng)碼GetMessageStatus.OFFSET_OVERFLOW_ONE,以及下一次查詢的開始索引位置nextBeginOffset = minOffset。如果offset在指定的范圍內(nèi),則根據(jù)offset,定位出offset第一個(gè)【邏輯位移索引】所在的存儲(chǔ)映射文件及所在物理位移,然后在根據(jù)業(yè)務(wù)方所指定的最大拉取數(shù)量maxMsgNums(當(dāng)然,實(shí)際值還要結(jié)合目前的消費(fèi)進(jìn)度及可傳輸?shù)膶?shí)際總字節(jié)量來確定maxMsgNums的最終值),結(jié)合篩選條件subscriptionDatatagsCode匹配,如果符合條件,在根據(jù)【邏輯位移索引】存儲(chǔ)的的業(yè)務(wù)消息物理位移offsetPy及其大小sizePy,查詢出具體的業(yè)務(wù)消息字節(jié)內(nèi)容,將業(yè)務(wù)消息字節(jié)結(jié)果加入結(jié)果列表中,知道結(jié)果列表到達(dá)maxMsgNums或者是【邏輯位移索引】存儲(chǔ)文件尾部,則結(jié)束查詢。


到這里,《rocket mq 底層存儲(chǔ)源碼分析》系列就結(jié)束了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容