RocketMQ底層原理之存儲(chǔ)設(shè)計(jì)

塑造你生活的不是你偶爾做的一兩件事,而是你一貫堅(jiān)持做的事。

????????????????????????????????????????????????????????????????????????????——安東尼.羅賓

大綱

圖示

分布式事務(wù)演進(jìn)及RocketMQ方案

圖示

????業(yè)務(wù)場(chǎng)景:用戶(hù) A 轉(zhuǎn)賬 100 元給用戶(hù) B,這個(gè)業(yè)務(wù)比較簡(jiǎn)單,具體的步驟:

????1、用戶(hù) A 的賬戶(hù)先扣除 100 元。

????2、再把用戶(hù) B 的賬戶(hù)加 100 元。

????如果在同一個(gè)數(shù)據(jù)庫(kù)中進(jìn)行,事務(wù)可以保證這兩步操作,要么同時(shí)成功,要么同時(shí)不成功。這樣就保證了轉(zhuǎn)賬的數(shù)據(jù)一致性。

????但是在微服務(wù)架構(gòu)中,因?yàn)楦鱾€(gè)服務(wù)都是獨(dú)立的模塊,都是遠(yuǎn)程調(diào)用,都沒(méi)法在同一個(gè)事務(wù)中,都會(huì)遇到事務(wù)問(wèn)題。

????因?yàn)楦鱾€(gè)服務(wù)都是獨(dú)立的模塊,都是遠(yuǎn)程調(diào)用,都沒(méi)法在同一個(gè)事務(wù)中,都會(huì)遇到事務(wù)問(wèn)題。

圖示

????消息中間件的方式,把扣款業(yè)務(wù)和加錢(qián)業(yè)務(wù)異步化,扣款成功后,發(fā)送“扣款成功消息”到消息中間件;加錢(qián)業(yè)務(wù)訂閱“扣款成功消息”,再對(duì)用 戶(hù) B 加錢(qián)(系統(tǒng)怎么知道給用戶(hù) B 加錢(qián)呢?是消息體里面包含了源賬戶(hù)和目標(biāo)賬戶(hù) ID,以及錢(qián)數(shù))

????場(chǎng)景一:先扣款后向 MQ 發(fā)消息

? ? 先扣款再發(fā)送消息,萬(wàn)一發(fā)送消息失敗了,那用戶(hù) B 就沒(méi)法加錢(qián)

????場(chǎng)景二:先向 MQ 發(fā)像消息,后扣款

? ? 扣款成功消息發(fā)送成功,但用戶(hù) A 扣款失敗,可加錢(qián)業(yè)務(wù)訂閱到了消息,用戶(hù) B 加了錢(qián)

????問(wèn)題所在,也就是沒(méi)法保證扣款和發(fā)送消息,同時(shí)成功,或同時(shí)失?。粚?dǎo)致數(shù)據(jù)不一致。

? ??RocketMq 消息中間件把消息分為兩個(gè)階段:半事務(wù)階段和確認(rèn)階段。

? ??半事務(wù)階段:

? ??該階段主要發(fā)一個(gè)消息到 rocketmq,但該消息只儲(chǔ)存在 commitlog 中,但 consumeQueue 中不可見(jiàn),也就是消費(fèi)端(訂閱端)無(wú)法看到此消息。

????commit/rollback 階段(確認(rèn)階段):

????該階段主要是把 prepared 消息保存到 consumeQueue 中,即讓消費(fèi)端可以看到此消息,也就是可以消費(fèi)此消息。如果是 rollback 就不保存。

圖示

? ??整個(gè)流程:

????1、A 在扣款之前,先發(fā)送半事務(wù)消息。

????2、發(fā)送預(yù)備消息成功后,執(zhí)行本地扣款事務(wù)。

????3、扣款成功后,再發(fā)送確認(rèn)消息。

????4、B 消息端(加錢(qián)業(yè)務(wù))可以看到確認(rèn)消息,消費(fèi)此消息,進(jìn)行加錢(qián)。

? ??注意:上面的確認(rèn)消息可以為 commit 消息,可以被訂閱者消費(fèi);也可以是 Rollback消息,即執(zhí)行本地扣款事務(wù)失敗后,提交 rollback 消息,即刪除那個(gè)預(yù)備消息,訂閱者無(wú)法消費(fèi)。

????異常 1:如果發(fā)送半事務(wù)消息失敗,下面的流程不會(huì)走下去;這個(gè)是正常的。

