前言:
從本節(jié)開始,我們將分七個(gè)章節(jié)從底層源碼來(lái)分析Rocket mq 是如何實(shí)現(xiàn)自己的消息持久化的:
第(1)章節(jié)主要總從體上分析rmq消息的存儲(chǔ)格式和文件組織以及實(shí)現(xiàn)上的技術(shù)選用。
接下來(lái)的(2)、(3)、(4)、(5)、(6)、(7)章分別從源碼上深入分析消息的具體內(nèi)容、高可用、索引、存儲(chǔ)文件刪除、啟動(dòng)存儲(chǔ)狀態(tài)恢復(fù)以及業(yè)務(wù)消息查詢是如何實(shí)現(xiàn)的。
以下用4小節(jié)分析:(1)存儲(chǔ)總概。
1、消息【內(nèi)容】
說(shuō)明:
consumer客戶端消費(fèi)的就是這些【內(nèi)容】。消息會(huì)按照broker接收的順序串行寫入存儲(chǔ)文件,在寫入文件之前,先判斷當(dāng)前文件的剩余空間是否能夠?qū)懭朐摋l完整的消息,如果不足,就創(chuàng)建下一個(gè)文件,把這條消息寫入新創(chuàng)建的存儲(chǔ)文件。
文件組織:
存儲(chǔ)地址:user_root_path/store/commitlog/{fileName}
文件大?。?G = 1073741824 byte
文件起名:20位數(shù)字填充位
-
user_root_path/store/commitlog/
-00000000000000000000
-00000000001073741824
-00000000002147483648
這里說(shuō)明一下文件如何起名:
我們知每個(gè)存儲(chǔ)道文件的大小為1G,文件的起名方式為,文件所在的物理存儲(chǔ)位置映射的起始地址。
第n個(gè)文件的起名方式為:
(n-1) * 1073741824 ,然后左填充0,直至20位
消息體的存儲(chǔ)格式及內(nèi)容
| 存儲(chǔ)序列 | 字段名稱 | 長(zhǎng)度 | 注析 |
|---|---|---|---|
| 1 | TOTALSIZE | 4字節(jié) | 消息總長(zhǎng)度 |
| 2 | MAGICCODE | 4字節(jié) | 消息魔數(shù) |
| 3 | BODYCRC | 4字節(jié) | 消息完整校驗(yàn)碼 |
| 4 | QUEUEID | 4字節(jié) | 所在隊(duì)列 |
| 5 | FLAG | 4字節(jié) | 一般標(biāo)識(shí) |
| 6 | QUEUEOFFSET | 8字節(jié) | 所在隊(duì)列邏輯位移 |
| 7 | PHYSICALOFFSET | 8字節(jié) | 物理位移 |
| 8 | SYSFLAG | 4字節(jié) | 系統(tǒng)標(biāo)識(shí) |
| 9 | BORNTIMESTAMP | 8字節(jié) | producer端的消息生成時(shí)間戳 |
| 10 | BORNHOST | 8字節(jié) | producer端的通訊address |
| 11 | STORETIMESTAMP | 8字節(jié) | broker端存儲(chǔ)時(shí)間戳 |
| 12 | STOREHOSTADDRESS | 8字節(jié) | broker端地址 |
| 13 | RECONSUMETIMES | 4字節(jié) | 重消費(fèi)次數(shù) |
| 14 | Prepared Transaction Offset | 8字節(jié) | 預(yù)提交位移 |
| 15 | BODY LENGTH | 4字節(jié) | 消息體長(zhǎng)度 |
| 16 | BODY | value(BODY LENGTH) 字節(jié) | 具體消息體內(nèi)容 |
| 17 | TOPIC LENGTH | 4字節(jié) | topic 長(zhǎng)度 |
| 18 | TOPIC | value(TOPIC LENGTH) 字節(jié) | 具體topic內(nèi)容 |
| 19 | PROPERTIES LENGTH | 2字節(jié) | 屬性長(zhǎng)度 |
| 20 | PROPERTIES | value(PROPERTIES LENGTH) 字節(jié) | 屬性內(nèi)容 |
解析:
3-BODYCRC: CRC是一種數(shù)據(jù)錯(cuò)誤檢查技術(shù),它可以確保最初寫入鏡像文件的數(shù)據(jù)與從鏡像文件中使用>的數(shù)據(jù)保持一致,以確保在將該文件還原到磁盤時(shí)能夠檢測(cè)到它是否已經(jīng)損壞。4-QUEUEID:rockmq中 topic-brokerName-queueId可以確定集群中的一條具體的隊(duì)列。
5-FLAG:由producer消息生產(chǎn)方指定的一個(gè)int類型的標(biāo)志,默認(rèn)為0。
6-QUEUEOFFSET:消息所在隊(duì)列中的邏輯位移,即第幾條消息。
7-PHYSICALOFFSET:消息存儲(chǔ)的物理位移,即存儲(chǔ)的開始位置。
8-SYSFLAG:系統(tǒng)標(biāo)識(shí)位;例如消息是否壓縮,消息的類型(普通類型消息或是事務(wù)類型)。
20-PROPERTIES:producer端指定的消息屬性,以字符1和2作為分割符,如下key-a1value-a2key-b1valueb-2拼接字符串,其中性客戶端生成的消息id是必有屬,PROPERTIES用于構(gòu)建消息的【key查詢索引】。
2、消息的【邏輯位移索引】存儲(chǔ)格式
說(shuō)明:
類比于List的index,conusmer端可以通過(guò)該index,查詢出具體的消息【內(nèi)容】。
例如我們consumer指定消費(fèi)哪個(gè)broker下的topic,哪條隊(duì)列(queueId)的第n條消息,broker端會(huì)根據(jù)topic-queueId 以及邏輯索引n,先查詢出【邏輯位移索引】的具體內(nèi)容,然后通過(guò)【邏輯位移索引】存儲(chǔ)對(duì)應(yīng)的消息【內(nèi)容】的物理位移,最后再找出具體的消息內(nèi)容。
文件組織:
存儲(chǔ)地址:user_root_path/store/consumequeue/{topic}/{queueId}/{fileName}
文件大小:300000 * 20 byte
文件起名:20位數(shù)字填充位
第n個(gè)文件的起名方式為:
(n-1) * 300000 * 20 ,然后左填充0,直至20位
-
user_root_path/store/consumequeue/
- topic1
- 0
-00000000000000000000
-00000000000006000000
-00000000000012000000 - 1
-00000000000000000000
-00000000000006000000 - 2
-00000000000000000000
- 0
- topic1
消息的【邏輯位移索引】存儲(chǔ)格式及內(nèi)容
| 存儲(chǔ)序列 | 字段名稱 | 長(zhǎng)度 | 注析 |
|---|---|---|---|
| 1 | offset | 8字節(jié) | 消息存儲(chǔ)的物理位移 |
| 2 | size | 4字節(jié) | 消息總長(zhǎng)度 |
| 2 | tagsCode | 8字節(jié) | producer端指定消息的tags屬性的hashcode |
3、消息的【key查詢索引】存儲(chǔ)格式
說(shuō)明:
在producer生產(chǎn)消息時(shí),我們可以為消息指定一個(gè)業(yè)務(wù)上的String 字符串類型的key。那么,當(dāng)該消息落地成功后,我們可以通過(guò)指定的key,去查詢指定的消息,broker端會(huì)返回通過(guò)key的散列值,落在相同的槽的一定數(shù)量的消息給客戶端。
文件組織:
存儲(chǔ)地址:user_root_path/store/index/{fileName}
文件大?。?0 + (5,000,000[hashSlotNum] * 4[hashSlotSize]) + ((4 * 5,000,000)[indexNum] * 20[indexSize]) byte
文件起名:yyyyMMddhhmmss(年月日時(shí)分秒) + mmm(毫秒)
-
user_root_path/store/index/
- yyyyMMddhhmmssmmm
消息的【key查詢索引】存儲(chǔ)格式及內(nèi)容
查詢索引文件是由【索引頭】 + 【索引內(nèi)容】?jī)刹糠纸M成:
【索引頭】
| 存儲(chǔ)序列 | 字段名稱 | 長(zhǎng)度 | 注析 |
|---|---|---|---|
| 1 | beginTimestampIndex | 8字節(jié) | 表示索引文件 第一個(gè)構(gòu)建的 消息 的存儲(chǔ)時(shí)間(即消息放入緩存時(shí)的時(shí)間戳) |
| 2 | endTimestampIndex | 8字節(jié) | 表示索引文件 最后一個(gè)構(gòu)建的 消息 的存儲(chǔ)時(shí)間 |
| 3 | beginPhyoffsetIndex | 8字節(jié) | 表該索引文件 第一個(gè)構(gòu)建的 消息 的物理存儲(chǔ)地址 |
| 4 | beginPhyoffsetIndex | 8字節(jié) | 表該索引文件 最后一個(gè)構(gòu)建的 消息 的物理存儲(chǔ)地址 |
| 5 | hashSlotCount | 4字節(jié) | 散列槽個(gè)數(shù),每構(gòu)建一個(gè)消息的索引,個(gè)數(shù)加一 |
| 6 | indexCount | 4字節(jié) | 索引個(gè)數(shù),每構(gòu)建一個(gè)消息的索引,個(gè)數(shù)加一 |
【索引內(nèi)容】
| 存儲(chǔ)序列 | 字段名稱 | 長(zhǎng)度 | 注析 |
|---|---|---|---|
| 1 | keyHash | 4字節(jié) | 查詢消息的 key.hashCode()的絕對(duì)值 |
| 2 | phyOffset | 8字節(jié) | 消息的物理存儲(chǔ)地址 |
| 3 | timeDiff | 4字節(jié) | 與第一個(gè)消息的存儲(chǔ)時(shí)間差 |
| 4 | slotValue | 8字節(jié) | 上一條索引消息所在的邏輯索引 |
4、rmq消息持久化技術(shù)
4.1系統(tǒng)標(biāo)準(zhǔn)IO與直接內(nèi)存

