消息隊列掃盲
消息隊列顧名思義就是存放消息的隊列,隊列我就不解釋了,別告訴我你連隊列都不知道似啥吧?
所以問題并不是消息隊列是什么,而是 消息隊列為什么會出現(xiàn)?消息隊列能用來干什么?用它來干這些事會帶來什么好處?消息隊列會帶來副作用嗎?
消息隊列為什么會出現(xiàn)?
消息隊列算是作為后端程序員的一個必備技能吧,因為分布式應(yīng)用必定涉及到各個系統(tǒng)之間的通信問題,這個時候消息隊列也應(yīng)運而生了??梢哉f分布式的產(chǎn)生是消息隊列的基礎(chǔ),而分布式怕是一個很古老的概念了吧,所以消息隊列也是一個很古老的中間件了。
消息隊列能用來干什么?
異步
你可能會反駁我,應(yīng)用之間的通信又不是只能由消息隊列解決,好好的通信為什么中間非要插一個消息隊列呢?我不能直接進行通信嗎?
很好??,你又提出了一個概念,同步通信。就比如現(xiàn)在業(yè)界使用比較多的 Dubbo 就是一個適用于各個系統(tǒng)之間同步通信的 RPC 框架。
我來舉個??吧,比如我們有一個購票系統(tǒng),需求是用戶在購買完之后能接收到購買完成的短信。

我們省略中間的網(wǎng)絡(luò)通信時間消耗,假如購票系統(tǒng)處理需要 150ms ,短信系統(tǒng)處理需要 200ms ,那么整個處理流程的時間消耗就是 150ms + 200ms = 350ms。
當然,乍看沒什么問題??墒亲屑氁幌肽憔透杏X有點問題,我用戶購票在購票系統(tǒng)的時候其實就已經(jīng)完成了購買,而我現(xiàn)在通過同步調(diào)用非要讓整個請求拉長時間,而短息系統(tǒng)這玩意又不是很有必要,它僅僅是一個輔助功能增強用戶體驗感而已。我現(xiàn)在整個調(diào)用流程就有點 頭重腳輕 的感覺了,購票是一個不太耗時的流程,而我現(xiàn)在因為同步調(diào)用,非要等待發(fā)送短信這個比較耗時的操作才返回結(jié)果。那我如果再加一個發(fā)送郵件呢?

這樣整個系統(tǒng)的調(diào)用鏈又變長了,整個時間就變成了550ms。
當我們在學生時代需要在食堂排隊的時候,我們和食堂大媽就是一個同步的模型。
我們需要告訴食堂大媽:“姐姐,給我加個雞腿,再加個酸辣土豆絲,幫我澆點汁上去,多打點飯哦” 咦~~~~~ 為了多吃點,真惡心。
然后大媽幫我們打飯配菜,我們看著大媽那顫抖的手和掉落的土豆絲不禁咽了咽口水。
最終我們從大媽手中接過飯菜然后去尋找座位了...
回想一下,我們在給大媽發(fā)送需要的信息之后我們是 同步等待大媽給我配好飯菜 的,上面我們只是加了雞腿和土豆絲,萬一我再加一個番茄牛腩,韭菜雞蛋,這樣是不是大媽打飯配菜的流程就會變長,我們等待的時間也會相應(yīng)的變長。

那后來,我們工作賺錢了有錢去飯店吃飯了,我們告訴服務(wù)員來一碗牛肉面加個荷包蛋 (傳達一個消息) ,然后我們就可以在飯桌上安心的玩手機了 (干自己其他事情) ,等到我們的牛肉面上了我們就可以吃了。這其中我們也就傳達了一個消息,然后我們又轉(zhuǎn)過頭干其他事情了。這其中雖然做面的時間沒有變短,但是我們只需要傳達一個消息就可以看其他事情了,這是一個 異步 的概念。
所以,為了解決這一個問題,聰明的程序員在中間也加了個類似于服務(wù)員的中間件——消息隊列。這個時候我們就可以把模型給改造了。

這樣,我們在將消息存入消息隊列之后我們就可以直接返回了(我們告訴服務(wù)員我們要吃什么然后玩手機),所以整個耗時只是 150ms + 10ms = 160ms。
但是你需要注意的是,整個流程的時長是沒變的,就像你僅僅告訴服務(wù)員要吃什么是不會影響到做面的速度的。
解耦
回到最初同步調(diào)用的過程,我們寫個偽代碼簡單概括一下。

那么第二步,我們又添加了一個發(fā)送郵件,我們就得重新去修改代碼,如果我們又加一個需求:用戶購買完還需要給他加積分,這個時候我們是不是又得改代碼?

如果你覺得還行,那么我這個時候不要發(fā)郵件這個服務(wù)了呢,我是不是又得改代碼,又得重啟應(yīng)用?

