rocket mq 底層存儲(chǔ)源碼分析(1)-存儲(chǔ)總概

前言:
從本節(jié)開始,我們將分七個(gè)章節(jié)從底層源碼來(lái)分析Rocket mq 是如何實(shí)現(xiàn)自己的消息持久化的:

第(1)章節(jié)主要總從體上分析rmq消息的存儲(chǔ)格式和文件組織以及實(shí)現(xiàn)上的技術(shù)選用。
接下來(lái)的(2)、(3)、(4)、(5)、(6)、(7)章分別從源碼上深入分析消息的具體內(nèi)容、高可用、索引、存儲(chǔ)文件刪除、啟動(dòng)存儲(chǔ)狀態(tài)恢復(fù)以及業(yè)務(wù)消息查詢是如何實(shí)現(xiàn)的。

以下用4小節(jié)分析:(1)存儲(chǔ)總概。



1、消息【內(nèi)容】

說(shuō)明:
consumer客戶端消費(fèi)的就是這些【內(nèi)容】。消息會(huì)按照broker接收的順序串行寫入存儲(chǔ)文件,在寫入文件之前,先判斷當(dāng)前文件的剩余空間是否能夠?qū)懭朐摋l完整的消息,如果不足,就創(chuàng)建下一個(gè)文件,把這條消息寫入新創(chuàng)建的存儲(chǔ)文件。

文件組織:
存儲(chǔ)地址:user_root_path/store/commitlog/{fileName}
文件大?。?G = 1073741824 byte
文件起名:20位數(shù)字填充位

  • user_root_path/store/commitlog/
    -00000000000000000000
    -00000000001073741824
    -00000000002147483648

這里說(shuō)明一下文件如何起名:
我們知每個(gè)存儲(chǔ)道文件的大小為1G,文件的起名方式為,文件所在的物理存儲(chǔ)位置映射的起始地址。
第n個(gè)文件的起名方式為:
(n-1) * 1073741824 ,然后左填充0,直至20位

消息體的存儲(chǔ)格式及內(nèi)容

存儲(chǔ)序列 字段名稱 長(zhǎng)度 注析
1 TOTALSIZE 4字節(jié) 消息總長(zhǎng)度
2 MAGICCODE 4字節(jié) 消息魔數(shù)
3 BODYCRC 4字節(jié) 消息完整校驗(yàn)碼
4 QUEUEID 4字節(jié) 所在隊(duì)列
5 FLAG 4字節(jié) 一般標(biāo)識(shí)
6 QUEUEOFFSET 8字節(jié) 所在隊(duì)列邏輯位移
7 PHYSICALOFFSET 8字節(jié) 物理位移
8 SYSFLAG 4字節(jié) 系統(tǒng)標(biāo)識(shí)
9 BORNTIMESTAMP 8字節(jié) producer端的消息生成時(shí)間戳
10 BORNHOST 8字節(jié) producer端的通訊address
11 STORETIMESTAMP 8字節(jié) broker端存儲(chǔ)時(shí)間戳
12 STOREHOSTADDRESS 8字節(jié) broker端地址
13 RECONSUMETIMES 4字節(jié) 重消費(fèi)次數(shù)
14 Prepared Transaction Offset 8字節(jié) 預(yù)提交位移
15 BODY LENGTH 4字節(jié) 消息體長(zhǎng)度
16 BODY value(BODY LENGTH) 字節(jié) 具體消息體內(nèi)容
17 TOPIC LENGTH 4字節(jié) topic 長(zhǎng)度
18 TOPIC value(TOPIC LENGTH) 字節(jié) 具體topic內(nèi)容
19 PROPERTIES LENGTH 2字節(jié) 屬性長(zhǎng)度
20 PROPERTIES value(PROPERTIES LENGTH) 字節(jié) 屬性內(nèi)容


解析:
3-BODYCRC: CRC是一種數(shù)據(jù)錯(cuò)誤檢查技術(shù),它可以確保最初寫入鏡像文件的數(shù)據(jù)與從鏡像文件中使用>的數(shù)據(jù)保持一致,以確保在將該文件還原到磁盤時(shí)能夠檢測(cè)到它是否已經(jīng)損壞。

