系列
rocketMq消費(fèi)過程包括兩種,分別是并發(fā)消費(fèi)和有序消費(fèi),每個(gè)消費(fèi)方式都可以單獨(dú)拿出來進(jìn)行分享,這篇文章單獨(dú)用來分析并發(fā)消費(fèi)問題。
并發(fā)消費(fèi)需要理解的幾個(gè)核心點(diǎn):并發(fā)消費(fèi)的消息拉取,并發(fā)消費(fèi)的消息重試,并發(fā)消息的ack機(jī)制,消費(fèi)進(jìn)度的持久化,這篇分享會(huì)就這幾個(gè)問題分解展開。
其他邏輯
? ? 1、consumer會(huì)定期向broker同步ack消息偏移量,也就是已經(jīng)消費(fèi)的位置。
? ? 2、極端情況下consumer會(huì)因?yàn)橐粋€(gè)消息一直失敗導(dǎo)致ack消息偏移量無法前進(jìn),但是因?yàn)闀?huì)有定時(shí)任務(wù)去清楚過期消息,所以ack進(jìn)度正常便宜。
并發(fā)消費(fèi)整體流程

說明:
? ? 1、Rebalance負(fù)責(zé)生成pullRequest放置到pullRequestQueue當(dāng)中。
? ? 2、PullMessageService負(fù)責(zé)消費(fèi)pullRequest來完成數(shù)據(jù)的拉取。
? ? 3、數(shù)據(jù)拉取后生成ConsumeRequest對象投遞到consumeExecutor的線程池當(dāng)中
? ? 4、ConsumeRequest是一個(gè)線程實(shí)例,負(fù)責(zé)消費(fèi)拉取的消息。
? ? 5、消費(fèi)消息成功就從ConsumeRequest的ProcessQueue中刪除,消費(fèi)失敗就投遞到broker的重試隊(duì)列中,重試次數(shù)和延遲粒度在broker端處理。
? ? 6、consumeRequest內(nèi)部維持的processQueue作為一個(gè)TreeMap對象可以維持消息的有序性,用于判斷消費(fèi)進(jìn)度。
? ? 7、pullRequest在消費(fèi)完以后還是再次投遞到pullRequestQueue當(dāng)中。
pullRequest執(zhí)行過程

說明:參見PullMessageService類
? ? 1、單線程循環(huán)消費(fèi)pullRequest。

說明:參見PullMessageService類
? ? 1、消費(fèi)過程中進(jìn)行一些狀態(tài)判斷以及流速控制

說明:參見DefaultMQPushConsumerImpl類
? ? 1、區(qū)分有序消費(fèi)和無須消費(fèi)
? ? 2、無序消費(fèi)會(huì)判斷消費(fèi)偏移量是否差別過大

說明:參見DefaultMQPushConsumerImpl類
? ? 1、處理拉取消息的后續(xù)操作
? ? 2、處理完以后再次投遞pullRequest請求

說明:參見PullAPIWrapper類

說明:參見PullAPIWrapper類

說明:參見ConsumeMessageConcurrentlyService類。
? ? 1、拉取消息成功后設(shè)置下一次拉取的偏移量。
? ? 2、更新拉取的消息到processQueue當(dāng)中。
? ? 3、再次投遞pullRequest發(fā)起下一次拉取。

說明:參見ConsumeMessageConcurrentlyService類
? ? 1、分一次能夠處理完成和分多次能夠處理完成。

說明:
? ? 1、processQueue是待處理消息保存位置,里面核心數(shù)據(jù)結(jié)構(gòu)之一為TreeMap
? ? 2、messageQueue就是這個(gè)ConsumeRequest負(fù)責(zé)處理的messageQueue

說明:參見ConsumeMessageConcurrentlyService類
? ? 1、consumer消費(fèi)拉取消息的邏輯及后續(xù)處理

說明:參見ConsumeMessageConcurrentlyService類
? ? 1、消費(fèi)成功就刪除所有拉取的消息

說明:參見SendMessageProcessor類
? ? 1、處理邏輯在consumerSendMsgBack方法中
? ? 2、里面涉及到延遲粒度和重試次數(shù)的設(shè)置
? ? 3、消息是被投遞到延遲隊(duì)列當(dāng)中的

說明:參見MQClientInstance類
? ? 1、在persistAllConsumerOffset定期持久化消費(fèi)偏移量
? ? 2、消費(fèi)偏移量由ConsumerRequest請求在處理的過程中變更的

說明:參見DefaultMQPushConsumerImpl類
? ? 1、處理沒有從broker拉取消息的過程
? ? 2、再次投遞pullRequest請求