消息存儲(chǔ)的結(jié)構(gòu)
1、RocketMQ消息的存儲(chǔ)結(jié)構(gòu)

2、存儲(chǔ)特點(diǎn)
如上圖所示:
- 1、消息主體以及元數(shù)據(jù)都存儲(chǔ)在CommitLog文件當(dāng)中,完全順序?qū)?,隨機(jī)讀
- 2、Consume Queue相當(dāng)于kafka中的partition,是一個(gè)邏輯隊(duì)列,存儲(chǔ)了這個(gè)Queue在CommiLog中的起始o(jì)ffset,log大小和MessageTag的hashCode。
- 3、每次讀取消息隊(duì)列先讀取consumerQueue,然后再通過consumerQueue去commitLog中拿到消息主體。
3、為什么要這樣設(shè)計(jì)?
rocketMQ的設(shè)計(jì)理念很大程度借鑒了kafka,所以有必要介紹下kafka的存儲(chǔ)結(jié)構(gòu)設(shè)計(jì):

存儲(chǔ)特點(diǎn):
和RocketMQ類似,每個(gè)Topic有多個(gè)partition(queue),kafka的每個(gè)partition都是一個(gè)獨(dú)立的物理文件,消息直接從里面讀寫。
RocketMQ這樣做的優(yōu)點(diǎn):
- 對(duì)最終用戶展現(xiàn)的隊(duì)列實(shí)際只儲(chǔ)存消息在Commit Log 的位置信息,并且串行方式刷盤
- 隊(duì)列輕量化,單個(gè)隊(duì)列數(shù)據(jù)量非常少
- 對(duì)磁盤的訪問串行化,避免磁盤競(jìng)爭(zhēng),不會(huì)因?yàn)殛?duì)列增加導(dǎo)致IOWait增高
每個(gè)方案都有優(yōu)缺點(diǎn),他的缺點(diǎn)是:
- 寫雖然是順序?qū)懀亲x卻變成了隨機(jī)讀
- 讀一條消息,會(huì)先讀Consume Queue,再讀Commit Log,增加了開銷
- 要保證Commit Log 與 Consume Queue完全的一致,增加了編程的復(fù)雜度
以上缺點(diǎn)如何克服:
- 隨機(jī)讀,盡可能讓讀命中pagecache,減少IO操作,所以內(nèi)存越大越好。如果系統(tǒng)中堆積的消息過多,讀數(shù)據(jù)要訪問硬盤會(huì)不會(huì)由于隨機(jī)讀導(dǎo)致系統(tǒng)性能急劇下降,答案是否定的。
- 訪問pagecache時(shí),即使只訪問1K的消息,系統(tǒng)也會(huì)提前預(yù)讀出更多的數(shù)據(jù),在下次讀時(shí)就可能命中pagecache
- 隨機(jī)訪問Commit Log 磁盤數(shù)據(jù),系統(tǒng)IO調(diào)度算法設(shè)置為NOOP方式,會(huì)在一定程度上將完全的隨機(jī)讀變成順序跳躍方式,而順序跳躍方式讀較完全的隨機(jī)讀性能高5倍
- 由于Consume Queue存儲(chǔ)數(shù)量極少,而且順序讀,在pagecache的與讀取情況下,Consume Queue的讀性能與內(nèi)存幾乎一直,即使堆積情況下。所以可以認(rèn)為Consume Queue完全不會(huì)阻礙讀性能
- Commit Log中存儲(chǔ)了所有的元信息,包含消息體,類似于MySQl、Oracle的redolog,所以只要有Commit Log存在, Consume Queue即使丟失數(shù)據(jù),仍可以恢復(fù)出來(lái)
詳細(xì)的消息存儲(chǔ):RocketMQ源碼學(xué)習(xí)--消息存儲(chǔ)篇
同步刷盤和異步刷盤
RocketMQ消息存儲(chǔ):內(nèi)存+磁盤存儲(chǔ),兩種刷盤方式
RocketMQ和Redis等其他存儲(chǔ)系統(tǒng)類似,提供了同步和異步兩種刷盤方式,同步刷盤方式能夠保證數(shù)據(jù)被寫入硬盤,做到真正的持久化,但是也會(huì)讓系統(tǒng)的寫入速度受制于磁盤的IO速度;而異步刷盤方式在將數(shù)據(jù)寫入緩沖之后就返回,提供了系統(tǒng)的IO速度,卻存在系統(tǒng)發(fā)生故障時(shí)未來(lái)得及寫入硬盤的數(shù)據(jù)丟失的風(fēng)險(xiǎn)。
同步刷盤、異步刷盤
RocketMQ的消息是存儲(chǔ)到磁盤上的,這樣既能保證斷電后恢復(fù),又可以讓存儲(chǔ)的消息量超出內(nèi)存的限制。
RocketMQ為了提高性能,會(huì)盡可能地保證磁盤的順序?qū)?。消息在通過Producer寫入RocketMQ的時(shí)候,有兩種
- 異步刷盤方式:在返回寫成功狀態(tài)時(shí),消息可能只是被寫入了內(nèi)存的PAGECACHE,寫操作的返回快,吞吐量大;當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤操作,快速寫入
- 同步刷盤方式:在返回寫成功狀態(tài)時(shí),消息已經(jīng)被寫入磁盤。具體流程是,消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。
同步刷盤還是異步刷盤,是通過Broker配置文件里的flushDiskType參數(shù)設(shè)置的,這個(gè)參數(shù)被設(shè)置成SYNC_FLUSH、ASYNC_FLUSH中的一個(gè)
詳細(xì)的刷盤解讀:rocketmq刷盤過程
同步復(fù)制和異步復(fù)制
前提:
同一組broker中有Master和Slave,消息需要從Master復(fù)制到Slave上,那么有同步和異步兩種復(fù)制方式。
同步復(fù)制:是等Master和Slave均寫成功后才反饋給客戶端寫成功狀態(tài)
異步復(fù)制:是只要Master寫成功即可反饋給客戶端寫成功狀態(tài)
兩種復(fù)制方式對(duì)比:
- 異步復(fù)制方式下,系統(tǒng)擁有較低的延遲和較高的吞吐量,但是如果Master出了故障,有些數(shù)據(jù)因?yàn)闆]有被寫入Slave,有可能會(huì)丟失;
- 同步復(fù)制方式下,如果Master出故障,Slave上有全部的備份數(shù)據(jù),容易恢復(fù),但是同步復(fù)制會(huì)增大數(shù)據(jù)寫入延遲,降低系統(tǒng)吞吐量。
配置方式:
同步復(fù)制和異步復(fù)制是通過Broker配置文件里的brokerRole參數(shù)進(jìn)行設(shè)置的,這個(gè)參數(shù)可以被設(shè)置成ASYNC_MASTER、SYNC_MASTER、SLAVE三個(gè)值中的一個(gè)。
實(shí)際應(yīng)用中要結(jié)合業(yè)務(wù)場(chǎng)景,合理設(shè)置刷盤方式和主從復(fù)制方式,尤其是SYNC_FLUSH方式,由于頻繁的觸發(fā)寫磁盤動(dòng)作,會(huì)明顯降低性能。
通常情況下,應(yīng)該把Master和Slave設(shè)置成ASYNC_FLUSH的刷盤方式,主從之間配置成SYNC_MASTER的復(fù)制方式,這樣即使有一臺(tái)機(jī)器出故障,仍然可以保證數(shù)據(jù)不丟。
高可用機(jī)制
- 當(dāng)Master節(jié)點(diǎn)繁忙,可自動(dòng)切換到Slave節(jié)點(diǎn)讀取信息
- 當(dāng)Master節(jié)點(diǎn)down機(jī)或不可用時(shí),rocketmq基于raft 協(xié)議支持主從切換,引入了多副本機(jī)制,即DLedger,支持主從切換,即當(dāng)一個(gè)復(fù)制組內(nèi)的主節(jié)點(diǎn)宕機(jī)后,會(huì)在該復(fù)制組內(nèi)觸發(fā)重新選主,選主完成后即可繼續(xù)提供消息寫功能。
RocketMQ高可用機(jī)制詳細(xì)解讀:Apache RocketMQ Producer解析文章中的2、RocketMQ主從同步機(jī)制解析
NameServer協(xié)調(diào)者
Namesrv功能介紹
Namesrv的功能,就相當(dāng)于RPC或微服務(wù)中的注冊(cè)中心。對(duì)于MQ而言,broker啟動(dòng),將自身創(chuàng)建的topic等信息注冊(cè)到Namesrv上。consumer和producer需要配置namesrv的地址,啟動(dòng)后,首先和namesrv建立長(zhǎng)連接,并獲取相應(yīng)的topic信息(比如,哪些broker有topic路由信息),然后再和broker建立長(zhǎng)連接。Namesrv本身無(wú)狀態(tài),可集群橫向擴(kuò)展部署。所有的注冊(cè)信息,都保存在namesrv的類似map內(nèi)存數(shù)據(jù)結(jié)構(gòu)中。
Namesrv啟動(dòng)流程:

Namesrv的數(shù)據(jù)都保存在RouteInfoManager類中:
public class RouteInfoManager {
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
參考:
https://www.cnblogs.com/toUpdating/p/10021372.html
https://www.iteye.com/blog/technoboy-2368379
https://www.cnblogs.com/shoshana-kong/p/10914353.html
rocketmq 同步刷盤、異步刷盤和同步復(fù)制、異步復(fù)制