消息隊列的一些場景及源碼分析,RocketMQ使用相關(guān)問題及性能優(yōu)化

?前文目錄鏈接參考:

?消息隊列的一些場景及源碼分析,RocketMQ使用相關(guān)問題及性能優(yōu)化https://www.cnblogs.com/yizhiamumu/p/16694126.html

消息隊列的對比測試與RocketMQ使用擴展https://www.cnblogs.com/yizhiamumu/p/16677881.html

消息隊列為什么選用redis?聊聊如何做技術(shù)方案選型?https://www.cnblogs.com/yizhiamumu/p/16573472.html

分布式事務原理及解決方案案例?https://www.cnblogs.com/yizhiamumu/p/16662412.html

分布式事務實戰(zhàn)方案匯總https://www.cnblogs.com/yizhiamumu/p/16625677.html

消息隊列初見:一起聊聊引入系統(tǒng)mq 之后的問題?https://www.cnblogs.com/yizhiamumu/p/16573472.html



參考:消息隊列為什么選用redis?聊聊如何做技術(shù)方案選型?https://www.cnblogs.com/yizhiamumu/p/16573472.html


上文,我們把 Redis 當作隊列來使用時,始終面臨的 2 個問題:

Redis 本身可能會丟數(shù)據(jù)

面對消息積壓,Redis 內(nèi)存資源緊張

如果你的業(yè)務場景足夠簡單,對于數(shù)據(jù)丟失不敏感,而且消息積壓概率比較小的情況下,把 Redis 當作隊列是完全可以的。

?一:消息隊列的一些場景

1.1 為什么有各種各樣的 MQ?

近幾年,確實出現(xiàn)了很多消息隊列解決方案,但其實去分析每種消息隊列,會發(fā)現(xiàn)他們誕生的背景和要針對性解決的問題是不一樣的。

RabbitMQ?誕生于標準化與開源,打破了商業(yè)化消息隊列的技術(shù)壁壘,但應用場景其實沒變,定位為異步與解耦;

Kafka?誕生的背景是大數(shù)據(jù),以批量,高吞吐等核心能力搶占了大數(shù)據(jù)管道的心智,隨后非常自然地定位到 Streaming 領(lǐng)域;

EMQ?重點聚焦的領(lǐng)域在物聯(lián)網(wǎng),物聯(lián)網(wǎng)的挑戰(zhàn)跟其他領(lǐng)域是大相徑庭的,超大規(guī)模的設(shè)備與連接數(shù),規(guī)則引擎,甚者邊緣段需要有一整套完整的解決方案;

Pulsar?作為后起之秀嘗試在多個領(lǐng)域發(fā)力,包括 Messaging、Function、Streaming 等多領(lǐng)域都有相應布局。

回到 RocketMQ,大家能從近兩年 RocketMQ 在社區(qū)的一系列動作中發(fā)現(xiàn),RocketMQ 同時在消息、事件、流三個領(lǐng)域都有發(fā)力,逐漸演進至一個超融合處理平臺。作為一個融合的數(shù)據(jù)處理平臺,RocketMQ 當前在開源的布局看起來是與業(yè)界多個 MQ 趨同,在 RocketMQ 開源的背后其實是商業(yè)上真實的需求驅(qū)動。


1.2 從性能上來講,相關(guān)基準測試數(shù)據(jù)是什么水平?

一般講性能,其實就是吞吐量和延遲兩個指標。

對于吞吐量來講,RocketMQ 在 2017 年就能優(yōu)化到單機 50W 的 TPS。如果是在批量的場景,實際上從生產(chǎn)環(huán)境的穩(wěn)定性,以及業(yè)務消息的重要性來講,各個消息隊列都能輕易地打滿網(wǎng)絡(luò)帶寬或者磁盤資源。

也就是說,性能一般情況下差異都不大,是很難作為一個產(chǎn)品的核心競爭力的,除非是架構(gòu)層面有限制。

延遲就是一個非常重要的指標了,在線業(yè)務對于是 2ms 延時和 5ms 延時基本上都能接受,但非常難以接受的是經(jīng)常性有秒級的毛刺(在延遲這個指標后面長尾延遲)。

除了上述兩點,彈性和可擴展能力也是非常重要的。


1.3 消息如何存儲

消息我們可以直接在內(nèi)存中使用數(shù)組或者隊列來存儲數(shù)據(jù)即可。性能非常高。

但是有幾方面的缺點

數(shù)據(jù)丟失,比如異常情況服務器宕機重啟后內(nèi)存的消息會被丟失掉

數(shù)據(jù)量大的時候,內(nèi)存放不下,或者需要高昂的成本。如果 面對一些業(yè)務系統(tǒng)是不能容忍消息丟失的情況,單純放內(nèi)存存儲也不太可能,所以需要一款持久化的消息系統(tǒng)。

既然要存儲數(shù)據(jù),就需要解決數(shù)據(jù)存哪里?從存儲方式來看,主要有幾個方面:

關(guān)系型數(shù)據(jù)庫,比如mysql

分布式KV存儲,比如采用rocketdb實現(xiàn)的

文件系統(tǒng),log 的方式直接追加

性能,吞吐量,本質(zhì)上就是數(shù)據(jù)結(jié)構(gòu)的設(shè)計決定的。我們看看上面數(shù)據(jù)存儲方式對應的數(shù)據(jù)結(jié)構(gòu)

存儲數(shù)據(jù)結(jié)構(gòu)寫放大讀放大

mysqlB+ tree寫一條數(shù)據(jù)需要兩次寫入1、數(shù)據(jù)寫入是按頁為單位進行寫的,假設(shè)頁的大小為B 字節(jié),那么寫放大為Θ(B)(最壞的結(jié)果)2、為了避免在寫頁的過程中出現(xiàn)故障,需要寫入redo log(WAL)既支持隨機讀取又支持范圍查找的系統(tǒng)。讀放大為O(logBN/B),數(shù)據(jù)量大的適合性能會急劇下降,常規(guī)是b+ tree 超過4層,大約2000萬記錄是臨界點

rocketdbLSM treeMemtable/SSTable實現(xiàn),寫的話也變成順序?qū)懥耍ㄟ@一點是極大的優(yōu)化點),但是后臺會出現(xiàn)多路歸并算法來合并,這個過程占用磁盤IO 會到當前消息的讀寫有擾動,寫放大Θ(klogkN/B)讀的順序是MemTable->分層的sst ,性能會比B+ tree 略差,讀放大Θ((log2N/B)/logk)

