rocketmq客戶端消費(fèi)流程
只關(guān)注于集群模式下并發(fā)消費(fèi)的push模式
組件概述
DefaultMQPushConsumerImpl
- 負(fù)載均衡實(shí)現(xiàn) RebalanceImpl
- 拉取消息. PullAPIWrapper
- 消費(fèi)進(jìn)度存儲(chǔ) OffsetStore
- 消費(fèi)服務(wù) ConsumeMessageService
- MQClientInstance 客戶端核心實(shí)現(xiàn)
MQClientInstance
- netty 客戶端 業(yè)務(wù)線程池和回調(diào)線程池隔離
- 定時(shí)任務(wù)
- 負(fù)載均衡調(diào)度 RebalanceService
- 拉消息任務(wù)調(diào)度 pullMessageService
- 內(nèi)部生產(chǎn)者 defaultMQProducer
MQClientInstance 和 消費(fèi)者為一對(duì)多關(guān)系。使用InstanceName相同的生產(chǎn)者消費(fèi)者都使用同一個(gè)MQClientInstance。
啟動(dòng) DefaultMQPushConsumerImpl.start()
生成InstanceName,如果用戶未設(shè)置則為pid。
-
創(chuàng)建 MQClientInstance,使用InstanceName相同的生產(chǎn)者消費(fèi)者都使用同一個(gè)MQClientInstance。MQClientInstance是客戶端的核心。
就是說(shuō)一個(gè)MQClientInstance下會(huì)與多個(gè)消費(fèi)者。MQClientInstance統(tǒng)一調(diào)度他們。
this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);
//后面會(huì)將消費(fèi)者注冊(cè)到mQClientFactory,讓mQClientFactory有所有同一InstanceName消費(fèi)者的引用。
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
- 為負(fù)載均衡實(shí)現(xiàn)rebalanceImpl 賦值
- 創(chuàng)建PullAPIWrapper 負(fù)責(zé)拉取消息
- 根據(jù)消費(fèi)模式創(chuàng)建 OffsetStore
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING://廣播存儲(chǔ)在本地
this.offsetStore =
new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING://集群進(jìn)度存儲(chǔ)在遠(yuǎn)程
this.offsetStore =
new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
? OffsetStore 負(fù)責(zé)讀取消費(fèi)進(jìn)度和同步消費(fèi)進(jìn)度
-
根據(jù)消費(fèi)模式創(chuàng)建ConsumeMessageService 并啟動(dòng)
并發(fā)消費(fèi)不啟動(dòng)線程。
順序消費(fèi)下啟動(dòng)定時(shí)任務(wù),會(huì)調(diào)用消費(fèi)者的RebalanceImpl的lockAll 方法。向broker發(fā)生請(qǐng)求鎖住分配給他的隊(duì)列。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
{
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
-
啟動(dòng)MQClientInstance,多消費(fèi)者引用同一個(gè)MQClientInstance時(shí)MQClientInstance只會(huì)啟動(dòng)一次
mQClientFactory.start(); -
初始化
//向nameser 拉取所關(guān)心的topic的路由信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //向所有路由信息里的所有broker發(fā)送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //喚醒mQClientFactory的負(fù)責(zé)均衡服務(wù), this.mQClientFactory.rebalanceImmediately();
啟動(dòng) MQClientInstance.start()
一個(gè)MQClientInstance 只會(huì)啟動(dòng)一次。
1.啟動(dòng)netty 客戶端
this.mQClientAPIImpl.start();//內(nèi)置netty客戶端
2.啟動(dòng)定時(shí)任務(wù)
this.startScheduledTask();//會(huì)啟動(dòng)5個(gè)定時(shí)任務(wù)
- 從遠(yuǎn)程獲取nameServer地址 發(fā)生變動(dòng)時(shí)可以更新本地nameServer
遠(yuǎn)程地址被寫(xiě)死,暫時(shí)沒(méi)有用。
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
-
更新topic路由信息,topic路由發(fā)送變動(dòng)時(shí)可以感知
MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 更新消費(fèi)進(jìn)度到broker 最終調(diào)用 DefaultMQPushConsumerImpl.offsetStore.persistAll
這里可以看出更新消費(fèi)進(jìn)度是異步的,這也是出現(xiàn)重復(fù)消息的原因之一
MQClientInstance.this.persistAllConsumerOffset();
- 向broker發(fā)送心跳
MQClientInstance.this.cleanOfflineBroker();//清理下線的broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();//發(fā)送心跳
- 動(dòng)態(tài)調(diào)整線程池 根據(jù)DefaultMQPushConsumer 的 adjustThreadPoolNumsThreshold 參數(shù)和消息在消費(fèi)者內(nèi)部的堆積調(diào)整
MQClientInstance.this.adjustThreadPool();
- 啟動(dòng)調(diào)度服務(wù)
//拉消息線程
this.pullMessageService.start();
//Start rebalance service
//重負(fù)載線程
this.rebalanceService.start();
//Start push service 內(nèi)部生產(chǎn)者用于消費(fèi)失敗時(shí),發(fā)送重試消息
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
拉消息流程
拉消息的流程是先從負(fù)載均衡開(kāi)始的。MQClientInstance的rebalanceService啟動(dòng)后會(huì)定時(shí)調(diào)用,所有消費(fèi)者的doRebalance 方法。間隔10s
private static long WaitInterval = 1000 * 10;//間隔10s
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);//等待
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
public void doRebalance() {
//調(diào)用所有消費(fèi)者的doRebalance
for (String group : this.consumerTable.keySet()) {//consumerTable 消費(fèi)者引用
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}
//消費(fèi)者最終會(huì)調(diào)用自己的負(fù)載均衡實(shí)現(xiàn)的doRebalance方法
@Override
public void doRebalance() {
if (this.rebalanceImpl != null) { //消費(fèi)者調(diào)用自己的rebalanceImpl
this.rebalanceImpl.doRebalance();
}
}
負(fù)載均衡實(shí)現(xiàn)
先拿到topic路由信息,然后循環(huán)對(duì)topic做負(fù)載
public void doRebalance() {
//取得關(guān)心等待topic
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//對(duì)topic做負(fù)載
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//當(dāng)topic變動(dòng)時(shí),移除多余topic對(duì)應(yīng)的ProcessQueue
this.truncateMessageQueueNotMyTopic();
}
負(fù)載分集群和廣播模式,廣播模式不討論
在rocketmq中一個(gè)topic有多個(gè)隊(duì)列。負(fù)載均衡就是將隊(duì)列合理的分配給一個(gè)消費(fèi)組的所有消費(fèi)者。
有多種分配算法,繼承AllocateMessageQueueStrategy,默認(rèn)為AllocateMessageQueueAveragely
//先獲取負(fù)載所需要的參數(shù)
//topic對(duì)應(yīng)的所有隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//topic對(duì)應(yīng)的所有客戶端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
然后調(diào)用,返回的list就是分配給當(dāng)前消費(fèi)者的隊(duì)列
public List<MessageQueue> allocate(//
final String consumerGroup,//
final String currentCID,//
final List<MessageQueue> mqAll,//
final List<String> cidAll//
);
而區(qū)分不同客戶端的cidAll 就是每個(gè)客戶端的ip@InstanceName ,使用同一ip下不能有相同的InstanceName。
比如AllocateMessageQueueAveragely有這一行
//取自己在客戶端集合的下標(biāo),如果兩個(gè)客戶端InstanceName相同,那么index都一樣,分配的隊(duì)列也相同
int index = cidAll.indexOf(currentCID);
而這個(gè)負(fù)載算法是沒(méi)有同步和校驗(yàn)等操作的,不同客戶端不會(huì)進(jìn)行通信??蛻舳瞬恢绖e人分配了哪些隊(duì)列。全靠“自覺(jué)”,同一組內(nèi)都使用同一策略那么分配是合理的,如果同一組內(nèi)使用不同策略,隊(duì)列的分配就會(huì)發(fā)生混亂。
拉取任務(wù)
rocketmq為每個(gè)分配給它的隊(duì)列生成一個(gè) 拉取任務(wù) ProcessQueue
將其存儲(chǔ)在PullMessageService 的pullRequestQueue中,這是一個(gè)LinkedBlockingQueue
PullMessageService 啟動(dòng)后會(huì)從堵塞隊(duì)列中取出拉取任務(wù),然后進(jìn)行消息的拉取。
分配隊(duì)列完成后
//返回隊(duì)列是否發(fā)生了變化
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet) {//mqSet 分配給當(dāng)前消費(fèi)者的隊(duì)列
boolean changed = false;
//存儲(chǔ)上次分配的隊(duì)列和對(duì)應(yīng)的ProcessQueue拉取任務(wù)
//processQueueTable 是ConcurrentHashMap
Iterator<Entry<MessageQueue, ProcessQueue>> it =
this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {//topic 是否相等
if (!mqSet.contains(mq)) { //上次分配隊(duì)列,這次沒(méi)分配給我
pq.setDropped(true);//禁用拉取任務(wù) 修改dropped屬性。是volatile變量
//移除OffsetStore中存儲(chǔ)的隊(duì)列進(jìn)度,移除前先提交進(jìn)度
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}",
consumerGroup, mq);
}
}
//據(jù)上次拉取間隔 120000ms 也移除它
else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error(
"[BUG]doRebalance, {}, remove unnecessary mq, {},
because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
//新隊(duì)列 處理
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//生成拉取任務(wù)
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(new ProcessQueue());
//計(jì)算下次拉取的偏移
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
pullRequest.setNextOffset(nextOffset);
pullRequestList.add(pullRequest);
changed = true;
//記錄下 用于下次對(duì)比
this.processQueueTable.put(mq, pullRequest.getProcessQueue());
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//將拉取任務(wù)壓入堵塞隊(duì)列
//最終調(diào)用
//PullMessageService.executePullRequestImmediately
//this.pullRequestQueue.put(pullRequest);
this.dispatchPullRequest(pullRequestList);
return changed;
}
拉取消息
現(xiàn)在知道一個(gè)隊(duì)列對(duì)應(yīng)一個(gè)拉取任務(wù)ProcessQueue,存放在堵塞隊(duì)列中,如果禁用了會(huì)將dropped屬性修改為true。
誰(shuí)來(lái)執(zhí)行拉取呢,MQClientInstance.PullMessageService。
PullMessageService 啟動(dòng)后從堵塞隊(duì)列取出拉取任務(wù),找到對(duì)應(yīng)的組調(diào)用pullMessage
PullMessageService 為單線程,所有拉取消息時(shí)為單線程拉取
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
//從堵塞隊(duì)列中取出1
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
//找到對(duì)應(yīng)的組調(diào)用pullMessage
final MQConsumerInner consumer =
this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//調(diào)用消費(fèi)者的pullMessage,最終調(diào)用pullAPIWrapper.pullKernelImpl
impl.pullMessage(pullRequest);
}
}
DefaultMQPushConsumerImpl.pullMessage
先進(jìn)行限流等檢查,如果不能通過(guò)會(huì)調(diào)用executePullRequestLater() 將任務(wù)放回隊(duì)列,下次消費(fèi)。
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
//提交到定時(shí)任務(wù)中
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {//待會(huì)在放入隊(duì)列
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
}
也會(huì)檢查是否禁用。正常的任務(wù)拉取完成會(huì)放回隊(duì)列,等待下次拉取。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {//檢查dropped屬性。volatile修飾
log.info("the pull request[{}] is droped.", pullRequest.toString());
//被禁用直接拋棄 沒(méi)被禁用的用完會(huì)放回隊(duì)列
return;
}
都完成后創(chuàng)建一個(gè)回調(diào)函數(shù) PullCallback,然后異步拉取
因?yàn)榫W(wǎng)絡(luò)層是netty,所以其實(shí)所有請(qǐng)求都是異步。同步的操作只是做了異步轉(zhuǎn)同步而已。
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getSubVersion(), // 3
pullRequest.getNextOffset(), // 4
this.defaultMQPushConsumer.getPullBatchSize(), // 5
sysFlag, // 6
commitOffsetValue,// 7
BrokerSuspendMaxTimeMillis, // 8
ConsumerTimeoutMillisWhenSuspend, // 9
CommunicationMode.ASYNC, // 10
pullCallback// 11
);
請(qǐng)求成功后觸發(fā)回調(diào)函數(shù)。主要看 case FOUND,就可以了。其他代表沒(méi)有新消息,偏移量不對(duì)等
//這里有一個(gè)mq自己實(shí)現(xiàn)的性能統(tǒng)計(jì)。我們?cè)谕獠恳部梢阅玫?consumer.getDefaultMQPushConsumerImpl().getConsumerStatsManager()
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult =
DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
//性能統(tǒng)計(jì)
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null ||
pullResult.getMsgFoundList().isEmpty()) {
//空消息,放回隊(duì)列
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
//性能統(tǒng)計(jì)
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullResult.getMsgFoundList().size());
boolean dispathToConsume =
processQueue.putMessage(pullResult.getMsgFoundList());
//開(kāi)始消費(fèi)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
case NO_NEW_MSG:
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
}
}
開(kāi)始消費(fèi)
這里有一個(gè)分批消費(fèi)的邏輯,根據(jù)consumeMessageBatchMaxSize拆分
取決于這個(gè)參數(shù)private int consumeMessageBatchMaxSize = 1;
如果設(shè)置大于1那么這批消息消費(fèi)時(shí)只能全部成功或者全部失敗
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
else {
for (int total = 0; total < msgs.size();) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
}
else {
break;
}
}
//創(chuàng)建一個(gè)消費(fèi)job
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue,
messageQueue);
//提交到線程池
this.consumeExecutor.submit(consumeRequest);
}
}
//ConsumeRequest 是Runnable的實(shí)現(xiàn)
ConsumeRequest implements Runnable
ConsumeRequest的run方法
@Override
public void run() {
//又進(jìn)行了隊(duì)列禁用的校驗(yàn)
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped {}",
this.messageQueue);
return;
}
//用戶的消費(fèi)listener 實(shí)現(xiàn)
MessageListenerConcurrently listener =
ConsumeMessageConcurrentlyService.this.messageListener;
//創(chuàng)建Context
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
//這個(gè)Context 用于hook,在4.5的消息追蹤中是借助此hook和Context實(shí)現(xiàn)的
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer
.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
//調(diào)用hook:ConsumeMessageHook
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
try {
//將重試消息的topic替換為原來(lái)的topic
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
//調(diào)用用戶方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
RemotingHelper.exceptionSimpleDesc(e),//
ConsumeMessageConcurrentlyService.this.consumerGroup,//
msgs,//
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {//返回null 或者異常設(shè)置為失敗
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",//
ConsumeMessageConcurrentlyService.this.consumerGroup,//
msgs,//
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// add by fuhaining@yolo24.com
if (consumeMessageLog.isInfoEnabled()) {
StringBuilder keys = new StringBuilder();
for (MessageExt msg : msgs) {
keys.append(msg.getMsgId()).append(",");
}
consumeMessageLog.info("concurrently - " + status.name() + " : " +
keys.deleteCharAt(keys.length() - 1).toString());
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS ==
status);
//調(diào)用hook
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(
ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(),
consumeRT);
//再次校驗(yàn)
if (!processQueue.isDropped()) {
//處理結(jié)果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
else {
log.warn(
"processQueue is dropped without process consume result. messageQueue={},
msgTreeMap={}, msgs={}",
new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
}
}
重試消息
//ConsumeMessageConcurrentlyService.processConsumeResult方法
//在前面會(huì)進(jìn)行性能統(tǒng)計(jì)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING://廣播略過(guò)
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//發(fā)送重試消息
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//發(fā)送失敗的進(jìn)定時(shí)任務(wù),重試
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0) {
// 將消費(fèi)進(jìn)度提交到OffsetStore
// OffsetStore 只會(huì)將進(jìn)度記下,由前面說(shuō)的定時(shí)任務(wù)同步給broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
將要重試的消息發(fā)會(huì)broker。只是把原來(lái)的id發(fā)回去。broker在會(huì)根據(jù)id讀取原來(lái)消息的消息體
生成重試消息
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
//消息原來(lái)存哪,發(fā)會(huì)到哪
String brokerAddr =(null != brokerName) ?
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000);
}
catch (Exception e) {
//如果發(fā)送失敗,使用內(nèi)部生產(chǎn)者發(fā)送
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
// consumerSendMessageBack 方法
ConsumerSendMsgBackRequestHeader requestHeader = new
ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroupWithProjectGroup);
//原來(lái)的topic
requestHeader.setOriginTopic(msg.getTopic());
//原消息的偏移
requestHeader.setOffset(msg.getCommitLogOffset());
//重試級(jí)別
requestHeader.setDelayLevel(delayLevel);
//記錄原來(lái)的id
requestHeader.setOriginMsgId(msg.getMsgId());
//通過(guò)netty發(fā)送
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
流程總結(jié)
MQClientInstance的rebalanceService 線程啟動(dòng)。定時(shí)調(diào)用消費(fèi)者的負(fù)載均衡實(shí)現(xiàn)RebalanceImpl的doRebalance方法。
RebalanceImpl根據(jù)負(fù)載策略AllocateMessageQueueStrategy計(jì)算屬于自己的隊(duì)列
根據(jù)隊(duì)列的變化,生成新的拉取任務(wù) ProcessQueue 或者將原來(lái)的ProcessQueue禁用
將新的 ProcessQueue放入MQClientInstance的PullMessageService的pullRequestQueue這是一個(gè)LinkedBlockingQueue
PullMessageService的線程會(huì)從隊(duì)列中取出,然后調(diào)用對(duì)應(yīng)消費(fèi)者的PullAPIWrapper的pullKernelImpl方法發(fā)送請(qǐng)求拉取
拉取為異步,在回調(diào)中將消息封裝成ConsumeMessageConcurrentlyService.ConsumeRequest任務(wù)提交到ConsumeMessageConcurrentlyService的線程池ScheduledExecutorService
最終調(diào)用用戶的實(shí)現(xiàn)進(jìn)行消費(fèi)
將消費(fèi)失敗消息發(fā)回broker生成重試消息
消費(fèi)成功將進(jìn)度寫(xiě)入消費(fèi)者的OffsetStore 定時(shí)回寫(xiě)broker