
不吃(燒烤)不喝(奶茶可樂)看了好久才概括出這么一點(diǎn)點(diǎn)東西,希望大佬們能夠有耐心看一看,遇到說的不對(duì)的地方,也歡迎在評(píng)論區(qū)或者私信與我交流
另外完整版的代碼注釋,我在我的github上也添加了,感興趣的小伙伴也可以點(diǎn)擊這個(gè)鏈接去看一波 github地址
覺得我講的有那么一點(diǎn)點(diǎn)道理,對(duì)你有那么一丟丟的幫助的,也可以給我一波點(diǎn)贊關(guān)注666喲~

廢話不多說,下面開始我的表演~
RocketMQ全局流程圖

上來就是這么一大張圖片,相信大家肯定完全不想看下去。(那么我為什么還要放在一開始呢?主要是為了能夠讓大家有一個(gè)全局的印象,然后后續(xù)復(fù)習(xí)的時(shí)候也可以根據(jù)這個(gè)流程圖去具體復(fù)習(xí))
那么,下面我們就針對(duì)一些問題來具體描述RocketMQ的工作流程 此處內(nèi)容會(huì)不斷補(bǔ)充,也歡迎大家把遇到的問題在評(píng)論區(qū)留下來
消息消費(fèi)邏輯
消息消費(fèi)可以分為三大模塊
- Rebalance
- 拉取消息
- 消費(fèi)消息
Rebalance

// RebalanceImpl
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍歷每個(gè)主題的隊(duì)列
// subTable 會(huì)在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 時(shí)修改
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 對(duì)隊(duì)列進(jìn)行重新負(fù)載
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// topicSubscribeInfoTable topic訂閱信息緩存表
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 發(fā)送請(qǐng)求到broker獲取topic下該消費(fèi)組內(nèi)當(dāng)前所有的消費(fèi)者客戶端id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 排序保證了同一個(gè)消費(fèi)組內(nèi)消費(fèi)者看到的視圖保持一致,確保同一個(gè)消費(fèi)隊(duì)列不會(huì)被多個(gè)消費(fèi)者分配
Collections.sort(mqAll);
Collections.sort(cidAll);
// 分配算法 (盡量使用前兩種)
// 默認(rèn)有5種 1)平均分配 2)平均輪詢分配 3)一致性hash
// 4)根據(jù)配置 為每一個(gè)消費(fèi)者配置固定的消息隊(duì)列 5)根據(jù)broker部署機(jī)房名,對(duì)每個(gè)消費(fèi)者負(fù)責(zé)不同的broker上的隊(duì)列
// 但是如果消費(fèi)者數(shù)目大于消息隊(duì)列數(shù)量,則會(huì)有些消費(fèi)者無法消費(fèi)消息
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
// 當(dāng)前消費(fèi)者分配到的隊(duì)列
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新消息消費(fèi)隊(duì)列,如果是新增的消息消費(fèi)隊(duì)列,則會(huì)創(chuàng)建一個(gè)消息拉取請(qǐng)求并立即執(zhí)行拉取
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
// 當(dāng)前分配到的隊(duì)列中不包含原先的隊(duì)列(說明當(dāng)前隊(duì)列被分配給了其他消費(fèi)者)
if (!mqSet.contains(mq)) {
// 丟棄 processQueue
pq.setDropped(true);
// 移除當(dāng)前消息隊(duì)列
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 消息消費(fèi)隊(duì)列緩存中不存在當(dāng)前隊(duì)列 本次分配新增的隊(duì)列
if (!this.processQueueTable.containsKey(mq)) {
// 向broker發(fā)起鎖定隊(duì)列請(qǐng)求 (向broker端請(qǐng)求鎖定MessageQueue,同時(shí)在本地鎖定對(duì)應(yīng)的ProcessQueue)
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
// 加鎖失敗,跳過,等待下一次隊(duì)列重新負(fù)載時(shí)再嘗試加鎖
continue;
}
// 從內(nèi)存中移除該消息隊(duì)列的消費(fèi)進(jìn)度
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 首次添加,構(gòu)建拉取消息的請(qǐng)求
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 立即拉取消息(對(duì)新增的隊(duì)列)
this.dispatchPullRequest(pullRequestList);
return changed;
}
由流程圖和代碼,我們可以得知,集群模式下消息負(fù)載主要有以下幾個(gè)步驟:
- 從Broker獲取訂閱當(dāng)前Topic的消費(fèi)者列表
- 根據(jù)具體的策略進(jìn)行負(fù)載均衡
- 對(duì)當(dāng)前消費(fèi)者分配到的隊(duì)列進(jìn)行處理
- 原來有,現(xiàn)在沒有:丟棄對(duì)應(yīng)的消息處理隊(duì)列(ProcessQueue)
- 原來沒有,現(xiàn)在有:添加消息處理隊(duì)列(ProcessQueue),如果是第一次新增,還會(huì)創(chuàng)建一個(gè)消息拉取請(qǐng)求
拉取消息

