RocketMQ源碼解析(十)-Broker#消息存儲Index

IndexFile作用

MessageStore中存儲的消息除了通過ConsumeQueue提供給consumer消費之外,還支持通過MessageID或者M(jìn)essageKey來查詢消息。使用ID查詢時,因為ID就是用broker+offset生成的,所以很容易就找到對應(yīng)的commitLog文件來讀取消息。對于用MessageKey來查詢消息,MessageStore通過構(gòu)建一個index來提高讀取速度。

Index文件組織方式

在了解源碼之前,最重要的是要知道Index文件的存儲結(jié)構(gòu)是怎么樣的,下面的圖引用自CSDN斬秋的文章RocketMQ原理解析

索引文件結(jié)構(gòu)

上圖中下半部分是索引文件的結(jié)構(gòu),一個索引文件時有文件頭,slotTable,和index List組成的。
Header中存儲的信息有:文件中第一個和最后一個索引對應(yīng)的消息的存儲時間,第一個和最后一個索引對應(yīng)消息的offset的最大和最小值,文件中的索引個數(shù)。
整個slotTable+indexLinkedList可以理解成java的HashMap。每當(dāng)放一個新的消息的index進(jìn)來,首先取MessageKey的hashCode,然后用hashCode對slot總數(shù)取模,得到應(yīng)該放到哪個slot中,slot總數(shù)系統(tǒng)默認(rèn)500W個。只要是取hash就必然面臨hash沖突的問題,跟HashMap一樣,IndexFile也是使用一個鏈表結(jié)構(gòu)來解決hash沖突。只是這里跟HashMap稍微有點區(qū)別的地方是,slot中放的是最新index的指針。這個是因為一般查詢的時候肯定是優(yōu)先查最近的消息。
每個slot中放的指針值是索引在indexFile中的偏移量,如上圖,每個索引大小是20字節(jié),所以根據(jù)當(dāng)前索引是這個文件中的第幾個(偏移量),就很容易定位到索引的位置。然后每個索引都保存了跟它同一個slot的前一個索引的位置,以此類推形成一個鏈表的結(jié)構(gòu)。下面通過代碼來看下新建一個索引的過程:

Index生成過程

上一篇講ConsumeQueue的時候,有一個ReputMessageService在分發(fā)消息的時候還會調(diào)用CommitLogDispatcherBuildIndex用來創(chuàng)建index。這個類實現(xiàn)就是直接調(diào)用的IndexService.buildIndex()

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }

直接看IndexService的實現(xiàn)如下:

public void buildIndex(DispatchRequest req) {
        //1、獲取或者新建當(dāng)前可寫入的index file
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            //2、獲取當(dāng)前indexFile中記錄的最大offset
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            //3、新來消息是之前的,不應(yīng)該出現(xiàn)
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE://rollback消息不更新index
                    return;
            }
            //4、單條消息
            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
            //5、多個msg,循環(huán)逐個存入
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }
  • 第1步,跟CommitLogConsumeQueue一樣,IndexFile也是用MappedFile來存儲數(shù)據(jù),每個MappedFile大小也是固定的。所以這里第一步獲取當(dāng)前正在寫入的文件,沒有的話則新建,indexFile是用當(dāng)前時間戳作為文件名的。
  • 第2、3步,判斷是否是新的消息,不是則跳過
  • 第4、5步,這里分為單條消息和批量消息,其實最終的邏輯是一樣的,都是調(diào)用putKey的方法
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }

這里就是調(diào)用IndexFile的putKey的方法,包含了重試邏輯,因為有可能在寫index的時候,上一個文件已經(jīng)寫滿了,需要創(chuàng)建一個新的文件寫入。

IndexFile數(shù)據(jù)寫入

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        //1、判斷index是否已滿,已滿返回失敗,由調(diào)用方來處理
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //2、計算key的非負(fù)hashCode,調(diào)用的java String的hashcode方法
            int keyHash = indexKeyHashMethod(key);
            //3、key應(yīng)該放的slot
            int slotPos = keyHash % this.hashSlotNum;
            //4、slot的數(shù)據(jù)存儲位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//獲取slot的position

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                //5、如果存在hash沖突,獲取這個slot存的前一個index的計數(shù),如果沒有則值為0
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                //6、計算當(dāng)前msg的存儲時間和第一條msg相差秒數(shù)
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                //7、獲取該條index實際存儲position
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //8、生成一個index的unit內(nèi)容
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);//key的hash,不會記錄完整的key
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//消息在commitlog中偏移
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//時間差
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//相同hashcode的前一條index的順序號
                //9、更新slot中的值為本條消息的順序號
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                //10、如果是第一條消息,更新header中的起始o(jì)ffset和起始time
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                //11、更新header中的計數(shù)
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();//增加index計數(shù)
                this.indexHeader.setEndPhyOffset(phyOffset);//最后一條消息的offset
                this.indexHeader.setEndTimestamp(storeTimestamp);//最后一個index的時間

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