4-QUEUEID:rockmq中 topic-brokerName-queueId可以確定集群中的一條具體的隊(duì)列。

5-FLAG:由producer消息生產(chǎn)方指定的一個(gè)int類型的標(biāo)志,默認(rèn)為0。

6-QUEUEOFFSET:消息所在隊(duì)列中的邏輯位移,即第幾條消息。

7-PHYSICALOFFSET:消息存儲(chǔ)的物理位移,即存儲(chǔ)的開始位置。

8-SYSFLAG:系統(tǒng)標(biāo)識(shí)位;例如消息是否壓縮,消息的類型(普通類型消息或是事務(wù)類型)。

20-PROPERTIES:producer端指定的消息屬性,以字符1和2作為分割符,如下key-a1value-a2key-b1valueb-2拼接字符串,其中性客戶端生成的消息id是必有屬,PROPERTIES用于構(gòu)建消息的【key查詢索引】。



2、消息的【邏輯位移索引】存儲(chǔ)格式

說(shuō)明:
類比于List的index,conusmer端可以通過(guò)該index,查詢出具體的消息【內(nèi)容】。
例如我們consumer指定消費(fèi)哪個(gè)broker下的topic,哪條隊(duì)列(queueId)的第n條消息,broker端會(huì)根據(jù)topic-queueId 以及邏輯索引n,先查詢出【邏輯位移索引】的具體內(nèi)容,然后通過(guò)【邏輯位移索引】存儲(chǔ)對(duì)應(yīng)的消息【內(nèi)容】的物理位移,最后再找出具體的消息內(nèi)容。

文件組織:
存儲(chǔ)地址:user_root_path/store/consumequeue/{topic}/{queueId}/{fileName}
文件大小:300000 * 20 byte
文件起名:20位數(shù)字填充位

第n個(gè)文件的起名方式為:
(n-1) * 300000 * 20 ,然后左填充0,直至20位

  • user_root_path/store/consumequeue/
    • topic1
      • 0
        -00000000000000000000
        -00000000000006000000
        -00000000000012000000
      • 1
        -00000000000000000000
        -00000000000006000000
      • 2
        -00000000000000000000

消息的【邏輯位移索引】存儲(chǔ)格式及內(nèi)容

存儲(chǔ)序列 字段名稱 長(zhǎng)度 注析
1 offset 8字節(jié) 消息存儲(chǔ)的物理位移
2 size 4字節(jié) 消息總長(zhǎng)度
2 tagsCode 8字節(jié) producer端指定消息的tags屬性的hashcode


3、消息的【key查詢索引】存儲(chǔ)格式

說(shuō)明:
在producer生產(chǎn)消息時(shí),我們可以為消息指定一個(gè)業(yè)務(wù)上的String 字符串類型的key。那么,當(dāng)該消息落地成功后,我們可以通過(guò)指定的key,去查詢指定的消息,broker端會(huì)返回通過(guò)key的散列值,落在相同的槽的一定數(shù)量的消息給客戶端。

文件組織:
存儲(chǔ)地址:user_root_path/store/index/{fileName}
文件大?。?0 + (5,000,000[hashSlotNum] * 4[hashSlotSize]) + ((4 * 5,000,000)[indexNum] * 20[indexSize]) byte
文件起名:yyyyMMddhhmmss(年月日時(shí)分秒) + mmm(毫秒)

  • user_root_path/store/index/
    • yyyyMMddhhmmssmmm

消息的【key查詢索引】存儲(chǔ)格式及內(nèi)容

查詢索引文件是由【索引頭】 + 【索引內(nèi)容】?jī)刹糠纸M成:

【索引頭】