這樣改來改去是不是很麻煩,那么 此時我們就用一個消息隊列在中間進行解耦 。你需要注意的是,我們后面的發(fā)送短信、發(fā)送郵件、添加積分等一些操作都依賴于上面的 result ,這東西抽象出來就是購票的處理結(jié)果呀,比如訂單號,用戶賬號等等,也就是說我們后面的一系列服務(wù)都是需要同樣的消息來進行處理。既然這樣,我們是不是可以通過 “廣播消息” 來實現(xiàn)。
我上面所講的“廣播”并不是真正的廣播,而是接下來的系統(tǒng)作為消費者去 訂閱 特定的主題。比如我們這里的主題就可以叫做 訂票 ,我們購買系統(tǒng)作為一個生產(chǎn)者去生產(chǎn)這條消息放入消息隊列,然后消費者訂閱了這個主題,會從消息隊列中拉取消息并消費。就比如我們剛剛畫的那張圖,你會發(fā)現(xiàn),在生產(chǎn)者這邊我們只需要關(guān)注 生產(chǎn)消息到指定主題中 ,而 消費者只需要關(guān)注從指定主題中拉取消息 就行了。

如果沒有消息隊列,每當一個新的業(yè)務(wù)接入,我們都要在主系統(tǒng)調(diào)用新接口、或者當我們?nèi)∠承I(yè)務(wù),我們也得在主系統(tǒng)刪除某些接口調(diào)用。有了消息隊列,我們只需要關(guān)心消息是否送達了隊列,至于誰希望訂閱,接下來收到消息如何處理,是下游的事情,無疑極大地減少了開發(fā)和聯(lián)調(diào)的工作量。
削峰
我們再次回到一開始我們使用同步調(diào)用系統(tǒng)的情況,并且思考一下,如果此時有大量用戶請求購票整個系統(tǒng)會變成什么樣?

如果,此時有一萬的請求進入購票系統(tǒng),我們知道運行我們主業(yè)務(wù)的服務(wù)器配置一般會比較好,所以這里我們假設(shè)購票系統(tǒng)能承受這一萬的用戶請求,那么也就意味著我們同時也會出現(xiàn)一萬調(diào)用發(fā)短信服務(wù)的請求。而對于短信系統(tǒng)來說并不是我們的主要業(yè)務(wù),所以我們配備的硬件資源并不會太高,那么你覺得現(xiàn)在這個短信系統(tǒng)能承受這一萬的峰值么,且不說能不能承受,系統(tǒng)會不會 直接崩潰 了?
短信業(yè)務(wù)又不是我們的主業(yè)務(wù),我們能不能 折中處理 呢?如果我們把購買完成的信息發(fā)送到消息隊列中,而短信系統(tǒng) 盡自己所能地去消息隊列中取消息和消費消息 ,即使處理速度慢一點也無所謂,只要我們的系統(tǒng)沒有崩潰就行了。
留得江山在,還怕沒柴燒?你敢說每次發(fā)送驗證碼的時候是一發(fā)你就收到了的么?
消息隊列能帶來什么好處?
其實上面我已經(jīng)說了。異步、解耦、削峰。 哪怕你上面的都沒看懂也千萬要記住這六個字,因為他不僅是消息隊列的精華,更是編程和架構(gòu)的精華。
消息隊列會帶來副作用嗎?
沒有哪一門技術(shù)是“銀彈”,消息隊列也有它的副作用。
比如,本來好好的兩個系統(tǒng)之間的調(diào)用,我中間加了個消息隊列,如果消息隊列掛了怎么辦呢?是不是 降低了系統(tǒng)的可用性 ?
那這樣是不是要保證HA(高可用)?是不是要搞集群?那么我 整個系統(tǒng)的復(fù)雜度是不是上升了 ?
拋開上面的問題不講,萬一我發(fā)送方發(fā)送失敗了,然后執(zhí)行重試,這樣就可能產(chǎn)生重復(fù)的消息。
或者我消費端處理失敗了,請求重發(fā),這樣也會產(chǎn)生重復(fù)的消息。
對于一些微服務(wù)來說,消費重復(fù)消息會帶來更大的麻煩,比如增加積分,這個時候我加了多次是不是對其他用戶不公平?
那么,又 如何解決重復(fù)消費消息的問題 呢?
如果我們此時的消息需要保證嚴格的順序性怎么辦呢?比如生產(chǎn)者生產(chǎn)了一系列的有序消息(對一個id為1的記錄進行刪除增加修改),但是我們知道在發(fā)布訂閱模型中,對于主題是無順序的,那么這個時候就會導(dǎo)致對于消費者消費消息的時候沒有按照生產(chǎn)者的發(fā)送順序消費,比如這個時候我們消費的順序為修改刪除增加,如果該記錄涉及到金額的話是不是會出大事情?
那么,又 如何解決消息的順序消費問題 呢?
就拿我們上面所講的分布式系統(tǒng)來說,用戶購票完成之后是不是需要增加賬戶積分?在同一個系統(tǒng)中我們一般會使用事務(wù)來進行解決,如果用 Spring 的話我們在上面?zhèn)未a中加入 @Transactional 注解就好了。但是在不同系統(tǒng)中如何保證事務(wù)呢?總不能這個系統(tǒng)我扣錢成功了你那積分系統(tǒng)積分沒加吧?或者說我這扣錢明明失敗了,你那積分系統(tǒng)給我加了積分。
那么,又如何 解決分布式事務(wù)問題 呢?
我們剛剛說了,消息隊列可以進行削峰操作,那如果我的消費者如果消費很慢或者生產(chǎn)者生產(chǎn)消息很快,這樣是不是會將消息堆積在消息隊列中?
那么,又如何 解決消息堆積的問題 呢?
可用性降低,復(fù)雜度上升,又帶來一系列的重復(fù)消費,順序消費,分布式事務(wù),消息堆積的問題,這消息隊列還怎么用啊???

