Producer延遲容錯策略:
- 生產者在發(fā)送message時會根據一定的策略選取一個消息通道,然后將消息發(fā)送到對應的broker上,默認情況下(RoundRobin),TopicPublishInfo會維護一個計數器(ThreadLocal中),當producer需要發(fā)送時,會根據計數器(mod通道數量)選擇一個queue,消息發(fā)送失敗時會以當前計數器的位置開始找到下一個可用的queue,如果當前topic的queue在不同的broker上會優(yōu)先選擇其他broker上的queue。
- 其次生產者還有一個延遲容錯的策略默認時關閉的,需要設置sendLatencyFaultEnable為true手動開啟,在每次生產者發(fā)送消息時會記錄從開始發(fā)送到收到broker的響應間的延遲,隨后會根據該延遲判斷當前broker是否需要隔離及重置下次可用的開始時間,比如延遲600ms則對應的broker在30s內不再能被選擇,50ms對應的時間就是0s也就是不影響下次的選擇,再次選擇通道時,會優(yōu)先選擇可用的沒有被隔離的通道,其次是延遲最低的,最后是距離隔離結束時間最短的。
延遲消息:
- 目前開源版本的rocketmq只支持按級別延遲消息,共18個級別,每個級別的時間分別是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,延遲消息的大概過程如下:
- 生產者在發(fā)出消息時設定延遲級別delayTimeLevel 比如級別為3那么就是延遲10s后發(fā)出消息
- broker在接收到消息寫入commitlog前會判斷消息的延遲級別是否大于0,大于0就代表是個延遲消息,根據當前的延遲級別獲取對應的queueId。
- 替換消息的topic為延遲隊列的topic(SCHEDULE_TOPIC_XXXX),替換queueId為第二步獲得的queueId,同時將原先的topic與queueId記在消息屬性中,然后寫入commitlog中
- broker在啟動的時候會為每個延遲隊列起一個線程來處理延遲消息,線程會每隔100ms取出對應延遲隊列當前處理到的offset的msg描述,描述中包含消息在commitlog中的offset、消息長度等(20k),以此獲得commitlog中的msg
- 獲得到消息后將之前記錄在消息中的原始topic、queueId重新設回消息,再重新投遞到commitlog中,最后更新對應延遲隊列的offset
- 思考:
根據rocketmq 的設計 broker在收到生產者的消息時會順序寫入commitlog,在消費者消費時由于不同消費者的進度并不一致,所以從commitlog中取消息時可能會涉及到隨機查找,但從整體來看,消息消費的走向總是先到的消息先被消費,所以消費過程整體上也是一個順序讀取的過程,結合零拷貝(mmap+write)以及PageCache的讀寫分離(需要手動開啟),這是rocketmq高性能的原因之一。
結合broker延時消息的實現方案,實際上在看源碼的過程中考慮了下在broker上實現延時消息需要解決的幾個問題:
- 如何給延時消息排序(因為延遲時間不同,后來的消息也可能被先投遞)
- 排序過程中如何避免隨機讀寫(如果要進行排序不可避免的要在消息隊列中進行插入操作)
實際上rocketmq采用的多級延遲隊列方案,就沒有進行排序也就不存在第二個問題,因為是按照延遲級別劃分的,分到同一個隊列中的消息延遲都是相同的天然有序
付費版的rocketmq支持任意時間的延遲消息,考慮該如何實現?(多級時間輪)
順序消息:
TODO
Consumer的負載均衡方案
TODO