前言
消息隊列是分布式系統(tǒng)中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。目前在生產環(huán)境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。由于每個消息隊列都有它的優(yōu)勢和劣勢,我們公司對于不同的場景使用了不同類型的消息隊列。對于RocketMQ消費端存在消息消費失敗的情況,通常有兩種方式,一種是consumer端知道怎么處理,另一種是consumer不能處理(broker處理),本文對后一種情況進行介紹,consumer獲取到消息但不能正常處理(ack),接下來這個消費失敗的消息在Broker里面如何存儲和重新讓consumer消費,針對這個流程做了深入的分析。本文中的P代表producer,C代表consumer,本文的consumeQueue對應前面的topic下面的隊列。
目錄
- RocketMQ的消費與存儲結構
- RocketMQ的消費失敗消息處理邏輯
- Broker端處理失敗消息任務的啟動
- Consumer發(fā)回消費失敗消息流程
- Broker寫發(fā)回失敗消息的流程
RocketMQ的消費與存儲結構
正常情況下,P發(fā)送消息到broker,消息內容寫到commitlog,消息內容在commitlog的位置信息(索引)寫到consumerQueue,C讀取consumerQueue的內容消費消息。

RocketMq的存儲結構:

本文的內容涉及上面的消費隊列服務(consumerQueue,%RETRY%groupName屬于consumerQueue),定時消息服務(SCHEDULE_TOPIC_XXXX)兩個模塊,C與broker的的消息消費只涉及到consumerQueue,定時消息服務只在broker內部起作用。
RocketMQ的消費失敗消息處理邏輯
consumer消費失敗消息處理流程圖如下:

在下面的代碼和流程分析中請結合這個圖進行分析。
其中SCHEDULE_TOPIC_XXXX和%RETRY%groupName的queue都存儲在目錄 ~/store/consumequeue 里面:
ll ~/store/consumequeue 如下:

ll ~/store/consumequeue/SCHEDULE_TOPIC_XXXX 如下:

從上圖可以看出SCHEDULE_TOPIC_XXXX的隊列名稱是從2開始到17,對應的delayLevel為3到18,3對應10s,18對應2h,在類MessageStoreConfig中這樣定義延時時間:String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。SCHEDULE_TOPIC_XXXX這個topic只對內部使用,對于consumer只能消費到retry隊列的數(shù)據(jù)。
consumer消費失敗的消息發(fā)回broker后總是先寫到SCHEDULE_TOPIC_XXXX里面,然后schedule service讀取SCHEDULE_TOPIC_XXXX里面的數(shù)據(jù)寫到retry隊列,consumer消費retry隊列的數(shù)據(jù),這樣就完成了一個循環(huán),從這個過程也能看到,一個消費失敗的消息體每次發(fā)回broker需要在commitLog里面存儲兩份(topic為SCHEDULE_TOPIC_XXXX的一份這個主要是為schedule service控制延時用的,topic為%RETRY%groupName的一份)。
當我們想查看現(xiàn)在的延時消息數(shù)量,我們可以查看SCHEDULE_TOPIC_XXXX的offset來得知,使用CLI Admin Tool工具輸入命令“sh mqadmin brokerStatus”查看處理進度。如下圖:

其中每行為一個隊列,圖中第一列為隊列的名稱,圖中第二列參數(shù)為當前隊列處理的offset,圖中第三列為當前隊列最大存儲的offset。通過第三列和第二列的值相減能得出當前的隊列的消息數(shù)量。
Broker端處理失敗消息任務的啟動

ScheduleMessageService根據(jù)messageDelayLevel維護了每個延遲level對應的隊列編號,以及每個隊列編號對應的offset。在start方法里面會啟動18個timerTask(DeliverDelayedMessageTimerTask),每個對應一個level,初始offset為0。然后就是定時任務讀取SCHEDULE_TOPIC_XXXX隊列里面的消息進行判斷,如果消息的delayLevel對應的時間滿足重新消費,那么就會忘consumeQueue里面寫這個消息,等待C重新來消費。
Consumer發(fā)回消費失敗消息流程

在ConsumeRequest的run方法里面也就是業(yè)務端處理消息的線程里面,對于status是非success的交給ConsumeMessageConcurrentlyService(本文只討論并行消費的模式,串行模式類似)的sendMessageBack方法處理,這個方法主要設置delayLevel(context.getDelayLevelWhenNextConsume()),然后傳遞給DefaultMQPushConsumerImpl.sendMessageBack找到對應的消息來源queue,把這個消息發(fā)送到這個queue里面,也就是說消費失敗的消息發(fā)回broker還是會在之前的那個queue里面。發(fā)回broker后本地再過5秒重試消費一次,如果這次成功,下次就不再消費。
上面流程的類圖:

在ConsumeRequest的run方法里面會調用ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this)來處理消費結果狀態(tài),在cluster(集群模式)下設置新的消息delayLevle值然后把失敗的消息發(fā)回Broker,廣播模式不發(fā)回。注意ConsumeConcurrentlyContext的delayLevelWhenNextConsume屬性說明-1直接放到死信隊列,0又broker每次對重試消費次數(shù)加1來控制重試策略,大于0由consumer控制重試消費策略(在listener的consumeMessage方法里面有個context:context.setDelayLevelWhenNextConsume(4)設置為1分鐘延時消費),默認值為0。
Broker寫發(fā)回失敗消息的流程

broker端收到消費失敗消息后通過consumerSendMsgBack(P發(fā)送的消息不由這個處理,區(qū)分通過消息頭的type)方法設置當前消息的delayTimeLevel,這里計算delayTimeLevel,第一次重試默認consumer發(fā)回為0,延遲為延遲等級為0+3=3;如果第一次不為0表明是consumer控制的情況,直接取出delayTimeLevel,也就是和ConsumeConcurrentlyContext(consumer端控制)的delayLevelWhenNextConsume配置一致。設置好delayLevelTime后就交給DefaultMessageStore的putMessage方法,DefaultMessageStore的putMessage方法通過Commitlog的putMessage來寫入文件,這里需要重點關注的是在這個方法里面通過msg.getDelayTimeLevel() > 0這個條件,修改當前消息topic為SCHEDULE_TOPIC_XXXX,原來的topic保留在property里面,在ScheduleMessageService里面判斷消息滿足條件后會把消息的topic改為真實的topic,通常是retry,接著寫到consumeQueue里面,C對于%RETRY%consumerGroup這個topic在程序里面默認是訂閱的不需用戶指定,然后隊列Id的計算方式為queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),即msg.getDelayTimeLevel()-1,和前面的截圖2到17編號一致,然后消息體寫到commitlog文件和索引寫到SCHEDULE_TOPIC_XXXX隊列。類圖如下:

SendMessageProcessor處理遠程發(fā)來的消息,包括P和C的,方法里面通過RequestCode.CONSUMER_SEND_MSG_BACK來判斷是不是重試發(fā)回的消息。然后會判斷這個消息對應的topic為%RETRY%consumerGroup的是否創(chuàng)建過,沒有則創(chuàng)建;接下來的處理就和上面的流程圖一樣了。
總結
本文圍繞consumer端消費失敗后RocketMQ各個模塊的處理邏輯進行了源碼的深入分析。相信有了以上的知識學習和實踐之后,當業(yè)務應用遇到了類似的問題就可以胸有成竹的應對了。