別急,辦法總是有的。
RocketMQ是什么?

哇,你個混蛋!上面給我拋出那么多問題,你現(xiàn)在又講 RocketMQ ,還讓不讓人活了?!
別急別急,話說你現(xiàn)在清楚 MQ 的構(gòu)造嗎,我還沒講呢,我們先搞明白 MQ 的內(nèi)部構(gòu)造,再來看看如何解決上面的一系列問題吧,不過你最好帶著問題去閱讀和了解喔。
RocketMQ 是一個 隊列模型 的消息中間件,具有高性能、高可靠、高實時、分布式 的特點。它是一個采用 Java 語言開發(fā)的分布式的消息系統(tǒng),由阿里巴巴團隊開發(fā),在2016年底貢獻給 Apache,成為了 Apache 的一個頂級項目。 在阿里內(nèi)部,RocketMQ 很好地服務(wù)了集團大大小小上千個應(yīng)用,在每年的雙十一當天,更有不可思議的萬億級消息通過 RocketMQ 流轉(zhuǎn)。
廢話不多說,想要了解 RocketMQ 歷史的同學可以自己去搜尋資料。聽完上面的介紹,你只要知道 RocketMQ 很快、很牛、而且經(jīng)歷過雙十一的實踐就行了!
隊列模型和主題模型
在談 RocketMQ 的技術(shù)架構(gòu)之前,我們先來了解一下兩個名詞概念——隊列模型 和 主題模型 。
首先我問一個問題,消息隊列為什么要叫消息隊列?
你可能覺得很弱智,這玩意不就是存放消息的隊列嘛?不叫消息隊列叫什么?
的確,早期的消息中間件是通過 隊列 這一模型來實現(xiàn)的,可能是歷史原因,我們都習慣把消息中間件成為消息隊列。
但是,如今例如 RocketMQ 、Kafka 這些優(yōu)秀的消息中間件不僅僅是通過一個 隊列 來實現(xiàn)消息存儲的。
隊列模型
就像我們理解隊列一樣,消息中間件的隊列模型就真的只是一個隊列。。。我畫一張圖給大家理解。

在一開始我跟你提到了一個 “廣播” 的概念,也就是說如果我們此時我們需要將一個消息發(fā)送給多個消費者(比如此時我需要將信息發(fā)送給短信系統(tǒng)和郵件系統(tǒng)),這個時候單個隊列即不能滿足需求了。
當然你可以讓 Producer 生產(chǎn)消息放入多個隊列中,然后每個隊列去對應(yīng)每一個消費者。問題是可以解決,創(chuàng)建多個隊列并且復(fù)制多份消息是會很影響資源和性能的。而且,這樣子就會導(dǎo)致生產(chǎn)者需要知道具體消費者個數(shù)然后去復(fù)制對應(yīng)數(shù)量的消息隊列,這就違背我們消息中間件的 解耦 這一原則。
主題模型
那么有沒有好的方法去解決這一個問題呢?有,那就是 主題模型 或者可以稱為 發(fā)布訂閱模型 。
感興趣的同學可以去了解一下設(shè)計模式里面的觀察者模式并且手動實現(xiàn)一下,我相信你會有所收獲的。
在主題模型中,消息的生產(chǎn)者稱為 發(fā)布者(Publisher) ,消息的消費者稱為 訂閱者(Subscriber) ,存放消息的容器稱為 主題(Topic) 。
其中,發(fā)布者將消息發(fā)送到指定主題中,訂閱者需要 提前訂閱主題 才能接受特定主題的消息。

RocketMQ中的消息模型
RockerMQ 中的消息模型就是按照 主題模型 所實現(xiàn)的。你可能會好奇這個 主題 到底是怎么實現(xiàn)的呢?你上面也沒有講到呀!
其實對于主題模型的實現(xiàn)來說每個消息中間件的底層設(shè)計都是不一樣的,就比如 Kafka 中的 分區(qū) ,RocketMQ 中的 隊列 ,RabbitMQ 中的 Exchange 。我們可以理解為 主題模型/發(fā)布訂閱模型 就是一個標準,那些中間件只不過照著這個標準去實現(xiàn)而已。
所以,RocketMQ 中的 主題模型 到底是如何實現(xiàn)的呢?首先我畫一張圖,大家嘗試著去理解一下。

我們可以看到在整個圖中有 Producer Group 、Topic 、Consumer Group 三個角色,我來分別介紹一下他們。
-
Producer Group生產(chǎn)者組: 代表某一類的生產(chǎn)者,比如我們有多個秒殺系統(tǒng)作為生產(chǎn)者,這多個合在一起就是一個Producer Group生產(chǎn)者組,它們一般生產(chǎn)相同的消息。 -
Consumer Group消費者組: 代表某一類的消費者,比如我們有多個短信系統(tǒng)作為消費者,這多個合在一起就是一個Consumer Group消費者組,它們一般消費相同的消息。 -
Topic主題: 代表一類消息,比如訂單消息,物流消息等等。
你可以看到圖中生產(chǎn)者組中的生產(chǎn)者會向主題發(fā)送消息,而 主題中存在多個隊列,生產(chǎn)者每次生產(chǎn)消息之后是指定主題中的某個隊列發(fā)送消息的。
每個主題中都有多個隊列(這里還不涉及到 Broker),集群消費模式下,一個消費者集群多臺機器共同消費一個 topic 的多個隊列,一個隊列只會被一個消費者消費。如果某個消費者掛掉,分組內(nèi)其它消費者會接替掛掉的消費者繼續(xù)消費。就像上圖中 Consumer1 和 Consumer2 分別對應(yīng)著兩個隊列,而 Consuer3 是沒有隊列對應(yīng)的,所以一般來講要控制 消費者組中的消費者個數(shù)和主題中隊列個數(shù)相同 。
當然也可以消費者個數(shù)小于隊列個數(shù),只不過不太建議。如下圖。