存儲(chǔ)序列 字段名稱 長(zhǎng)度 注析
1 beginTimestampIndex 8字節(jié) 表示索引文件 第一個(gè)構(gòu)建的 消息 的存儲(chǔ)時(shí)間(即消息放入緩存時(shí)的時(shí)間戳)
2 endTimestampIndex 8字節(jié) 表示索引文件 最后一個(gè)構(gòu)建的 消息 的存儲(chǔ)時(shí)間
3 beginPhyoffsetIndex 8字節(jié) 表該索引文件 第一個(gè)構(gòu)建的 消息 的物理存儲(chǔ)地址
4 beginPhyoffsetIndex 8字節(jié) 表該索引文件 最后一個(gè)構(gòu)建的 消息 的物理存儲(chǔ)地址
5 hashSlotCount 4字節(jié) 散列槽個(gè)數(shù),每構(gòu)建一個(gè)消息的索引,個(gè)數(shù)加一
6 indexCount 4字節(jié) 索引個(gè)數(shù),每構(gòu)建一個(gè)消息的索引,個(gè)數(shù)加一

【索引內(nèi)容】

存儲(chǔ)序列 字段名稱 長(zhǎng)度 注析
1 keyHash 4字節(jié) 查詢消息的 key.hashCode()的絕對(duì)值
2 phyOffset 8字節(jié) 消息的物理存儲(chǔ)地址
3 timeDiff 4字節(jié) 與第一個(gè)消息的存儲(chǔ)時(shí)間差
4 slotValue 8字節(jié) 上一條索引消息所在的邏輯索引


4、rmq消息持久化技術(shù)

4.1系統(tǒng)標(biāo)準(zhǔn)IO與直接內(nèi)存
4-1(傳統(tǒng)I/O的read/write過(guò)程)

傳統(tǒng)I/O操作中,java io文件的read和write操作都是通過(guò)調(diào)用系統(tǒng)底層的標(biāo)準(zhǔn)IO函數(shù) read()和write() 實(shí)現(xiàn)的。對(duì)于寫操作,即通過(guò)java調(diào)用write方法以后,首先將當(dāng)前的用戶態(tài)(即java進(jìn)程)轉(zhuǎn)成內(nèi)核態(tài),并把java 內(nèi)存(heap)的字節(jié)數(shù)據(jù)拷貝到內(nèi)核態(tài)的io緩存區(qū)(pagecache),最后在把內(nèi)核態(tài)的緩存區(qū)數(shù)據(jù)拷貝到磁盤文件(pagecache->disk),這樣就完成了一次寫操作;對(duì)于讀操作,也是先將用戶態(tài)轉(zhuǎn)成內(nèi)核態(tài),然后由操作系統(tǒng)的內(nèi)核代碼將磁盤文件拷貝到內(nèi)核態(tài)的緩沖區(qū)中,最后再?gòu)膬?nèi)核態(tài)拷貝到用戶態(tài)的緩存中,這樣就完成了一次讀操作。當(dāng)然,根據(jù)空間局部性原理,也就是當(dāng)我們的應(yīng)用程序需要訪問(wèn)磁盤文件上的指定的一段數(shù)據(jù)時(shí),操作系統(tǒng)會(huì)認(rèn)為我們可能也需要訪問(wèn)該段數(shù)據(jù)的下一段數(shù)據(jù),并且,由于磁盤io的操作速度比內(nèi)存操作的速度慢幾個(gè)數(shù)量級(jí),因此,會(huì)同時(shí)把指定的一段數(shù)據(jù)以及下一段數(shù)據(jù)一并從磁盤文件拷貝到內(nèi)核態(tài)的緩沖區(qū)中,即預(yù)讀更多的數(shù)據(jù)。所以,當(dāng)我們讀完指定的一段數(shù)據(jù)后,再讀下一段數(shù)據(jù)時(shí),就可以直接從內(nèi)核態(tài)拷貝到用戶態(tài)的緩存中,減少了低效率的io操作。

4-2(內(nèi)存映射的read/write過(guò)程)