????異常 2:如果發(fā)送半事務(wù)消息成功,但執(zhí)行本地事務(wù)失??;這個(gè)也沒(méi)有問(wèn)題,因?yàn)榇祟A(yù)備消息不會(huì)被消費(fèi)端訂閱到,消費(fèi)端不會(huì)執(zhí)行業(yè)務(wù)。

????異常 3:如果發(fā)送半事務(wù)消息成功,執(zhí)行本地事務(wù)成功,但發(fā)送確認(rèn)消息失敗;這個(gè)就有問(wèn)題了,因?yàn)橛脩?hù) A 扣款成功了,但加錢(qián)業(yè)務(wù)沒(méi)有訂閱到確認(rèn)消息,無(wú)法加錢(qián)。這里出現(xiàn)了數(shù)據(jù)不一致。

? ??RocketMQ如何解決上面的問(wèn)題,核心思路就是【事務(wù)回查】,也就是 RocketMQ會(huì)定時(shí)遍歷 commitlog 中的半事務(wù)消息。

????異常 3,發(fā)送半事務(wù)消息成功,本地扣款事務(wù)成功,但發(fā)送確認(rèn)消息失??;因?yàn)?RocketMQ會(huì)進(jìn)行回查半事務(wù)消息,在回查后發(fā)現(xiàn)業(yè)務(wù)已經(jīng)扣款成功了,就補(bǔ)發(fā)“發(fā)送 commit 確認(rèn)消息”;這樣加錢(qián)業(yè)務(wù)就可以訂閱此消息了。

????這個(gè)思路其實(shí)把異常 2 也解決了,如果本地事務(wù)沒(méi)有執(zhí)行成功,RocketMQ回查業(yè)務(wù),發(fā)現(xiàn)沒(méi)有執(zhí)行成功,就會(huì)發(fā)送 Rollback確認(rèn)消息,把消息進(jìn)行刪除。

? ??同時(shí)還要注意的點(diǎn)是,RocketMQ不能保障消息的重復(fù),所以在消費(fèi)端一定要做冪等性處理。

????除此之外,如果消費(fèi)端發(fā)生消費(fèi)失敗,同時(shí)也需要做重試,如果重試多次,消息會(huì)進(jìn)入死信隊(duì)列,這個(gè)時(shí)候也需要進(jìn)行特殊的處理。(一般就是把 A已經(jīng)處理完的業(yè)務(wù)進(jìn)行回退)

圖示

????如果本地事務(wù)執(zhí)行了很多張表,那是不是我們要把那些表都要進(jìn)行判斷是否執(zhí)行成功呢?這樣是不是太麻煩了,而且和業(yè)務(wù)很耦合。

????好的方案是設(shè)計(jì)一張Transaction表,將業(yè)務(wù)表和Transaction綁定在同一個(gè)本地事務(wù)中,如果扣款本地事務(wù)成功時(shí),Transaction中應(yīng)當(dāng)已經(jīng)記錄該TransactionId的狀態(tài)為「已完成」。當(dāng)RocketMq事務(wù)回查時(shí),只需要檢查對(duì)應(yīng)的 TransactionId的狀態(tài)是否是「已完成」就好,而不用關(guān)心具體的業(yè)務(wù)數(shù)據(jù)。

????如果是銀行業(yè)務(wù),對(duì)數(shù)據(jù)要求性極高,一般 A 與 B 需要進(jìn)行手動(dòng)對(duì)賬,手動(dòng)補(bǔ)償。

RocketMQ的存儲(chǔ)設(shè)計(jì)

1.Domain Model

? ? 領(lǐng)域模型(Domain Model)是對(duì)領(lǐng)域內(nèi)的概念類(lèi)或現(xiàn)實(shí)世界中對(duì)象的可視化標(biāo)識(shí)。又稱(chēng)概念模型、領(lǐng)域?qū)ο竽P?、分析?duì)象模型。它專(zhuān)注于分析問(wèn)題領(lǐng)域本身,發(fā)掘重要的業(yè)務(wù)領(lǐng)域概念,并建立業(yè)務(wù)領(lǐng)域概念之間的關(guān)系。

圖示

(1)Message

????Message是RocketMQ消息引擎中的主體。messageId是全局唯一的。MessageKey是業(yè)務(wù)系統(tǒng)(生產(chǎn)者)生成的,所以如果要結(jié)合業(yè)務(wù),可以使用MessageKey作為業(yè)務(wù)系統(tǒng)的唯一索引。