每個消費組在每個隊列上維護一個消費位置 ,為什么呢?
因為我們剛剛畫的僅僅是一個消費者組,我們知道在發(fā)布訂閱模式中一般會涉及到多個消費者組,而每個消費者組在每個隊列中的消費位置都是不同的。如果此時有多個消費者組,那么消息被一個消費者組消費完之后是不會刪除的(因為其它消費者組也需要呀),它僅僅是為每個消費者組維護一個 消費位移(offset) ,每次消費者組消費完會返回一個成功的響應(yīng),然后隊列再把維護的消費位移加一,這樣就不會出現(xiàn)剛剛消費過的消息再一次被消費了。

可能你還有一個問題,為什么一個主題中需要維護多個隊列 ?
答案是 提高并發(fā)能力 。的確,每個主題中只存在一個隊列也是可行的。你想一下,如果每個主題中只存在一個隊列,這個隊列中也維護著每個消費者組的消費位置,這樣也可以做到 發(fā)布訂閱模式 。如下圖。

但是,這樣我生產(chǎn)者是不是只能向一個隊列發(fā)送消息?又因為需要維護消費位置所以一個隊列只能對應(yīng)一個消費者組中的消費者,這樣是不是其他的 Consumer 就沒有用武之地了?從這兩個角度來講,并發(fā)度一下子就小了很多。
所以總結(jié)來說,RocketMQ 通過使用在一個 Topic 中配置多個隊列并且每個隊列維護每個消費者組的消費位置 實現(xiàn)了 主題模式/發(fā)布訂閱模式 。
RocketMQ的架構(gòu)圖
講完了消息模型,我們理解起 RocketMQ 的技術(shù)架構(gòu)起來就容易多了。
RocketMQ 技術(shù)架構(gòu)中有四大角色 NameServer 、Broker 、Producer 、Consumer 。我來向大家分別解釋一下這四個角色是干啥的。
-
Broker: 主要負責消息的存儲、投遞和查詢以及服務(wù)高可用保證。說白了就是消息隊列服務(wù)器嘛,生產(chǎn)者生產(chǎn)消息到Broker,消費者從Broker拉取消息并消費。這里,我還得普及一下關(guān)于
Broker、Topic和 隊列的關(guān)系。上面我講解了Topic和隊列的關(guān)系——一個Topic中存在多個隊列,那么這個Topic和隊列存放在哪呢?一個
Topic分布在多個Broker上,一個Broker可以配置多個Topic,它們是多對多的關(guān)系。如果某個
Topic消息量很大,應(yīng)該給它多配置幾個隊列(上文中提到了提高并發(fā)能力),并且 盡量多分布在不同Broker上,以減輕某個Broker的壓力 。Topic消息量都比較均勻的情況下,如果某個broker上的隊列越多,則該broker壓力越大。所以說我們需要配置多個Broker。
NameServer: 不知道你們有沒有接觸過ZooKeeper和Spring Cloud中的Eureka,它其實也是一個 注冊中心 ,主要提供兩個功能:Broker管理 和 路由信息管理 。說白了就是Broker會將自己的信息注冊到NameServer中,此時NameServer就存放了很多Broker的信息(Broker的路由表),消費者和生產(chǎn)者就從NameServer中獲取路由表然后照著路由表的信息和對應(yīng)的Broker進行通信(生產(chǎn)者和消費者定期會向NameServer去查詢相關(guān)的Broker的信息)。Producer: 消息發(fā)布的角色,支持分布式集群方式部署。說白了就是生產(chǎn)者。Consumer: 消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制。說白了就是消費者。
聽完了上面的解釋你可能會覺得,這玩意好簡單。不就是這樣的么?

嗯?你可能會發(fā)現(xiàn)一個問題,這老家伙 NameServer 干啥用的,這不多余嗎?直接 Producer 、Consumer 和 Broker 直接進行生產(chǎn)消息,消費消息不就好了么?
但是,我們上文提到過 Broker 是需要保證高可用的,如果整個系統(tǒng)僅僅靠著一個 Broker 來維持的話,那么這個 Broker 的壓力會不會很大?所以我們需要使用多個 Broker 來保證 負載均衡 。
如果說,我們的消費者和生產(chǎn)者直接和多個 Broker 相連,那么當 Broker 修改的時候必定會牽連著每個生產(chǎn)者和消費者,這樣就會產(chǎn)生耦合問題,而 NameServer 注冊中心就是用來解決這個問題的。
如果還不是很理解的話,可以去看我介紹
Spring Cloud的那篇文章,其中介紹了Eureka注冊中心。
當然,RocketMQ 中的技術(shù)架構(gòu)肯定不止前面那么簡單,因為上面圖中的四個角色都是需要做集群的。我給出一張官網(wǎng)的架構(gòu)圖,大家嘗試理解一下。