傳統(tǒng)I/O操作中,java io文件的read和write操作都是通過(guò)調(diào)用系統(tǒng)底層的標(biāo)準(zhǔn)IO函數(shù) read()和write() 實(shí)現(xiàn)的。對(duì)于寫操作,即通過(guò)java調(diào)用write方法以后,首先將當(dāng)前的用戶態(tài)(即java進(jìn)程)轉(zhuǎn)成內(nèi)核態(tài),并把java 內(nèi)存(heap)的字節(jié)數(shù)據(jù)拷貝到內(nèi)核態(tài)的io緩存區(qū)(pagecache),最后在把內(nèi)核態(tài)的緩存區(qū)數(shù)據(jù)拷貝到磁盤文件(pagecache->disk),這樣就完成了一次寫操作;對(duì)于讀操作,也是先將用戶態(tài)轉(zhuǎn)成內(nèi)核態(tài),然后由操作系統(tǒng)的內(nèi)核代碼將磁盤文件拷貝到內(nèi)核態(tài)的緩沖區(qū)中,最后再?gòu)膬?nèi)核態(tài)拷貝到用戶態(tài)的緩存中,這樣就完成了一次讀操作。當(dāng)然,根據(jù)空間局部性原理,也就是當(dāng)我們的應(yīng)用程序需要訪問(wèn)磁盤文件上的指定的一段數(shù)據(jù)時(shí),操作系統(tǒng)會(huì)認(rèn)為我們可能也需要訪問(wèn)該段數(shù)據(jù)的下一段數(shù)據(jù),并且,由于磁盤io的操作速度比內(nèi)存操作的速度慢幾個(gè)數(shù)量級(jí),因此,會(huì)同時(shí)把指定的一段數(shù)據(jù)以及下一段數(shù)據(jù)一并從磁盤文件拷貝到內(nèi)核態(tài)的緩沖區(qū)中,即預(yù)讀更多的數(shù)據(jù)。所以,當(dāng)我們讀完指定的一段數(shù)據(jù)后,再讀下一段數(shù)據(jù)時(shí),就可以直接從內(nèi)核態(tài)拷貝到用戶態(tài)的緩存中,減少了低效率的io操作。