代碼示例
SendResult源碼圖示

????另外Message中的equals方法和hashCode主要是為了完成消息只處理一次(Exactly-Once)。

????Exactly-Once 是指發(fā)送到消息系統(tǒng)的消息只能被消費(fèi)端處理且僅處理一次,即使生產(chǎn)端重試消息發(fā)送導(dǎo)致某消息重復(fù)投遞,該消息在消費(fèi)端也只被消費(fèi)一次。

(2)Topic

? ? Tags是在同一Topic中對(duì)消息進(jìn)行分類(lèi)。

? ? subTopics==Message Queue,其實(shí)在內(nèi)存邏輯中,subTopics是對(duì)Topics的一個(gè)拓展,尤其是在MQTT這種協(xié)議下,在Topic底下會(huì)有很多subTopics。

(3)Queue

? ? Queue是消息物理管理單位,比如在RocketMQ的控制臺(tái)中,就可以看到每一個(gè)queue中的情況(比如消息的堆積情況、消息的TPS、QPS)。

(4)Offset

? ? 對(duì)于每一個(gè)Queue來(lái)說(shuō)都有Offset,這個(gè)是消費(fèi)位點(diǎn)。

(5)Group

? ? 業(yè)務(wù)場(chǎng)景中,如果有一堆發(fā)送者,一堆消費(fèi)者,所以這里使用Group的概念進(jìn)行管理。

2.對(duì)應(yīng)關(guān)系

? ? Message與Topic是多對(duì)一的關(guān)系,一個(gè)Topic可以有多個(gè)Message。

? ? Topic到Queue是一對(duì)多的關(guān)系,這個(gè)也是方便橫向拓展,也就是消費(fèi)的時(shí)候,這里可以有很多很多的Queue。

? ? 一個(gè)Queue只有一個(gè)消費(fèi)位點(diǎn)(Offset),所以Topic和Offset也是一對(duì)多的關(guān)系。

? ? Topic和Group也是多對(duì)多的關(guān)系。

消費(fèi)并發(fā)度、熱點(diǎn)問(wèn)題

1.消費(fèi)并發(fā)度

????從上面模型可以看出,要解決消費(fèi)并發(fā),就是要利用Queue,一個(gè)Topic可以分出更多的queue,每一個(gè)queue可以存放在不同的硬件上來(lái)提高并發(fā)。

2.熱點(diǎn)問(wèn)題(順序、重復(fù))

? ??前面講過(guò)要確保消息的順序,生產(chǎn)者、隊(duì)列、消費(fèi)者最好都是一對(duì)一的關(guān)系。但是這樣設(shè)計(jì),并發(fā)度就會(huì)成為消息系統(tǒng)的瓶頸(并發(fā)度不夠)RocketMQ 不解決這個(gè)矛盾的問(wèn)題。理由如下:

????1、 亂序的應(yīng)用實(shí)際大量存在。

????2、 隊(duì)列無(wú)序并不意味著消息無(wú)序。

????另外還有消息重復(fù),造成消息重復(fù)的根本原因是:網(wǎng)絡(luò)不可達(dá)(網(wǎng)絡(luò)波動(dòng))。所以如果消費(fèi)者收到兩條一樣的消息,應(yīng)該是怎么處理?

????RocketMQ 不保證消息不重復(fù),如果你的業(yè)務(wù)要嚴(yán)格確保消息不重復(fù),需要在自己的業(yè)務(wù)端進(jìn)行去重。

????1、 消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性。

????2、 確保每一條消息都有唯一的編號(hào)且保證消息處理成功與去重表的日志同時(shí)出現(xiàn)。

消息存儲(chǔ)結(jié)構(gòu)及運(yùn)轉(zhuǎn)機(jī)制

1.消息存儲(chǔ)結(jié)構(gòu)

????RocketMQ 因?yàn)橛懈呖煽啃缘囊?宕機(jī)不丟失數(shù)據(jù)),所以數(shù)據(jù)要進(jìn)行持久化存儲(chǔ)。所以 RocketMQ 采用文件進(jìn)行存儲(chǔ)。

(1)存儲(chǔ)文件

圖示

>commitLog:消息存儲(chǔ)目錄

>config:運(yùn)行期間一些配置信息

>consumerqueue:消息消費(fèi)隊(duì)列存儲(chǔ)目錄

>index:消息索引文件存儲(chǔ)目錄

>abort:如果存在該文件則Broker非正常關(guān)閉