其實和我們最開始畫的那張乞丐版的架構(gòu)圖也沒什么區(qū)別,主要是一些細節(jié)上的差別。聽我細細道來??。
第一、我們的 Broker 做了集群并且還進行了主從部署 ,由于消息分布在各個 Broker 上,一旦某個 Broker 宕機,則該Broker 上的消息讀寫都會受到影響。所以 Rocketmq 提供了 master/slave 的結(jié)構(gòu), salve 定時從 master 同步數(shù)據(jù)(同步刷盤或者異步刷盤),如果 master 宕機,則 slave 提供消費服務(wù),但是不能寫入消息 (后面我還會提到哦)。
第二、為了保證 HA ,我們的 NameServer 也做了集群部署,但是請注意它是 去中心化 的。也就意味著它沒有主節(jié)點,你可以很明顯地看出 NameServer 的所有節(jié)點是沒有進行 Info Replicate 的,在 RocketMQ 中是通過 單個Broker和所有NameServer保持長連接 ,并且在每隔30秒 Broker 會向所有 Nameserver 發(fā)送心跳,心跳包含了自身的 Topic 配置信息,這個步驟就對應(yīng)這上面的 Routing Info 。
第三、在生產(chǎn)者需要向 Broker 發(fā)送消息的時候,需要先從 NameServer 獲取關(guān)于 Broker 的路由信息,然后通過 輪詢 的方法去向每個隊列中生產(chǎn)數(shù)據(jù)以達到 負載均衡 的效果。
第四、消費者通過 NameServer 獲取所有 Broker 的路由信息后,向 Broker 發(fā)送 Pull 請求來獲取消息數(shù)據(jù)。Consumer 可以以兩種模式啟動—— 廣播(Broadcast)和集群(Cluster)。廣播模式下,一條消息會發(fā)送給 同一個消費組中的所有消費者 ,集群模式下消息只會發(fā)送給一個消費者。
如何解決 順序消費、重復(fù)消費
其實,這些東西都是我在介紹消息隊列帶來的一些副作用的時候提到的,也就是說,這些問題不僅僅掛鉤于 RocketMQ ,而是應(yīng)該每個消息中間件都需要去解決的。
在上面我介紹 RocketMQ 的技術(shù)架構(gòu)的時候我已經(jīng)向你展示了 它是如何保證高可用的 ,這里不涉及運維方面的搭建,如果你感興趣可以自己去官網(wǎng)上照著例子搭建屬于你自己的 RocketMQ 集群。
其實
Kafka的架構(gòu)基本和RocketMQ類似,只是它注冊中心使用了Zookeeper、它的 分區(qū) 就相當于RocketMQ中的 隊列 。還有一些小細節(jié)不同會在后面提到。
順序消費
在上面的技術(shù)架構(gòu)介紹中,我們已經(jīng)知道了 RocketMQ 在主題上是無序的、它只有在隊列層面才是保證有序 的。
這又扯到兩個概念——普通順序 和 嚴格順序 。
所謂普通順序是指 消費者通過 同一個消費隊列收到的消息是有順序的 ,不同消息隊列收到的消息則可能是無順序的。普通順序消息在 Broker 重啟情況下不會保證消息順序性 (短暫時間) 。
所謂嚴格順序是指 消費者收到的 所有消息 均是有順序的。嚴格順序消息 即使在異常情況下也會保證消息的順序性 。
但是,嚴格順序看起來雖好,實現(xiàn)它可會付出巨大的代價。如果你使用嚴格順序模式,Broker 集群中只要有一臺機器不可用,則整個集群都不可用。你還用啥?現(xiàn)在主要場景也就在 binlog 同步。
一般而言,我們的 MQ 都是能容忍短暫的亂序,所以推薦使用普通順序模式。
那么,我們現(xiàn)在使用了 普通順序模式 ,我們從上面學習知道了在 Producer 生產(chǎn)消息的時候會進行輪詢(取決你的負載均衡策略)來向同一主題的不同消息隊列發(fā)送消息。那么如果此時我有幾個消息分別是同一個訂單的創(chuàng)建、支付、發(fā)貨,在輪詢的策略下這 三個消息會被發(fā)送到不同隊列 ,因為在不同的隊列此時就無法使用 RocketMQ 帶來的隊列有序特性來保證消息有序性了。