拉取消息的代碼太多了,我就不再這里貼出來了。
我在這里說一下大致流程,然后有幾個(gè)需要注意的地方
流程:在我們Rebalance第一次添加負(fù)責(zé)的隊(duì)列和后續(xù)拉取消息后,都會(huì)再提交一個(gè)拉取請(qǐng)求到拉取請(qǐng)求隊(duì)列(pullRequestQueue)中,然后有一個(gè)線程不停的去里面獲取拉取請(qǐng)求,去執(zhí)行拉取的操作
這里說一個(gè)RocketMQ消費(fèi)者這邊設(shè)計(jì)的一個(gè)亮點(diǎn)
它將拉取消息,消費(fèi)消息通過兩個(gè)任務(wù)隊(duì)列的方式進(jìn)行解耦,然后每一個(gè)模塊僅需要負(fù)責(zé)它自己的功能。(雖然大佬們覺得很常見,但是當(dāng)時(shí)我看的時(shí)候還是感覺妙呀~)
另外還有一點(diǎn)需要注意的是:拉取消息的時(shí)候broker和consumer都會(huì)對(duì)消息進(jìn)行過濾,只不過broker是根據(jù)tag的hash進(jìn)行過濾的,而consumer是根據(jù)具體的tag字符串匹配過濾的。這也是有的時(shí)候,明明拉取到了消息,但是卻沒有需要消費(fèi)的消息產(chǎn)生的原因
既然說到了消息過濾,這邊先簡單提一下RocketMQ消息過濾的幾種方式
- 表達(dá)式過濾
- tag
- SQL92
- 類過濾
消費(fèi)消息

這邊也先說幾個(gè)注意點(diǎn)吧,后面再單獨(dú)出篇文章。
(一)順序消費(fèi)和非順序消費(fèi)消費(fèi)失敗的處理
(二)消費(fèi)失敗偏移量的更新:只有當(dāng)前這批消息全部消費(fèi)成功后,才會(huì)將偏移量更新成為這批消息最后一條的偏移量
(三)廣播消息失敗不會(huì)重試,僅打印失敗日志
補(bǔ)充:為什么同一個(gè)消費(fèi)組下消費(fèi)者的訂閱信息要相同
首先,先說一下什么叫做同一個(gè)消費(fèi)組下消費(fèi)者的訂閱信息要相同
即:在相同的GroupId下,每一個(gè)消費(fèi)者他們的訂閱內(nèi)容(Topic+Tag)要保持一致,否則會(huì)導(dǎo)致消息無法被正常消費(fèi)
參考文檔:阿里云:訂閱關(guān)系一致

我們?cè)诳创@個(gè)問題的時(shí)候,可以把它分為兩類情況考慮
- topic不一致
- tag不一致
(一)topic不一致的問題
首先先說一個(gè)場景,消費(fèi)者A監(jiān)聽了TopicA,消費(fèi)者B監(jiān)聽了TopicB,但是消費(fèi)者A和消費(fèi)者B同屬一個(gè)groupTest
在Rebalance階段,消費(fèi)者A對(duì)TopicA進(jìn)行負(fù)載均衡時(shí),會(huì)去查詢groupTest下的所有消費(fèi)者信息。獲取到了消費(fèi)者A和消費(fèi)者B。此時(shí)就會(huì)將TopicA的隊(duì)列對(duì)消費(fèi)者A和消費(fèi)者B進(jìn)行負(fù)載均衡(例如消費(fèi)者A分配到了1234四個(gè)隊(duì)列,消費(fèi)者B分配到了5678四個(gè)隊(duì)列)。此時(shí)消費(fèi)者B沒有針對(duì)TopicA的處理邏輯,就會(huì)導(dǎo)致推送到5678這幾個(gè)隊(duì)列里面的消息沒有辦法得到處理。
(二)tag不一致的問題
隨著消費(fèi)者A,消費(fèi)者B負(fù)載均衡的不斷進(jìn)行,會(huì)不斷把最新的訂閱信息(消息過濾規(guī)則)上報(bào)給broker。broker就會(huì)不斷的覆蓋更新,導(dǎo)致tag信息不停地變化,而tag的變化在消費(fèi)者拉取消息時(shí)broker的過濾就會(huì)產(chǎn)生影響,會(huì)導(dǎo)致一些本來要被消費(fèi)者拉取到的消息被broker過濾掉
消費(fèi)者總結(jié)
講了這么多的消費(fèi)者的內(nèi)容,出現(xiàn)了好多名詞,也把消費(fèi)者的一些比較核心的內(nèi)容逐個(gè)講了一遍。
那么,在這里,我們將消費(fèi)者這個(gè)模塊里面的所有東西,在進(jìn)行一個(gè)完整的串聯(lián)。然后消費(fèi)者這一方面的介紹就要告一段落了