內(nèi)存映射和標(biāo)準(zhǔn)的io操作一樣需要從磁盤文件中獲取數(shù)據(jù),但它并不需要從內(nèi)核態(tài)的緩沖區(qū)拷貝到用戶態(tài)的緩沖區(qū),而是直接將進(jìn)程中一部分私有地址空間區(qū)域與磁盤文件建立起映射關(guān)系,通過(guò)缺頁(yè),把磁盤文件的內(nèi)容直接拷貝到進(jìn)程的緩沖區(qū)。相對(duì)標(biāo)準(zhǔn)io操作,這里有兩個(gè)最主要的優(yōu)勢(shì),第一,性能上,由于直接內(nèi)存映射少一次內(nèi)存拷貝,因此會(huì)比標(biāo)準(zhǔn)io操作要快,文件越大,優(yōu)勢(shì)越明顯;第二,直接內(nèi)存映射可以加載普通方式無(wú)法訪問(wèn)的大文件,這里的大文件指,例如1G的文件,如果是標(biāo)準(zhǔn)io操作,我們需要將1G的文件直接加載進(jìn)內(nèi)存,才可以訪問(wèn),明顯這是不可能的,但通過(guò)虛擬內(nèi)存的映射方式以及操作系統(tǒng)層面上發(fā)起的頁(yè)面請(qǐng)求,便可將所需數(shù)據(jù)加載到程序內(nèi)存中。
下面快速總結(jié)一下Java內(nèi)存映射文件和IO
1)Java語(yǔ)言通過(guò)java.nio包支持內(nèi)存映射文件和IO。
2)內(nèi)存映射文件用于對(duì)性能要求高的系統(tǒng)中,如繁忙的電子交易系統(tǒng)
3)使用內(nèi)存映射IO你可以將文件的一部分加載到內(nèi)存中
4)如果被請(qǐng)求的緩存頁(yè)面不在內(nèi)存中,內(nèi)存映射文件會(huì)導(dǎo)致頁(yè)面錯(cuò)誤
5)將一個(gè)文件區(qū)間映射到內(nèi)存中的能力取決于內(nèi)存的可尋址范圍。在32位機(jī)器中,不能超過(guò)4GB,即2^32比特。
6)Java中的內(nèi)存映射文件比流IO要快(譯注:對(duì)于大文件而言是對(duì)的,小文件則未必)
7)用于加載文件的內(nèi)存在Java的堆內(nèi)存之外,存在于共享內(nèi)存中,允許兩個(gè)不同進(jìn)程訪問(wèn)文件。順便說(shuō)一下,這依賴于你用的是direct還是non-direct字節(jié)緩存。
8)讀寫內(nèi)存映射文件是操作系統(tǒng)來(lái)負(fù)責(zé)的,因此,即使你的Java程序在寫入內(nèi)存后就掛掉了,只要操作系統(tǒng)工作正常,數(shù)據(jù)就會(huì)寫入磁盤。
9)Direct字節(jié)緩存比non-direct字節(jié)緩存性能要好
10)不要經(jīng)常調(diào)用MappedByteBuffer.force()方法,這個(gè)方法強(qiáng)制操作系統(tǒng)將內(nèi)存中的內(nèi)容寫入硬盤,所以如果你在每次寫內(nèi)存映射文件后都調(diào)用force()方法,你就不能真正從內(nèi)存映射文件中獲益,而是跟disk IO差不多。
11)如果電源故障或者主機(jī)癱瘓,有可能內(nèi)存映射文件還沒(méi)有寫入磁盤,意味著可能會(huì)丟失一些關(guān)鍵數(shù)據(jù)。
12)MappedByteBuffer和文件映射在緩存被GC之前都是有效的。sun.misc.Cleaner可能是清除內(nèi)存映射文件的唯一選擇。
這里在說(shuō)一下MappedByteBuffer,可能會(huì)導(dǎo)致JVM crash ,因?yàn)镸appedByteBuffer可以通過(guò)特殊的方法釋放,實(shí)際上調(diào)用了unmap的方法。此時(shí),之前映射到j(luò)vm的地址空間就是非法地址,如果此后仍然對(duì)MappedByteBuffer進(jìn)行讀寫,系統(tǒng)就會(huì)向jvm發(fā)送sigbus信號(hào)來(lái)通知進(jìn)程非法操作,這個(gè)問(wèn)題一般是由于程序沒(méi)有處理好并發(fā)問(wèn)題導(dǎo)致的。
因此rmq通過(guò)引用計(jì)數(shù)法,即只要引用計(jì)數(shù)不為0,MappedByteBuffer對(duì)象就不會(huì)釋放來(lái)解決這個(gè)問(wèn)題。具體的抽象實(shí)現(xiàn)為ReferenceResource,使用AtomicLong原子變量來(lái)保證并發(fā),性能上會(huì)比較好。
當(dāng)然,這種方式也會(huì)存在弊端,就是程序不能正確操作引用計(jì)數(shù),可能會(huì)導(dǎo)致文件無(wú)法刪除,因此,rmq增加了一個(gè)補(bǔ)救措施,就是一旦文件被關(guān)閉了狀態(tài)位available會(huì)設(shè)置為false,并且開始計(jì)時(shí),如果超過(guò)2分鐘,引用計(jì)數(shù)還沒(méi)有變?yōu)?,就強(qiáng)行釋放。上文提到,MappedByteBuffer會(huì)在jvm發(fā)生gc時(shí),可能被回收,但不是一定,rmq通過(guò)反射的方式調(diào)用Cleaner.clean,手動(dòng)清除。
DirectByteBuffer本身是一個(gè)java heap內(nèi)的對(duì)象,自身所占用的內(nèi)存并不會(huì)很大,只是其實(shí)例所映射的堆外內(nèi)存可能會(huì)比較大,當(dāng)jvm發(fā)起young gc時(shí),如果DirectByteBuffer實(shí)例是非可達(dá)性對(duì)象,那么,jvm就會(huì)將DirectByteBuffer實(shí)例回收,在回收前,會(huì)通過(guò)Cleaner.clean方法,委托Deallocator釋放堆外內(nèi)存;但DirectByteBuffer經(jīng)過(guò)多次ygc后,會(huì)晉升到老年代,此時(shí),如果不通過(guò)full gc 或old gc,就無(wú)法釋放堆外內(nèi)存;
因此我們可以通過(guò)程序手動(dòng)釋放。
Cleaner是 PhantomReference虛引用的子類,并通過(guò)自身的next和prev字段維護(hù)的一個(gè)雙向鏈表。PhantomReference的作用在于跟蹤垃圾回收過(guò)程,并不會(huì)對(duì)對(duì)象的垃圾回收過(guò)程造成任何的影響。
所以cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); 用于對(duì)當(dāng)前構(gòu)造的DirectByteBuffer對(duì)象的垃圾回收過(guò)程進(jìn)行跟蹤。
當(dāng)DirectByteBuffer對(duì)象從pending狀態(tài) ——> enqueue狀態(tài)時(shí),會(huì)觸發(fā)Cleaner的clean(),而Cleaner的clean()的方法會(huì)實(shí)現(xiàn)通過(guò)unsafe對(duì)堆外內(nèi)存的釋放。
4.2消息在java堆,物理內(nèi)存,虛擬內(nèi)存以及磁盤間的流動(dòng)

