RocketMQ實(shí)戰(zhàn)(三):分布式事務(wù)

《RocketMQ實(shí)戰(zhàn)(一)》,《RocketMQ實(shí)戰(zhàn)(二)》,本篇博客主要討論的話題是:順序消費(fèi)、RMQ在分布式事務(wù)中的應(yīng)用等。

關(guān)于多Master多Slave的說明

由于在之前的博客中已經(jīng)搭建了雙Master,其實(shí)多Master多Slave大同小異,因此這里并不會一步步的演示搭建多Master多Slave,而是從思路上,分析下重點(diǎn)應(yīng)該注意的配置項。

多Master多Slave

第一,這四臺機(jī)器,對外是一個統(tǒng)一的整體,是一個rocketmq cluster,因此需要brokerClusterName保持統(tǒng)一

第二,123機(jī)器是121的從,124機(jī)器是122的從,如何在配置中體現(xiàn)? 主和從的brokerName需要保持一致,另外brokerId標(biāo)示了誰是主,誰是從(brokerId=0的就是主,大于0的就是從)

第三,注意namesrvAddr的地址是4臺NameServer

第四,配置項中brokerRole需要指明 ASYNC_MASTER(異步復(fù)制Master) or SYNC_MASTER(同步雙寫Master) or SLAVE(從)

第五,和以前的多Master啟動方式一致,先啟動4臺Namesrv,然后用指定配置文件的方式啟動Master/Slave即可

第六,多Master多Slave的好處在于,即便集群中某個broker掛了,也可以繼續(xù)消費(fèi),保證了實(shí)時性的高可用,但是并不是說某個master掛了,slave就可以升級master,開源版本的rocketmq是不可以的。也就是說,在這種情況下,slave只能提供讀的功能,將失去消息負(fù)載的能力。


Queue in Topic

對于RocketMQ而言,Topic只是一個邏輯上的概念,真正的消息存儲其實(shí)是在Topic中的Queue中。想一想,為什么RocketMQ要這要設(shè)計呢?其實(shí)是為了消息的順序消費(fèi),后文中將為大家介紹。


queue in topic


默認(rèn)一個Topic中4個隊列


配置文件中指定


初步認(rèn)識RocketMQ的核心模塊


rocketmq模塊

rocketmq-broker:接受生產(chǎn)者發(fā)來的消息并存儲(通過調(diào)用rocketmq-store),消費(fèi)者從這里取得消息。

rocketmq-client:提供發(fā)送、接受消息的客戶端API。

rocketmq-namesrv:NameServer,類似于Zookeeper,這里保存著消息的TopicName,隊列等運(yùn)行時的元信息。(有點(diǎn)NameNode的味道)

rocketmq-common:通用的一些類,方法,數(shù)據(jù)結(jié)構(gòu)等

rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定義二進(jìn)制協(xié)議

rocketmq-store:消息、索引存儲等

rocketmq-filtersrv:消息過濾器Server,需要注意的是,要實(shí)現(xiàn)這種過濾,需要上傳代碼到MQ!【一般而言,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更復(fù)雜的過濾需求,可以考慮filtersrv組件】

rocketmq-tools:命令行工具


Order Message

RocketMQ提供了3種模式的Producer:

NormalProducer(普通)、OrderProducer(順序)、TransactionProducer(事務(wù))

在前面的博客當(dāng)中,涉及的都是NormalProducer,調(diào)用傳統(tǒng)的send方法,消息是無序的。接下來,我們來看看順序消費(fèi)。模擬這樣一個場景,如果一個用戶完成一個訂單需要3條消息,比如訂單的創(chuàng)建、訂單的支付、訂單的發(fā)貨,很顯然,同一個用戶的訂單消息必須要順序消費(fèi),但是不同用戶之間的訂單可以并行消費(fèi)。

生產(chǎn)者端代碼示例:

順序消息模式

注意,一個Message除了Topic/Tag外,還有Key的概念。

上圖的send方法不同于以往,有一個MessageQueueSelector,將用于指定特定的消息發(fā)往特定的隊列當(dāng)中!


順序消費(fèi)

注意在以前普通消費(fèi)消息時設(shè)置的回調(diào)是MessageListenerConcurrently,而順序消費(fèi)的回調(diào)設(shè)置是MessageListenerOrderly。