文件系統(tǒng)append only log直接在文件末尾追加,所有的的寫都是順序的,因此性能極高不支持根據(jù)內(nèi)容進行檢索,只能根據(jù)文件偏移量執(zhí)行查詢


mysql 在大數(shù)據(jù)量的情況,性能會急劇下降,并且擴展性非常不友好。

分布式KV 存儲 天然的分布式系統(tǒng),對大數(shù)據(jù)量和未來的擴展都問題不大,LSM tree 對寫性能和吞吐都比mysql 要好。查詢其實是可以通過緩存等手段去優(yōu)化,可以考慮。

但是,滿足性能和吞吐量最優(yōu)的毫無疑問是使用文件系統(tǒng),因為消息不需要修改,讀和寫都是順序讀寫,性能極高。

但是現(xiàn)實中的需求我們可能需要使用多個隊列來完成不同的業(yè)務。比如一個隊列來處理訂單相關(guān)的業(yè)務,一個隊列來處理商品相關(guān)的業(yè)務等等。那么我們該如何調(diào)整呢?

我們都知道文件 append only log 的方式是不支持根據(jù)消息的內(nèi)容來搜索的,如果所有的隊列的數(shù)據(jù)存在一個文件中,是沒辦法滿足需求的。

換個思路,一個隊列一個文件我們就可以繞開根據(jù)內(nèi)容檢索的需求,kafka 就是這么玩的。

這個時候,每個隊列一個文件,讀寫還是順序的嗎


我們現(xiàn)在面臨的問題是,作為一款面向業(yè)務的高性能消息中間件,隨著業(yè)務的復雜度變高,隊列數(shù)量是急劇變大的。

如果要保證寫入的吞吐量和性能,還需要得所有的隊列都寫在同一個文件。

但是,按照隊列消費的場景就意味著要根據(jù)消息內(nèi)容(隊列名字)來進行消費,append only log 是不支持檢索的,如何解決這個問題。


我們會增加一個索引來處理慢sql 。我們是否也可以建立一個隊列的索引,每一個隊列就是一個索引文件。

讀取數(shù)據(jù)的時候,先從索引隊列找到消息在文件的偏移量后,在到數(shù)據(jù)文件去讀取。


那么,索引的文件的數(shù)量變大的之后,那么對索引文件的讀寫不就是又變成隨機讀寫了嗎?性能又會急劇下降?

一個一個來解決:

寫索引文件的時候,我們可以改成異步寫,也就是寫完數(shù)據(jù)文件,可以直接返回給客戶端成功了,后臺再由一個線程不停的從數(shù)據(jù)文件獲取數(shù)據(jù)來構(gòu)建索引,這樣就可以解決寫的性能瓶頸了

讀的問題,我們要盡量避免直接從磁盤讀,改成從內(nèi)存讀。放在內(nèi)存就意味著索引的內(nèi)容要足夠小,不然根本放不下。所以問題就變成盡量控制索引文件的大小,放在內(nèi)存里面來避開磁盤讀從而提高性能

?(rocketmq 中數(shù)據(jù)文件稱為:commitlog,? ?topic索引文件稱為 consumeQueue)

方案優(yōu)點缺點

每一個queue 都單獨一個文件消費的時候不需要獨立建立一個索引,系統(tǒng)復雜度降低,并且性能高當queue 很多的時候,并且每個queue 的數(shù)據(jù)量都不是很大情況,就會存在很多小文件,寫和讀都講變成隨機讀,性能會受到影響

所有queue 共享一個文件所有的寫都是順序?qū)懙模阅鼙容^高,可以支撐大量queue 性能也不至于下降的厲害1、需要建立獨立的索引文件,查詢數(shù)據(jù)的鏈路變長,需要先從索引查到數(shù)據(jù)再到數(shù)據(jù)文件查詢2、索引隊列本身也是小文件,好在因為數(shù)據(jù)量少,基本可以常駐內(nèi)存3、讀變成隨時讀,不過整體還是順序讀


我們得出結(jié)論:選擇文件系統(tǒng),append only log.根據(jù)消息隊列即時消費和順序讀寫的特點,剛寫入的內(nèi)容還在page cache ,就被讀走了,甚至都不需要回到磁盤,性能會非常高。


1.5 數(shù)據(jù)量大了存儲怎么辦

本地切割,大文件變小文件

如果所有的數(shù)據(jù)都存在一個commitlog 文件的話,隨著數(shù)據(jù)量變大,文件必然會非常大。

解決方案是,我們大文件切換成小文件,每個文件固定大小1G,寫滿了就切換到一個新的文件

分布式存儲

消息隊列的第一個特點就是數(shù)據(jù)量大,一臺機器容易面臨瓶頸,因此我們需要把數(shù)據(jù)均衡的分發(fā)到各個機器上。

解決方案是,一段很長的隊列平均切成N份,把這N份分別放到不同的機器上

1.6 消息高可靠

雖然消息已經(jīng)分成切分成為多份放到不同的機器了,但是每一份都是都只有一個副本,也就意味著,任何一臺機器的硬盤壞掉的話,該機器上的消息就會丟失掉了,這是不可接受的。

行業(yè)通常的做法一份數(shù)據(jù)存多個副本,并且確保所有的副本不能全都在同一臺機器。

問題來了,那么這多份數(shù)據(jù)是同步雙寫還是異步雙寫呢?

方案優(yōu)點缺點

同步雙寫數(shù)據(jù)不會丟失性能會降低,單個RT變長

異步雙寫單個RT 更加小,性能更高,吞吐量更大數(shù)據(jù)可能會丟失


其實每個業(yè)務場景需求是不一樣的,RocketMq 是支持可配置的


1.7?Broker是怎么保存數(shù)據(jù)的呢?

RocketMQ主要的存儲文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件


RocketMQ采用的是混合型的存儲結(jié)構(gòu),即為Broker單個實例下所有的隊列共用一個日志數(shù)據(jù)文件(即為CommitLog)來存儲。

RocketMQ的混合型存儲結(jié)構(gòu)(多個Topic的消息實體內(nèi)容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。

只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發(fā)送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內(nèi)有新消息到達,將直接返回給消費端。

這里,RocketMQ的具體做法是,使用Broker端的后臺服務線程—ReputMessageService不停地分發(fā)請求并異步構(gòu)建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù)


所以,Broker是怎么保存數(shù)據(jù)的呢?Broker在收到消息之后,會把消息保存到commitlog的文件當中,而同時在分布式的存儲當中,每個broker都會保存一部分topic的數(shù)據(jù),同時,每個topic對應的messagequeue下都會生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中會保存key和offset的對應關(guān)系。


