- 消息存儲(chǔ):因?yàn)榉植际疥?duì)列有高可靠性的要求,所以數(shù)據(jù)要進(jìn)行持久化存儲(chǔ)。

- 消息生產(chǎn)到消費(fèi)的整個(gè)流程:
- 消息生產(chǎn)者發(fā)送消息到MQ;
- MQ收到消息,將消息持久化,在存儲(chǔ)中新增一條記錄;
- 返回ACK給生產(chǎn)者;
- MQ push 消息給對(duì)應(yīng)的消費(fèi)者,然后等待消費(fèi)者返回ACK;
- 若消息消費(fèi)者在指定時(shí)間內(nèi)成功返回ack,則MQ認(rèn)為消息消費(fèi)成功,在存儲(chǔ)中刪除消息,即執(zhí)行第6步;否則認(rèn)為消息消費(fèi)失敗,會(huì)嘗試重新push消息,重復(fù)執(zhí)行第4、5、6步驟。
- 消息持久化的存儲(chǔ)介質(zhì):
-
關(guān)系型數(shù)據(jù)庫DB:Apache下開源的另外一款MQ——ActiveMQ(默認(rèn)采用的KahaDB做消息存儲(chǔ))可選用JDBC的方式來做消息持久化,通過簡單的xml配置信息即可實(shí)現(xiàn)JDBC消息存儲(chǔ)。由于普通關(guān)系型數(shù)據(jù)庫(如:Mysql)在單表數(shù)據(jù)量達(dá)到千萬級(jí)別的情況下,其IO讀寫性能往往會(huì)出現(xiàn)瓶頸。在可靠性方面,該種方案非常依賴DB,若一旦DB出現(xiàn)故障,則MQ的消息無法落盤存儲(chǔ)將導(dǎo)致線上發(fā)生故障。 -
文件系統(tǒng):目前業(yè)界較為常用的幾款MQ產(chǎn)品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤至所部署虛擬機(jī)/物理機(jī)的文件系統(tǒng)來做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。消息刷盤為消息存儲(chǔ)提供了一種高效率、高可靠性和高性能的數(shù)據(jù)持久化方式。除非部署MQ的機(jī)器本身或本地磁盤掛了,否則一般是不會(huì)出現(xiàn)無法持久化的故障問題。 - 性能對(duì)比:文件系統(tǒng)>關(guān)系型數(shù)據(jù)庫DB。
-
- 消息存儲(chǔ):若磁盤使用得當(dāng),則其速度完全可以匹配上網(wǎng)絡(luò)數(shù)據(jù)的傳輸速度。目前高性能磁盤的順序?qū)懰俣瓤梢赃_(dá)到600MB/s, 超過了一般網(wǎng)卡的傳輸速度。但磁盤隨機(jī)寫的速度大概只有100KB/s,和順序?qū)懙男阅芟嗖?000倍!RocketMQ的消息使用
順序?qū)?/code>,保證了消息的存儲(chǔ)速度。 - 消息發(fā)送:Linux 操作系統(tǒng)分為用戶態(tài)和內(nèi)核態(tài),文件操作、網(wǎng)絡(luò)操作需要涉及這兩種形態(tài)的切換,避免不了進(jìn)行數(shù)據(jù)復(fù)制。一臺(tái)服務(wù)器把本機(jī)磁盤文件的內(nèi)容發(fā)送到客戶端,一般分為兩個(gè)步驟:①read:讀取本地文件內(nèi)容;②write:將讀取的內(nèi)容通過網(wǎng)絡(luò)發(fā)送出去。這兩個(gè)看似簡單的操作,實(shí)際上進(jìn)行了4 次數(shù)據(jù)復(fù)制,分別是:
- 從磁盤復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存;
- 從內(nèi)核態(tài)內(nèi)存復(fù)制數(shù)據(jù)到用戶態(tài)內(nèi)存;
- 從用戶態(tài)內(nèi)存復(fù)制數(shù)據(jù)到網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存;
- 從網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存復(fù)制到網(wǎng)卡中進(jìn)行網(wǎng)絡(luò)傳輸。

