閑著研究了下RocketMQ消費失敗消息的處理邏輯這里記錄下,更細化說這里只討論Push模式(其實實現(xiàn)還是Pull的模式)非順序消費的情況Pull和順序消息這里暫時不做討論哈~(還沒研究- -)
消費失敗處理邏輯
- 消費成功的情況RockeMQ會通過移動消費offset位點向前來標示消息已被處理
- 而對于業(yè)務(wù)處里失敗的消息采用的策略是將消息回發(fā)回Broker(并存放到一個
%RETRY%XX的topic中), 大家一聽可以想到的是回發(fā)broker這時候broker掛了怎么辦? - 回發(fā)失敗的時候會在本機啟動個task來重試..恩 然后這時候consumer機器掛了了怎么辦重試沒了,難道丟消息?
- 所以為了保證consumer掉電不丟消費失敗且回發(fā)失敗的消息,代碼里保證offsetManage(local or remote)中offset不會前移超過重發(fā)失敗消息的offset,這樣可以保證在下次需要,如果下次consumer活過來時(這時一定會從offsetManage中取offset, 恩其實正常運行中不會每次都取, 順序消息除外...),一定可以重拉到消費失敗的消息(后面會提到這個的代價是會重復拉到很多上次已經(jīng)消費的消息,不過業(yè)務(wù)同學的代碼都是冪等的,所以逃~)
- 對于消費失敗但回發(fā)成功的消息,會直接更新offset假裝認為那幾條消息已經(jīng)被消費成功,因為他們已經(jīng)轉(zhuǎn)生在
%RETRY%XXtopic里作為新消息等待消費了~當前消費者可以專注于干其他事情.(補充: RERTRY topic實際會帶上delay所以實際是先SCHEDULE_TOPIC然后再%RETRY%XX, 這個具體見其他同學關(guān)于delay消息的解析~)
可以看到,重發(fā)這種模式是不會丟消息的,即使broker掛了,consumer掛了,一定會消費到,雖然可能獲得很多不想要的重復消息- -
為啥這么搞
寫本文的原因就是我組幾個小伙伴都覺得這個很奇怪,為啥這么弄呢~?個人研究了下理解是這樣的....(其實自己剛開始研究RocketMQ很多理解可能有問題,歡迎大家一塊討論學習 哈哈哈)
可以冷靜看下當前消息消費場景特點:
- 給了我們一個Queue,訪問的時候需要通過一層網(wǎng)絡(luò)
- 為了希望消費者能盡快的獲得大量消息,結(jié)合上條希望consumer更好是一次獲取一批而不是一條消息
- 因為順序并不重要,consumer本身應(yīng)該可以并發(fā)消費這批消息
- 因為并發(fā)消費消息,不是等上條ok才能消費下一條,就會有ack先后順序問題
- 為了server端ack應(yīng)該是高效的,每條記錄一個狀態(tài) vs 已成功offset?
- 更進一步,獲取獲取第二批消息能否需要等待上一批消費完成? 其實沒必要只要消費者有空閑線程可以先抓過來消費第二批,雖然第一批里某幾條處理比較慢,但多數(shù)情況下應(yīng)該能不會一會就恢復,其他線程先干第二批的活, 所以拉取和處理應(yīng)該分離
- 其實從上條可以看出對于抓取的速度應(yīng)該根據(jù)消費者處理能力來控制~如果消費還有閑的可以瘋狂的從Queue中先抓過來,只要不把沒處理成功的給ack掉;如果消費者已經(jīng)嚴重delay無力處理則需要降低抓取速度
完整處理
感覺RocketMQ處理這部分的代碼挺巧妙...幾個核心參與類:
-
ConsumeQueue: Consumer角度消費的一個Queue(有些類似kafka里partition的概念, 一個Queue只會被一個consumer消費,雖然一個consumer可以消費多個Q) -
PullRequest: 當前consumer下已分配的每個ConsumeQueue消費者端都會新建一個PullRequest,里面記錄nextOffset即從Server拉取offset,拉取offset和消費offset是兩個offset才能第一批沒消費完就拉第二批; 這個Request會在rebalanceService中創(chuàng)建,并被多次更新nextOffset多次進入PullRequestQueue來達到持續(xù)拉取的循環(huán)效果- -(也會被延遲丟Q來控制速率) -
PullRequestQueue: 一個內(nèi)存隊列,充當PullMessageService的入?yún)?/li> -
PullMessageService: 負責拉取消息的拉取線程,不停的讀取PullRequestQueue根據(jù)request拉取消息,然后將消息丟到ProcessQueue中并新建ConsumeRequest提交到ConsumeService處理, 然后生成下一批的PullRequest丟到PullRequestQueue:繼續(xù)消費下一批,達到持續(xù)循環(huán)拉取的作用 -
ConsumeRequest: 雖然叫request但除了要consume的消息數(shù)據(jù)外,還有具體的消費邏輯(是個Runnable- -); 關(guān)鍵元素就是這批要處理msg列表對這批消息的處理邏輯,run里會調(diào)用用戶注冊的listener,并根據(jù)處理情況,對失敗消息回發(fā),并根據(jù)失敗和回發(fā)結(jié)果, 更新ProcessQueue以及OffsetStore -
ProcessQueue: 又一個內(nèi)存隊列保存實現(xiàn)是TreeMap,在處理中的消息,處理處理成功或處理失敗但回發(fā)成功都會從這個Queue中移除,消費offset的上報基于ProcessQueue中最小的offset來完成(所以失敗未回發(fā)成功的不會被移除);另外在ProcessQueue里最大offset和最小offset過大(MaxSpan)時,前面的PullMessageService會減速等一會在基于運行抓取(等一會兒再往PullRequestQueue里扔消息). -
ConsumeService: 一個ConsumeRequest的Executor可以理解為一個線程池 -
OffsetStore: 維護消費offset(即offset之前都處理完成) -
RebalanceService: 負責給consume分配queue,而對于目前討論過程他的作用是初始化了了對應(yīng)的PullRequestQueue和首次的PullRequest, offset從offsetStore獲取
簡單畫了個圖說明上面幾個類的關(guān)系~(手指在ipad上畫得沒有筆所以特別難看- -先將就吧)

