IndexFile作用
MessageStore中存儲的消息除了通過ConsumeQueue提供給consumer消費之外,還支持通過MessageID或者M(jìn)essageKey來查詢消息。使用ID查詢時,因為ID就是用broker+offset生成的,所以很容易就找到對應(yīng)的commitLog文件來讀取消息。對于用MessageKey來查詢消息,MessageStore通過構(gòu)建一個index來提高讀取速度。
Index文件組織方式
在了解源碼之前,最重要的是要知道Index文件的存儲結(jié)構(gòu)是怎么樣的,下面的圖引用自CSDN斬秋的文章RocketMQ原理解析

上圖中下半部分是索引文件的結(jié)構(gòu),一個索引文件時有文件頭,slotTable,和index List組成的。
Header中存儲的信息有:文件中第一個和最后一個索引對應(yīng)的消息的存儲時間,第一個和最后一個索引對應(yīng)消息的offset的最大和最小值,文件中的索引個數(shù)。
整個
slotTable+indexLinkedList可以理解成java的HashMap。每當(dāng)放一個新的消息的index進(jìn)來,首先取MessageKey的hashCode,然后用hashCode對slot總數(shù)取模,得到應(yīng)該放到哪個slot中,slot總數(shù)系統(tǒng)默認(rèn)500W個。只要是取hash就必然面臨hash沖突的問題,跟HashMap一樣,IndexFile也是使用一個鏈表結(jié)構(gòu)來解決hash沖突。只是這里跟HashMap稍微有點區(qū)別的地方是,slot中放的是最新index的指針。這個是因為一般查詢的時候肯定是優(yōu)先查最近的消息。每個slot中放的指針值是索引在indexFile中的偏移量,如上圖,每個索引大小是20字節(jié),所以根據(jù)當(dāng)前索引是這個文件中的第幾個(偏移量),就很容易定位到索引的位置。然后每個索引都保存了跟它同一個slot的前一個索引的位置,以此類推形成一個鏈表的結(jié)構(gòu)。下面通過代碼來看下新建一個索引的過程:
Index生成過程
上一篇講ConsumeQueue的時候,有一個ReputMessageService在分發(fā)消息的時候還會調(diào)用CommitLogDispatcherBuildIndex用來創(chuàng)建index。這個類實現(xiàn)就是直接調(diào)用的IndexService.buildIndex()
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
直接看IndexService的實現(xiàn)如下:
public void buildIndex(DispatchRequest req) {
//1、獲取或者新建當(dāng)前可寫入的index file
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
//2、獲取當(dāng)前indexFile中記錄的最大offset
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
//3、新來消息是之前的,不應(yīng)該出現(xiàn)
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE://rollback消息不更新index
return;
}
//4、單條消息
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
//5、多個msg,循環(huán)逐個存入
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
- 第1步,跟
CommitLog和ConsumeQueue一樣,IndexFile也是用MappedFile來存儲數(shù)據(jù),每個MappedFile大小也是固定的。所以這里第一步獲取當(dāng)前正在寫入的文件,沒有的話則新建,indexFile是用當(dāng)前時間戳作為文件名的。 - 第2、3步,判斷是否是新的消息,不是則跳過
- 第4、5步,這里分為單條消息和批量消息,其實最終的邏輯是一樣的,都是調(diào)用putKey的方法
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
這里就是調(diào)用IndexFile的putKey的方法,包含了重試邏輯,因為有可能在寫index的時候,上一個文件已經(jīng)寫滿了,需要創(chuàng)建一個新的文件寫入。
IndexFile數(shù)據(jù)寫入
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//1、判斷index是否已滿,已滿返回失敗,由調(diào)用方來處理
if (this.indexHeader.getIndexCount() < this.indexNum) {
//2、計算key的非負(fù)hashCode,調(diào)用的java String的hashcode方法
int keyHash = indexKeyHashMethod(key);
//3、key應(yīng)該放的slot
int slotPos = keyHash % this.hashSlotNum;
//4、slot的數(shù)據(jù)存儲位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//獲取slot的position
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//5、如果存在hash沖突,獲取這個slot存的前一個index的計數(shù),如果沒有則值為0
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//6、計算當(dāng)前msg的存儲時間和第一條msg相差秒數(shù)
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//7、獲取該條index實際存儲position
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//8、生成一個index的unit內(nèi)容
this.mappedByteBuffer.putInt(absIndexPos, keyHash);//key的hash,不會記錄完整的key
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//消息在commitlog中偏移
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//時間差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//相同hashcode的前一條index的順序號
//9、更新slot中的值為本條消息的順序號
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//10、如果是第一條消息,更新header中的起始o(jì)ffset和起始time
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//11、更新header中的計數(shù)
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();//增加index計數(shù)
this.indexHeader.setEndPhyOffset(phyOffset);//最后一條消息的offset
this.indexHeader.setEndTimestamp(storeTimestamp);//最后一個index的時間
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
以上的邏輯中,第5步獲取slot之前是否已經(jīng)有value,如果有代表hash沖突了,則在第8步中會把value設(shè)置成當(dāng)前index的前一個index,同時將slot中的value更新成當(dāng)前消息的序號。這樣整個索引的生成就結(jié)束了,我們看下使用MsgKey來查詢消息的時候是怎么使用索引文件的。
通過IndexFile查詢消息
MessageStore中提供了根據(jù)MessageKey查詢消息的接口,接口如下:
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) {
//1、從indexService中查詢所有offset
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
Collections.sort(queryOffsetResult.getPhyOffsets());
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);
try {
boolean match = true;
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp();
}
if (match) {
//2、根據(jù)offset,到CommitLog讀取消息詳情
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}
} catch (Exception e) {
log.error("queryMessage exception", e);
}
}
...
}
return queryMessageResult;
}
以上的接口輸入?yún)?shù)如下:
topic:只能按topic維度來查詢消息,因為索引生成的時候key是用的topic+MessageKey
key: messageKey
maxNum : 最多返回的消息數(shù),因為key是由用戶設(shè)置的,并不保證唯一,所以可能取到多個消息;同時index中只存儲了hash,所以hash相同的消息也會取出來
begin,end:起始和結(jié)束時間,只會查詢指定時間段的消息
這個接口的具體實現(xiàn)就是,先從IndexService中讀取到offset,然后到CommitLog中讀取消息詳情。下面主要看下indexService中的實現(xiàn):
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<Long>(maxNum);
long indexLastUpdateTimestamp = 0;
long indexLastUpdatePhyoffset = 0;
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
try {
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
//1、從最新的index文件開始向前找
for (int i = this.indexFileList.size(); i > 0; i--) {
IndexFile f = this.indexFileList.get(i - 1);
boolean lastFile = i == this.indexFileList.size();
if (lastFile) {
indexLastUpdateTimestamp = f.getEndTimestamp();
indexLastUpdatePhyoffset = f.getEndPhyOffset();
}
//2、index文件的時間包含了begin和end的全部或者部分
if (f.isTimeMatched(begin, end)) {
//3、從文件中讀取index中的offset
f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
}
if (f.getBeginTimestamp() < begin) {
break;
}
if (phyOffsets.size() >= maxNum) {
break;
}
}
}
} catch (Exception e) {
log.error("queryMsg exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}
- 第1步,循環(huán)掃描indexFile,先掃描最近的
- 第2步,從header中讀出最小和最大的timestamp,然后看是否符合請求的時間范圍
- 第3步,從符合條件的index文件中獲取offset。下面看下indexFile的讀取步驟
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {//根據(jù)key來獲取消息offset
if (this.mappedFile.hold()) {
//1、跟生成索引時一樣,找到key的slot
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
}
//2、獲取該槽位上的最后一條索引的序號
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {//到達(dá)最大返回條數(shù)
break;
}
//3、找到index的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
//4、在commitlog中偏移
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
//5、相同hashcode的前一條消息的序號
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
//6、Hash和time都符合條件,加入返回列表
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
//7、前一條不等于0,繼續(xù)讀入前一條
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
- 第1步,首先根據(jù)Key獲取索引對應(yīng)的slot,這里邏輯跟存入索引時時一樣的
- 第2步,slot中value存儲的是最后一個index的序號
- 第3-6步,將符合條件的offset加入返回列表
- 第7步,如果存在相同hash的前一條index,并且返回列表沒到最大值,則繼續(xù)向前搜索
總結(jié)
從通過index查詢消息的邏輯可以看出,相同的hashCode的message都會返回客戶端,如果調(diào)用這個接口通過key來查詢消息,需要在客戶端再做一次過濾。為了提高查詢效率,在發(fā)送消息時應(yīng)該在保證便于查詢的同時,盡量在一段時間內(nèi)讓消息有不同key。