- 通過使用共享內(nèi)存mmap的方式,省去了向用戶態(tài)的內(nèi)存復(fù)制操作,提高了拷貝速度。這種機(jī)制在Java中是通過
MappedByteBuffer來實(shí)現(xiàn)的。RocketMQ充分利用了上述特性,也就是所謂的零拷貝技術(shù),提高了消息存盤和網(wǎng)絡(luò)發(fā)送的速度。注意:采用MappedByteBuffer這種內(nèi)存映射的方式有幾個(gè)限制,其中之一是一次只能映射1.5~2G的文件至用戶態(tài)的虛擬內(nèi)存,這也是為何RocketMQ默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了。

- 消息的存儲(chǔ)結(jié)構(gòu):由
ConsumeQueue和CommitLog配合完成 的,消息真正的物理存儲(chǔ)文件是CommitLog,ConsumeQueue是消息的邏輯隊(duì)列,類似數(shù)據(jù)庫的索引文件,存儲(chǔ)的是指向物理存儲(chǔ)的地址。每個(gè)Topic下的每個(gè)Message Queue都有一個(gè)對(duì)應(yīng)的ConsumeQueue文件。 -
CommitLog:存儲(chǔ)消息的元數(shù)據(jù)。 -
ConsumerQueue:存儲(chǔ)消息在CommitLog的索引。 -
IndexFile:通過key或時(shí)間區(qū)間來查詢消息,且該過程不影響發(fā)送與消費(fèi)消息的主流程。

- 刷盤機(jī)制:分布式
同步刷盤和異步刷盤。-
同步刷盤:在返回寫成功狀態(tài)時(shí),消息已被寫入磁盤。具體流程是:消息寫入內(nèi)存的PAGECACHE頁緩存后,立刻通知刷盤線程刷盤,待刷盤完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。 -
異步刷盤:在返回寫成功狀態(tài)時(shí),消息可能只是被寫入了內(nèi)存的PAGECACHE頁緩存,寫操作返回快,吞吐量大;當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤動(dòng)作,快速寫入。 - 采用同步刷盤還是異步刷盤?將Broker配置文件里的
flushDiskType參數(shù)設(shè)置為SYNC_FLUSH,ASYNC_FLUSH中的一個(gè)即可。
-

- RocketMQ分布式集群是通過Master和Slave的配合達(dá)到高可用性的。
- Master和Slave的區(qū)別:在Broker的配置文件中,參數(shù) brokerId的值為0表明這個(gè)Broker是Master,大于0表明這個(gè)Broker是 Slave,同時(shí)brokerRole參數(shù)也會(huì)說明這個(gè)Broker是Master還是Slave。
- Master角色的Broker支持讀和寫,Slave角色的Broker僅支持讀,也就是 Producer只能和Master角色的Broker連接寫入消息;Consumer可以連接 Master角色的Broker和Slave角色的Broker來讀取消息。
- 消息發(fā)送高可用:在創(chuàng)建Topic時(shí),把Topic的多個(gè)Message Queue創(chuàng)建在多個(gè)Broker組上(相同Broker名稱,不同 brokerId的機(jī)器組成一個(gè)Broker組),這樣當(dāng)一個(gè)Broker組的Master不可用后,其它組的Master仍然可用,Producer仍然可以發(fā)送消息。RocketMQ目前還不支持把Slave自動(dòng)轉(zhuǎn)成Master,若機(jī)器資源不足,且需要把Slave轉(zhuǎn)成Master,則要手動(dòng)停止Slave角色的Broker,更改配置文件,用新的配置文件啟動(dòng)Broker。
- 消息消費(fèi)高可用:在Consumer的配置文件中,并不需要設(shè)置是從Master讀還是從Slave 讀,當(dāng)Master不可用或繁忙時(shí),Consumer會(huì)被自動(dòng)切換到從Slave 讀。有了自動(dòng)切換Consumer這種機(jī)制,當(dāng)一個(gè)Master角色的機(jī)器出現(xiàn)故障后,Consumer仍然可以從Slave讀取消息,不影響Consumer程序。這就達(dá)到了消費(fèi)端的高可用性。
