本章節(jié)是《rocket mq 底層存儲(chǔ)源碼分析》系列的最后一章,我們結(jié)合【邏輯位移索引】以及【key查詢索引】,從低層接口分析如何利用這兩類索引,為上層業(yè)務(wù)接口提供查詢業(yè)務(wù)消息的實(shí)現(xiàn)。因此,這里并不涉及Consumer客戶端是如何發(fā)起拉取消息請(qǐng)求,以及broker端接收請(qǐng)求后,根據(jù)客戶端的查詢條件,查詢出指定的業(yè)務(wù)消息并,最后返回給客戶端的 整個(gè)流程。該章節(jié)只是分析如何通過指定的查詢參數(shù),獲取指定的業(yè)務(wù)消息。
在開始之前,我們先回顧一下java 中ArrayList是如何訪問一個(gè)指定的元素?相信不少讀者可以立刻給出答案,通過index下標(biāo)就可以訪問了。
那rmq是如何設(shè)計(jì)底層接口來訪問具體的業(yè)務(wù)消息呢?帶著這個(gè)疑問,我們來一起分析查詢業(yè)務(wù)消息的底層核心接口:
/**
* @param group 消費(fèi)客戶端所制定的ConsumeGroup
* @param topic
* @param queueId 指定的消費(fèi)隊(duì)列
* @param offset 消息的邏輯位移
* @param maxMsgNums 拉取的最大消息數(shù)量,PULL模式的客戶端由用戶自行制定,PUSH模式一般使用默認(rèn)值32
* @param subscriptionData
* @return
*/
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
final SubscriptionData subscriptionData) {
...
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
//this.mappedFileQueue.getMaxOffset() 獲取最大的已commit 的物理offset。
final long maxOffsetPy = this.commitLog.getMaxOffset();
//step1、通過topic和queueId找到指定消費(fèi)的ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
//邏輯消費(fèi)隊(duì)列目前最小的【邏輯位移索引】
minOffset = consumeQueue.getMinOffsetInQueue();
//邏輯消費(fèi)隊(duì)列目前最大的【邏輯位移索引】
maxOffset = consumeQueue.getMaxOffsetInQueue();
//nextOffsetCorrection 根據(jù)consumer指定消費(fèi)的offset修正nextOffset
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
}
else {
//step2
//獲取該索引位置之后的【邏輯位移索引】字節(jié),這里有可能獲取多個(gè) 【邏輯位移索引】(20字節(jié)一個(gè))
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = 16000;
...
//start rolling get 【邏輯位移索引】
//step3
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//step3.1 讀取完整的【邏輯位移索引】?jī)?nèi)容
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); //業(yè)務(wù)消息的開始 物理存儲(chǔ)位移
int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); //業(yè)務(wù)消息的實(shí)際大小
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); //業(yè)務(wù)消息的 標(biāo)識(shí)hash值
//已經(jīng)獲取的最大的具體消息的物理位移
maxPhyOffsetPulling = offsetPy;
...
//step3.2這里表明如果當(dāng)前查詢出的具體消息的物理位移落后于已CommitLog的maxOffsetPy即最大已提交到緩存的消息物理位移
//總內(nèi)存的40%,則表明該消息時(shí)在磁盤中
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
//該邏輯判斷循環(huán)內(nèi),該條消息是否達(dá)到了批次滿的條件
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
//step3.3
if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (selectResult != null) {
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
} else {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
}
} else {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
}
} //end for
...
//step3.4
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//記錄目前的消費(fèi)進(jìn)度差值
long diff = maxOffsetPy - maxPhyOffsetPulling;
//系統(tǒng)總存儲(chǔ)的40% accessMessageInMemoryMaxRatio = 40,可以配置
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
//換言之,如果目前的消費(fèi)進(jìn)度差值 超過40%,則建議消費(fèi)者端下次拉取的目標(biāo)為slave
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
//如果該索引文件找不到相對(duì)應(yīng)的【邏輯位移索引】字節(jié)內(nèi)容,則滾動(dòng)到下一個(gè)索引文件的的開始位置所對(duì)應(yīng)的邏輯位移
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
}
}
} else {
//找不到對(duì)應(yīng)的消費(fèi)邏輯隊(duì)列
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
...
//step4記錄拉取消息的消耗時(shí)間,并返回結(jié)果
...
long eclipseTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
接下來我們主要從4個(gè)步驟分析整個(gè)查詢流程::
1、通過topic和queueId找到指定消費(fèi)的ConsumeQueue
2、獲取根據(jù)查詢位移,通過具體ConsumeQueue實(shí)例獲取多條【邏輯位移索引】字節(jié)內(nèi)容
3、根據(jù)【邏輯位移索引】列表滾動(dòng)獲取業(yè)務(wù)消息
4、返回查詢結(jié)果
1、通過topic和queueId找到指定消費(fèi)的ConsumeQueue。
之前在【rocket mq 底層存儲(chǔ)源碼分析(4)-索引構(gòu)建】章節(jié)中,已經(jīng)分析過,如何通過topic即queueId獲取對(duì)應(yīng)的ConsumeQueue實(shí)例,這里就不在詳細(xì)展開。
獲取ConsumeQueue實(shí)例后,先得到該邏輯消費(fèi)隊(duì)列目前的可消費(fèi)范圍:
minOffset = consumeQueue.getMinOffsetInQueue()
getMinOffsetInQueue() = this.minLogicOffset / CQ_STORE_UNIT_SIZE
maxOffset = consumeQueue.getMaxOffsetInQueue()
getMaxOffsetInQueue() = this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE
字段minLogicOffset以及 this.mappedFileQueue.getMaxOffset()分別代表目前的 邏輯消費(fèi)隊(duì)列實(shí)例中,【邏輯索引位移】所對(duì)應(yīng)的【最小物理位移起始地址】以及【最大物理位移起始地址】 ,通過【邏輯索引位移】的物理位移起始位置除以其大小(CQ_STORE_UNIT_SIZE),即可換算出【下標(biāo)位移】(例如java中查詢 ArrayList中的index)。
接著,通過 查詢參數(shù)中的offset (客戶端指定的【下標(biāo)位移】)與 邏輯消費(fèi)隊(duì)列可查詢范圍[minOffset ,maxOffset] 比較,如果offset不在[minOffset ,maxOffset] 的范圍內(nèi),則跳過查詢業(yè)務(wù)消息的步驟。否則,在根據(jù)所在范圍,返回查詢結(jié)果。例如,如果offset < minOffset,則設(shè)置查詢狀態(tài)為status = GetMessageStatus.OFFSET_TOO_SMALL, 以及下次可查詢的下標(biāo)位移nextBeginOffset = nextOffsetCorrection(offset, minOffset),其業(yè)務(wù)含義為,該次查詢的消息位移過小,并要求客戶端下一次查詢下標(biāo)位移為minOffset。如果offset在[minOffset ,maxOffset] 范圍內(nèi),則代碼走到步驟2。
2、獲取根據(jù)查詢位移,通過具體ConsumeQueue實(shí)例獲取多條【邏輯位移索引】字節(jié)內(nèi)容。
接著步驟1中的, 如果offset在查詢范圍[minOffset ,maxOffset],接著分析獲取多條【邏輯位移索引】字節(jié)內(nèi)容。
對(duì)應(yīng)代碼
consumeQueue.getIndexBuffer(offset):
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
//300000 * 20 ,即每一個(gè)mappedFile存放30萬條位置索引消息
int mappedFileSize = this.mappedFileSize;
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
分析一下該方法,首先是this.mappedFileQueue.findMappedFileByOffset(offset)想必讀者對(duì)該方法不陌生,即通過指定的物理位移,找出該物理位移所在的映射文件。
然后,在通過SelectMappedBufferResult bufferConsumeQueue = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)),將映射文件大于該物理位移的【邏輯位移索引】字節(jié)內(nèi)容一并查詢出來。
3、根據(jù)【邏輯位移索引】列表滾動(dòng)獲取業(yè)務(wù)消息。
在上一步分析中,我們知道bufferConsumeQueue存放著連續(xù)遞增的【邏輯位移索引】字節(jié)內(nèi)容,接下來,我們將分析如何使用bufferConsumeQueue查詢出符合條件的業(yè)務(wù)消息字節(jié)。
在for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) 的for 循環(huán)里,滾動(dòng)讀取bufferConsumeQueue中的【邏輯位移索引】。我們?cè)敿?xì)分析讀取流程:
首先是step3.1,根據(jù)【邏輯位移索引】的存儲(chǔ)結(jié)構(gòu),讀取完整的【邏輯位移索引】?jī)?nèi)容,maxPhyOffsetPulling則記錄目前的最大的業(yè)務(wù)消息的物理位移,主要是用來記錄消費(fèi)進(jìn)度的。
然后到step3.2,checkInDiskByCommitOffset(offsetPy, maxOffsetPy),該方法主要是判斷,目前我們所查詢的業(yè)務(wù)消息是否在磁盤中,因?yàn)閞mq只將部分最新的業(yè)務(wù)消息放在pagecache,而大部分的消息還是存放在磁盤中。判斷的依據(jù)是,當(dāng)前查詢出的具體消息的物理位移 與 已CommitLog的maxOffsetPy即最大已提交到緩存的消息物理位移的差值,是否超過總內(nèi)存的40%,如果超過,則表明該消息在磁盤中。接著,判斷循環(huán)內(nèi),該次讀取是否達(dá)到了批次滿的條件,對(duì)應(yīng)代碼
this.isTheBatchFull(...):
private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
if (0 == bufferTotal || 0 == messageTotal) {
return false;
}
if ((messageTotal + 1) >= maxMsgNums) {
return true;
}
if (isInDisk) {
//如果當(dāng)前已獲取的總消息大小大于1024 * 64,則停止獲取
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
}
//MaxTransferCountOnMessageInDisk : 8
if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
return true;
}
} else {
//如果當(dāng)前已獲取的總消息大小大于1024 * 256
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}
//MaxTransferBytesOnMessageInMemory:32
if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
return true;
}
}
return false;
}
總結(jié)一下下面的判斷邏輯:
如果isInDisk為true,說明目前消費(fèi)進(jìn)度不到系統(tǒng)總存儲(chǔ)的60%,則批次量滿的條件為:目前的傳輸字節(jié)量不能大于64k或者總條數(shù)不能大于8, 否則,目前的傳輸字節(jié)量不能大于256k或者總條數(shù)不能大于32條。
如果該次讀取未到達(dá)批次滿的條件,則繼續(xù)判斷目前的業(yè)務(wù)想消息的tagsCode是否符合查詢所指定的訂閱條件,this.messageFilter.isMessageMatched(...):
public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
if (tagsCode == null) {
return true;
}
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode())
return true;
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
總結(jié)一下上述判斷邏輯,如果該【邏輯位移索引】所對(duì)應(yīng)的tagsCode或者上層業(yè)務(wù)調(diào)用方?jīng)]有指定subscriptionData,則認(rèn)為這條消息符合訂閱條件。否則,則根據(jù)上層業(yè)務(wù)所指定的訂閱數(shù)據(jù),判斷訂閱數(shù)據(jù)是否包含tagsCode。這里在說明一下,tagsCode是由生產(chǎn)者方指定,而subscriptionData則由消費(fèi)者端指定。
如果消息符合查詢條件,即this.messageFilter.isMessageMatched(...)為true,則通過【邏輯位移索引】所存儲(chǔ)的offsetPy以及sizePy字段,查詢出具體的業(yè)務(wù)消息字節(jié),對(duì)應(yīng)代碼SelectMappedBufferResult selectResult = this.commitLog.getMessage(...),之前有分析過該方法,這里就不在展開分析了。
如果查詢子結(jié)果selectResult不為空,則說明該次查詢正常,繼而,往拉取總結(jié)果getResult添加子結(jié)果,getResult.addMessage(selectResult),我們跟進(jìn)該方法:
public void addMessage(final SelectMappedBufferResult mapedBuffer) {
this.messageMapedList.add(mapedBuffer);
this.messageBufferList.add(mapedBuffer.getByteBuffer());
this.bufferTotalSize += mapedBuffer.getSize();
this.msgCount4Commercial += (int) Math.ceil(
mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
}
可以看出,每次添加子結(jié)果后,會(huì)往結(jié)果列表messageMapedList添加業(yè)務(wù)消息字節(jié),而消費(fèi)者客戶端所得到的所有消息元數(shù)據(jù)就在messageMapedList中,并且由客戶端負(fù)責(zé)反序列化業(yè)務(wù)消息;以及記錄當(dāng)已獲取消息的總大小bufferTotalSize 和總條數(shù)msgCount4Commercial。這些結(jié)果就是作用與下一次 批次滿判斷的 條件,就是步驟step3.2中的邏輯判斷。
如果查詢子結(jié)果selectResult為空,說明該【邏輯位移索引】所對(duì)應(yīng)的【業(yè)務(wù)消息】不在該映射文件中,需要滾動(dòng)到下一個(gè)存儲(chǔ)映射文件。
當(dāng)拉取的消息達(dá)到了批次滿判斷的條件或者是到達(dá)了【邏輯位移索引】所在的映射文件尾部時(shí),則結(jié)束該次的循環(huán)流程。
for循環(huán)結(jié)束后,接著step3.4,更新nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE),在說一下nextBeginOffset ,該字段所代表的業(yè)務(wù)含義是,希望消費(fèi)者客戶端下一次拉取消息下標(biāo)索引位置。
并且判斷消費(fèi)者客戶端拉取消息的broker角色,如果目前的消費(fèi)進(jìn)度差值 超過40%,則建議消費(fèi)者端下次拉取的目標(biāo)為slave,否則繼續(xù)向master拉取,這部分內(nèi)容我們會(huì)在分析消費(fèi)者客戶端拉取消息時(shí),詳細(xì)分析的,這里就不在展開討論。
4、返回查詢結(jié)果。
最后一步step4比較簡(jiǎn)單,往結(jié)果getResult填充相對(duì)應(yīng)的查詢結(jié)果字段。
總結(jié)一下上述查找流程,首先,根據(jù)查詢條件topic 與 queueId 確定該broker唯一一條邏輯消費(fèi)隊(duì)列實(shí)例(ConsumeQueue),然后,獲取該消費(fèi)隊(duì)列目前可以消費(fèi)的索引范圍[minOffset ,maxOffset]。換言之,業(yè)務(wù)調(diào)用方所指定的請(qǐng)求查詢參數(shù)中的索引字段offset一定要在[minOffset ,maxOffset]這個(gè)范圍內(nèi),如果不在該范圍內(nèi),則會(huì)給業(yè)務(wù)調(diào)用方返回相對(duì)應(yīng)的響應(yīng)碼。例如offset比最小的可消費(fèi)索引minOffset還要小,則返回響應(yīng)碼GetMessageStatus.OFFSET_OVERFLOW_ONE,以及下一次查詢的開始索引位置nextBeginOffset = minOffset。如果offset在指定的范圍內(nèi),則根據(jù)offset,定位出offset第一個(gè)【邏輯位移索引】所在的存儲(chǔ)映射文件及所在物理位移,然后在根據(jù)業(yè)務(wù)方所指定的最大拉取數(shù)量maxMsgNums(當(dāng)然,實(shí)際值還要結(jié)合目前的消費(fèi)進(jìn)度及可傳輸?shù)膶?shí)際總字節(jié)量來確定maxMsgNums的最終值),結(jié)合篩選條件subscriptionData與tagsCode匹配,如果符合條件,在根據(jù)【邏輯位移索引】存儲(chǔ)的的業(yè)務(wù)消息物理位移offsetPy及其大小sizePy,查詢出具體的業(yè)務(wù)消息字節(jié)內(nèi)容,將業(yè)務(wù)消息字節(jié)結(jié)果加入結(jié)果列表中,知道結(jié)果列表到達(dá)maxMsgNums或者是【邏輯位移索引】存儲(chǔ)文件尾部,則結(jié)束查詢。
到這里,《rocket mq 底層存儲(chǔ)源碼分析》系列就結(jié)束了。