Broker

CommitLog文件保存于${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來文件名的偏移量,每個文件默認1G,寫滿后自動生成一個新的文件。


log


由于同一個topic的消息并不是連續(xù)的存儲在commitlog中,消費者如果直接從commitlog獲取消息效率非常低,所以通過consumequeue保存commitlog中消息的偏移量的物理地址,這樣消費者在消費的時候先從consumequeue中根據(jù)偏移量定位到具體的commitlog物理文件,然后根據(jù)一定的規(guī)則(offset和文件大小取模)在commitlog中快速定位。


log


1.8 RocketMQ怎么對文件進行讀寫

RocketMQ對文件的讀寫巧妙地利用了操作系統(tǒng)的一些高效文件讀寫方式——PageCache、順序讀寫、零拷貝

1.8.1 PageCache、順序讀取

在RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數(shù)據(jù)較少,并且是順序讀取,在page cache機制的預讀取作用下,Consume Queue文件的讀性能幾乎接近讀內(nèi)存,即使在有消息堆積情況下也不會影響性能。而對于CommitLog消息存儲的日志數(shù)據(jù)文件來說,讀取消息內(nèi)容時候會產(chǎn)生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統(tǒng)IO調(diào)度算法,比如設(shè)置調(diào)度算法為Deadline(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。

頁緩存(PageCache)是OS對文件的緩存,用于加速對文件的讀寫。一般來說,程序?qū)ξ募M行順序讀寫的速度幾乎接近于內(nèi)存的讀寫速度,主要原因就是由于OS使用PageCache機制對讀寫訪問操作進行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache。對于數(shù)據(jù)的寫入,OS會先寫入至Cache內(nèi),隨后通過異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤至物理磁盤上。對于數(shù)據(jù)的讀取,如果一次讀取文件時出現(xiàn)未命中PageCache的情況,OS從物理磁盤上訪問讀取文件的同時,會順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進行預讀取

1.8.2 零拷貝

RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁盤上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種Mmap的方式減少了傳統(tǒng)IO,將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū),和用戶應用程序地址空間的緩沖區(qū)之間來回進行拷貝的性能開銷),將對文件的操作轉(zhuǎn)化為直接對內(nèi)存地址進行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內(nèi)存映射機制,故RocketMQ的文件存儲都使用定長結(jié)構(gòu)來存儲,方便一次將整個文件映射至內(nèi)存)。

什么是零拷貝

在操作系統(tǒng)中,使用傳統(tǒng)的方式,數(shù)據(jù)需要經(jīng)歷幾次拷貝,還要經(jīng)歷用戶態(tài)/內(nèi)核態(tài)切換

從磁盤復制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存;

從內(nèi)核態(tài)內(nèi)存復制到用戶態(tài)內(nèi)存;

然后從用戶態(tài)內(nèi)存復制到網(wǎng)絡(luò)驅(qū)動的內(nèi)核態(tài)內(nèi)存;

最后是從網(wǎng)絡(luò)驅(qū)動的內(nèi)核態(tài)內(nèi)存復制到網(wǎng)卡中進行傳輸。



?傳統(tǒng)文件傳輸示意圖


所以,可以通過零拷貝的方式,減少用戶態(tài)與內(nèi)核態(tài)的上下文切換和內(nèi)存拷貝的次數(shù),用來提升I/O的性能。零拷貝比較常見的實現(xiàn)方式是mmap,這種機制在Java中是通過MappedByteBuffer實現(xiàn)的。


?mmap示意圖



1.9 消息刷盤怎么實現(xiàn)

RocketMQ提供了兩種刷盤策略:同步刷盤和異步刷盤

同步刷盤:在消息達到Broker的內(nèi)存之后,必須刷到commitLog日志文件中才算成功,然后返回Producer數(shù)據(jù)已經(jīng)發(fā)送成功。

異步刷盤:異步刷盤是指消息達到Broker內(nèi)存后就返回Producer數(shù)據(jù)已經(jīng)發(fā)送成功,會喚醒一個線程去將數(shù)據(jù)持久化到CommitLog日志文件中

Broker在消息的存取時直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無法避免機器掉電時數(shù)據(jù)丟失,所以需要持久化到磁盤中

刷盤的最終實現(xiàn)都是使用NIO中的MappedByteBuffer.force()將映射區(qū)的數(shù)據(jù)寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區(qū)后,就會等待寫入完成

異步而言,只是喚醒對應的線程,不保證執(zhí)行的時機,



1.10 RocketMQ的負載均衡

RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端發(fā)送消息時候的負載均衡和Consumer端訂閱消息的負載均衡。

1.10.1 Producer的負載均衡

Producer端在發(fā)送消息的時候,會先根據(jù)Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發(fā)送消息。具這里有一個sendLatencyFaultEnable開關(guān)變量,如果開啟,在隨機遞增取模的基礎(chǔ)上,再過濾掉not available的Broker代理。


Producer負載均衡:索引遞增隨機取模public MessageQueue selectOneMessageQueue(){

? ? //索引遞增intindex =this.sendWhichQueue.incrementAndGet();

? ? //利用索引取隨機數(shù),取余數(shù)intpos = Math.abs(index) %this.messageQueueList.size();

? ? if(pos<0){

? ? ? ? pos=0;?

? ? }

? ? returnthis.messageQueueList.get(pos);

}


所謂的latencyFaultTolerance,是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關(guān)閉,采用隨機遞增取模的方式選擇一個隊列(MessageQueue)來發(fā)送消息,latencyFaultTolerance機制是實現(xiàn)消息發(fā)送高可用的核心關(guān)鍵所在。

1.10.2 Consumer的負載均衡

在RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基于拉模式來獲取消息的,而在Push模式只是對pull模式的一種封裝,其本質(zhì)實現(xiàn)為消息拉取線程在從服務器拉取到一批消息后,然后提交到消息消費線程池后,又“馬不停蹄”的繼續(xù)向服務器再次嘗試拉取消息。如果未拉取到消息,則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費方式(Push/Pull)中,均需要Consumer端知道從Broker端的哪一個消息隊列中去獲取消息。因此,有必要在Consumer端來做負載均衡,即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費。

Consumer端的心跳包發(fā)送

在Consumer啟動后,它就會通過定時任務不斷地向RocketMQ集群中的所有Broker實例發(fā)送心跳包(其中包含了,消息消費分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后,會將它維護在ConsumerManager的本地緩存變量—consumerTable,同時并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負載均衡提供可以依據(jù)的元數(shù)據(jù)信息。

Consumer端實現(xiàn)負載均衡的核心類—RebalanceImpl

在Consumer實例的啟動流程中的啟動MQClientInstance實例部分,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執(zhí)行一次)。

