java中間件之rocketmq

一、RocketMq簡介

1.1 RocketMq是什么

RcoketMQ是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。具有以下特性:

支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型

在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞

支持拉(pull)和推(push)兩種消息模式

單一隊列百萬消息的堆積能力

支持多種消息協(xié)議,如JMS、MQTT 等

分布式高可用的部署架構(gòu),滿足至少一次消息傳e遞語義

提供docker鏡像用于隔離測試和云集群部署

提供配置、指標和監(jiān)控等功能豐富的Dashboard

1.2 RocketMq各個角色的介紹

Broker:暫存和傳輸消息,舉例:郵局

NameServer:管理Broker,舉例:各個郵局的管理機構(gòu)

Topic:區(qū)分消息的種類

Message Queue:Topic的分區(qū),用于并行發(fā)送和接受消息

1.5 RocketMQ集群部署結(jié)構(gòu)

1) Name Server

Name Server是一個幾乎無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。

2) Broker

Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應(yīng)多個Slave,但是一個Slave只能對應(yīng)一個Master,Master與Slave的對應(yīng)關(guān)系通過指定相同的Broker Name,不同的Broker Id來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。

每個Broker與Name Server集群中的所有節(jié)點建立長連接,定時(每隔30s)注冊Topic信息到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接。

3) Producer

Producer與Name Server集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master建立長連接,且定時向Master發(fā)送心跳。Producer完全無狀態(tài),可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊列的最新情況,這意味著如果Broker不可用,Producer最多30s能夠感知,在此期間內(nèi)發(fā)往Broker的所有消息都會失敗。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s中掃描所有存活的連接,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù),則關(guān)閉與Producer的連接。

4) Consumer

Consumer與Name Server集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master、Slave建立長連接,且定時向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規(guī)則由Broker配置決定。

Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味著Broker不可用時,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s掃描所有存活的連接,若某個連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù),則關(guān)閉連接;并向該Consumer Group的所有Consumer發(fā)出通知,Group內(nèi)的Consumer重新分配隊列,然后繼續(xù)消費。

1.6 RocketMq集群的工作流程

1、啟動NameServer,啟動后監(jiān)聽端口,等待Broker、Producer、Consumer連接上,相當于一個路由控制中心

2、Broker啟動,跟所有的NameServer保持長連接,定時發(fā)送心跳包。包含了當前Broker信息以及所有存儲的Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關(guān)系

3、收發(fā)消息前創(chuàng)建Topic,創(chuàng)建topic需要指定該Topic在哪些broker上,也可以發(fā)送消息自動創(chuàng)建topic

4、Producer發(fā)送消息,啟動時先跟NameServer集群中其中一臺建立長連接,并NameServer中獲取當前發(fā)送的topic存在哪些Broker上,輪詢從隊列中選擇一個隊列,然后跟隊列所在的Broker建立長連接從而向Broker發(fā)消息

5、Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當前訂閱的topic存在哪些Broker上,然后直接Broker建立連接通道,開始消費消息


三、Rocketmq如何支持分布式事務(wù)消息

場景

A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務(wù)一致性,通過引入中間層MQ,A和MQ保持事務(wù)一致性(異常情況下通過MQ反查A接口實現(xiàn)check),B和MQ保證事務(wù)一致(通過重試),從而達到最終事務(wù)一致性。


原理:大事務(wù)=小事務(wù) + 異步

1. MQ與DB一致性原理(兩方事務(wù))

流程圖


上圖是RocketMQ提供的保證MQ消息、DB事務(wù)一致性的方案。


MQ消息、DB操作一致性方案:

1)發(fā)送消息到MQ服務(wù)器,此時消息狀態(tài)為SEND_OK。此消息為consumer不可見。

2)執(zhí)行DB操作;DB執(zhí)行成功Commit DB操作,DB執(zhí)行失敗Rollback DB操作。

3)如果DB執(zhí)行成功,回復MQ服務(wù)器,將狀態(tài)為COMMIT_MESSAGE;如果DB執(zhí)行失敗,回復MQ服務(wù)器,將狀態(tài)改為ROLLBACK_MESSAGE。注意此過程有可能失敗。

4)MQ內(nèi)部提供一個名為“事務(wù)狀態(tài)服務(wù)”的服務(wù),此服務(wù)會檢查事務(wù)消息的狀態(tài),如果發(fā)現(xiàn)消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調(diào)業(yè)務(wù)系統(tǒng),業(yè)務(wù)系統(tǒng)在checkLocalTransactionState方法中檢查DB事務(wù)狀態(tài),如果成功,則回復COMMIT_MESSAGE,否則回復ROLLBACK_MESSAGE。


說明:

上面以DB為例,其實此處可以是任何業(yè)務(wù)或者數(shù)據(jù)源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀態(tài),在MQ服務(wù)器內(nèi)部是一個數(shù)字。