>checkpoint:文件檢查點(diǎn),存儲(chǔ)CommitLog文件最后一次刷盤(pán)時(shí)間戳、consumerqueue最后一次刷盤(pán)時(shí)間,index索引文件最后一次刷盤(pán)時(shí)間戳。

(2)消息存儲(chǔ)結(jié)構(gòu)

圖示

????RocketMQ消息的存儲(chǔ)是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存儲(chǔ)文件是 CommitLog,ConsumeQueue是消息的邏輯隊(duì)列,類(lèi)似數(shù)據(jù)庫(kù)的索引文件,存儲(chǔ)的是指向物理存儲(chǔ)的地址。每個(gè)Topic下的每個(gè)Message Queue都有一個(gè)對(duì)應(yīng)的ConsumeQueue文件。

? ? >CommitLog:存儲(chǔ)消息的元數(shù)據(jù)。

? ? >ConsumerQueue:存儲(chǔ)消息在CommitLog的索引。

? ? >IndexFile:為了消息查詢(xún)提供了一種通過(guò)key或時(shí)間區(qū)間來(lái)查詢(xún)消息的方法,這種通過(guò)IndexFile來(lái)查找消息的方法不影響發(fā)送與消費(fèi)消息的主流程。

圖示

CommitLog

? ??CommitLog以物理文件的方式存放,每臺(tái)Broker 上的CommitLog被本機(jī)器所有ConsumeQueue共享,文件地址:$ {user.home}\store\${ commitlog}\${ fileName}。在CommitLog中,一個(gè)消息的存儲(chǔ)長(zhǎng)度是不固定的, RocketMQ采取一些機(jī)制,盡量向CommitLog中順序?qū)?,但是隨機(jī)讀。commitlog文件默認(rèn)大小為1G ,可通過(guò)在 broker置文件中設(shè)置 mappedFileSizeCommitLog屬性來(lái)改變默認(rèn)大小。

圖示

????Commitlog文件存儲(chǔ)的邏輯視圖如下,每條消息的前面4個(gè)字節(jié)存儲(chǔ)該條消息的總長(zhǎng)度。但是一個(gè)消息的存儲(chǔ)長(zhǎng)度是不固定的。

圖示

? ??每個(gè)CommitLog文件的大小為 1G,一般情況下第一個(gè) CommitLog 的起始偏移量為 0,第二個(gè) CommitLog 的起始偏移量為

1073741824(1G =1073741824byte)。

圖示

????每臺(tái) Rocket 只會(huì)往一個(gè) commitlog 文件中寫(xiě),寫(xiě)完一個(gè)接著寫(xiě)下一個(gè)。

? ??indexFile和ComsumerQueue中都有消息對(duì)應(yīng)的物理偏移量,通過(guò)物理偏移量就可以計(jì)算出該消息位于哪個(gè)CommitLog文件上。

ConsumeQueue

? ??ConsumeQueue是消息的邏輯隊(duì)列,類(lèi)似數(shù)據(jù)庫(kù)的索引文件,存儲(chǔ)的是指向物理存儲(chǔ)的地址。每個(gè)Topic下的每個(gè)Message Queue都有一個(gè)對(duì)應(yīng)的ConsumeQueue文件, 文件地址在${$storeRoot}\consumequeue\${topicName}\${queueld}\$ {fileName}。

圖示
圖示

? ??ConsumeQueue中存儲(chǔ)的是消息條目,為了加速ConsumeQueue消息條目的檢索速度與節(jié)省磁盤(pán)空間,每一個(gè) Consumequeue條目不會(huì)存儲(chǔ)消息的全量信息,消息條目如下:

圖示

? ??ConsumeQueue即為Commitlog文件的索引文件, 其構(gòu)建機(jī)制是 當(dāng)消息到達(dá)Commitlog文件后由專(zhuān)門(mén)的線(xiàn)程產(chǎn)生消息轉(zhuǎn)發(fā)任務(wù),從而構(gòu)建消息消費(fèi)隊(duì)列文件(ConsumeQueue )與下文提到的索引文件。

????存儲(chǔ)機(jī)制這樣設(shè)計(jì)有以下幾個(gè)好處:

????1) CommitLog順序?qū)?,可以大大提高寫(xiě)入效率。