通過查看源碼可以發(fā)現(xiàn),RebalanceService線程的run()方法最終調(diào)用的是RebalanceImpl類的rebalanceByTopic()方法,這個方法是實現(xiàn)Consumer端負載均衡的核心。

rebalanceByTopic()方法會根據(jù)消費者通信類型為廣播模式還是集群模式做不同的邏輯處理


1.11 RocketMQ消息長輪詢

所謂的長輪詢,就是Consumer拉取消息,如果對應的Queue如果沒有數(shù)據(jù),Broker不會立即返回,而是把?PullReuqest?hold起來,等待?queue消息后,或者長輪詢阻塞時間到了,再重新處理該?queue?上的所有?PullRequest

//如果沒有拉到數(shù)據(jù)case ResponseCode.PULL_NOT_FOUND:// broker 和 consumer 都允許 suspend,默認開啟if(brokerAllowSuspend && hasSuspendFlag) {

? ? longpollingTimeMills = suspendTimeoutMillisLong;

? ? if(!this.brokerController.getBrokerConfig().isLongPollingEnable()) {

? ? ? ? ? pollingTimeMills =this.brokerController.getBrokerConfig().getShortPollingTimeMills();

? ? }

? ? String topic = requestHeader.getTopic();

? ? longoffset = requestHeader.getQueueOffset();

? ? intqueueId = requestHeader.getQueueId();

? ? //封裝一個PullRequestPullRequest pullRequest =new PullRequest(request, channel, pollingTimeMills,

? ? this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);

? //把PullRequest掛起來this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);

? ? response =null;

? ? break;

}



掛起的請求,有一個服務線程會不停地檢查,看queue中是否有數(shù)據(jù),或者超時。

PullRequestHoldService#run()

@Overridepublicvoid run() {

? log.info("{} service started",this.getServiceName());

? while(!this.isStopped()) {

? ? ? try {

? ? ? ? ? if(this.brokerController.getBrokerConfig().isLongPollingEnable()) {

? ? ? ? ? ? ? ? this.waitForRunning(5 * 1000);

? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());

? ? ? ? ? ? ? ? }

? ? ? ? longbeginLockTimestamp =this.systemClock.now();

? ? ? ? //檢查hold住的請求this.checkHoldRequest();

? ? ? ? longcostTime =this.systemClock.now() - beginLockTimestamp;

? ? ? ? if(costTime > 5 * 1000) {

? ? ? ? ? ? ? log.info("[NOTIFYME] check hold request cost {} ms.", costTime);

? ? ? ? }

? ? ? ? } catch (Throwable e) {

? ? ? ? ? ? log.warn(this.getServiceName() + " service has exception. ", e);

? ? ? ? ? ? }

? ? ? ? }

? ? log.info("{} service end",this.getServiceName());

? ? }


1.12 RocketMQ為什么速度快?

是因為使用了順序存儲、Page Cache和異步刷盤。

1、我們在寫入commitlog的時候是順序?qū)懭氲模@樣比隨機寫入的性能就會提高很多。

2、寫入commitlog的時候并不是直接寫入磁盤,而是先寫入操作系統(tǒng)的PageCache。

3、最后由操作系統(tǒng)異步將緩存中的數(shù)據(jù)刷到磁盤。


二:RocketMQ的基本架構(gòu)

RocketMQ一共有四個部分組成:NameServer,Broker,Producer 生產(chǎn)者,Consumer 消費者,它們對應了:發(fā)現(xiàn)、發(fā)、存、收,為了保證高可用,一般每一部分都是集群部署的

1.1 NameServer

NameServer?是一個無狀態(tài)的服務器,角色類似于?Kafka使用的?Zookeeper,但比?Zookeeper?更輕量。

特點:

每個?NameServer?結(jié)點之間是相互獨立,彼此沒有任何信息交互。

Nameserver?被設(shè)計成幾乎是無狀態(tài)的,通過部署多個結(jié)點來標識自己是一個偽集群,Producer?在發(fā)送消息前從?NameServer中獲取?Topic?的路由信息也就是發(fā)往哪個?Broker,Consumer?也會定時從?NameServer獲取?Topic的路由信息,Broker?在啟動時會向?NameServer?注冊,并定時進行心跳連接,且定時同步維護的?Topic?到?NameServer

功能主要有兩個:

和Broker?結(jié)點保持長連接。

維護?Topic?的路由信息。

1.2 Broker

消息存儲和中轉(zhuǎn)角色,負責存儲和轉(zhuǎn)發(fā)消息

Broker?內(nèi)部維護著一個個?Consumer Queue,用來存儲消息的索引,真正存儲消息的地方是?CommitLog(日志文件)

單個?Broker?與所有的?Nameserver?保持著長連接和心跳,并會定時將?Topic?信息同步到?NameServer,和?NameServer?的通信底層是通過?Netty?實現(xiàn)的。

1.3 Producer

消息生產(chǎn)者,業(yè)務端負責發(fā)送消息,由用戶自行實現(xiàn)和分布式部署。

Producer由用戶進行分布式部署,消息由Producer通過多種負載均衡模式發(fā)送到Broker集群,發(fā)送低延時,支持快速失敗。

RocketMQ?提供了三種方式發(fā)送消息:同步、異步和單向

同步發(fā)送:同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會在收到接收方發(fā)回響應之后才發(fā)下一個數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營銷短信。

異步發(fā)送:異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應,接著發(fā)送下個數(shù)據(jù)包,一般用于可能鏈路耗時較長而對響應時間敏感的業(yè)務場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務。

單向發(fā)送:單向發(fā)送是指只負責發(fā)送消息而不等待服務器回應且沒有回調(diào)函數(shù)觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集

1.4 Consumer

消息消費者,負責消費消息,一般是后臺系統(tǒng)負責異步消費。

Consumer也由用戶部署,支持PUSH和PULL兩種消費模式,支持集群消費和廣播消費,提供實時的消息訂閱機制。

Pull:拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啟動消費過程,所以?Pull?稱為主動消費型

Push:推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的內(nèi)部維護工作,將消息到達時執(zhí)行的回調(diào)接口留給用戶應用程序來實現(xiàn)。所以?Push?稱為被動消費類型,但其實從實現(xiàn)上看還是從消息服務器中拉取消息,不同于?Pull?的是?Push?首先要注冊消費監(jiān)聽器,當監(jiān)聽器處觸發(fā)后才開始消費消息

