@[toc]
延遲消息描述介紹
RocketMQ的定時(shí)消息(延遲隊(duì)列)是指消息發(fā)送到broker后,不會(huì)立即被消費(fèi),等待特定時(shí)間投遞給真正的topic。broker有配置項(xiàng)messageDelayLevel,默認(rèn)值為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,18個(gè)level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個(gè)topic。發(fā)消息時(shí),設(shè)置delayLevel等級(jí)即可:msg.setDelayLevel(level)。level有以下三種情況:
- level 為 0,消息為非延遲消息
- 1<=level<=maxLevel,消息延遲特定時(shí)間,例如level==1,延遲1s
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h
定時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId =delayTimeLevel – 1,即一個(gè)queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)。broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫(xiě)入真實(shí)的topic。
需要注意的是,定時(shí)消息會(huì)在第一次寫(xiě)入和調(diào)度寫(xiě)入真實(shí)topic時(shí)都會(huì)計(jì)數(shù),因此發(fā)送數(shù)量、tps都會(huì)變高。
源碼分析
第一次存儲(chǔ)消息
?第一次存儲(chǔ)延遲消息是在CommitLog的putMessage方法中進(jìn)行的,關(guān)于這部分代碼分析可以看看前面的分析CommitLog文件的文章。這里不重復(fù)分析,只截取部分的代碼片段出來(lái)。
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
......
//如果不是事務(wù)消息 或者 是事務(wù)消息的提交階段
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果設(shè)置了延遲時(shí)間
if (msg.getDelayTimeLevel() > 0) {
//延遲級(jí)別不能超過(guò)最大的延遲級(jí)別,超過(guò)也設(shè)置為最大的延遲級(jí)別
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//設(shè)置延遲消息的topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//延遲消息的queueId= 延遲級(jí)別-1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 備份真正的topic和queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
......
}
?后續(xù)的把消息保存到CommitLog文件的代碼沒(méi)有貼出來(lái),這里是第一次存儲(chǔ)的關(guān)鍵部分。
- 這里會(huì)先判斷消息的標(biāo)志位,如果標(biāo)識(shí)位不是事務(wù)消息或者事務(wù)消息的提交階段。
- 會(huì)進(jìn)一步判斷是不是設(shè)置了延遲時(shí)間。
- 如果設(shè)置的延遲時(shí)間大于最大的延遲時(shí)間則把延遲時(shí)間設(shè)置為最大延遲時(shí)間
- 把消息的
queueId屬性修改為PROPERTY_REAL_QUEUE_ID,對(duì)應(yīng)的topic屬性設(shè)置為PROPERTY_REAL_TOPIC。同時(shí)把真正的queueId和topic保存在property屬性中。然后保存到CommitLog。
?在這里可以看到RocketMQ對(duì)于延遲消息,第一次的消息存儲(chǔ),會(huì)把消息的topic和queueId先修改,然后存放到特定的topic中去進(jìn)行保存。
第二次消息存儲(chǔ)
?RocketMQ中有一個(gè)專(zhuān)門(mén)處理topic為RMQ_SYS_SCHEDULE_TOPIC的服務(wù)類(lèi)ScheduleMessageService。這個(gè)類(lèi)的初始化是在DefaultMessageStore中會(huì)在RocketMQ的Broker啟動(dòng)的時(shí)候初始化。
初始化延遲文件和配置
?ScheduleMessageService在Broker啟動(dòng)的時(shí)候會(huì)先調(diào)用其load方法,加載delayOffset.json文件然后加載對(duì)應(yīng)的延遲級(jí)別配置。
public boolean load() {
//調(diào)用父類(lèi)的加載文件的方法,父類(lèi)會(huì)調(diào)用子類(lèi)實(shí)現(xiàn)的configFilePath方法確定文件
boolean result = super.load();
//加載成功則進(jìn)行解析延遲級(jí)別配置
result = result && this.parseDelayLevel();
return result;
}
@Override
public String configFilePath() {
//獲取`delayOffset.json`文件
return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
.getStorePathRootDir());
}
public boolean parseDelayLevel() {
//不同延遲級(jí)別的基礎(chǔ)時(shí)間長(zhǎng)度,單位為毫秒
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
//獲取延遲配置
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
//進(jìn)行分割解析,分割符號(hào)為空格
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
//獲取對(duì)應(yīng)的延遲毫秒數(shù)
Long tu = timeUnitTable.get(ch);
int level = i + 1;
//尋找最大的延遲等級(jí)
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
//獲取延遲的時(shí)長(zhǎng)
long num = Long.parseLong(value.substring(0, value.length() - 1));
//計(jì)算真正的時(shí)長(zhǎng)
long delayTimeMillis = tu * num;
//保存到延遲級(jí)別緩存中
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
?上面就是對(duì)應(yīng)的延遲級(jí)別的解析和配置,從上面可以看到我們可以通過(guò)修改配置文件的方式來(lái)修改RocketMQ的最大延遲時(shí)間和對(duì)應(yīng)的延遲級(jí)別。
處理延遲消息
?ScheduleMessageService是一個(gè)不停運(yùn)行的線程,在start方法中會(huì)不斷的針對(duì)不同延遲級(jí)別的消息進(jìn)行處理
public void start() {
//設(shè)置運(yùn)行狀態(tài)為開(kāi)始
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
//迭代延遲級(jí)別的緩存
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
//獲取等級(jí)和 延遲時(shí)間長(zhǎng)度
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
//獲取對(duì)應(yīng)延遲級(jí)別的偏移量緩存,這里緩存的是ConsumeQueue文件中的消息的偏移量
Long offset = this.offsetTable.get(level);
//如果偏移量為null,說(shuō)明沒(méi)有消息需要處理,這設(shè)置為0
if (null == offset) {
offset = 0L;
}
//如果延遲級(jí)別不為null,則構(gòu)建DeliverDelayedMessageTimerTask任務(wù)進(jìn)行處理
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
......
}
?這里主要邏輯就是循環(huán)迭代對(duì)應(yīng)的延遲級(jí)別緩存,然后根據(jù)不同的等級(jí)來(lái)獲取對(duì)應(yīng)的偏移量緩存。然后根據(jù)偏移量和延遲級(jí)創(chuàng)建一個(gè)DeliverDelayedMessageTimerTask進(jìn)一步的處理。這里要說(shuō)明的是offsetTable中存的是消息在ConsumeQueue中的偏移量。關(guān)于這部分的可以看看前面的ConsumeQueue相關(guān)的文章
?這里進(jìn)一步看看DeliverDelayedMessageTimerTask。
@Override
public void run() {
try {
if (isStarted()) {
//執(zhí)行檢查消息是否到時(shí)間的邏輯
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
?主要邏輯在executeOnTimeup中。這個(gè)方法的邏輯有點(diǎn)長(zhǎng),這里貼出主要的部分,然后進(jìn)行分析
public void executeOnTimeup() {
//根據(jù) RMQ_SYS_SCHEDULE_TOPIC 和 延遲級(jí)別 找到對(duì)應(yīng)的ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//根據(jù)傳入的 offset 的從ConsumeQueue中獲取對(duì)應(yīng)的消息信息緩沖,這里獲取到的不是真實(shí)的消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
//獲取額外的信息單元
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//從buffer中每次獲取20個(gè)byte長(zhǎng)度的信息,因?yàn)镃onsumeQueue的存儲(chǔ)單元大小為20byte
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//獲取消息在CommitLog中的真實(shí)位置
long offsetPy = bufferCQ.getByteBuffer().getLong();
//獲取消息的大小
int sizePy = bufferCQ.getByteBuffer().getInt();
//消息對(duì)應(yīng)的tag的hashcode
long tagsCode = bufferCQ.getByteBuffer().getLong();
//如果額外的信息不為空,則獲取
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
//從CommitLog中獲取消息的存儲(chǔ)時(shí)間
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
//下一個(gè)消息單元的偏移量
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//消息的延遲時(shí)間到了
if (countdown <= 0) {
//鎖定消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//構(gòu)建真正的消息,把真實(shí)的Topic和QueueId恢復(fù)
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
//把消息保存到CommitLog,等待消費(fèi)
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
//保存成功,則繼續(xù)處理下一條消息
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
//如果保存失敗,則創(chuàng)建新的延遲任務(wù)。10秒鐘之后重試
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
//更新偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
.......
//剩余的這些部分的邏輯就是重新構(gòu)建DeliverDelayedMessageTimerTask,等待下個(gè)時(shí)間段的重試
}
分析一下主要的邏輯步驟:
- 根據(jù) topic為
RMQ_SYS_SCHEDULE_TOPIC和 延遲級(jí)別為queueId找到對(duì)應(yīng)的ConsumeQueue - 根據(jù)傳入的
offset的從ConsumeQueue中獲取對(duì)應(yīng)的消息信息緩沖,這里獲取到的不是真實(shí)的消息,而是前面分析的重新設(shè)置后的消息 - 從buffer中每次獲取20個(gè)byte長(zhǎng)度的信息,因?yàn)镃onsumeQueue的存儲(chǔ)單元大小為20byte。然后根據(jù)Offset從CommitLog中獲取消息的落盤(pán)時(shí)間
- 計(jì)算當(dāng)前時(shí)間和落盤(pán)時(shí)間的時(shí)間差,檢查延遲時(shí)間是否到了
- 恢復(fù)消息真實(shí)的
Topic和QueueId然后保存到CommitLog中。等待消息的消費(fèi) - 期間如果有存在失敗的,則重新創(chuàng)建
DeliverDelayedMessageTimerTask任務(wù),等待下一次的處理
?這就是整個(gè)延遲消息的處理邏輯。其實(shí)就是先把消息真實(shí)的信息保存在消息屬性中,然后把消息的topic和queueId覆蓋然后保存到延遲消息專(zhuān)用的topic中,其中queueId為延遲級(jí)別。然后等待延遲消息處理的線程處理延遲消息的topic,時(shí)間到了就恢復(fù)消息真實(shí)的topic和queueId然后重新保存到CommitLog中,等待消費(fèi)。
?其中關(guān)于ConsumeQueue和CommitLog的消息獲取和保存的邏輯這里沒(méi)有進(jìn)行分析,需要了解的可以看前面的文章
CommitLog文件的文章
ConsumeQueue相關(guān)的文章