當(dāng)我們啟動2個Consumer進(jìn)行消費(fèi)時,可以觀察到:

多個消費(fèi)者消費(fèi)的結(jié)果

可以觀察得到,雖然從全局上來看,消息的消費(fèi)不是有序的,但是每一個訂單下的3條消息是順序消費(fèi)的!

其實(shí),如果需要保證消息的順序消費(fèi),那么很簡單,首先需要做到一組需要有序消費(fèi)的消息發(fā)往同一個broker的同一個隊列上!其次消費(fèi)者端采用有序Listener即可。

這里,RocketMQ底層是如何做到消息順序消費(fèi)的,看一看源碼你就能大概了解到,至少來說,在多線程消費(fèi)場景下,一個線程只去消費(fèi)一個隊列上的消息,那么自然就保證了消息消費(fèi)的順序性,同時也保證了多個線程之間的并發(fā)性。也就是說其實(shí)broker并不能完全保證消息的順序消費(fèi),它僅僅能保證的消息的順序發(fā)送而已!

關(guān)于多線程消費(fèi)這塊,RocketMQ早就替我們想好了,這樣設(shè)置即可:

消費(fèi)多線程設(shè)置

想一想,在ActiveMQ中,我們?nèi)绻雽?shí)現(xiàn)并發(fā)消費(fèi)的話,恐怕還得搞個線程池提交任務(wù)吧,RocketMQ讓我們的工作變得簡單!


Transaction Message

在說事務(wù)消息之前,我們先來說說分布式事務(wù)的那些事!

什么是分布式事務(wù),我的理解是一半事務(wù)。怎么說,比如有2個異構(gòu)系統(tǒng),A異構(gòu)系統(tǒng)要做T1,B異構(gòu)系統(tǒng)要做T2,要么都成功,要么都失敗。

要知道異構(gòu)系統(tǒng),很顯然,不在一個數(shù)據(jù)庫實(shí)例上,它們往往分布在不同物理節(jié)點(diǎn)上,本地事務(wù)已經(jīng)失效。

2階段提交

2階段提交協(xié)議,Two-Phase Commit,是處理分布式事務(wù)的一種常見手段。2PC,存在2個重要角色:事務(wù)協(xié)調(diào)器(TC),事務(wù)執(zhí)行者。

2PC,可以看到節(jié)點(diǎn)之間的通信次數(shù)太多了,時間很長!時間變長了,從而導(dǎo)致,事務(wù)鎖定的資源時間也變長了,造成資源等待時間變長!在高并發(fā)場景下,存在嚴(yán)重的性能問題!

下面,我們來看看MQ在高并發(fā)場景下,是如何解決分布式事務(wù)的。

考慮生活中的場景:

我們?nèi)ケ本c豐包子鋪吃炒肝,先去營業(yè)員那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排隊拿炒肝(Action2)。思考2個問題:第一,為什么不在付款的同時,給顧客炒肝?如果這樣的話,會增加處理時間,使得后面的顧客等待時間變長,相當(dāng)于降低了接待顧客的能力(降低了系統(tǒng)的QPS)。第二,付了款,拿到的是Ticket,顧客為什么會接受?從心理上說,顧客相信Ticket會兌現(xiàn)炒肝。事實(shí)上也是如此,就算在最后炒肝沒了,或者斷電斷水(系統(tǒng)出現(xiàn)異常),顧客依然可以通過Ticket進(jìn)行退款操作,這樣都不會有什么損失?。m然這么說,但是實(shí)際上包子鋪?zhàn)畲蠡怂睦?,如果炒肝真的沒了,浪費(fèi)了顧客的時間,不過顧客頂多發(fā)發(fā)牢騷,最后接受)

生活已經(jīng)告訴我們處理分布式事務(wù),保證數(shù)據(jù)最終一致性的思路!這個Ticket(憑證)其實(shí)就是消息!

業(yè)務(wù)和消息生成耦合在一起

