一、RocketMq簡介
1.1 RocketMq是什么
RcoketMQ是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。具有以下特性:
支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型
在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
支持拉(pull)和推(push)兩種消息模式
單一隊列百萬消息的堆積能力
支持多種消息協(xié)議,如JMS、MQTT 等
分布式高可用的部署架構(gòu),滿足至少一次消息傳e遞語義
提供docker鏡像用于隔離測試和云集群部署
提供配置、指標和監(jiān)控等功能豐富的Dashboard
1.2 RocketMq各個角色的介紹
Broker:暫存和傳輸消息,舉例:郵局
NameServer:管理Broker,舉例:各個郵局的管理機構(gòu)
Topic:區(qū)分消息的種類
Message Queue:Topic的分區(qū),用于并行發(fā)送和接受消息
1.5 RocketMQ集群部署結(jié)構(gòu)

1) Name Server
Name Server是一個幾乎無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。
2) Broker
Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應(yīng)多個Slave,但是一個Slave只能對應(yīng)一個Master,Master與Slave的對應(yīng)關(guān)系通過指定相同的Broker Name,不同的Broker Id來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。
每個Broker與Name Server集群中的所有節(jié)點建立長連接,定時(每隔30s)注冊Topic信息到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接。
3) Producer
Producer與Name Server集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master建立長連接,且定時向Master發(fā)送心跳。Producer完全無狀態(tài),可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊列的最新情況,這意味著如果Broker不可用,Producer最多30s能夠感知,在此期間內(nèi)發(fā)往Broker的所有消息都會失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s中掃描所有存活的連接,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù),則關(guān)閉與Producer的連接。
4) Consumer
Consumer與Name Server集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master、Slave建立長連接,且定時向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規(guī)則由Broker配置決定。
Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味著Broker不可用時,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s掃描所有存活的連接,若某個連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù),則關(guān)閉連接;并向該Consumer Group的所有Consumer發(fā)出通知,Group內(nèi)的Consumer重新分配隊列,然后繼續(xù)消費。
1.6 RocketMq集群的工作流程
1、啟動NameServer,啟動后監(jiān)聽端口,等待Broker、Producer、Consumer連接上,相當于一個路由控制中心
2、Broker啟動,跟所有的NameServer保持長連接,定時發(fā)送心跳包。包含了當前Broker信息以及所有存儲的Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關(guān)系
3、收發(fā)消息前創(chuàng)建Topic,創(chuàng)建topic需要指定該Topic在哪些broker上,也可以發(fā)送消息自動創(chuàng)建topic
4、Producer發(fā)送消息,啟動時先跟NameServer集群中其中一臺建立長連接,并NameServer中獲取當前發(fā)送的topic存在哪些Broker上,輪詢從隊列中選擇一個隊列,然后跟隊列所在的Broker建立長連接從而向Broker發(fā)消息
5、Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當前訂閱的topic存在哪些Broker上,然后直接Broker建立連接通道,開始消費消息
三、Rocketmq如何支持分布式事務(wù)消息
場景
A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務(wù)一致性,通過引入中間層MQ,A和MQ保持事務(wù)一致性(異常情況下通過MQ反查A接口實現(xiàn)check),B和MQ保證事務(wù)一致(通過重試),從而達到最終事務(wù)一致性。
原理:大事務(wù)=小事務(wù) + 異步
1. MQ與DB一致性原理(兩方事務(wù))
流程圖

上圖是RocketMQ提供的保證MQ消息、DB事務(wù)一致性的方案。
MQ消息、DB操作一致性方案:
1)發(fā)送消息到MQ服務(wù)器,此時消息狀態(tài)為SEND_OK。此消息為consumer不可見。
2)執(zhí)行DB操作;DB執(zhí)行成功Commit DB操作,DB執(zhí)行失敗Rollback DB操作。
3)如果DB執(zhí)行成功,回復MQ服務(wù)器,將狀態(tài)為COMMIT_MESSAGE;如果DB執(zhí)行失敗,回復MQ服務(wù)器,將狀態(tài)改為ROLLBACK_MESSAGE。注意此過程有可能失敗。
4)MQ內(nèi)部提供一個名為“事務(wù)狀態(tài)服務(wù)”的服務(wù),此服務(wù)會檢查事務(wù)消息的狀態(tài),如果發(fā)現(xiàn)消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調(diào)業(yè)務(wù)系統(tǒng),業(yè)務(wù)系統(tǒng)在checkLocalTransactionState方法中檢查DB事務(wù)狀態(tài),如果成功,則回復COMMIT_MESSAGE,否則回復ROLLBACK_MESSAGE。
說明:
上面以DB為例,其實此處可以是任何業(yè)務(wù)或者數(shù)據(jù)源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀態(tài),在MQ服務(wù)器內(nèi)部是一個數(shù)字。
TransactionCheckListener是在消息的commit或者rollback消息丟失的情況下才會回調(diào)(上圖中灰色部分)。這種消息丟失只存在于斷網(wǎng)或者rocketmq集群掛了的情況下。當rocketmq集群掛了,如果采用異步刷盤,存在1s內(nèi)數(shù)據(jù)丟失風險,異步刷盤場景下保障事務(wù)沒有意義。所以如果要核心業(yè)務(wù)用Rocketmq解決分布式事務(wù)問題,建議選擇同步刷盤模式。
2.多系統(tǒng)之間數(shù)據(jù)一致性(多方事務(wù))