延時(shí)隊(duì)列是如何工作的

由流程圖中我們不難看出,RocketMQ對(duì)延時(shí)消息的處理,是交由Timer去完成的(相關(guān)類ScheduleMessageService)。在Timer的任務(wù)隊(duì)列中讀取需要處理的延遲任務(wù),將消息從延遲隊(duì)列轉(zhuǎn)發(fā)到具體的業(yè)務(wù)隊(duì)列中
此處補(bǔ)充一點(diǎn):此處提到的Timer為java工具類包(java.util.Timer)下的一個(gè)定時(shí)任務(wù)工具。它主要由兩個(gè)部分:TaskQueue queue(任務(wù)隊(duì)列)和TimerThread thread(工作線程)。這邊我把它簡單的類比為一個(gè)單線程的工作線程池
另外在ScheduleMessageService中使用到了Timer的兩個(gè)方法,我在這里先單獨(dú)列出來下
- this.timer.schedule :在任務(wù)執(zhí)行成功后,再加上對(duì)應(yīng)的周期,然后再執(zhí)行
- this.timer.scheduleAtFixedRate :每隔指定時(shí)間就執(zhí)行一次,與任務(wù)執(zhí)行時(shí)間無關(guān)
話不多少,貼上源碼(源碼雖然枯燥,但希望可以耐心的看完)
// ScheduleMessageService
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 根據(jù)延時(shí)隊(duì)列創(chuàng)建對(duì)應(yīng)的定時(shí)任務(wù)
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 第一次,延遲一秒執(zhí)行任務(wù),后續(xù)根據(jù)對(duì)應(yīng)延時(shí)時(shí)間來執(zhí)行
// 延時(shí)級(jí)別和消息隊(duì)列id對(duì)應(yīng)關(guān)系 : 消息隊(duì)列id = 延時(shí)級(jí)別 - 1
// shedule 在任務(wù)執(zhí)行成功后,再加上對(duì)應(yīng)的周期,然后再執(zhí)行
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// scheduleAtFixedRate 每隔指定時(shí)間就執(zhí)行一次,與任務(wù)執(zhí)行時(shí)間無關(guān)
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) {
// 每個(gè)十秒持久化一次延遲隊(duì)列的處理進(jìn)度
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
// DeliverDelayedMessageTimerTask
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
// 根據(jù) 延時(shí)隊(duì)列topic 和 延時(shí)隊(duì)列id 查找消費(fèi)隊(duì)列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
// 遍歷ConsumeQueue,每一個(gè)標(biāo)準(zhǔn)的ConsumeQueue條目為20字節(jié)
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// > 0 未到消息消費(fèi)時(shí)間
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 放到對(duì)應(yīng)的 %RETRY%+gid 重試topic下進(jìn)行消費(fèi)(轉(zhuǎn)發(fā)消息)
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
}
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
}
}
} else {
// 會(huì)將下次任務(wù)執(zhí)行時(shí)間設(shè)置為countdown 即 消息的延時(shí)轉(zhuǎn)發(fā)時(shí)間-當(dāng)前時(shí)間
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 更新延時(shí)隊(duì)列拉取任務(wù)進(jìn)度
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
// 消費(fèi)隊(duì)列不存在,默認(rèn)為沒有需要消費(fèi)的任務(wù),跳過本次消費(fèi)
long cqMinOffset = cq.getMinOffsetInQueue();
long cqMaxOffset = cq.getMaxOffsetInQueue();
if (offset < cqMinOffset) {
// 下次拉取任務(wù)進(jìn)度更新
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (offset > cqMaxOffset) {
failScheduleOffset = cqMaxOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
}
} // end of if (cq != null)
// 根據(jù)延時(shí)等級(jí)創(chuàng)建一個(gè)任務(wù)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}