2?RocketMQ?原理

2.1 RocketMQ整體工作流程

RocketMQ是一個分布式消息隊列,也就是消息隊列+分布式系統(tǒng)

作為消息隊列,它是發(fā)-存-收的一個模型,對應的就是Producer、Broker、Cosumer;作為分布式系統(tǒng),它要有服務端、客戶端、注冊中心,對應的就是Broker、Producer/Consumer、NameServer

主要的工作流程:RocketMQ由NameServer注冊中心集群、Producer生產(chǎn)者集群、Consumer消費者集群和若干Broker(RocketMQ進程)組成:

Broker在啟動的時候去向所有的NameServer注冊,并保持長連接,每30s發(fā)送一次心跳

Producer在發(fā)送消息的時候從NameServer獲取Broker服務器地址,根據(jù)負載均衡算法選擇一臺服務器來發(fā)送消息

Conusmer消費消息的時候同樣從NameServer獲取Broker地址,然后主動拉取消息來消費

2.2 如何保證RocketMQ的高可用?

NameServer因為是無狀態(tài),且不相互通信的,所以只要集群部署就可以保證高可用。

RocketMQ的高可用主要是在體現(xiàn)在Broker的讀和寫的高可用,Broker的高可用是通過集群和主從實現(xiàn)的。

Broker可以配置兩種角色:Master和Slave,Master角色的Broker支持讀和寫,Slave角色的Broker只支持讀,Master會向Slave同步消息。

也就是說Producer只能向Master角色的Broker寫入消息,Cosumer可以從Master和Slave角色的Broker讀取消息。

Consumer 的配置文件中,并不需要設(shè)置是從 Master 讀還是從 Slave讀,當 Master 不可用或者繁忙的時候, Consumer 的讀請求會被自動切換到從 Slave。有了自動切換 Consumer 這種機制,當一個 Master 角色的機器出現(xiàn)故障后,Consumer 仍然可以從 Slave 讀取消息,不影響 Consumer 讀取消息,這就實現(xiàn)了讀的高可用

如何達到發(fā)送端寫的高可用性呢?

在創(chuàng)建 Topic 的時候,把 Topic 的多個Message Queue 創(chuàng)建在多個 Broker 組上(相同 Broker 名稱,不同 brokerId機器組成 Broker 組),這樣當 Broker 組的 Master 不可用后,其他組Master 仍然可用, Producer 仍然可以發(fā)送消息 RocketMQ 目前還不支持把Slave自動轉(zhuǎn)成 Master ,如果機器資源不足,需要把 Slave 轉(zhuǎn)成 Master ,則要手動停止 Slave 色的 Broker ,更改配置文件,用新的配置文件啟動 Broker。


2.3 Master和Slave之間是怎么同步數(shù)據(jù)的呢?

而消息在master和slave之間的同步是根據(jù)raft協(xié)議來進行的:

1、在broker收到消息后,會被標記為uncommitted狀態(tài)

2、然后會把消息發(fā)送給所有的slave

3、slave在收到消息之后返回ack響應給master

4、master在收到超過半數(shù)的ack之后,把消息標記為committed

5、發(fā)送committed消息給所有slave,slave也修改狀態(tài)為committed


2.4 為什么RocketMQ不使用Zookeeper作為注冊中心呢?

Kafka采用Zookeeper作為注冊中心(也開始逐漸去Zookeeper),

RocketMQ不使用Zookeeper其實主要可能從這幾方面來考慮:

基于可用性的考慮,根據(jù)CAP理論【Consistency(一致性)、Availability(可用性)、Partition Tolerance(分區(qū)容錯性),不能同時成立】,Zookeeper滿足的是CP,并不能保證服務的可用性,Zookeeper在進行選舉的時候,整個選舉的時間太長,期間整個集群都處于不可用的狀態(tài),而這對于一個注冊中心來說肯定是不能接受的,作為服務發(fā)現(xiàn)來說就應該是為可用性而設(shè)計。

基于性能的考慮,NameServer本身的實現(xiàn)非常輕量,而且可以通過增加機器的方式水平擴展,增加集群的抗壓能力,而Zookeeper的寫是不可擴展的,Zookeeper要解決這個問題只能通過劃分領(lǐng)域,劃分多個Zookeeper集群來解決,首先操作起來太復雜,其次這樣還是又違反了CAP中的A的設(shè)計,導致服務之間是不連通的。

持久化的機制來帶的問題,ZooKeeper 的 ZAB 協(xié)議對每一個寫請求,會在每個 ZooKeeper ?節(jié)點上保持寫一個事務日志,同時再加上定期的將內(nèi)存數(shù)據(jù)鏡像(Snapshot)到磁盤來保證數(shù)據(jù)的一致性和持久性,而對于一個簡單的服務發(fā)現(xiàn)的場景來說,這其實沒有太大的必要,這個實現(xiàn)方案太重了。而且本身存儲的數(shù)據(jù)應該是高度定制化的。

消息發(fā)送應該弱依賴注冊中心,這也是RocketMQ的設(shè)計理念,生產(chǎn)者在第一次發(fā)送消息的時候從NameServer獲取到Broker地址后緩存到本地,如果NameServer整個集群不可用,短時間內(nèi)對于生產(chǎn)者和消費者并不會產(chǎn)生太大影響。


?三:RocketMQ使用相關(guān)問題

1. 如何保證消息的可用性/可靠性/不丟失呢?

消息的一個處理方式是異步發(fā)送,那消息可靠性怎么保證?

消息丟失可能發(fā)生在生產(chǎn)者發(fā)送消息、MQ本身丟失消息、消費者丟失消息3個方面。



1.1 生產(chǎn)者丟失

生產(chǎn)者丟失消息的可能點在于程序發(fā)送失敗拋異常了沒有重試處理,或者發(fā)送的過程成功但是過程中網(wǎng)絡(luò)閃斷MQ沒收到,消息就丟失了。

由于同步發(fā)送的一般不會出現(xiàn)這樣使用方式。

異步發(fā)送的場景下,一般分為兩個方式:異步有回調(diào)和異步無回調(diào),無回調(diào)的方式,生產(chǎn)者發(fā)送完后不管結(jié)果可能就會造成消息丟失,而通過異步發(fā)送+回調(diào)通知+本地消息表的形式我們就可以做出一個解決方案。


所以在生產(chǎn)階段,主要通過請求確認機制,來保證消息的可靠傳遞。

1、同步發(fā)送的時候,要注意處理響應結(jié)果和異常。如果返回響應OK,表示消息成功發(fā)送到了Broker,如果響應失敗,或者發(fā)生其它異常,都應該重試。