????(實(shí)際上,磁盤(pán)有時(shí)候會(huì)比你想象的快很多,有時(shí)候也比你想象的慢很多,關(guān)鍵在如何使用,使用得當(dāng),磁盤(pán)的速度完全可以匹配上網(wǎng)絡(luò)的數(shù)據(jù)傳輸速度。目前的高性能磁盤(pán),順序?qū)懰俣瓤梢赃_(dá)到 600MB/s ,超過(guò)了一般網(wǎng)卡的傳輸速度,這是磁盤(pán)比想象的快的地方 但是磁盤(pán)隨機(jī)寫(xiě)的速度只有大概100KB/s,和順序?qū)懙男阅芟嗖?000倍?。?/p>

????2)雖然是隨機(jī)讀,但是利用操作系統(tǒng)的 pagecache 機(jī)制,可以批量地從磁盤(pán)讀取,作為 cache 存到內(nèi)存中,加速后續(xù)的讀取速度。

????3)為了保證完全的順序?qū)?,需要ConsumeQueue這個(gè)中間結(jié)構(gòu) ,因?yàn)镃onsumeQueue里只存偏移量信息,所以尺寸是有限的,在實(shí)際情況中,大部分的ConsumeQueue能夠被全部讀入內(nèi)存,所以這個(gè)中間結(jié)構(gòu)的操作速度很快,可以認(rèn)為是內(nèi)存讀取的速度。此外為了保證CommitLog和ConsumeQueue的一致性, CommitLog 里存儲(chǔ)了ConsumeQueues 、Message Key、 Tag 等所有信息,即使ConsumeQueue丟失,也可以通過(guò) commitLog 完全恢復(fù)出來(lái)。

IndexFile

? ??RocketMQ 還支持通過(guò) MessageID 或者 MessageKey 來(lái)查詢(xún)消息;使用ID查詢(xún)時(shí),因?yàn)镮D 就是用 broker+offset 生成的(這里 msgId 指的是服務(wù)端的),所以很容易就找到對(duì)應(yīng)的commitLog文件來(lái)讀取消息。但是對(duì)于用MessageKey來(lái)查詢(xún)消息,RocketMQ 則通過(guò)構(gòu)建一個(gè) index 來(lái)提高讀取速度。

????index 存的是索引文件,這個(gè)文件用來(lái)加快消息查詢(xún)的速度。消息消費(fèi)隊(duì)列RocketMQ專(zhuān)門(mén)為消息訂閱構(gòu)建的索引文件 ,提高根據(jù)主題與消息檢索消息的速度 ,使用Hash索引機(jī)制,具體是Hash槽與Hash沖突的鏈表結(jié)構(gòu)。

圖示

Config

????config 文件夾中存儲(chǔ)著 Topic 和 Consumer 等相關(guān)信息。主題和消費(fèi)者群組相關(guān)的信息就存在在此。

????topics.json : topic配置屬性。

????subscriptionGroup.json :消息消費(fèi)組配置信息。

????delayOffset.json :延時(shí)消息隊(duì)列拉取進(jìn)度。

????consumerOffset.json :集群消費(fèi)模式消息消進(jìn)度。

????consumerFilter.json :主題消息過(guò)濾信息。

圖示

其他

????abort :如果存在 abort 文件說(shuō)明 Broker 非正常閉,該文件默認(rèn)啟動(dòng)時(shí)創(chuàng)建,正常退出之前刪除

????checkpoint :文件檢測(cè)點(diǎn),存儲(chǔ)commitlog文件最后一次刷盤(pán)時(shí)間戳、 consumequeue最后一次刷盤(pán)時(shí)間、 index 索引文件最后一次刷盤(pán)時(shí)間戳。

2.過(guò)期文件刪除

? ??由于 RocketMQ 操作 CommitLog,ConsumeQueue 文件是基于內(nèi)存映射機(jī)制并在啟動(dòng)的時(shí)候會(huì)加載 commitlog,ConsumeQueue 目錄下的所有文件,為 了避免內(nèi)存與磁盤(pán)的浪費(fèi),不可能將消息永久存儲(chǔ)在消息服務(wù)器上,所以需要引入一種機(jī)制來(lái)刪除己過(guò)期的文件。

????刪除過(guò)程分別執(zhí)行清理消息存儲(chǔ)文件( Commitlog )與消息消費(fèi) 隊(duì)列文件( ConsumeQueue 文件), 消息消費(fèi)隊(duì)列文件與消息存儲(chǔ)文件( Commitlog)共用一套過(guò)期文件機(jī)制。

