1 概述
2 入口方法
3 ConsumeQueue索引結(jié)構(gòu)
4 索引構(gòu)建
1 概述
RocketMQ一個Broker中可以建立多個Topic,每個Topic又可以有多個queue,Broker在接收生產(chǎn)者發(fā)來的消息時,是按照消息到來的順序追加到同一個文件中的,當(dāng)然文件默認大小為1G,如果超過文件最大大小,則會接著前一個文件寫入的數(shù)據(jù)繼續(xù)寫入。
所有Topic所有queue的數(shù)據(jù)放在一起就造成了查詢數(shù)據(jù)或者消費數(shù)據(jù)時面臨著大量的隨機讀,也造成查詢數(shù)據(jù)需要從頭到尾讀取所有的數(shù)據(jù)。為了避免每次查詢或者消費者拉去數(shù)據(jù)時從頭到尾遍歷,RocketMQ在消息數(shù)據(jù)上構(gòu)建了兩種索引,一個是筆者文章RocketMQ源碼-Index索引介紹介紹的全局索引Index索引,另一個就是本文介紹的根據(jù)queue劃分后的每個隊列的ConsumeQueue索引。
Index索引和ConsumeQueue的區(qū)別主要有三個,第一是Index基于消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX屬性構(gòu)建,而ConsumeQueue基于消息標(biāo)簽的hash碼構(gòu)建;第二是Index為全局索引,不區(qū)分主題隊列,所有消息索引在一個文件中,而ConsumeQueue對應(yīng)一個主題的一個隊列,每個主題的每個隊列都會有一個ConsumeQueue索引;第三是Index主要用于消息查詢,而ConsumeQueue主要用于消息消費時,消費者拉取消息使用,這里也能說明為什么Index設(shè)計為全局索引而ConsumeQueue為單個隊列的索引,因為消息查詢時一般為查詢所有消息中的滿足指定條件的消息,而消息消費時,消費者一般只會拉取自己訂閱(或者是訂閱之后負載均衡的被分配)的某個主題下某個隊列的消息。
2 入口方法
和筆者文章RocketMQ源碼-Index索引介紹一樣,ConsumeQueue構(gòu)建的入口也是在ReputMessageService服務(wù)的run方法中進行reput操作觸發(fā)的,用于構(gòu)建ConsumeQueue的類為CommitLogDispatcherBuildConsumeQueue,其也是DefaultMessageStore的內(nèi)部類,源碼如下:
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
//只會為普通的非事務(wù)消息和已提交的事務(wù)消息
//做索引
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
//為提交的事務(wù)消息或者已經(jīng)回滾的事務(wù)消息
//則不索引
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
在介紹具體如何構(gòu)建ConsumeQueue之前,我們先介紹下ConsumeQueue索引的結(jié)構(gòu)。
3 ConsumeQueue索引結(jié)構(gòu)
ConsumeQueue的結(jié)構(gòu)比較簡單,如下:

如上圖所示,每個索引項在文件中占20個字節(jié),各字段分別為:
- CommitLog Offset:該消息在CommitLog的起始物理偏移,long類型,8字節(jié);
- Size:該消息的大小,int類型,4字節(jié);
- Message Tags HashCode:消息標(biāo)簽對應(yīng)的hashCode,long類型,8字節(jié)。
這里要注意一下,每個ConsumeQueue還有一個用于記錄擴展索引信息的ConsumeQueueExt類實例,如果配置啟動了ConsumeQueue擴展類型,則ConsumeQueue中的Message Tags HashCode記錄的并不是消息標(biāo)簽對應(yīng)的hashCode,記錄的是該消息索引在擴展信息ConsumeQueueExt文件中的物理偏移,真正的Message Tags HashCode則記錄在ConsumeQueueExt文件中。那么在讀取ConsumeQueue如何區(qū)分Message Tags HashCode記錄的是消息標(biāo)簽的hashCode,還是擴展信息偏移呢?ConsumeQueue中有個方法isExtAddr(long tagsCode)則用于實現(xiàn)這個判斷:
//ConsumeQueue
/**
* Check {@code tagsCode} is address of extend file or tags code.
*/
public boolean isExtAddr(long tagsCode) {
return ConsumeQueueExt.isExtAddr(tagsCode);
}
//ConsumeQueueExt
/**
* Check whether {@code address} point to extend file.
* <p>
* Just test {@code address} is less than 0.
* </p>
*/
public static boolean isExtAddr(final long address) {
//MAX_ADDR = Integer.MIN_VALUE - 1L;
//也即如果消息tagsCode小于Integer.Min_VALUE-1,
//則為偏移地址,而不是tags的hashCode
return address <= MAX_ADDR;
}
擴展索引ConsumeQueueExt除了記錄消息的標(biāo)簽code,還記錄了消息bitMap信息和存儲時間。消息bitMap主要用于消息過了,暫不介紹。ConsumeQueueExt的基本存儲結(jié)構(gòu)為ConsumeQueueExt.CqExtUnit。
4 索引構(gòu)建
我們現(xiàn)在接著第2節(jié)的入口方法介紹,入口方法是調(diào)用DefaultMessageStore.this.putMessagePositionInfo(request);進行索引構(gòu)建的,DefaultMessageStore.this.putMessagePositionInfo(request);實現(xiàn)如下:
//DefaultMessageStore
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//先根據(jù)
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
ConsumeQueue中具體實現(xiàn)如下:
//ConsumeQueue
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
//寫入失敗則會連續(xù)嘗試30次
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
//如果啟用了擴展索引,則先構(gòu)造擴展索引保存單元
//CqExtUnit,寫入bitMap、保存時間、實際的消息
//標(biāo)簽hashCode
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
//寫入之后則返回擴展索引剛寫入的偏移地址
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
//tagsCode重置為擴展索引偏移地址
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
//進行實際寫入
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
//寫入消息物理偏移、消息大小和tagsCode
//tagsCode可能為擴展索引偏移或者實際標(biāo)簽code
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}