2、異步發(fā)送的時候,應該在回調(diào)方法里檢查,如果發(fā)送失敗或者異常,都應該進行重試。

3、如果發(fā)生超時的情況,也可以通過查詢?nèi)罩镜腁PI,來檢查是否在Broker存儲成功


以下單的場景舉例。

1、下單后先保存本地數(shù)據(jù)和MQ消息表,這時候消息的狀態(tài)是發(fā)送中,如果本地事務失敗,那么下單失敗,事務回滾(訂單數(shù)據(jù)、MQ消息記錄都不會保存)。

2、下單成功,直接返回客戶端成功,異步發(fā)送MQ消息。

3、MQ回調(diào)通知消息發(fā)送結(jié)果,對應更新數(shù)據(jù)庫MQ發(fā)送狀態(tài)。

4、JOB輪詢超過一定時間(時間根據(jù)業(yè)務配置)還未發(fā)送成功的消息去重試

在監(jiān)控平臺配置或者JOB程序處理超過一定次數(shù)一直發(fā)送不成功的消息,告警,人工介入。


MQ

異步回調(diào)的形式是適合大部分場景下的一種解決方案。

1.2 MQ?存儲丟失

如果生產(chǎn)者保證消息發(fā)送到MQ,而MQ收到消息后還在內(nèi)存中,這時候宕機了又沒來得及同步給從節(jié)點,就有可能導致消息丟失。

比如RocketMQ:

RocketMQ分為同步刷盤和異步刷盤兩種方式,默認的是異步刷盤,就有可能導致消息還未刷到硬盤上就丟失了,可以通過設(shè)置為同步刷盤的方式來保證消息可靠性,這樣即使MQ掛了,恢復的時候也可以從磁盤中去恢復消息。

比如Kafka也可以通過配置做到:


acks=all 只有參與復制的所有節(jié)點全部收到消息,才返回生產(chǎn)者成功。這樣的話除非所有的節(jié)點都掛了,消息才會丟失。

replication.factor=N,設(shè)置大于1的數(shù),這會要求每個partion至少有2個副本

min.insync.replicas=N,設(shè)置大于1的數(shù),這會要求leader至少感知到一個follower還保持著連接

retries=N,設(shè)置一個非常大的值,讓生產(chǎn)者發(fā)送失敗一直重試

雖然我們可以通過配置的方式來達到MQ本身高可用的目的,但是都對性能有損耗,怎樣配置需要根據(jù)業(yè)務做出權(quán)衡。


所以存儲階段,可以通過配置可靠性優(yōu)先的 Broker 參數(shù)來避免因為宕機丟消息,簡單說就是可靠性優(yōu)先的場景都應該使用同步。

1、消息只要持久化到CommitLog(日志文件)中,即使Broker宕機,未消費的消息也能重新恢復再消費。

2、Broker的刷盤機制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲在pagecache中(內(nèi)存中),但是同步刷盤更可靠,它是Producer發(fā)送消息后等數(shù)據(jù)持久化到磁盤之后再返回響應給Producer。

3、Broker通過主從模式來保證高可用,Broker支持Master和Slave同步復制、Master和Slave異步復制模式,生產(chǎn)者的消息都是發(fā)送給Master,但是消費既可以從Master消費,也可以從Slave消費。同步復制模式可以保證即使Master宕機,消息肯定在Slave中有備份,保證了消息不會丟失。


?圖:同步刷盤和異步刷盤


1.3 消費者丟失

消費者丟失消息的場景1:消費者剛收到消息,此時服務器宕機,MQ認為消費者已經(jīng)消費,不會重復發(fā)送消息,消息丟失。

RocketMQ默認是需要消費者回復ack確認,而kafka需要手動開啟配置關(guān)閉自動offset。

消費方不返回ack確認,重發(fā)的機制根據(jù)MQ類型的不同發(fā)送時間間隔、次數(shù)都不盡相同,如果重試超過次數(shù)之后會進入死信隊列,需要手工來處理了。(Kafka沒有這些)


MQ

消費者丟失消息的場景2:消費者收到消息,但消費業(yè)務邏輯出錯,消費失敗。

解決:利用前面提到的MQ本地表,消費者收到消息且業(yè)務邏輯執(zhí)行完畢后再更新MQ消息的狀態(tài)(更新為已消費)


所以從Consumer角度分析,如何保證消息被成功消費?

Consumer保證消息成功消費的關(guān)鍵在于確認的時機,不要在收到消息后就立即發(fā)送消費確認,而是應該在執(zhí)行完所有消費業(yè)務邏輯之后,再發(fā)送消費確認。

因為消息隊列維護了消費的位置,邏輯執(zhí)行失敗了,沒有確認,再去隊列拉取消息,就還是之前的一條。


2 如何處理消息重復的問題呢?

對分布式消息隊列來說,同時做到確保一定投遞和不重復投遞是很難的,就是所謂的“有且僅有一次” 。RocketMQ擇了確保一定投遞,保證消息不丟失,但有可能造成消息重復。

處理消息重復問題,主要有業(yè)務端自己保證,主要的方式有兩種:業(yè)務冪等消息去重



業(yè)務冪等:第一種是保證消費邏輯的冪等性,也就是多次調(diào)用和一次調(diào)用的效果是一樣的。這樣一來,不管消息消費多少次,對業(yè)務都沒有影響。

消息去重:第二種是業(yè)務端,對重復的消息就不再消費了。這種方法,需要保證每條消息都有一個惟一的編號,通常是業(yè)務相關(guān)的,比如訂單號,消費的記錄需要落庫,而且需要保證和消息確認這一步的原子性。

具體做法是可以建立一個消費記錄表,拿到這個消息做數(shù)據(jù)庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現(xiàn)重復消費的情況,就會導致主鍵沖突,那么就不再處理這條消息。



3?怎么處理消息積壓?

3.1?消息積壓

發(fā)生了消息積壓,這時候就得想辦法趕緊把積壓的消息消費完,就得考慮提高消費能力,一般有兩種辦法:

消費者擴容:如果當前Topic的Message Queue的數(shù)量大于消費者數(shù)量,就可以對消費者進行擴容,增加消費者,來提高消費能力,盡快把積壓的消息消費玩。