????RocketMQ清除過(guò)期文件的方法是 :如果非當(dāng)前寫(xiě)文件在一定時(shí)間間隔內(nèi)沒(méi)有再次被更新,則認(rèn)為是過(guò)期文件,可以被刪除,RocketMQ 不會(huì)關(guān)注這個(gè)文件上的消息是否全部被消費(fèi)。默認(rèn)每個(gè)文件的過(guò)期時(shí)間為 42 小時(shí)(不同版本的默認(rèn)值不同,這里以 4.4.0 為例) ,通過(guò)在Broker 配置文件中設(shè)置 fileReservedTime 來(lái)改變過(guò)期時(shí)間,單位為小時(shí)。

????觸發(fā)文件清除操作的是一個(gè)定時(shí)任務(wù),而且只有定時(shí)任務(wù),文件過(guò)期刪除定時(shí)任務(wù)的周期由該刪除決定,默認(rèn)每 10s 執(zhí)行一次。

(1)過(guò)期判斷

? ??文件刪除主要是由這個(gè)配置屬性:fileReservedTime:文件保留時(shí)間。也就是從最后一次更新時(shí)間到現(xiàn)在,如果超過(guò)了該時(shí)間,則認(rèn)為是過(guò)期文件, 可以刪除。

另外還有其他兩個(gè)配置參數(shù):

????deletePhysicFilesInterval:刪除物理文件的時(shí)間間隔(默認(rèn)是 100MS),在一次定時(shí)任務(wù)觸發(fā)時(shí),可能會(huì)有多個(gè)物理文件超過(guò)過(guò)期時(shí)間可被刪除,因此刪除一個(gè)文件后需要間隔 deletePhysicFilesInterval 這個(gè)時(shí)間再刪除另外一個(gè)文件,由于刪除文件是一個(gè)非常耗費(fèi) IO 的操作,會(huì)引起消息插入消費(fèi)的延遲(相比于正常情況下),所以不建議直接刪除所有過(guò)期文件。

????destroyMapedFileIntervalForcibly:在刪除文件時(shí),如果該文件還被線(xiàn)程引用,此時(shí)會(huì)阻止此次刪除操作,同時(shí)將該文件標(biāo)記不可用并且紀(jì)錄當(dāng)前時(shí)間戳 destroyMapedFileIntervalForcibly 這個(gè)表示文件在第一次刪除拒絕后,文件保存的最大時(shí)間,在此時(shí)間內(nèi)一直會(huì)被拒絕刪除,當(dāng)超過(guò)這個(gè)時(shí)間時(shí),會(huì)將引用每次減少 1000,直到引用 小于等于 0 為止,即可刪除該文件。

(2)刪除條件

? ??1)指定刪除文件的時(shí)間點(diǎn), RocketMQ 通過(guò) deleteWhen 設(shè)置一天的固定時(shí)間執(zhí)行一次。刪除過(guò)期文件操作, 默認(rèn)為凌晨 4 點(diǎn)。

????2)磁盤(pán)空間是否充足,如果磁盤(pán)空間不充足(DiskSpaceCleanForciblyRatio。磁盤(pán)空間強(qiáng)制刪除文件水位。默認(rèn)是 85),會(huì)觸發(fā)過(guò)期文件刪除操作。

另外還有 RocketMQ 的磁盤(pán)配置參數(shù):

????1:物理使用率大于 diskSpaceWarningLevelRatio(默認(rèn) 90%可通過(guò)參數(shù)設(shè)置),則會(huì)阻止新消息的插入。

????2:物理磁盤(pán)使用率小于 diskMaxUsedSpaceRatio(默認(rèn) 75%) 表示磁盤(pán)使用正常。

3.零拷貝與 MMAP

(1)什么是零拷貝?

? ??零拷貝(英語(yǔ): Zero-copy) 技術(shù)是指計(jì)算機(jī)執(zhí)行操作時(shí),CPU 不需要先將數(shù)據(jù)從某處內(nèi)存復(fù)制到另一個(gè)特定區(qū)域。這種技術(shù)通常用于通過(guò)網(wǎng)絡(luò)傳輸文件時(shí)節(jié)省 CPU 周期和內(nèi)存帶寬。

?????零拷貝技術(shù)可以減少數(shù)據(jù)拷貝和共享總線(xiàn)操作的次數(shù),消除傳輸數(shù)據(jù)在存儲(chǔ)器之間不必要的中間拷貝次數(shù),從而有效地提高數(shù)據(jù)傳輸效率

?????零拷貝技術(shù)減少了用戶(hù)進(jìn)程地址空間和內(nèi)核地址空間之間因?yàn)樯?下文切換而帶來(lái)的開(kāi)銷(xiāo)