那么,怎么解決呢?
其實很簡單,我們需要處理的僅僅是將同一語義下的消息放入同一個隊列(比如這里是同一個訂單),那我們就可以使用 Hash取模法 來保證同一個訂單在同一個隊列中就行了。
重復(fù)消費
emmm,就兩個字—— 冪等 。在編程中一個冪等 操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。比如說,這個時候我們有一個訂單的處理積分的系統(tǒng),每當來一個消息的時候它就負責為創(chuàng)建這個訂單的用戶的積分加上相應(yīng)的數(shù)值??墒怯幸淮危㈥犃邪l(fā)送給訂單系統(tǒng) FrancisQ 的訂單信息,其要求是給 FrancisQ 的積分加上 500。但是積分系統(tǒng)在收到 FrancisQ 的訂單信息處理完成之后返回給消息隊列處理成功的信息的時候出現(xiàn)了網(wǎng)絡(luò)波動(當然還有很多種情況,比如Broker意外重啟等等),這條回應(yīng)沒有發(fā)送成功。
那么,消息隊列沒收到積分系統(tǒng)的回應(yīng)會不會嘗試重發(fā)這個消息?問題就來了,我再發(fā)這個消息,萬一它又給 FrancisQ 的賬戶加上 500 積分怎么辦呢?
所以我們需要給我們的消費者實現(xiàn) 冪等 ,也就是對同一個消息的處理結(jié)果,執(zhí)行多少次都不變。
那么如何給業(yè)務(wù)實現(xiàn)冪等呢?這個還是需要結(jié)合具體的業(yè)務(wù)的。你可以使用 寫入 Redis 來保證,因為 Redis 的 key 和 value 就是天然支持冪等的。當然還有使用 數(shù)據(jù)庫插入法 ,基于數(shù)據(jù)庫的唯一鍵來保證重復(fù)數(shù)據(jù)不會被插入多條。
不過最主要的還是需要 根據(jù)特定場景使用特定的解決方案 ,你要知道你的消息消費是否是完全不可重復(fù)消費還是可以忍受重復(fù)消費的,然后再選擇強校驗和弱校驗的方式。畢竟在 CS 領(lǐng)域還是很少有技術(shù)銀彈的說法。
而在整個互聯(lián)網(wǎng)領(lǐng)域,冪等不僅僅適用于消息隊列的重復(fù)消費問題,這些實現(xiàn)冪等的方法,也同樣適用于,在其他場景中來解決重復(fù)請求或者重復(fù)調(diào)用的問題 。比如將HTTP服務(wù)設(shè)計成冪等的,解決前端或者APP重復(fù)提交表單數(shù)據(jù)的問題 ,也可以將一個微服務(wù)設(shè)計成冪等的,解決 RPC 框架自動重試導(dǎo)致的 重復(fù)調(diào)用問題 。
分布式事務(wù)
如何解釋分布式事務(wù)呢?事務(wù)大家都知道吧?要么都執(zhí)行要么都不執(zhí)行 。在同一個系統(tǒng)中我們可以輕松地實現(xiàn)事務(wù),但是在分布式架構(gòu)中,我們有很多服務(wù)是部署在不同系統(tǒng)之間的,而不同服務(wù)之間又需要進行調(diào)用。比如此時我下訂單然后增加積分,如果保證不了分布式事務(wù)的話,就會出現(xiàn)A系統(tǒng)下了訂單,但是B系統(tǒng)增加積分失敗或者A系統(tǒng)沒有下訂單,B系統(tǒng)卻增加了積分。前者對用戶不友好,后者對運營商不利,這是我們都不愿意見到的。
那么,如何去解決這個問題呢?
如今比較常見的分布式事務(wù)實現(xiàn)有 2PC、TCC 和事務(wù)消息(half 半消息機制)。每一種實現(xiàn)都有其特定的使用場景,但是也有各自的問題,都不是完美的解決方案。
在 RocketMQ 中使用的是 事務(wù)消息加上事務(wù)反查機制 來解決分布式事務(wù)問題的。我畫了張圖,大家可以對照著圖進行理解。

在第一步發(fā)送的 half 消息 ,它的意思是 在事務(wù)提交之前,對于消費者來說,這個消息是不可見的 。
那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后 改變主題 為RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。
你可以試想一下,如果沒有從第5步開始的 事務(wù)反查機制 ,如果出現(xiàn)網(wǎng)路波動第4步?jīng)]有發(fā)送成功,這樣就會產(chǎn)生 MQ 不知道是不是需要給消費者消費的問題,他就像一個無頭蒼蠅一樣。在 RocketMQ 中就是使用的上述的事務(wù)反查來解決的,而在 Kafka 中通常是直接拋出一個異常讓用戶來自行解決。
你還需要注意的是,在 MQ Server 指向系統(tǒng)B的操作已經(jīng)和系統(tǒng)A不相關(guān)了,也就是說在消息隊列中的分布式事務(wù)是——本地事務(wù)和存儲消息到消息隊列才是同一個事務(wù)。這樣也就產(chǎn)生了事務(wù)的最終一致性,因為整個過程是異步的,每個系統(tǒng)只要保證它自己那一部分的事務(wù)就行了。
消息堆積問題
在上面我們提到了消息隊列一個很重要的功能——削峰 。那么如果這個峰值太大了導(dǎo)致消息堆積在隊列中怎么辦呢?
其實這個問題可以將它廣義化,因為產(chǎn)生消息堆積的根源其實就只有兩個——生產(chǎn)者生產(chǎn)太快或者消費者消費太慢。
我們可以從多個角度去思考解決這個問題,當流量到峰值的時候是因為生產(chǎn)者生產(chǎn)太快,我們可以使用一些 限流降級 的方法,當然你也可以增加多個消費者實例去水平擴展增加消費能力來匹配生產(chǎn)的激增。如果消費者消費過慢的話,我們可以先檢查 是否是消費者出現(xiàn)了大量的消費錯誤 ,或者打印一下日志查看是否是哪一個線程卡死,出現(xiàn)了鎖資源不釋放等等的問題。
當然,最快速解決消息堆積問題的方法還是增加消費者實例,不過 同時你還需要增加每個主題的隊列數(shù)量 。
別忘了在
RocketMQ中,一個隊列只會被一個消費者消費 ,如果你僅僅是增加消費者實例就會出現(xiàn)我一開始給你畫架構(gòu)圖的那種情況。