(1)對(duì)于producer消息生產(chǎn)來(lái)說(shuō),消息通過(guò)socket轉(zhuǎn)入java heap,然后通過(guò)直接內(nèi)存映射寫到pagecache(內(nèi)核態(tài)共享緩存,內(nèi)存的一種),最后在通過(guò)刷盤線程異步寫到flush到磁盤文件。如果broker端設(shè)置為sync,則同步等待刷盤結(jié)果。
(2)消費(fèi)者(consumer1)正常拉取消息,消息直接從pagecache轉(zhuǎn)入socket,不經(jīng)過(guò)java heap,這張場(chǎng)景最多,例如向上96G物理內(nèi)存,按照1K消息算,可在物理內(nèi)存緩存1億條消息
(3)消費(fèi)者(consumer2)異常拉取消息,由于socket所訪問(wèn)的消息不在pagecache中,因此需要通過(guò)虛擬內(nèi)存,發(fā)生缺頁(yè)中斷,產(chǎn)生磁盤IO,從磁盤把消息加載到pagecache,最后在直接從socket發(fā)出去。
參考:
1:http://javarevisited.blogspot.hk/2012/01/memorymapped-file-and-io-in-java.html
2:RocketMQ STORE Q&A
3:http://www.itdecent.cn/p/007052ee3773