????可以看出沒(méi)有說(shuō)不需要拷貝,只是說(shuō)減少冗余[不必要]的拷貝。

????下面這些組件、框架中均使用了零拷貝技術(shù):Kafka、Netty、Rocketmq、Nginx、Apache。

(2)傳統(tǒng)數(shù)據(jù)傳送機(jī)制

????比如:讀取文件,再用 socket 發(fā)送出去,實(shí)際經(jīng)過(guò)四次 copy。

????偽碼實(shí)現(xiàn)如下:

????buffer = File.read();

????Socket.send(buffer);

????1、第一次:將磁盤(pán)文件,讀取到操作系統(tǒng)內(nèi)核緩沖區(qū);

????2、第二次:將內(nèi)核緩沖區(qū)的數(shù)據(jù),copy 到應(yīng)用程序的 buffer;

????3、第三步:將 application 應(yīng)用程序 buffer 中的數(shù)據(jù),copy 到 socket 網(wǎng)絡(luò)發(fā)送緩沖區(qū)(屬于操作系統(tǒng)內(nèi)核的緩沖區(qū));

????4、第四次:將 socket buffer 的數(shù)據(jù),copy 到網(wǎng)卡,由網(wǎng)卡進(jìn)行網(wǎng)絡(luò)傳輸。

圖示

? ??分析上述的過(guò)程,雖然引入 DMA 來(lái)接管 CPU 的中斷請(qǐng)求,但四次 copy 是存在“不必要的拷貝”的。實(shí)際上并不需要第二個(gè)和第三個(gè)數(shù)據(jù)副本。應(yīng) 用程序除了緩存數(shù)據(jù)并將其傳輸回套接字緩沖區(qū)之外什么都不做。相反,數(shù)據(jù)可以直接從讀緩沖區(qū)傳輸?shù)教捉幼志彌_區(qū)。

????顯然,第二次和第三次數(shù)據(jù) copy 其實(shí)在這種場(chǎng)景下沒(méi)有什么幫助反而帶來(lái)開(kāi)銷(xiāo)(DMA 拷貝速度一般比 CPU 拷貝速度快一個(gè)數(shù)量級(jí)),這也正是零拷貝出現(xiàn)的背景和意義。

????打個(gè)比喻:200M 的數(shù)據(jù),讀取文件,再用 socket 發(fā)送出去,實(shí)際經(jīng)過(guò)四次 copy(2 次 cpu 拷貝每次 100ms ,2 次 DMS 拷貝每次 10ms)。

????傳統(tǒng)網(wǎng)絡(luò)傳輸?shù)脑?huà):合計(jì)耗時(shí)將有 220ms。

? ??同時(shí),read 和 send 都屬于系統(tǒng)調(diào)用,每次調(diào)用都牽涉到兩次上下文切換:

圖示

? ??總結(jié)下,傳統(tǒng)的數(shù)據(jù)傳送所消耗的成本:4 次拷貝,4 次上下文切換。

????4 次拷貝,其中兩次是 DMA copy,兩次是 CPU copy。

(3)mmap 內(nèi)存映射

? ??硬盤(pán)上文件的位置和應(yīng)用程序緩沖區(qū)(application buffers)進(jìn)行映射(建立一種一一對(duì)應(yīng)關(guān)系),由于 mmap()將文件直接映射到用戶(hù)空間,所以實(shí)際文 件讀取時(shí)根據(jù)這個(gè)映射關(guān)系,直接將文件從硬盤(pán)拷貝到用戶(hù)空間,只進(jìn)行了一次數(shù)據(jù)拷貝,不再有文件內(nèi)容從硬盤(pán)拷貝到內(nèi)核空間的一個(gè)緩沖區(qū)。

????mmap 內(nèi)存映射將會(huì)經(jīng)歷:3 次拷貝: 1 次 cpu copy,2 次 DMA copy;

????打個(gè)比喻:200M 的數(shù)據(jù),讀取文件,再用 socket 發(fā)送出去,如果是使用 MMAP 實(shí)際經(jīng)過(guò)三次 copy(1 次 cpu 拷貝每次 100ms ,2 次 DMS 拷貝每次10ms)合計(jì)只需要 120ms。

? ??從數(shù)據(jù)拷貝的角度上來(lái)看,就比傳統(tǒng)的網(wǎng)絡(luò)傳輸,性能提升了近一倍。

????以及 4 次上下文切換