TransactionCheckListener是在消息的commit或者rollback消息丟失的情況下才會回調(diào)(上圖中灰色部分)。這種消息丟失只存在于斷網(wǎng)或者rocketmq集群掛了的情況下。當rocketmq集群掛了,如果采用異步刷盤,存在1s內(nèi)數(shù)據(jù)丟失風險,異步刷盤場景下保障事務(wù)沒有意義。所以如果要核心業(yè)務(wù)用Rocketmq解決分布式事務(wù)問題,建議選擇同步刷盤模式。


2.多系統(tǒng)之間數(shù)據(jù)一致性(多方事務(wù))

當需要保證多方(超過2方)的分布式一致性,上面的兩方事務(wù)一致性(通過Rocketmq的事務(wù)性消息解決)已經(jīng)無法支持。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。


以上圖交易系統(tǒng)為例:

1)交易系統(tǒng)創(chuàng)建訂單(往DB插入一條記錄),同時發(fā)送訂單創(chuàng)建消息。通過RocketMq事務(wù)性消息保證一致性

2)接著執(zhí)行完成訂單所需的同步核心RPC服務(wù)(非核心的系統(tǒng)通過監(jiān)聽MQ消息自行處理,處理結(jié)果不會影響交易狀態(tài))。執(zhí)行成功更改訂單狀態(tài),同時發(fā)送MQ消息。

3)交易系統(tǒng)接受自己發(fā)送的訂單創(chuàng)建消息,通過定時調(diào)度系統(tǒng)創(chuàng)建延時回滾任務(wù)(或者使用RocketMq的重試功能,設(shè)置第二次發(fā)送時間為定時任務(wù)的延遲創(chuàng)建時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執(zhí)行完,訂單狀態(tài)還未設(shè)置為完成,第二次消費時間可以指定)。延遲任務(wù)先通過查詢訂單狀態(tài)判斷訂單是否完成,完成則不創(chuàng)建回滾任務(wù),否則創(chuàng)建。 PS:多個RPC可以創(chuàng)建一個回滾任務(wù),通過一個消費組接受一次消息就可以;也可以通過創(chuàng)建多個消費組,一個消息消費多次,每次消費創(chuàng)建一個RPC的回滾任務(wù)。 回滾任務(wù)失敗,通過MQ的重發(fā)來重試。


以上是交易系統(tǒng)和其他系統(tǒng)之間保持最終一致性的解決方案。


3.案例分析

1)單機環(huán)境下的事務(wù)示意圖

如下為A給B轉(zhuǎn)賬的例子


四、順序消息

1.順序消息缺陷

發(fā)送順序消息無法利用集群Fail Over特性消費順序消息的并行度依賴于隊列數(shù)量隊列熱點問題,個別隊列由于哈希不均導致消息過多,消費速度跟不上,產(chǎn)生消息堆積問題遇到消息失敗的消息,無法跳過,當前隊列消費暫停。


2.原理

produce在發(fā)送消息的時候,把消息發(fā)到同一個隊列(queue)中,消費者注冊消息監(jiān)聽器為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息。


注意:把消息發(fā)到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue


3.擴展

可以通過實現(xiàn)發(fā)送消息的對列選擇器方法,實現(xiàn)部分順序消息。


舉例:比如一個數(shù)據(jù)庫通過MQ來同步,只需要保證每個表的數(shù)據(jù)是同步的就可以。解析binlog,將表名作為對列選擇器的參數(shù),這樣就可以保證每個表的數(shù)據(jù)到同一個對列里面,從而保證表數(shù)據(jù)的順序消費


五、最佳實踐

1. Producer

1) Topic

一個應(yīng)用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags,消費方在訂閱消息時,才可以利用tags 在broker做消息過濾。


2) key

每個消息在業(yè)務(wù)層面的唯一標識碼,要設(shè)置到keys字段,方便將來定位消息丟失問題。服務(wù)器會為每個消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過 topic,key來查詢這條消息內(nèi)容,以及消息被誰消費。由于是哈希索引,請務(wù)必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突。


//訂單Id

String orderId= "20034568923546";

message.setKeys(orderId);


3)日志

消息發(fā)送成功或者失敗,要打印消息日志,務(wù)必要打印send result和key 字段。

4) send

send消息方法,只要不拋異常,就代表發(fā)送成功。但是發(fā)送成功會有多個狀態(tài),在sendResult里定義。

SEND_OK:消息發(fā)送成功

FLUSH_DISK_TIMEOUT:消息發(fā)送成功,但是服務(wù)器刷盤超時,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機,消息才會丟失

FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功,但是服務(wù)器同步到Slave時超時,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機,消息才會丟失

SLAVE_NOT_AVAILABLE:消息發(fā)送成功,但是此時slave不可用,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機,消息才會丟失

2. Consumer

1)冪等

RocketMQ使用的消息原語是At Least Once,所以consumer可能多次收到同一個消息,此時務(wù)必做好冪等。

2)日志

消費時記錄日志,以便后續(xù)定位問題。

3)批量消費

盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容