Notes - Page 1.png
失敗重試的細節(jié)好像沒畫出來,= = 畫圖不好畫。。結(jié)合上面描述看代碼哈~- -
最終達到的效果
- 從Server是批量拉取的
- 拉取線程不需要等待上一批被處理就能開始拉取下一批,只要ProcessQueue沒超
MaxSpan(也就是消費某幾條卡主太久), 就可以一直拉取 - 消費listener可以并發(fā)消費,并各自返回完成狀態(tài), 部分消費者卡一段時間不影響其他消費者消費
- Consumer保證實際消費端offset保證offset之前必須是已處理成功或處理失敗但已回發(fā)成功
- 回發(fā)不成功會本地重試且遠端offset不會前移
- 如果重啟或被新分配隊列會從offsetStrore獲取初始offset,所以可能會有不必要的重復消息,所以消息處理需要做好冪等
總結(jié)
- 可以批量,且拉取和處理分離,同時保證不丟數(shù)據(jù),提升效率同時代價是重復消息
- 通過只記錄offset提升ack效率(可以一次ack一批,并且不用每條記錄記錄狀態(tài))
- 通過分離拉取offset(in PullRequest)和消費offset(in OffsetStore)分離拉取和處理進度,提升拉取效率,并根據(jù)消費者處理卡主情況做拉取閥值控制
- 通過回發(fā)消息加速批次中其他已處理成功消息的ack消費offset,如果不回發(fā)那消費offset不能遷移,重啟之類會導致更多的消息重發(fā), 而有回發(fā)后只要回發(fā)成功了就可以前移offset(反正不關(guān)心順序只要求效率)
- 對于回發(fā)失敗,只能不前移消費offset。。。然后通過本地重試做非consumer宕機情況的優(yōu)化