業(yè)務(wù)操作和消息的生成耦合在一起,保證了只要A銀行的賬戶發(fā)生扣款,那么一定會生成一條轉(zhuǎn)賬消息。只要A銀行系統(tǒng)的事務(wù)成功提交,我們可以通過實(shí)時消息服務(wù),將轉(zhuǎn)賬消息通知B銀行系統(tǒng),如果B銀行系統(tǒng)回復(fù)成功,那么A銀行系統(tǒng)可以在table中設(shè)置這條轉(zhuǎn)賬消息的狀態(tài)。

這樣耦合的方式,從架構(gòu)上來看,就有點(diǎn)不太優(yōu)雅,而且存在一些問題。比如說,消息的存儲實(shí)質(zhì)上是在A銀行系統(tǒng)中的,如果A銀行系統(tǒng)出了問題,將導(dǎo)致無法轉(zhuǎn)賬。如果解耦,將消息獨(dú)立出來呢?

業(yè)務(wù)和消息解耦

如上圖所示,消息數(shù)據(jù)獨(dú)立存儲,業(yè)務(wù)和消息解耦,實(shí)質(zhì)上消息的發(fā)送有2次,一條是轉(zhuǎn)賬消息,另一條是確認(rèn)消息。

到這里,我們先來看看基于RocketMQ的代碼:

生產(chǎn)者示例代碼

生產(chǎn)者這里用到是:TransactionMQProducer。

這里涉及到2個角色:本地事務(wù)執(zhí)行器(代碼中的TransactionExecuterImpl)、服務(wù)器回查客戶端Listener(代碼中的TransactionCheckListener)。

如果事務(wù)消息發(fā)送到MQ上后,會回調(diào) ?本地事務(wù)執(zhí)行器;但是此時事務(wù)消息是prepare狀態(tài),對消費(fèi)者還不可見,需要 ?本地事務(wù)執(zhí)行器 ?返回RMQ一個確認(rèn)消息。


本地事務(wù)執(zhí)行器

事務(wù)消息是否對消費(fèi)者可見,完全由事務(wù)返回給RMQ的狀態(tài)碼決定(狀態(tài)碼的本質(zhì)也是一條消息)。


回查Listener


運(yùn)行結(jié)果

生產(chǎn)者發(fā)送了2條消息給RMQ,有一條本地事務(wù)執(zhí)行成功,有一條本地事務(wù)執(zhí)行失敗。

2條業(yè)務(wù)消息 + 2條確認(rèn)消息 ?因此是4條;

注意到消費(fèi)者只消費(fèi)了一條數(shù)據(jù),就是只有告訴RMQ本地事務(wù)執(zhí)行成功的那條消息才會被消費(fèi)!因此是1條!

但是,注意到本地事務(wù)執(zhí)行失敗的消息,RMQ并沒有check listener?這是為什么呢?因為RMQ在3.0.8的時候還是支持check listener回查機(jī)制的,但是到了3.2.6的時候?qū)⑹聞?wù)回查機(jī)制“閹割”了!

那么3.0.8的時候,RMQ是怎么做事務(wù)回查的呢?看一看源碼,你會知道,其實(shí)事務(wù)消息開始是prepare狀態(tài),然后RMQ會將其持久化到MySQL當(dāng)中,然后如果收到確認(rèn)消息,就刪除掉這條prepare消息,如果遲遲收不到確認(rèn)消息,那么RMQ會定時的掃描prepare消息,發(fā)送給produce group進(jìn)行回查確認(rèn)!

到這里,問題來了,要知道3.2.6版本,沒有回查機(jī)制了,會存在問題么?

當(dāng)然會存在問題!假設(shè),我們發(fā)送一條轉(zhuǎn)賬事務(wù)消息給RMQ,成功后回調(diào)本地事務(wù),DB減操作成功,剛準(zhǔn)備給RMQ一個確認(rèn)消息,此時突然斷電,或者網(wǎng)絡(luò)抖動,使得這條確認(rèn)消息沒有發(fā)送出去。此時RMQ中的那條轉(zhuǎn)賬事務(wù)消息,始終處于prepare狀態(tài),消費(fèi)者讀取不到,但是卻已經(jīng)完成一方的賬戶資金變動?。?!

既然,RMQ3.2.6版本不為我們進(jìn)行回查,那么只能由我們自己完成了。具體怎么做呢,咱們下期再來分析~?

see u , good night~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容