圖示

? ??mmap()是在 <sys/mman.h> 中定義的一個(gè)函數(shù),此函數(shù)的作用是創(chuàng)建一個(gè)新的 虛擬內(nèi)存 區(qū)域,并將指定的對(duì)象映射到此區(qū)域。mmap其實(shí)就是通過(guò)內(nèi)存映射 的機(jī)制來(lái)進(jìn)行文件操作。

????Windows 操作系統(tǒng)上也有虛擬機(jī)內(nèi)存,如下圖:

圖示

(4)代碼

代碼示例

4.RocketMQ 中 MMAP 運(yùn)用

? ??如果按照傳統(tǒng)的方式進(jìn)行數(shù)據(jù)傳送,那肯定性能上不去,作為 MQ 也是這樣,尤其是 RocketMQ,要滿(mǎn)足一個(gè)高并發(fā)的消息中間件,一定要進(jìn)行優(yōu)化。 所以 RocketMQ 使用的是 MMAP。

????RocketMQ 一個(gè)映射文件大概是,commitlog 文件默認(rèn)大小為 1G。

????這里需要注意的是,采用 MappedByteBuffer 這種內(nèi)存映射的方式有幾個(gè)限制,其中之一是一次只能映射 1.5~2G 的文件至用戶(hù)態(tài)的虛擬內(nèi)存,這也是為何 RocketMQ 默認(rèn)設(shè)置單個(gè) CommitLog 日志數(shù)據(jù)文件為 1G 的原因了。

(1)MMAP 文件對(duì)應(yīng)

圖示
圖示
圖示

(2)RocketMQ 源碼中的 MMAP 運(yùn)用

? ??RocketMQ 源碼中,使用 MappedFile 這個(gè)類(lèi)進(jìn)行 MMAP 的映射。

源碼圖示
源碼圖示

RocketMQ 存儲(chǔ)整體設(shè)計(jì)總結(jié)

1.消息生產(chǎn)與消息消費(fèi)相互分離

? ??Producer 端發(fā)送消息最終寫(xiě)入的是 CommitLog(消息存儲(chǔ)的日志數(shù)據(jù)文件),Consumer 端先從 ConsumeQueue(消息邏輯隊(duì)列)讀取持久化消息的 起始物理位置偏移量 offset、大小 size 和消息Tag的HashCode值,隨后再?gòu)腃ommitLog中進(jìn)行讀取待拉取消費(fèi)消息的真正實(shí)體內(nèi)容部分;

2.RocketMQ 的 CommitLog 文件采用混合型存儲(chǔ)

? ??所有的Topic下的消息隊(duì)列共用同一個(gè)CommitLog的日志數(shù)據(jù)文件,并通過(guò)建立類(lèi)似索引文件—ConsumeQueue 的方式來(lái)區(qū)分不同Topic下面的不同MessageQueue的消息,同時(shí)為消費(fèi)消息起到一定的緩沖作用(異步服務(wù)線(xiàn)生成了 ConsumeQueue 隊(duì)列的信息后,Consumer端才能進(jìn)行消費(fèi))。這樣,只要消息寫(xiě)入并刷盤(pán)至CommitLog文件后,消息就不會(huì)丟失,即使 ConsumeQueue中的數(shù)據(jù)丟失,也可以通過(guò)CommitLog來(lái)恢復(fù)。

3.RocketMQ 每次讀寫(xiě)文件的時(shí)候真的是完全順序讀寫(xiě)嗎?

? ??發(fā)送消息時(shí),生產(chǎn)者端的消息確實(shí)是順序?qū)懭?CommitLog;訂閱消息時(shí),消費(fèi)者端也是順序讀取 ConsumeQueue,然而根據(jù)其中的起始物理位置偏移 量 offset 讀取消息真實(shí)內(nèi)容卻是隨機(jī)讀取 CommitLog。 所以在 RocketMQ 集群整體的吞吐量、并發(fā)量非常高的情況下,隨機(jī)讀取文件帶來(lái)的性能開(kāi)銷(xiāo)影響還是比較大的,至于RocketMQ怎么優(yōu)化的,源碼解讀部分講解。


我是嬈疆_蚩夢(mèng),讓堅(jiān)持成為一種習(xí)慣,感謝各位大佬的:點(diǎn)贊收藏評(píng)論,我們下期見(jiàn)!


上一篇:RocketMQ基礎(chǔ)篇(下)

下一篇:RocketMQ底層原理之高可用機(jī)制

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容