以上的邏輯中,第5步獲取slot之前是否已經(jīng)有value,如果有代表hash沖突了,則在第8步中會把value設(shè)置成當(dāng)前index的前一個index,同時將slot中的value更新成當(dāng)前消息的序號。這樣整個索引的生成就結(jié)束了,我們看下使用MsgKey來查詢消息的時候是怎么使用索引文件的。

通過IndexFile查詢消息

MessageStore中提供了根據(jù)MessageKey查詢消息的接口,接口如下:

public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
        QueryMessageResult queryMessageResult = new QueryMessageResult();

        long lastQueryMsgTime = end;

        for (int i = 0; i < 3; i++) {
            //1、從indexService中查詢所有offset
            QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
            if (queryOffsetResult.getPhyOffsets().isEmpty()) {
                break;
            }

            Collections.sort(queryOffsetResult.getPhyOffsets());

            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());

            for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
                long offset = queryOffsetResult.getPhyOffsets().get(m);

                try {

                    boolean match = true;
                    MessageExt msg = this.lookMessageByOffset(offset);
                    if (0 == m) {
                        lastQueryMsgTime = msg.getStoreTimestamp();
                    }
                    if (match) {
                        //2、根據(jù)offset,到CommitLog讀取消息詳情
                        SelectMappedBufferResult result = this.commitLog.getData(offset, false);
                        if (result != null) {
                            int size = result.getByteBuffer().getInt(0);
                            result.getByteBuffer().limit(size);
                            result.setSize(size);
                            queryMessageResult.addMessage(result);
                        }
                    } else {
                        log.warn("queryMessage hash duplicate, {} {}", topic, key);
                    }
                } catch (Exception e) {
                    log.error("queryMessage exception", e);
                }
            }
            ...
        }

        return queryMessageResult;
    }

以上的接口輸入?yún)?shù)如下:
topic:只能按topic維度來查詢消息,因為索引生成的時候key是用的topic+MessageKey
key: messageKey
maxNum : 最多返回的消息數(shù),因為key是由用戶設(shè)置的,并不保證唯一,所以可能取到多個消息;同時index中只存儲了hash,所以hash相同的消息也會取出來
begin,end:起始和結(jié)束時間,只會查詢指定時間段的消息
這個接口的具體實現(xiàn)就是,先從IndexService中讀取到offset,然后到CommitLog中讀取消息詳情。下面主要看下indexService中的實現(xiàn):

public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<Long>(maxNum);

        long indexLastUpdateTimestamp = 0;
        long indexLastUpdatePhyoffset = 0;
        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
        try {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                //1、從最新的index文件開始向前找
                for (int i = this.indexFileList.size(); i > 0; i--) {
                    IndexFile f = this.indexFileList.get(i - 1);
                    boolean lastFile = i == this.indexFileList.size();
                    if (lastFile) {
                        indexLastUpdateTimestamp = f.getEndTimestamp();
                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
                    }
                    //2、index文件的時間包含了begin和end的全部或者部分
                    if (f.isTimeMatched(begin, end)) {
                        //3、從文件中讀取index中的offset
                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                    }

                    if (f.getBeginTimestamp() < begin) {
                        break;
                    }

                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            log.error("queryMsg exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
    }
  • 第1步,循環(huán)掃描indexFile,先掃描最近的
  • 第2步,從header中讀出最小和最大的timestamp,然后看是否符合請求的時間范圍
  • 第3步,從符合條件的index文件中獲取offset。下面看下indexFile的讀取步驟
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {//根據(jù)key來獲取消息offset
        if (this.mappedFile.hold()) {
            //1、跟生成索引時一樣,找到key的slot
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                }
                //2、獲取該槽位上的最后一條索引的序號
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {//到達(dá)最大返回條數(shù)
                            break;
                        }
                        //3、找到index的位置
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        //4、在commitlog中偏移
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        //5、相同hashcode的前一條消息的序號
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                        //6、Hash和time都符合條件,加入返回列表
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }
                        //7、前一條不等于0,繼續(xù)讀入前一條
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }
  • 第1步,首先根據(jù)Key獲取索引對應(yīng)的slot,這里邏輯跟存入索引時時一樣的
  • 第2步,slot中value存儲的是最后一個index的序號
  • 第3-6步,將符合條件的offset加入返回列表
  • 第7步,如果存在相同hash的前一條index,并且返回列表沒到最大值,則繼續(xù)向前搜索

總結(jié)

從通過index查詢消息的邏輯可以看出,相同的hashCode的message都會返回客戶端,如果調(diào)用這個接口通過key來查詢消息,需要在客戶端再做一次過濾。為了提高查詢效率,在發(fā)送消息時應(yīng)該在保證便于查詢的同時,盡量在一段時間內(nèi)讓消息有不同key。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容