內(nèi)存映射和標(biāo)準(zhǔn)的io操作一樣需要從磁盤文件中獲取數(shù)據(jù),但它并不需要從內(nèi)核態(tài)的緩沖區(qū)拷貝到用戶態(tài)的緩沖區(qū),而是直接將進(jìn)程中一部分私有地址空間區(qū)域與磁盤文件建立起映射關(guān)系,通過(guò)缺頁(yè),把磁盤文件的內(nèi)容直接拷貝到進(jìn)程的緩沖區(qū)。相對(duì)標(biāo)準(zhǔn)io操作,這里有兩個(gè)最主要的優(yōu)勢(shì),第一,性能上,由于直接內(nèi)存映射少一次內(nèi)存拷貝,因此會(huì)比標(biāo)準(zhǔn)io操作要快,文件越大,優(yōu)勢(shì)越明顯;第二,直接內(nèi)存映射可以加載普通方式無(wú)法訪問(wèn)的大文件,這里的大文件指,例如1G的文件,如果是標(biāo)準(zhǔn)io操作,我們需要將1G的文件直接加載進(jìn)內(nèi)存,才可以訪問(wèn),明顯這是不可能的,但通過(guò)虛擬內(nèi)存的映射方式以及操作系統(tǒng)層面上發(fā)起的頁(yè)面請(qǐng)求,便可將所需數(shù)據(jù)加載到程序內(nèi)存中。

下面快速總結(jié)一下Java內(nèi)存映射文件和IO

1)Java語(yǔ)言通過(guò)java.nio包支持內(nèi)存映射文件和IO。

2)內(nèi)存映射文件用于對(duì)性能要求高的系統(tǒng)中,如繁忙的電子交易系統(tǒng)

3)使用內(nèi)存映射IO你可以將文件的一部分加載到內(nèi)存中

4)如果被請(qǐng)求的緩存頁(yè)面不在內(nèi)存中,內(nèi)存映射文件會(huì)導(dǎo)致頁(yè)面錯(cuò)誤

5)將一個(gè)文件區(qū)間映射到內(nèi)存中的能力取決于內(nèi)存的可尋址范圍。在32位機(jī)器中,不能超過(guò)4GB,即2^32比特。

6)Java中的內(nèi)存映射文件比流IO要快(譯注:對(duì)于大文件而言是對(duì)的,小文件則未必)

7)用于加載文件的內(nèi)存在Java的堆內(nèi)存之外,存在于共享內(nèi)存中,允許兩個(gè)不同進(jìn)程訪問(wèn)文件。順便說(shuō)一下,這依賴于你用的是direct還是non-direct字節(jié)緩存。

8)讀寫內(nèi)存映射文件是操作系統(tǒng)來(lái)負(fù)責(zé)的,因此,即使你的Java程序在寫入內(nèi)存后就掛掉了,只要操作系統(tǒng)工作正常,數(shù)據(jù)就會(huì)寫入磁盤。

9)Direct字節(jié)緩存比non-direct字節(jié)緩存性能要好

10)不要經(jīng)常調(diào)用MappedByteBuffer.force()方法,這個(gè)方法強(qiáng)制操作系統(tǒng)將內(nèi)存中的內(nèi)容寫入硬盤,所以如果你在每次寫內(nèi)存映射文件后都調(diào)用force()方法,你就不能真正從內(nèi)存映射文件中獲益,而是跟disk IO差不多。

11)如果電源故障或者主機(jī)癱瘓,有可能內(nèi)存映射文件還沒(méi)有寫入磁盤,意味著可能會(huì)丟失一些關(guān)鍵數(shù)據(jù)。

12)MappedByteBuffer和文件映射在緩存被GC之前都是有效的。sun.misc.Cleaner可能是清除內(nèi)存映射文件的唯一選擇。

這里在說(shuō)一下MappedByteBuffer,可能會(huì)導(dǎo)致JVM crash ,因?yàn)镸appedByteBuffer可以通過(guò)特殊的方法釋放,實(shí)際上調(diào)用了unmap的方法。此時(shí),之前映射到j(luò)vm的地址空間就是非法地址,如果此后仍然對(duì)MappedByteBuffer進(jìn)行讀寫,系統(tǒng)就會(huì)向jvm發(fā)送sigbus信號(hào)來(lái)通知進(jìn)程非法操作,這個(gè)問(wèn)題一般是由于程序沒(méi)有處理好并發(fā)問(wèn)題導(dǎo)致的。

