@[toc]
作用
前面介紹了RocketMQ的一些主要的日志文件,CommitLog,ConsumeQueue,IndexFile的結(jié)構(gòu)和存儲(chǔ)操作原理。這些文件的處理類都在不同的類中處理的。RocketMQ中提供了DefaultMessageStore來(lái)對(duì)這些類進(jìn)行一個(gè)封裝聚合和額外的擴(kuò)展。比如過期消息的清理,新消息的保存,消息的查詢,消息的刷盤等。除此之外也提供一些服務(wù)啟動(dòng)時(shí)候的一些邏輯,比如從磁盤加載元數(shù)據(jù),服務(wù)停止時(shí)候的消息保存邏輯等。
分析
構(gòu)造方法DefaultMessageStore
?DefaultMessageStore的構(gòu)造方法,主要是給該類的一些成員變量進(jìn)行賦值或者初始化。這些成員變量都是DefaultMessageStore間接操作CommitLog文件或者是ConsumeQueue的對(duì)象以及持久化等相關(guān)的操作類。
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
//消息到達(dá)監(jiān)聽器,這個(gè)跟PullRequestHoldService一起使用的,消息到達(dá)后調(diào)用PullRequestHoldService中的notifyMessageArriving方法
this.messageArrivingListener = messageArrivingListener;
//broker配置
this.brokerConfig = brokerConfig;
//消息存儲(chǔ)配置
this.messageStoreConfig = messageStoreConfig;
//broker狀態(tài)管理類,統(tǒng)計(jì)broker相關(guān)信息,比如消息數(shù)量,topic數(shù)量等
this.brokerStatsManager = brokerStatsManager;
//提前主動(dòng)申請(qǐng)內(nèi)存文件服務(wù)類,用于CommitLog
this.allocateMappedFileService = new AllocateMappedFileService(this);
//
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
//管理ConsumeQueue文件的集合
this.consumeQueueTable = new ConcurrentHashMap<>(32);
//將ConsumeQueue邏輯隊(duì)列刷盤的服務(wù)類,1秒刷一次,最多嘗試3次,
this.flushConsumeQueueService = new FlushConsumeQueueService();
//清理物理文件服務(wù),定期清理72小時(shí)之前的物理文件。
this.cleanCommitLogService = new CleanCommitLogService();
//清理邏輯文件服務(wù),定期清理在邏輯隊(duì)列中的物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù),同時(shí)也清理Index中物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù)
this.cleanConsumeQueueService = new CleanConsumeQueueService();
//存儲(chǔ)層內(nèi)部統(tǒng)計(jì)服務(wù)
this.storeStatsService = new StoreStatsService();
//index文件的存儲(chǔ)
this.indexService = new IndexService(this);
//用于commitlog數(shù)據(jù)的主備同步
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
//用于轉(zhuǎn)發(fā)CommitLog中消息到ConsumeQueue中
this.reputMessageService = new ReputMessageService();
//用于監(jiān)控延遲消息,并到期后執(zhí)行,吧延遲消息寫入CommitLog
this.scheduleMessageService = new ScheduleMessageService(this);
//臨時(shí)日志文件存儲(chǔ)池
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//啟動(dòng)allocateMappedFileService
this.allocateMappedFileService.start();
//啟動(dòng)indexService
this.indexService.start();
//消息轉(zhuǎn)存任務(wù)的集合
this.dispatcherList = new LinkedList<>();
//負(fù)責(zé)把CommitLog消息轉(zhuǎn)存到ConsumeQueue文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
//負(fù)責(zé)吧CommitLog消息轉(zhuǎn)存到Index文件
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
//在保存日志數(shù)據(jù)文件的根目錄 {user.home}/store 路徑下 創(chuàng)建一個(gè)一個(gè)名字為lock 的文件
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
//創(chuàng)建根目錄文件的隨機(jī)文件流,權(quán)限是讀寫
lockFile = new RandomAccessFile(file, "rw");
}
?這里對(duì)一些重要的變量進(jìn)行講解
| 變量對(duì)象 | 作用描述 |
|---|---|
messageArrivingListener |
消息到達(dá)監(jiān)聽器,這個(gè)跟PullRequestHoldService一起使用的,消息到達(dá)后調(diào)用PullRequestHoldService中的notifyMessageArriving方法 ,在push模式下會(huì)調(diào)用 |
brokerConfig |
broker配置 |
messageStoreConfig |
消息存儲(chǔ)相關(guān)配置 |
brokerStatsManager |
broker狀態(tài)管理類,統(tǒng)計(jì)broker相關(guān)信息,比如消息數(shù)量,topic數(shù)量等 |
allocateMappedFileService |
提前主動(dòng)申請(qǐng)內(nèi)存文件服務(wù)類,用于CommitLog |
commitLog |
CommitLog日志文件的生成和處理類,根據(jù)不同的情況可能是普通的CommitLog頁(yè)可能是基于Raft協(xié)議擴(kuò)展實(shí)現(xiàn)的CommitLog
|
consumeQueueTable |
管理ConsumeQueue文件的集合 |
flushConsumeQueueService |
將ConsumeQueue邏輯隊(duì)列刷盤的服務(wù)類,1秒刷一次,最多嘗試3次, |
cleanCommitLogService |
清理物理文件CommitLog服務(wù),定期清理72小時(shí)之前的物理文件?;蛘逤ommitLog日志文件的在磁盤占比達(dá)到了75以上就會(huì)進(jìn)行清理 |
cleanConsumeQueueService |
清理邏輯文件服務(wù),定期清理在邏輯隊(duì)列中的物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù),同時(shí)也清理Index中物理偏移量小于commitlog中的最小物理偏移量的數(shù)據(jù) |
storeStatsService |
存儲(chǔ)層內(nèi)部統(tǒng)計(jì)服務(wù) |
indexService |
index文件的存儲(chǔ)和讀取類 |
haService |
高可用的服務(wù)類,如果是基于Raft協(xié)議的高可用那么這個(gè)類就是null |
reputMessageService |
用于轉(zhuǎn)發(fā)CommitLog中消息到ConsumeQueue中 |
scheduleMessageService |
用于監(jiān)控延遲消息,并到期后執(zhí)行,吧延遲消息真實(shí)的信息寫入CommitLog |
transientStorePool |
臨時(shí)日志文件存儲(chǔ)池,在CommitLog使用的臨時(shí)存儲(chǔ)池的時(shí)候會(huì)用到 |
dispatcherList |
消息轉(zhuǎn)存任務(wù)的集合 , 用于把CommitLog的消息日志同時(shí)轉(zhuǎn)存一份到ConsumeQueue中和Index中,用于構(gòu)建索引和消費(fèi)隊(duì)列 |
內(nèi)部方法分析
?這里分析Broker按照啟動(dòng)的前后順序來(lái)進(jìn)行分析。
Broker啟動(dòng)后服務(wù)于日志相關(guān)的類啟動(dòng)的start方法
?Broker在啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)DefaultMessageStore,利用前面說(shuō)的構(gòu)造器進(jìn)行創(chuàng)建,然后會(huì)調(diào)用start方法對(duì)成員變量中的一些類進(jìn)行啟動(dòng)。
public void start() throws Exception {
//獲取lock文件的唯一FileChannel對(duì)象,然后獲取文件鎖
lock = lockFile.getChannel().tryLock(0, 1, false);
//如果鎖是null 或者 鎖是共享類型的 或者 鎖不是有效的,則拋出異常
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
//寫入lock 四個(gè)字符到lock文件
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
//獲取CommitLog的最小物理偏移量
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
//比較ConsumeQueue文件記錄的最大物理偏移量和實(shí)際的物理偏移量的大小,取ConsumeQueue文件記錄的
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
//如果ConsumeQueue記錄的最大物理偏移量 大于 CommitLog的最小偏移量,則選用ConsumeQueue記錄的偏移量,
// 因?yàn)镃onsumeQueue記錄下來(lái)的表示已經(jīng)換發(fā)到了ConsumeQueue了,不需要處理,只需要處理沒有轉(zhuǎn)發(fā)的
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
//記錄的小于0,則置位0
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
/**
* 如果邏輯文件記錄的物理偏移量 小于 實(shí)際的最小物理偏移量,則置位實(shí)際的。
* 1. 如果有人移除了ConsumeQueue文件 或者 保存文件的磁盤損壞了
* 2. 從別的broker上面復(fù)制CommitLog到一個(gè)新啟動(dòng)的broker機(jī)器中
* 這幾種情況都可能使得邏輯日志ConsumeQueue文件記錄的最小物理偏移量為0
*
*/
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
//啟動(dòng)reputMessageService 來(lái)按照CommitLog 生成ConsumeQueue
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
//等待同步完成
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
//如果沒有使用DLegerCommitLog,就使用HaService來(lái)做高可用
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
//如果是master,則啟動(dòng)延遲消息的檢測(cè)任務(wù)
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
//刷新ConsumeQueue的服務(wù)啟動(dòng)
this.flushConsumeQueueService.start();
//CommitLog刷新的服務(wù)啟動(dòng)
this.commitLog.start();
//存儲(chǔ)狀態(tài)檢測(cè)的服務(wù)啟動(dòng)
this.storeStatsService.start();
//創(chuàng)建臨時(shí)文件,來(lái)表示是否正常關(guān)機(jī)
this.createTempFile();
//啟動(dòng)其他服務(wù)。比如清除過期日志的服務(wù)等
this.addScheduleTask();
this.shutdown = false;
}
?這里發(fā)現(xiàn)有很大的一段邏輯實(shí)在判斷CommitLog文件的最小物理偏移量和ConsumeQueue記錄的最大物理移量的比較,這里比較的原因是。reputMessageService這個(gè)類,會(huì)對(duì)CommitLog中的消息進(jìn)行轉(zhuǎn)存到ConsumeQueue中。這里可以參考前面的ConsumeQueue的分析的文章和CommitLog的分析文章。還有一些別的任務(wù)也會(huì)同時(shí)啟動(dòng),這里就不仔細(xì)分析。
Broker啟動(dòng)加載文件的 load方法
?Broker在啟動(dòng)后,會(huì)對(duì)機(jī)器中的可能存在的日志先關(guān)的文件比如CommitLog,ConsumeQueue,IndexFile等先進(jìn)行恢復(fù)處理。這個(gè)恢復(fù)的處理邏輯就在 load方法中。
public boolean load() {
boolean result = true;
try {
//是否存在abort文件,如果存在說(shuō)明上次服務(wù)關(guān)閉時(shí)異常關(guān)閉的
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
//加載定時(shí)任務(wù)的配置文件,解析延遲級(jí)別配置信息
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// 加載 Commit Log文件
result = result && this.commitLog.load();
// 加載 Consume Queue文件
result = result && this.loadConsumeQueue();
//檢查前面3個(gè)文件是不是加載成功
if (result) {
//加載成功則繼續(xù)加載checkpoint文件
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加載indexFile
this.indexService.load(lastExitOK);
//進(jìn)行文件的恢復(fù)邏輯
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
//上次服務(wù)關(guān)閉是不是正常關(guān)閉
if (lastExitOK) {
//正常關(guān)閉情況關(guān)閉
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
//異常情況關(guān)閉
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
//恢復(fù)topic消費(fèi)相關(guān)相關(guān)的緩存
this.recoverTopicQueueTable();
}
?這里可以看到文件的恢復(fù)存在一種情況,就是上次Broker的關(guān)閉時(shí)異常關(guān)閉的情況。這種情況下對(duì)應(yīng)的CommitLog的恢復(fù)與正常情況的恢復(fù)是不一樣的。整體的 load方法邏輯如下:
- 檢查
abort文件是不是存在,如果存在表示上次是異常關(guān)閉,這個(gè)文件是一個(gè)空文件,在啟動(dòng)之后會(huì)創(chuàng)建,正常關(guān)閉的情況會(huì)刪除掉。 - 加載延遲消息相關(guān)的配置,加載 Commit Log文件,加載Consume Queue文件
- 如果步驟2成功加載,則加載checkpoint文件,加載indexFile然后進(jìn)行文件的恢復(fù)邏輯
- 對(duì)于文件的恢復(fù)邏輯在
recover方法中,會(huì)調(diào)用CommitLog類中的方法
?這里有幾個(gè)點(diǎn)沒有進(jìn)行分析,可以看前面的文章。第一個(gè)就是CommitLog文件正?;謴?fù)和異?;謴?fù)相關(guān)邏輯在前面的CommitLog的文章中有分析。第二個(gè)是延遲消息相關(guān)的邏輯在延遲消息的原理ScheduleMessageService中。第三個(gè)IndexFile的邏輯在IndexFile消息索引日志文件相關(guān)的IndexService類
存儲(chǔ)消息的putMessage方法
?DefaultMessageStore中的putMessage方法其實(shí)主要是檢查存儲(chǔ)狀態(tài),校驗(yàn)消息和記錄消息存儲(chǔ)的耗時(shí)和次數(shù),主要的邏輯還是在CommitLog類中。這里不對(duì)CommitLog類中的邏輯進(jìn)行分析
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//檢查存儲(chǔ)狀態(tài),檢查操作系統(tǒng)是不是繁忙
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
//校驗(yàn)消息的topic長(zhǎng)度和屬性長(zhǎng)度
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
//計(jì)算耗時(shí)
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
//記錄消息的存儲(chǔ)時(shí)間,分為13個(gè)耗時(shí)區(qū)間段來(lái)存儲(chǔ),每個(gè)區(qū)間段統(tǒng)計(jì)放入的消息個(gè)
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
//如果消息存儲(chǔ)失敗,則統(tǒng)計(jì)失敗的次數(shù)
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
private PutMessageStatus checkStoreStatus() {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//salve 機(jī)器不能存儲(chǔ)消息
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("broke role is slave, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//檢查是不是可寫的狀態(tài)
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
this.printTimes.set(0);
}
//如果寫入CommitLog時(shí)間超過1m,則認(rèn)為操作系統(tǒng)的pageCache繁忙
if (this.isOSPageCacheBusy()) {
return PutMessageStatus.OS_PAGECACHE_BUSY;
}
return PutMessageStatus.PUT_OK;
}
獲取消息的getMessage方法
?獲取消息的邏輯比較長(zhǎng),主要復(fù)雜點(diǎn)在于計(jì)算消息的偏移量信息。主要邏輯如下
- 檢查服務(wù)的狀態(tài)
- 獲取CommitLog的最大物理偏移量,根據(jù)要獲取的消息的topic和queueId找到對(duì)應(yīng)的ConsumeQueue,并獲取最大邏輯偏移量和最小邏輯偏移量
- 根據(jù)獲取的邏輯偏移量進(jìn)行一系列的校驗(yàn),校驗(yàn)通過則根據(jù)ConsumeQueue的消息單元中記錄的消息物理偏移量和傳入的偏移量進(jìn)行對(duì)比尋找
- 找到對(duì)應(yīng)的消息的物理偏移量,然后進(jìn)行封裝最后返回
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
}
//檢查服務(wù)的狀態(tài)是否是可讀
if (!this.runningFlags.isReadable()) {
log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
return null;
}
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
//獲取存儲(chǔ)的消息的最大的物理偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
//根據(jù)topic和queueId 先找到對(duì)應(yīng)的邏輯日志ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
//如果邏輯日志不為空,獲取最小和最大的邏輯偏移量
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
//如果最大邏輯日志為0,說(shuō)明沒有存任何邏輯消息記錄
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
//消息偏移量小于最小的偏移量
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
//偏移等于最大偏移量,則表示消息溢出了
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
//大于最大邏輯偏移量,則嚴(yán)重溢出
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
//根據(jù)偏移量獲取對(duì)應(yīng)的邏輯消息
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
//計(jì)算最大獲取的消息長(zhǎng)度 = 單個(gè)邏輯單元長(zhǎng)度 * 獲取的消息個(gè)數(shù) = 20*maxMsgNums
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
//是否開啟磁盤降幅
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//依次獲取消息
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//獲取消息的物理偏移量 在CommitLog中的位置
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
//獲取消息的大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
//獲取消息的tagcode
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
//當(dāng)前消息的CommitLog的偏移量
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
/**
* 檢查獲取的消息是不是在磁盤上
* 1. 計(jì)算系統(tǒng)的內(nèi)存大小,然后?? 對(duì)應(yīng)的訪問消息在內(nèi)存中的最大比率參數(shù)(accessMessageInMemoryMaxRatio)得到存儲(chǔ)消息的內(nèi)存大小memory
* 2. 比較CommitLog的最大偏移量和當(dāng)前消息的偏移量差,如果小于memory 則在內(nèi)存中獲取消息。否則從磁盤獲取消息
*/
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
/**
* 消息是否獲取完畢。如果已經(jīng)獲取的消息的大小,或者消息的數(shù)量,達(dá)到了設(shè)置的上限也會(huì)直接返回
* - 從磁盤獲取時(shí)
* maxTransferBytesOnMessageInDisk : 一批次從磁盤獲取消息的最大允許大小
* maxTransferCountOnMessageInDisk : 一次從磁盤獲取消息的最大允許數(shù)量
*
* - 從內(nèi)存獲取
* maxTransferBytesOnMessageInMemory:一批次從內(nèi)存獲取消息的最大允許大小
* maxTransferCountOnMessageInMemory:一次從內(nèi)存獲取消息的最大允許數(shù)量
*/
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
//
boolean extRet = false, isTagsCodeLegal = true;
//檢查是否有ConsumeQueue擴(kuò)展文件,是的則從擴(kuò)展文件獲取
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}
//如果有ConsumeQueue對(duì)應(yīng)的過濾器,則進(jìn)行過濾
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
//從CommitLog獲取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
//如果有CommitLog先關(guān)的過濾器則進(jìn)行處理
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
//增加消息存儲(chǔ)服務(wù)的相關(guān)消息獲數(shù)量記錄
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
//開啟磁盤降幅,
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
//計(jì)算下一個(gè)開始的位置
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//當(dāng)前消息的偏移量和最大偏移量的差距
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
//如果剩余的偏移差 大于 設(shè)置的內(nèi)存百分比的大小 則建議從 salve節(jié)點(diǎn)拉取
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
//如果在ConsumeQueue中沒有對(duì)應(yīng)的消息,那么
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
//如果ConsumeQueue沒有則返回
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
//無(wú)論是否找到,都增加對(duì)應(yīng)的記錄
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
//設(shè)置返回的信息
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
?在DefaultMessageStore中還有一些別的方法,基本都是跟獲取消息有關(guān)系的,但是最最后幾乎都是調(diào)用上面說(shuō)的getMessage方法。因此這里就不在進(jìn)行分析了。