回溯消費
回溯消費是指 Consumer 已經(jīng)消費成功的消息,由于業(yè)務(wù)上需求需要重新消費,在RocketMQ 中, Broker 在向Consumer 投遞成功消息后,消息仍然需要保留 。并且重新消費一般是按照時間維度,例如由于 Consumer 系統(tǒng)故障,恢復(fù)后需要重新消費1小時前的數(shù)據(jù),那么 Broker 要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒。
這是官方文檔的解釋,我直接照搬過來就當科普了??????。
RocketMQ 的刷盤機制
上面我講了那么多的 RocketMQ 的架構(gòu)和設(shè)計原理,你有沒有好奇
在 Topic 中的 隊列是以什么樣的形式存在的?
隊列中的消息又是如何進行存儲持久化的呢?
我在上文中提到的 同步刷盤 和 異步刷盤 又是什么呢?它們會給持久化帶來什么樣的影響呢?
下面我將給你們一一解釋。
同步刷盤和異步刷盤

如上圖所示,在同步刷盤中需要等待一個刷盤成功的 ACK ,同步刷盤對 MQ 消息可靠性來說是一種不錯的保障,但是 性能上會有較大影響 ,一般地適用于金融等特定業(yè)務(wù)場景。
而異步刷盤往往是開啟一個線程去異步地執(zhí)行刷盤操作。消息刷盤采用后臺異步線程提交的方式進行, 降低了讀寫延遲 ,提高了 MQ 的性能和吞吐量,一般適用于如發(fā)驗證碼等對于消息保證要求不太高的業(yè)務(wù)場景。
一般地,異步刷盤只有在 Broker 意外宕機的時候會丟失部分數(shù)據(jù),你可以設(shè)置 Broker 的參數(shù) FlushDiskType 來調(diào)整你的刷盤策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。
同步復(fù)制和異步復(fù)制
上面的同步刷盤和異步刷盤是在單個結(jié)點層面的,而同步復(fù)制和異步復(fù)制主要是指的 Borker 主從模式下,主節(jié)點返回消息給客戶端的時候是否需要同步從節(jié)點。
- 同步復(fù)制: 也叫 “同步雙寫”,也就是說,只有消息同步雙寫到主從結(jié)點上時才返回寫入成功 。
- 異步復(fù)制: 消息寫入主節(jié)點之后就直接返回寫入成功 。
然而,很多事情是沒有完美的方案的,就比如我們進行消息寫入的節(jié)點越多就更能保證消息的可靠性,但是隨之的性能也會下降,所以需要程序員根據(jù)特定業(yè)務(wù)場景去選擇適應(yīng)的主從復(fù)制方案。
那么,異步復(fù)制會不會也像異步刷盤那樣影響消息的可靠性呢?
答案是不會的,因為兩者就是不同的概念,對于消息可靠性是通過不同的刷盤策略保證的,而像異步同步復(fù)制策略僅僅是影響到了 可用性 。為什么呢?其主要原因是 RocketMQ 是不支持自動主從切換的,當主節(jié)點掛掉之后,生產(chǎn)者就不能再給這個主節(jié)點生產(chǎn)消息了。
比如這個時候采用異步復(fù)制的方式,在主節(jié)點還未發(fā)送完需要同步的消息的時候主節(jié)點掛掉了,這個時候從節(jié)點就少了一部分消息。但是此時生產(chǎn)者無法再給主節(jié)點生產(chǎn)消息了,消費者可以自動切換到從節(jié)點進行消費(僅僅是消費),所以在主節(jié)點掛掉的時間只會產(chǎn)生主從結(jié)點短暫的消息不一致的情況,降低了可用性,而當主節(jié)點重啟之后,從節(jié)點那部分未來得及復(fù)制的消息還會繼續(xù)復(fù)制。
在單主從架構(gòu)中,如果一個主節(jié)點掛掉了,那么也就意味著整個系統(tǒng)不能再生產(chǎn)了。那么這個可用性的問題能否解決呢?一個主從不行那就多個主從的唄,別忘了在我們最初的架構(gòu)圖中,每個 Topic 是分布在不同 Broker 中的。