因此rmq通過(guò)引用計(jì)數(shù)法,即只要引用計(jì)數(shù)不為0,MappedByteBuffer對(duì)象就不會(huì)釋放來(lái)解決這個(gè)問(wèn)題。具體的抽象實(shí)現(xiàn)為ReferenceResource,使用AtomicLong原子變量來(lái)保證并發(fā),性能上會(huì)比較好。

當(dāng)然,這種方式也會(huì)存在弊端,就是程序不能正確操作引用計(jì)數(shù),可能會(huì)導(dǎo)致文件無(wú)法刪除,因此,rmq增加了一個(gè)補(bǔ)救措施,就是一旦文件被關(guān)閉了狀態(tài)位available會(huì)設(shè)置為false,并且開始計(jì)時(shí),如果超過(guò)2分鐘,引用計(jì)數(shù)還沒(méi)有變?yōu)?,就強(qiáng)行釋放。上文提到,MappedByteBuffer會(huì)在jvm發(fā)生gc時(shí),可能被回收,但不是一定,rmq通過(guò)反射的方式調(diào)用Cleaner.clean,手動(dòng)清除。

DirectByteBuffer本身是一個(gè)java heap內(nèi)的對(duì)象,自身所占用的內(nèi)存并不會(huì)很大,只是其實(shí)例所映射的堆外內(nèi)存可能會(huì)比較大,當(dāng)jvm發(fā)起young gc時(shí),如果DirectByteBuffer實(shí)例是非可達(dá)性對(duì)象,那么,jvm就會(huì)將DirectByteBuffer實(shí)例回收,在回收前,會(huì)通過(guò)Cleaner.clean方法,委托Deallocator釋放堆外內(nèi)存;但DirectByteBuffer經(jīng)過(guò)多次ygc后,會(huì)晉升到老年代,此時(shí),如果不通過(guò)full gc 或old gc,就無(wú)法釋放堆外內(nèi)存;
因此我們可以通過(guò)程序手動(dòng)釋放。

Cleaner是 PhantomReference虛引用的子類,并通過(guò)自身的next和prev字段維護(hù)的一個(gè)雙向鏈表。PhantomReference的作用在于跟蹤垃圾回收過(guò)程,并不會(huì)對(duì)對(duì)象的垃圾回收過(guò)程造成任何的影響。
所以cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); 用于對(duì)當(dāng)前構(gòu)造的DirectByteBuffer對(duì)象的垃圾回收過(guò)程進(jìn)行跟蹤。
當(dāng)DirectByteBuffer對(duì)象從pending狀態(tài) ——> enqueue狀態(tài)時(shí),會(huì)觸發(fā)Cleaner的clean(),而Cleaner的clean()的方法會(huì)實(shí)現(xiàn)通過(guò)unsafe對(duì)堆外內(nèi)存的釋放。

4.2消息在java堆,物理內(nèi)存,虛擬內(nèi)存以及磁盤間的流動(dòng)
4-3

(1)對(duì)于producer消息生產(chǎn)來(lái)說(shuō),消息通過(guò)socket轉(zhuǎn)入java heap,然后通過(guò)直接內(nèi)存映射寫到pagecache(內(nèi)核態(tài)共享緩存,內(nèi)存的一種),最后在通過(guò)刷盤線程異步寫到flush到磁盤文件。如果broker端設(shè)置為sync,則同步等待刷盤結(jié)果。

(2)消費(fèi)者(consumer1)正常拉取消息,消息直接從pagecache轉(zhuǎn)入socket,不經(jīng)過(guò)java heap,這張場(chǎng)景最多,例如向上96G物理內(nèi)存,按照1K消息算,可在物理內(nèi)存緩存1億條消息

(3)消費(fèi)者(consumer2)異常拉取消息,由于socket所訪問(wèn)的消息不在pagecache中,因此需要通過(guò)虛擬內(nèi)存,發(fā)生缺頁(yè)中斷,產(chǎn)生磁盤IO,從磁盤把消息加載到pagecache,最后在直接從socket發(fā)出去。


參考:
1:http://javarevisited.blogspot.hk/2012/01/memorymapped-file-and-io-in-java.html
2:RocketMQ STORE Q&A
3:http://www.itdecent.cn/p/007052ee3773

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容