當需要保證多方(超過2方)的分布式一致性,上面的兩方事務(wù)一致性(通過Rocketmq的事務(wù)性消息解決)已經(jīng)無法支持。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。
以上圖交易系統(tǒng)為例:
1)交易系統(tǒng)創(chuàng)建訂單(往DB插入一條記錄),同時發(fā)送訂單創(chuàng)建消息。通過RocketMq事務(wù)性消息保證一致性
2)接著執(zhí)行完成訂單所需的同步核心RPC服務(wù)(非核心的系統(tǒng)通過監(jiān)聽MQ消息自行處理,處理結(jié)果不會影響交易狀態(tài))。執(zhí)行成功更改訂單狀態(tài),同時發(fā)送MQ消息。
3)交易系統(tǒng)接受自己發(fā)送的訂單創(chuàng)建消息,通過定時調(diào)度系統(tǒng)創(chuàng)建延時回滾任務(wù)(或者使用RocketMq的重試功能,設(shè)置第二次發(fā)送時間為定時任務(wù)的延遲創(chuàng)建時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執(zhí)行完,訂單狀態(tài)還未設(shè)置為完成,第二次消費時間可以指定)。延遲任務(wù)先通過查詢訂單狀態(tài)判斷訂單是否完成,完成則不創(chuàng)建回滾任務(wù),否則創(chuàng)建。 PS:多個RPC可以創(chuàng)建一個回滾任務(wù),通過一個消費組接受一次消息就可以;也可以通過創(chuàng)建多個消費組,一個消息消費多次,每次消費創(chuàng)建一個RPC的回滾任務(wù)。 回滾任務(wù)失敗,通過MQ的重發(fā)來重試。
以上是交易系統(tǒng)和其他系統(tǒng)之間保持最終一致性的解決方案。
3.案例分析
1)單機環(huán)境下的事務(wù)示意圖
如下為A給B轉(zhuǎn)賬的例子
四、順序消息
1.順序消息缺陷
發(fā)送順序消息無法利用集群Fail Over特性消費順序消息的并行度依賴于隊列數(shù)量隊列熱點問題,個別隊列由于哈希不均導致消息過多,消費速度跟不上,產(chǎn)生消息堆積問題遇到消息失敗的消息,無法跳過,當前隊列消費暫停。
2.原理
produce在發(fā)送消息的時候,把消息發(fā)到同一個隊列(queue)中,消費者注冊消息監(jiān)聽器為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息。
注意:把消息發(fā)到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue
3.擴展
可以通過實現(xiàn)發(fā)送消息的對列選擇器方法,實現(xiàn)部分順序消息。
舉例:比如一個數(shù)據(jù)庫通過MQ來同步,只需要保證每個表的數(shù)據(jù)是同步的就可以。解析binlog,將表名作為對列選擇器的參數(shù),這樣就可以保證每個表的數(shù)據(jù)到同一個對列里面,從而保證表數(shù)據(jù)的順序消費
五、最佳實踐
1. Producer
1) Topic
一個應(yīng)用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags,消費方在訂閱消息時,才可以利用tags 在broker做消息過濾。
2) key
每個消息在業(yè)務(wù)層面的唯一標識碼,要設(shè)置到keys字段,方便將來定位消息丟失問題。服務(wù)器會為每個消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過 topic,key來查詢這條消息內(nèi)容,以及消息被誰消費。由于是哈希索引,請務(wù)必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突。
//訂單Id
String orderId= "20034568923546";
message.setKeys(orderId);
3)日志
消息發(fā)送成功或者失敗,要打印消息日志,務(wù)必要打印send result和key 字段。
4) send
send消息方法,只要不拋異常,就代表發(fā)送成功。但是發(fā)送成功會有多個狀態(tài),在sendResult里定義。
SEND_OK:消息發(fā)送成功
FLUSH_DISK_TIMEOUT:消息發(fā)送成功,但是服務(wù)器刷盤超時,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機,消息才會丟失
FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功,但是服務(wù)器同步到Slave時超時,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機,消息才會丟失
SLAVE_NOT_AVAILABLE:消息發(fā)送成功,但是此時slave不可用,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機,消息才會丟失
2. Consumer
1)冪等
RocketMQ使用的消息原語是At Least Once,所以consumer可能多次收到同一個消息,此時務(wù)必做好冪等。
2)日志
消費時記錄日志,以便后續(xù)定位問題。
3)批量消費
盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。