消息遷移Queue擴容:如果當前Topic的Message Queue的數(shù)量小于或者等于消費者數(shù)量,這種情況,再擴容消費者就沒什么用,就得考慮擴容Message Queue??梢孕陆ㄒ粋€臨時的Topic,臨時的Topic多設(shè)置一些Message Queue,然后先用一些消費者把消費的數(shù)據(jù)丟到臨時的Topic,因為不用業(yè)務處理,只是轉(zhuǎn)發(fā)一下消息,還是很快的。接下來用擴容的消費者去消費新的Topic里的數(shù)據(jù),消費完了之后,恢復原狀。


3.2 如果消費者一直消費失敗導致消息積壓怎么處理?

我們可以從以下幾個角度來考慮:

1、消費者出錯,肯定是程序或者其他問題導致的,如果容易修復,先把問題修復,讓consumer恢復正常消費。

2、如果時間來不及處理很麻煩,做轉(zhuǎn)發(fā)處理,寫一個臨時的consumer消費方案,先把消息消費,然后再轉(zhuǎn)發(fā)到一個新的topic和MQ資源,這個新的topic的機器資源單獨申請,要能承載住當前積壓的消息。

3、處理完積壓數(shù)據(jù)后,修復consumer,去消費新的MQ和現(xiàn)有的MQ數(shù)據(jù),新MQ消費完成后恢復原狀。


MQ

3.3 那如果消息積壓達到磁盤上限,消息被刪除了怎么辦?

最初,我們發(fā)送的消息記錄是落庫保存了的,而轉(zhuǎn)發(fā)發(fā)送的數(shù)據(jù)也保存了,那么我們就可以通過這部分數(shù)據(jù)來找到丟失的那部分數(shù)據(jù),再單獨跑個腳本重發(fā)就可以了。

如果轉(zhuǎn)發(fā)的程序沒有落庫,那就和消費方的記錄去做對比,只是過程會更艱難一點。


?4?順序消息如何實現(xiàn)?

順序消息是指消息的消費順序和產(chǎn)生順序相同,在有些業(yè)務邏輯下,必須保證順序,比如訂單的生成、付款、發(fā)貨,這個消息必須按順序處理才行。

順序消息分為全局順序消息和部分順序消息,全局順序消息指某個 Topic 下的所有消息都要保證順序;

部分順序消息只要保證每一組消息被順序消費即可,比如訂單消息,只要保證同一個訂單 ID 個消息能按順序消費即可。

部分順序消息

部分順序消息相對比較好實現(xiàn),生產(chǎn)端需要做到把同 ID 的消息發(fā)送到同一個 Message Queue ;在消費過程中,要做到從同一個Message Queue讀取的消息順序處理——消費端不能并發(fā)處理順序消息,這樣才能達到部分有序。

?發(fā)送端使用 MessageQueueSelector 類來控制 把消息發(fā)往哪個 Message Queue 。

?消費端通過使用 MessageListenerOrderly 來解決單 Message Queue 的消息被并發(fā)處理的問題。


全局順序消息

RocketMQ 默認情況下不保證順序,比如創(chuàng)建一個 Topic ,默認八個寫隊列,八個讀隊列,這時候一條消息可能被寫入任意一個隊列里;在數(shù)據(jù)的讀取過程中,可能有多個 Consumer ,每個 Consumer 也可能啟動多個線程并行處理,所以消息被哪個 Consumer 消費,被消費的順序和寫人的順序是否一致是不確定的。

要保證全局順序消息, 需要先把 Topic 的讀寫隊列數(shù)設(shè)置為 一,然后Producer Consumer 的并發(fā)設(shè)置,也要是一。簡單來說,為了保證整個 Topic全局消息有序,只能消除所有的并發(fā)處理,各部分都設(shè)置成單線程處理 ,這時候就完全犧牲RocketMQ的高并發(fā)、高吞吐的特性了。


5 如何實現(xiàn)消息過濾?

有兩種方案:

一種是在 Broker 端按照 Consumer 的去重邏輯進行過濾,這樣做的好處是避免了無用的消息傳輸?shù)?Consumer 端,缺點是加重了 Broker 的負擔,實現(xiàn)起來相對復雜。

另一種是在 Consumer 端過濾,比如按照消息設(shè)置的 tag 去重,這樣的好處是實現(xiàn)起來簡單,缺點是有大量無用的消息到達了 Consumer 端只能丟棄不處理。

一般采用Cosumer端過濾,如果希望提高吞吐量,可以采用Broker過濾。

對消息的過濾有三種方式:

根據(jù)Tag過濾:這是最常見的一種,用起來高效簡單

DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("CID_EXAMPLE");

consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");


SQL 表達式過濾:SQL表達式過濾更加靈活

DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("please_rename_unique_group_name_4");// 只有訂閱的消息有這個屬性a, a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {

? @Override

? publicConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {

? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

? }

});

consumer.start();


Filter Server 方式:最靈活,也是最復雜的一種方式,允許用戶自定義函數(shù)進行過濾

6?RocketMQ怎么實現(xiàn)延時消息的?

電商的訂單超時自動取消,就是一個典型的利用延時消息的例子,用戶提交了一個訂單,就可以發(fā)送一個延時消息,1h后去檢查這個訂單的狀態(tài),如果還是未付款就取消訂單釋放庫存。

RocketMQ是支持延時消息的,只需要在生產(chǎn)消息的時候設(shè)置消息的延時級別:

// 實例化一個生產(chǎn)者來產(chǎn)生延時消息DefaultMQProducer producer =newDefaultMQProducer("ExampleProducerGroup");

? ? ? // 啟動生產(chǎn)者? ? ? producer.start();

? ? ? inttotalMessagesToSend = 100;

? ? ? for(inti = 0; i < totalMessagesToSend; i++) {

? ? ? ? ? Message message =newMessage("TestTopic", ("Hello scheduled message " + i).getBytes());

? ? ? ? ? // 設(shè)置延時等級3,這個消息將在10s之后發(fā)送(現(xiàn)在只支持固定的幾個時間,詳看delayTimeLevel)message.setDelayTimeLevel(3);

? ? ? ? ? // 發(fā)送消息? ? ? ? ? producer.send(message);

? ? ? }


但是目前RocketMQ支持的延時級別是有限的:

privateString messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";


RocketMQ怎么實現(xiàn)延時消息的:臨時存儲+定時任務。

Broker收到延時消息了,會先發(fā)送到主題(SCHEDULE_TOPIC_XXXX)的相應時間段的Message Queue中,然后通過一個定時任務輪詢這些隊列,到期后,把消息投遞到目標Topic的隊列中,然后消費者就可以正常消費這些消息。

7 什么是事務消息、半事務消息?怎么實現(xiàn)的?

事務消息就是MQ提供的類似XA的分布式事務能力,通過事務消息可以達到分布式事務的最終一致性。