但是這種復(fù)制方式同樣也會帶來一個問題,那就是無法保證 嚴格順序 。在上文中我們提到了如何保證的消息順序性是通過將一個語義的消息發(fā)送在同一個隊列中,使用 Topic 下的隊列來保證順序性的。如果此時我們主節(jié)點A負責的是訂單A的一系列語義消息,然后它掛了,這樣其他節(jié)點是無法代替主節(jié)點A的,如果我們?nèi)我夤?jié)點都可以存入任何消息,那就沒有順序性可言了。
而在 RocketMQ 中采用了 Dledger 解決這個問題。他要求在寫入消息的時候,要求至少消息復(fù)制到半數(shù)以上的節(jié)點之后,才給客?端返回寫?成功,并且它是?持通過選舉來動態(tài)切換主節(jié)點的。這里我就不展開說明了,讀者可以自己去了解。
也不是說
Dledger是個完美的方案,至少在Dledger選舉過程中是無法提供服務(wù)的,而且他必須要使用三個節(jié)點或以上,如果多數(shù)節(jié)點同時掛掉他也是無法保證可用性的,而且要求消息復(fù)制板書以上節(jié)點的效率和直接異步復(fù)制還是有一定的差距的。
存儲機制
還記得上面我們一開始的三個問題嗎?到這里第三個問題已經(jīng)解決了。
但是,在 Topic 中的 隊列是以什么樣的形式存在的?隊列中的消息又是如何進行存儲持久化的呢? 還未解決,其實這里涉及到了 RocketMQ 是如何設(shè)計它的存儲結(jié)構(gòu)了。我首先想大家介紹 RocketMQ 消息存儲架構(gòu)中的三大角色——CommitLog 、ConsumeQueue 和 IndexFile 。
-
CommitLog: 消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募?/strong>,當文件滿了,寫入下一個文件。 -
ConsumeQueue: 消息消費隊列,引入的目的主要是提高消息消費的性能(我們再前面也講了),由于RocketMQ是基于主題Topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據(jù)Topic檢索消息是非常低效的。Consumer即可根據(jù)ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長設(shè)計,每一個條目共20個字節(jié),分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長度、8字節(jié)taghashcode,單個文件由30W個條目組成,可以像數(shù)組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M; -
IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。這里只做科普不做詳細介紹。
總結(jié)來說,整個消息存儲的結(jié)構(gòu),最主要的就是 CommitLoq 和 ConsumeQueue 。而 ConsumeQueue 你可以大概理解為 Topic 中的隊列。

RocketMQ 采用的是 混合型的存儲結(jié)構(gòu) ,即為 Broker 單個實例下所有的隊列共用一個日志數(shù)據(jù)文件來存儲消息。有意思的是在同樣高并發(fā)的 Kafka 中會為每個 Topic 分配一個存儲文件。這就有點類似于我們有一大堆書需要裝上書架,RockeMQ 是不分書的種類直接成批的塞上去的,而 Kafka 是將書本放入指定的分類區(qū)域的。
而 RocketMQ 為什么要這么做呢?原因是 提高數(shù)據(jù)的寫入效率 ,不分 Topic 意味著我們有更大的幾率獲取 成批 的消息進行數(shù)據(jù)寫入,但也會帶來一個麻煩就是讀取消息的時候需要遍歷整個大文件,這是非常耗時的。
所以,在 RocketMQ 中又使用了 ConsumeQueue 作為每個隊列的索引文件來 提升讀取消息的效率。我們可以直接根據(jù)隊列的消息序號,計算出索引的全局位置(索引序號*索引固定?度20),然后直接讀取這條索引,再根據(jù)索引中記錄的消息的全局位置,找到消息。
講到這里,你可能對 RockeMQ 的存儲架構(gòu)還有些模糊,沒事,我們結(jié)合著圖來理解一下。

emmm,是不是有一點復(fù)雜??,看英文圖片和英文文檔的時候就不要慫,硬著頭皮往下看就行。
如果上面沒看懂的讀者一定要認真看下面的流程分析!
首先,在最上面的那一塊就是我剛剛講的你現(xiàn)在可以直接 把 ConsumerQueue 理解為 Queue。
在圖中最左邊說明了 紅色方塊 代表被寫入的消息,虛線方塊代表等待被寫入的。左邊的生產(chǎn)者發(fā)送消息會指定 Topic 、QueueId 和具體消息內(nèi)容,而在 Broker 中管你是哪門子消息,他直接 **全部順序存儲到了 CommitLog **。而根據(jù)生產(chǎn)者指定的 Topic 和 QueueId 將這條消息本身在 CommitLog 的偏移(offset),消息本身大小,和tag的hash值存入對應(yīng)的 ConsumeQueue 索引文件中。而在每個隊列中都保存了 ConsumeOffset 即每個消費者組的消費位置(我在架構(gòu)那里提到了,忘了的同學可以回去看一下),而消費者拉取消息進行消費的時候只需要根據(jù) ConsumeOffset 獲取下一個未被消費的消息就行了。
上述就是我對于整個消息存儲架構(gòu)的大概理解(這里不涉及到一些細節(jié)討論,比如稀疏索引等等問題),希望對你有幫助。
因為有一個知識點因為寫嗨了忘講了,想想在哪里加也不好,所以我留給大家去思考一下吧。

為什么 CommitLog 文件要設(shè)計成固定大小的長度呢?提醒:內(nèi)存映射機制。
總結(jié)
總算把這篇博客寫完了。我講的你們還記得嗎???
這篇文章中我主要想大家介紹了
- 消息隊列出現(xiàn)的原因
- 消息隊列的作用(異步,解耦,削峰)
- 消息隊列帶來的一系列問題(消息堆積、重復(fù)消費、順序消費、分布式事務(wù)等等)
- 消息隊列的兩種消息模型——隊列和主題模式
- 分析了
RocketMQ的技術(shù)架構(gòu)(NameServer、Broker、Producer、Comsumer) - 結(jié)合
RocketMQ回答了消息隊列副作用的解決方案 - 介紹了
RocketMQ的存儲機制和刷盤策略。
等等。。。
作者:Francis
鏈接:RabbitMQ 入門
來源:gitee