半事務消息:是指暫時還不能被 Consumer 消費的消息,Producer 成功發(fā)送到 Broker 端的消息,但是此消息被標記為 “暫不可投遞” 狀態(tài),只有等 Producer 端執(zhí)行完本地事務后經(jīng)過二次確認了之后,Consumer 才能消費此條消息。就是MQ收到了生產(chǎn)者的消息,但是沒有收到二次確認,不能投遞的消息。

實現(xiàn)原理如下:


事務


依賴半消息,可以實現(xiàn)分布式消息事務,其中的關(guān)鍵在于二次確認以及消息回查。

1、Producer 向 broker 發(fā)送半消息

2、Producer 端收到響應,消息發(fā)送成功,此時消息是半消息,標記為 “不可投遞” 狀態(tài),Consumer 消費不了。

3、Producer 端執(zhí)行本地事務。

4、如果本地事務執(zhí)行完成,Producer 向 Broker 發(fā)送 Commit/Rollback,如果是 Commit,Broker 端將半消息標記為正常消息,Consumer 可以消費,如果是 Rollback,Broker 丟棄此消息。

5、如果發(fā)生異常情況,Broker 端遲遲等不到二次確認。在一定時間后,MQ對生產(chǎn)者發(fā)起消息回查,到 Producer 端查詢半消息的執(zhí)行情況。

6、Producer 端查詢本地事務的狀態(tài)

7、根據(jù)事務的狀態(tài)提交 commit/rollback 到 broker 端,再次提交二次確認。(5,6,7 是消息回查)

8、最終,消費者消費到消息,二次確認commit,就可以把消息投遞給消費者,執(zhí)行本地事務。反之如果是rollback,消息會保存下來并且在3天后被刪除


8?什么是死信隊列?

死信隊列用于處理無法被正常消費的消息,即死信消息。

當一條消息初次消費失敗,消息隊列 RocketMQ 會自動進行消息重試;

達到最大重試次數(shù)后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列 RocketMQ 不會立刻將消息丟棄,而是將其發(fā)送到該消費者對應的特殊隊列中,該特殊隊列稱為死信隊列。

死信消息的特點

不會再被消費者正常消費。

有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,需要在死信消息產(chǎn)生后的 3 天內(nèi)及時處理。

死信隊列的特點

一個死信隊列對應一個 Group ID, 而不是對應單個消費者實例。

如果一個 Group ID 未產(chǎn)生死信消息,消息隊列 RocketMQ 不會為其創(chuàng)建相應的死信隊列。

一個死信隊列包含了對應 Group ID 產(chǎn)生的所有死信消息,不論該消息屬于哪個 Topic。

RocketMQ 控制臺提供對死信消息的查詢、導出和重發(fā)的功能。


四:RocketMQ性能優(yōu)化

1.JVM層面

(1)STW

監(jiān)控暫停

?rocketmq-console 這個是官方提供了一個 WEB 項目,可以查看 rocketmq數(shù)據(jù)和執(zhí)行一些操作。但是這個監(jiān)控界面又沒有權(quán)限控制,并且還有一些消 耗性能的查詢操作,如果要提高性能,建議這個可以暫停

消除偏向鎖

?-XX:-UseBiasedLocking: 禁用偏向鎖

(2)垃圾回收

RocketMQ 推薦使用 G1 垃圾回收器

????-Xms8g -Xmx8g -Xmn4g:這個就是很關(guān)鍵的一塊參數(shù)了,也是重點需要調(diào)整的,就是默認的堆大小是 8g 內(nèi)存,新生代是 4g 內(nèi)存。

? ??如果是內(nèi)存比較大,比如有 48g 的內(nèi)存,所以這里完全可以給他們翻幾倍,比如給堆內(nèi)存 20g,其中新生代給 10g,甚至可以更多些,當然要留一些內(nèi)存給操作系統(tǒng)來用

? ?-XX:+UseG1GC -XX:G1HeapRegionSize=16m:這幾個參數(shù)也是至關(guān)重要的,這是選用了G1垃圾回收器來做分代回收,對新生代和老年代都是用G1來回收。這里把G1的region大小設(shè)置為了16m,這個因為機器內(nèi)存比較多,所以region 大小可以調(diào)大一些給到16m,不然用2m的region, 會導致region數(shù)量過多。

? ??-XX:G1ReservePercent=25:這個參數(shù)是說,在 G1 管理的老年代里預留 25%的空閑內(nèi)存,保證新生代對象晉升到老年代的時候有足夠空間,避免老年代內(nèi)存都滿了,新生代有對象要進入老年代沒有充足內(nèi)存了。默認值是 10%,略微偏少,這里 RocketMQ 給調(diào)大了一些。

? ?-XX:initiatingHeapOccupancyPercent= :30:這個參數(shù)是說,當堆內(nèi)存的使用率達到 30%之后就會自動啟動 G1 的并發(fā)垃圾回收,開始嘗試回收一些垃圾對象。默認值是 45%,這里調(diào)低了一些,也就是提高了 GC 的頻率,但是避免了垃圾對象過多,一次垃圾回收耗時過長的問題。

???-XX:-OmitStackTraceInFastThrow:這個參數(shù)是說,有時候 JVM 會拋棄-些異常堆棧信息,因此這個參數(shù)設(shè)置之后,就是禁用這個特性,要把完整的異常堆棧信息打印出來。

? ?-XX:+AIwaysPreTouch:這個參數(shù)的意思是我們剛開始指定 JVM 用多少內(nèi)存,不會真正分配給他,會在實際需要使用的時候再分配給他。所以使用這個參數(shù)之后,就是強制讓 JVM 啟動的時候直接分配我們指定的內(nèi)存,不要等到使用內(nèi)存的時候再分配。

? ?-XX:-UseLargePages:這個參數(shù)的意思是禁用大內(nèi)存頁,某些情況下會導致內(nèi)存浪費或?qū)嵗裏o法啟動。默認啟動。

2.操作系統(tǒng)層面

(1)基本參數(shù)

# vim /etc/sysctl.conf


(2)網(wǎng)絡(luò)接口控制器 NIC - network interface controller

一個請求到 RocketMQ 的應用,一般會經(jīng)過網(wǎng)卡、內(nèi)核空間、用戶空間



(3)Kernel



在操作系統(tǒng)級別,是可以做軟中斷聚合的優(yōu)化。

網(wǎng)卡隊列 CPU 綁定

緩沖區(qū)調(diào)整

隊列大小調(diào)整等



文:?一只阿木木

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容