概覽
RocketMQ的消息存儲主要是在${ROCKETMQ_HOME}/store文件夾下,message消息主要存儲在commitlog文件夾下,RocketMQ消息存儲和索引是分開隔離的,已Topic為主題的消息索引存儲在consumequeue文件夾下,通過MessageQueue映射為ConsumeQueue的文件就存儲在這個文件夾下,然后index主要是以消息key和offset的對應關系,以類似HashMap的方式存儲,方便消息查詢。

本片文章主要介紹消息存儲組織結構、Message是如何快速存儲都MappedFile文件中的。MappedFile文件就是一個個以首條消息的offset為名稱的存儲文件,如上圖commitlog文件夾下展示的00000000000000000000、00000000001073741824等,每一個mappedFile文件的大小約為102410241024=1G。

DefaultMessageStore
DefaultMessageStore是消息相關操作的主要服務,包括消息的存儲、查詢、定時清除等等。這里主要介紹其中消息存儲相關的事物,包括是否開啟TransientStorePool臨時消息存儲池,一次創(chuàng)建2個MappedFile文件的AllocateMappedFileService消息存儲預創(chuàng)建服務,還有歷史存儲文件mappedFile加載加載到直接內(nèi)存MappedByteBuffer和對應的mmap文件映射等。
# org.apache.rocketmq.store.DefaultMessageStore
// MappedFile 分配服務
private final AllocateMappedFileService allocateMappedFileService;
// 是否開啟
// 消息臨時存儲
private final TransientStorePool transientStorePool;
this.transientStorePool = new TransientStorePool(messageStoreConfig);
// 根據(jù)是否開啟 transientStorePoolEnable,存在兩種初始化情況。
// transientStorePoolEnable 為 true 表示內(nèi)容先存儲在堆外內(nèi)存(直接內(nèi)存),然后通過 Commit 線程將數(shù)據(jù)提交到FileChannel中,再通過 Flush 線程將內(nèi)存映射 Buffer 中的數(shù)據(jù)持久化到磁盤中。
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//加載歷史mappedFile文件,進行便于文件查詢和消費
// load Commit Log
result = result && this.commitLog.load();
TransientStorePool
TransientStorePool是短暫的消息存儲池。這里先進行簡單介紹,具體作用到應用的時候詳細介紹。這里直接開辟默認5個1G的直接內(nèi)存ByteBuffer,用來臨時存儲消息。它還引入了內(nèi)存鎖的機制,避免直接內(nèi)存的數(shù)據(jù)被替換到系統(tǒng)中的Swap分區(qū)中,提高系統(tǒng)存儲性能,使RocketMQ消息低延遲、高吞吐量。
public class TransientStorePool {
// availableBuffers 個數(shù),可通過在broker中配置文件中設置 transientStorePool,默認值為 5
private final int poolSize;
// 每個 ByteBuffer 大小,默認為 mappedFileSizeCommitLog,表明 TransientStorePool 為 commitlog 文件服務
private final int fileSize;
// 直接內(nèi)存,ByteBuffer 容器,雙端隊列
private final Deque<ByteBuffer> availableBuffers;
/**
* 創(chuàng)建默認的堆外內(nèi)存
* It's a heavy init method.
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
// 利用 NIO 直接直接分配,堆外內(nèi)存(直接內(nèi)存),在系統(tǒng)中的內(nèi)存,非 JVM 內(nèi)存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
// 內(nèi)存地址
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// 內(nèi)存鎖定
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
}
CommitLog
CommitLog主要有消息的刷盤的存儲服務、消息的刷盤服務,存儲消息的回調(diào)等等,這里主要介紹MappedFileQueue,它是對${ROCKET_HOME}/store/commitlog目錄的封裝,主要用來管理多個MappedFile。
public class CommitLog {
// 映射文件隊列,ROCKETMQ_HOME/commitlog 文件夾下的文件對應
protected final MappedFileQueue mappedFileQueue;
// 默認消息存儲服務
protected final DefaultMessageStore defaultMessageStore;
// commitLog 刷盤操作
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
// 定時將 transientStorePoll 中的直接內(nèi)存 ByteBuffer,提交條內(nèi)存映射 MappedByteBuffer 中
private final FlushCommitLogService commitLogService;
// 存儲消息到 mappedFile 的回調(diào)映射
private final AppendMessageCallback appendMessageCallback;
// 消息解碼服務線程
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
// topic-queue-id,offset;消息的key,和在 commitlog 中的 offset,方便消息存儲時的索引
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
public CommitLog(final DefaultMessageStore defaultMessageStore) {
// 在這里組織 commitlog 的對應的 MappedFile 文件,然后進行相應的文件操作,文件映射,刷線到磁盤文件
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
// 異步、同步刷盤服務初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盤服務為 GroupCommitService
this.flushCommitLogService = new GroupCommitService();
} else {
// 異步刷盤服務為 FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
// 定時將 transientStorePoll 中的直接內(nèi)存 ByteBuffer,提交條內(nèi)存映射 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
// 存儲消息到 mappedFile 的回調(diào)映射
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
// putMessage 到 mappedFile 時是否使用可重入鎖,默認使用自旋鎖
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
}
MappedFileQueue
MappedFile 的存儲集合和管理器,是對 ${ROCKET_HOME}/store/commitlog 文件夾的封裝。主要用來管理MappedFile文件,包括消息的查詢、提交、落盤的刷新,歷史MappedFile文件的預熱加載和直接內(nèi)存映射mmap操作,過期文件的刪除、追加消息的最后一個MappedFile文件的獲取和創(chuàng)建等。
public class MappedFileQueue {
// 存儲路徑${ROCKET_HOME}/store/commitlog,該目錄下會存在多個內(nèi)存映射文件
private final String storePath;
// 單個文件的存儲大小
private final int mappedFileSize;
// mappedFile 文件集合
// 一個線程安全的 ArrayList 的變種,通過可 reentrantLock 可重入鎖實現(xiàn)數(shù)組的新建和數(shù)組舊有內(nèi)容的 copy 到新建的數(shù)組,然后返回新建的數(shù)組
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 創(chuàng)建 MappedFile 服務類
private final AllocateMappedFileService allocateMappedFileService;
// 當前刷盤指針,表示該指針之前的所有數(shù)據(jù)全部持久化到磁盤
// MappedFile 中的 MappedByteBuffer 中數(shù)據(jù)寫入磁盤的指針,該指針之前的所有數(shù)據(jù)全部持久化到磁盤
private long flushedWhere = 0;
// Java 應用程序態(tài)數(shù)據(jù)要寫入nio內(nèi)存映射的ByteBuffer的提交了位置的指針
// commitWhere 只有開啟 transientStorePool 的前提下才有作用;
// commitWhere 代表著 transientStorePool 中直接內(nèi)存 ByteBuffer 需要提交數(shù)據(jù)到 MappedByteBuffer 直接內(nèi)存的,位置為已經(jīng)提交了數(shù)據(jù)的位置。下次要提交的開始位置,上次提交的結尾位置。
private long committedWhere = 0;
/**
* 項目啟動,加載 commitlog 文件夾下對應的文件
* @return
*/
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
// 根據(jù)文件名(offset)排序
Arrays.sort(files);
for (File file : files) {
// 如果物理文件大小 != mappedFileSize,說明文件被破壞了,返回false
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
// 更新 mappedFile 文件指針
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
// 加入映射文件集合
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
/**
* 獲取最后存儲消息的映射mappedFile
*
* @param startOffset mappedFile 開始寫文件的offset
* @param needCreate 是否需要創(chuàng)建新的 mappedFile 文件
* @return
*/
//
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 創(chuàng)建映射文件的起始偏移量,也就是即將的mappedfile文件名稱
long createOffset = -1;
// 獲取最后一個映射文件,如果為null或者寫滿則會執(zhí)行創(chuàng)建邏輯
MappedFile mappedFileLast = getLastMappedFile();
// mappedFileLast == null,表示需要創(chuàng)建新的 mappedFile 文件,創(chuàng)建新文件的offset值;
if (mappedFileLast == null) {
// 計算將要創(chuàng)建的映射文件的起始偏移量
// 如果startOffset<=mappedFileSize則起始偏移量為0
// 如果startOffset>mappedFileSize則起始偏移量為是mappedFileSize的倍數(shù)
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 映射文件滿了,創(chuàng)建新的映射文件
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 創(chuàng)建的映射文件的偏移量等于最后一個映射文件的起始偏移量 + 映射文件的大?。╟ommitlog文件大?。? createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 創(chuàng)建新的映射文件
if (createOffset != -1 && needCreate) {
// 構造commitlog 文件名稱
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// nextNextFilePath,預先創(chuàng)建下一個 mappedFile 文件,通過 allocateMappedFileService 服務,一起創(chuàng)建兩個文件,預先創(chuàng)建下一個文件
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 優(yōu)先通過allocateMappedFileService中方式構建映射文件,預分配方式,性能高
// 如果上述方式失敗則通過new創(chuàng)建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
// 是否是 MappedFileQueue 隊列中第一個文件
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
}
MappedFile
CommitLog、MappedFileQueue、MappedFile的關系如圖:

MappedFile是RocketMQ消息存儲的終極Boss,重中之重。涉及MapedFile的預創(chuàng)建和映射、歷史數(shù)據(jù)MappedFile的磁盤文件預熱。MappedByteBuffer是通過NIO方式創(chuàng)建的內(nèi)存映射對象。ByteBuffer writeBuffer是直接內(nèi)存從TransientStorePool中借來的,他們兩個是在內(nèi)存中用來存放消息的,其中區(qū)別下面詳細介紹。這里先從CommitLog文件存放消息說起。
public class MappedFile extends ReferenceResource {
// 當前JVM實例中 MappedFile 虛擬內(nèi)存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 當前JVM實例中MappedFile對象個數(shù)
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 即將寫入消息的mappedFile 的位置
// 當前 MappedFile 文件的寫指針,從 0 開始(內(nèi)存映射文件的寫指針)
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 當前文件的提交到 MappedBuffer的指針,如果開啟 transientStorePoolEnable,則數(shù)據(jù)會存儲在 TransientStorePool 中,然后提交到內(nèi)存映射 ByteBuffer 中,再刷寫到磁盤
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 刷寫到磁盤指針,該指針之前的數(shù)據(jù)持久化到磁盤中
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 文件通道
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 堆外內(nèi)存 ByteBuffer,如果不為空,數(shù)據(jù)首先將存儲在該 Buffer 中,然后提交到 MappedFile 對應的內(nèi)存映射文件 Buffer。
// transientStorePoolEnable 為true時不為空。
protected ByteBuffer writeBuffer = null;
// 堆內(nèi)存池,transientStorePoolEnable 為true 時啟用
protected TransientStorePool transientStorePool = null;
// 文件名稱
private String fileName;
// mappedFile 文件的開始偏移量地址
private long fileFromOffset;
// 物理文件
private File file;
// NIO 物理文件對應的內(nèi)存映射Buffer
private MappedByteBuffer mappedByteBuffer;
// 文件最后一次內(nèi)容寫入的時間
private volatile long storeTimestamp = 0;
// 是否是 MappedFileQueue 隊列中第一個文件
private boolean firstCreateInQueue = false;
}
CommitLog#putMessage方法是來存放消息的,存放消息到系統(tǒng)內(nèi)存映射中,并沒有落入磁盤中,等待同步刷盤、或者異步刷盤,然后是消息存儲的高可用。
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
// 消息存儲時間
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
// 循環(huán)冗余校驗碼,檢測數(shù)據(jù)在網(wǎng)絡中傳輸是否發(fā)生變化
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
// 存儲服務統(tǒng)計功能服務
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// topic
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 事務回滾消息標志
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 如果是非事務消息,或者事務消息的 commit 操作;進而判斷是不是延遲消息,存儲到特殊的延遲消息隊列;然后事務消息存儲也進行了同樣的消息 topic 的轉換,從而實現(xiàn)了消息的事務;事務消息非提交階段,先進行另一個 topic 的儲存,如果事務提交了,才進行,存儲到消息的真正的topic 中去。
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 如果是延遲級別消息
if (msg.getDelayTimeLevel() > 0) {
// 設置消息延遲級別
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 延遲消息topic
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延遲消息消息隊列Id
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 將真實的 topic 放入 message 屬性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 替換為延遲消息topic
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 消息誕生地址 ipv6 設置
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
// 消息存儲地址 ipv6 設置
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 最后一個 消息 存儲 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 自旋鎖 或 可重入鎖,上鎖;消息寫入 commitlog 的映射文件是串行的
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 開始上鎖時間
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
// 消息存儲時間,確定消息全局有序
msg.setStoreTimestamp(beginLockTimestamp);
//mappedFile==null標識CommitLog文件還未創(chuàng)建,第一次存消息則創(chuàng)建CommitLog文件
//mappedFile.isFull()表示mappedFile文件已滿,需要重新創(chuàng)建CommitLog文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise;新文件,造成臟數(shù)據(jù)
}
// mappedFile==null說明創(chuàng)建CommitLog文件失敗拋出異常,創(chuàng)建失敗可能是磁盤空間不足或者權限不夠
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 追加消息到 mappedFile 文件中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
// 針對一條消息足夠長,然后 mappedFile 文件不夠存儲,需要創(chuàng)建新的 mappedFile 進行消息存放。
case END_OF_FILE:
// 上一個 mappedFile 暫存文件,需要解鎖這個 mappedFile
unlockMappedFile = mappedFile;
// broker 重新開辟,新的 commitlog 文件
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 開辟成功,再將消息寫入
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
// 存儲消息花費時間
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 最后釋放存儲消息的鎖
putMessageLock.unlock();
}
// 存儲消息花費時間 > 500
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
// 上一個有空閑空間,但不夠存儲消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 解鎖 mappedFile 的內(nèi)存鎖定
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
// topic 下存放消息次數(shù)
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
// topic 下存放消息字節(jié)數(shù)
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// handle 硬盤刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
// 返回存儲消息的結果
return putMessageResult;
}
PutMessage重要步驟
- 獲取上次最后一個寫入消息的存儲文件MappedFile,MappedFile文件的獲取在后面會詳細接受。
// 最后一個 消息 存儲 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- 向MappedFile文件追加消息,如果返回END_OF_FILE代表這個整備追加消息的MappedFile文件寫滿了,不夠存儲本條消息,然后再去獲取這最后下一個MappedFile文件。
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 針對一條消息足夠長,然后 mappedFile 文件不夠存儲,需要創(chuàng)建新的 mappedFile 進行消息存放。
case END_OF_FILE:
// 上一個 mappedFile 暫存文件,需要解鎖這個 mappedFile
unlockMappedFile = mappedFile;
// broker 重新開辟,新的 commitlog 文件
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 開辟成功,再將消息寫入
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- 寫MappedFile文件是會被mlock內(nèi)存鎖定,防止被交換到Swap分區(qū)中,寫滿的MappedFile文件進行鎖定解除。
// 上一個有空閑空間,但不夠存儲消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 解鎖 mappedFile 的內(nèi)存鎖定
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
- 寫入內(nèi)存的消息進行刷盤,然后是HA消息存儲的高可用,Broker存儲消息的復制,這兩部分內(nèi)容也很重要,下次在介紹,在本章不是重點內(nèi)容。
// handle 硬盤刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
最后一個MappedFile的獲取
這是MappedFile設計的經(jīng)典,現(xiàn)在重點介紹。創(chuàng)建MappedFile對象有兩種方式。
第一種:通過構造方法,new MappedFile()一個對象。然后進行MapepdFile對象MappedByteBuffer的內(nèi)存映射。
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
/**
* MappedFile 初始化,并做好 mappedFile 和 mappedByteBuffer 的NIO 直接內(nèi)存映射關系
*
* @param fileName 物理文件路徑
* @param fileSize mappedFileSize 文件大小
* @throws IOException
*/
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
// 物理文件名稱
this.file = new File(fileName);
// 文件開始位置
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
// 確保文件路徑存在,不存在,進行路徑文件創(chuàng)建
ensureDirOK(this.file.getParent());
try {
// 通過 RandomAccessFile 創(chuàng)建讀寫文件通道,并將文件內(nèi)容使用NIO 的內(nèi)存映射 Buffer 將文件映射到內(nèi)存中
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 物理文件對應的內(nèi)存映射Buffer
// 通過 NIO 文件通道和mappedFileSize 大小,創(chuàng)建內(nèi)存映射文件 mappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// 當前JVM實例中 MappedFile 虛擬內(nèi)存
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
// 當前JVM實例中MappedFile對象個數(shù)
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
第二種:預創(chuàng)建MappedFile,通過allocateMappedFileService服務一次創(chuàng)建兩個MappedFile對象。
// 優(yōu)先通過allocateMappedFileService中方式構建映射文件,預分配方式,性能高
// 如果上述方式失敗則通過new創(chuàng)建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
AllocateMappedFileService
AllocateMappedFileService是預創(chuàng)建MappedFile文件的服務,通過一次構造兩個創(chuàng)建MappedFile的AllocateRequest然后放入隊列requestQueue中,通過CountDownLatch線程同步協(xié)調(diào)器等待mmapOperation()方法,創(chuàng)建MappedFile對象,并返回。RocketMQ中預分配MappedFile的設計非常巧妙,下次獲取時候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時間延遲。
CountDownLatch協(xié)調(diào)兩個線程之間的通信

/**
* 預先創(chuàng)建 MappedFile 文件,只是先創(chuàng)建2個創(chuàng)建mappedFile 文件的請求,放入隊列中,具體 mappedFile 文件的創(chuàng)建和文件內(nèi)存直接映射由 mmapOperation() 方法來實現(xiàn)。
* @param nextFilePath 創(chuàng)建 mappedFile 文件的全路徑名稱
* @param nextNextFilePath 創(chuàng)建下一個 mappedFile 文件的全路徑名稱
* @param fileSize 文件大小
* @return
*/
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 默認提交兩個請求
int canSubmitRequests = 2;
// 是否啟用 transientStorePool
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// SLAVE 節(jié)點中 transientByteBuffer 即使沒有足夠的 ByteBuffer,也不支持快速失敗
// 啟動快速失敗策略時,計算TransientStorePool中剩余的buffer數(shù)量減去requestQueue中待分配的數(shù)量后,剩余的buffer數(shù)量
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
// 可用的 ByteBuffer - requestQueue,還剩余可用的 ByteBuffer 數(shù)量
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
// 創(chuàng)建分配請求
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 判斷requestTable中是否存在該路徑的分配請求,如果存在則說明該請求已經(jīng)在排隊中
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//該路徑?jīng)]有在排隊
if (nextPutOK) {
// byteBuffer 數(shù)量不夠,則快速失敗
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
// 數(shù)量充足的話,將指定的元素插入到此優(yōu)先級隊列中
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
// 請求數(shù)量 -1
canSubmitRequests--;
}
// 下下個請求的處理
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
// 報錯,日志
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
// 下一個分配請求,獲取當前請求,然后通過線程協(xié)調(diào)器CountDownLatch,協(xié)調(diào)另一個線程進行完mmpOperation操作后,返回創(chuàng)建好的MappedFile文件
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 默認等待5s,等待 mmapOperation 操作創(chuàng)建 mappedFile
// 調(diào)用此方法的線程會被阻塞,直到 CountDownLatch 的 count 為 0;等到 mmapOperation() finally countDownLatch 為 0
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
// 成功從 requestTable 中移除請求,并返回 mappedFile 文件
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
具體創(chuàng)建MappedFile對象
AllocateMappedFileService服務開啟了一個線程,不停地從創(chuàng)建MappedFile對象的請求隊列requestQueue中獲取AllocateRequest,并實時創(chuàng)建MappedFile對象,并通過CountDownLatch通知putRequestAndReturnMappedFile() 方法已經(jīng)創(chuàng)建了MappedFile對象,從而獲取返回。
/**
* 開始 mappedFile 文件分配服務,從 requestQueue 中獲取創(chuàng)建 mappedFile 的文件請求
*/
public void run() {
log.info(this.getServiceName() + " service started");
// 除非停止,否則一直在進行 mmap 映射操作
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
MmapOperation具體創(chuàng)建MappedFile對象
在這里創(chuàng)建MappedFile對象,也有兩種情況,區(qū)別在于是否啟用TransientStorePool消息暫存池,它里面有默認5個1G的直接內(nèi)存,可以通過直接內(nèi)存賦值給MappedFile的writerBuffer對象,省去了開辟內(nèi)存的時間;還有一種是通過MappedFile的NIO創(chuàng)建的MappedByteBuffer直接內(nèi)存映射來存儲消息,需要進行文件的map映射操作,開辟內(nèi)存空間。這兩種方式的對比會在下面介紹。
第一種:不啟用transientStorePool對象,通過構造方法創(chuàng)建。
第二種:通過ServerLoader.load的方式創(chuàng)建,如果失敗了,再去嘗試構造方法的方式。
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
// 為每一個 mappedFile 文件,進行init中的mmap 映射操作
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
// spi 加載失敗,使用構造方法創(chuàng)建 mappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 不開啟 transientStorePool,直接內(nèi)存映射
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
MappedFile文件預熱
通過 mmap 建立內(nèi)存映射僅是將文件磁盤地址和虛擬地址通過映射對應起來,此時物理內(nèi)存并沒有填充磁盤文件內(nèi)容。
當實際發(fā)生文件讀寫時,產(chǎn)生缺頁中斷并陷入內(nèi)核,然后才會將磁盤文件內(nèi)容讀取至物理內(nèi)存。針對上述場景,RocketMQ 設計了 MappedFile 預熱機制。
當 RocketMQ 開啟 MappedFile 內(nèi)存預熱(warmMapedFileEnable),且 MappedFile 文件映射空間大小大于等于 mapedFileSizeCommitLog(1 GB) 時,調(diào)用 warmMappedFile 方法對 MappedFile 進行預熱。
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 對 mappedFile 進行預熱
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
MappedFile創(chuàng)建后,需要對 MappedFile 文件進行預熱,將內(nèi)存和磁盤映射起來,然后每頁寫入占位數(shù)據(jù)0,然后將這些0數(shù)據(jù),刷新到磁盤,進行磁盤預熱。
當調(diào)用Mmap進行內(nèi)存映射后,OS只是建立了虛擬內(nèi)存地址至物理地址的映射表,而實際并沒有加載任何文件至內(nèi)存中。
程序要訪問數(shù)據(jù)時,OS會檢查該部分的分頁是否已經(jīng)在內(nèi)存中,如果不在,則發(fā)出一次 缺頁中斷。X86的Linux中一個標準頁面大小是4KB,
那么1G的CommitLog需要發(fā)生 1024KB/4KB=256次 缺頁中斷,才能使得對應的數(shù)據(jù)完全加載至物理內(nèi)存中。
為什么每個頁都需要寫入數(shù)據(jù)呢?
RocketMQ在創(chuàng)建并分配MappedFile的過程中預先寫入了一些隨機值到Mmap映射出的內(nèi)存空間里。原因在于:
僅分配內(nèi)存并進行mlock系統(tǒng)調(diào)用后并不會為程序完全鎖定這些分配的內(nèi)存,原因在于其中的分頁可能是寫時復制的。因此,就有必要對每個內(nèi)存頁面中寫入一個假的值。
鎖定的內(nèi)存可能是寫時復制的,這個時候,這個內(nèi)存空間可能會改變。這個時候,寫入假的臨時值,這樣就可以針對每一個內(nèi)存分頁的寫入操做會強制 Linux 為當前進程分配一個獨立、私有的內(nèi)存頁。
寫時復制
寫時復制:子進程依賴使用父進程開創(chuàng)的物理空間。
內(nèi)核只為新生成的子進程創(chuàng)建虛擬空間結構,它們來復制于父進程的虛擬究竟結構,但是不為這些段分配物理內(nèi)存,它們共享父進程的物理空間,當父子進程中有更改相應段的行為發(fā)生時,再為子進程相應的段分配物理空間。
https:www.cnblogs.com/biyeymyhjob/archive/2012/07/20/2601655.html
為了避免OS檢查分頁是否在內(nèi)存中的過程出現(xiàn)大量缺頁中斷,RocketMQ在做Mmap內(nèi)存映射的同時進行了madvise系統(tǒng)調(diào)用,
目的是使OS做一次內(nèi)存映射后,使對應的文件數(shù)據(jù)盡可能多的預加載至內(nèi)存中,降低缺頁中斷次數(shù),從而達到內(nèi)存預熱的效果。
RocketMQ通過map+madvise映射后預熱機制,將磁盤中的數(shù)據(jù)盡可能多的加載到PageCache中,保證后續(xù)對ConsumeQueue和CommitLog的讀取過程中,能夠盡可能從內(nèi)存中讀取數(shù)據(jù),提升讀寫性能。
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
// 創(chuàng)建一個新的字節(jié)緩沖區(qū),其內(nèi)容是此緩沖區(qū)內(nèi)容的共享子序列
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
// warmMappedFile 每間隔 OS_PAGE_SIZE 向 mappedByteBuffer 寫入一個 0,此時對應頁恰好產(chǎn)生一個缺頁中斷,操作系統(tǒng)為對應頁分配物理內(nèi)存
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// 刷盤方式是同步策略時,進行刷盤操作
// 每修改 pages 個分頁刷一次盤,相當于 4096 * 4k = 16M,每 16 M刷一次盤,1G 文件 1024M/16M = 64 次
// force flush when flush disk type is sync
// 如果刷盤策略為同步刷盤,需要對每個頁進行刷盤
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
// 防止垃圾回收
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
// 前面對每個頁,寫入了數(shù)據(jù)(0 占位用,防止被內(nèi)存交互),進行了刷盤,然后這個操作是對所有的內(nèi)存進行刷盤。
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
// 刷盤,強制將此緩沖區(qū)內(nèi)容的任何更改寫入包含映射文件的存儲設備
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
//通過 JNA 調(diào)用 mlock 方法鎖定 mappedByteBuffer 對應的物理內(nèi)存,阻止操作系統(tǒng)將相關的內(nèi)存頁調(diào)度到交換空間(swap space),以此提升后續(xù)在訪問 MappedFile 時的讀寫性能。
this.mlock();
}
通過 JNA 調(diào)用 mlock 方法鎖定 mappedByteBuffer 對應的物理內(nèi)存,阻止操作系統(tǒng)將相關的內(nèi)存頁調(diào)度到交換空間(swap space),以此提升后續(xù)在訪問 MappedFile 時的讀寫性能。
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
// RocketMQ的做法是,在做Mmap內(nèi)存映射的同時進行madvise系統(tǒng)調(diào)用,目的是使OS做一次內(nèi)存映射后對應的文件數(shù)據(jù)盡可能多的預加載至內(nèi)存中,從而達到內(nèi)存預熱的效果。
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
MappedByteBuffer VS WriterBuffer
MappedByteBuffer和WriterBuffer都是MappedFile對象的成員屬性,都是用來存放消息的,只有開啟了TransientStorePool,才會向writerBuffer直接內(nèi)存寫入消息,然后commit消息到FileChannle中,然后再flush到磁盤。否則就是存儲在NIO創(chuàng)建的MappedByteBuffer直接內(nèi)存中,然后刷新到磁盤。

刷盤是否開啟TransientStorePool的區(qū)別
不開啟TransientStorePool:
MappedByteBuffer是直接內(nèi)存,它暫時存儲了message消息,MappedFile.mapp()方法做好MappedByteBuffer對象直接內(nèi)存和落盤文件的映射關系,然后flush()方法執(zhí)行MappedByteBuffer.force():強制將ByteBuffer中的任何內(nèi)容的改變寫入到磁盤文件,讀寫都經(jīng)過Page Cache。
開啟TransientStorePool:
TransientStorePool會通過ByteBuffer.allocateDirect調(diào)用直接申請對外內(nèi)存writerBuffer,消息數(shù)據(jù)在寫入內(nèi)存的時候是寫入預申請的內(nèi)存中。MappedFile的writerBuffer為直接開辟的內(nèi)存,然后MappedFile的初始化操作,做好FileChannel和磁盤文件的映射,commit()方法實質是執(zhí)行fileChannel.write(writerBuffer),將writerBuffer的數(shù)據(jù)寫入到FileChannel映射的磁盤文件,flush操作執(zhí)行FileChannel.force():將映射文件中的數(shù)據(jù)強制刷新到磁盤。寫入的時候不經(jīng)過PageCache,因此在消息寫入操作上會更快,因此能更少的占用CommitLog.putMessageLock鎖,從而能夠提升消息處理量。使用TransientStorePool方案的缺陷主要在于在異常崩潰的情況下回丟失更多的消息。
TransientStorePool的作用
TransientStorePool 相當于在內(nèi)存層面做了讀寫分離,寫走內(nèi)存磁盤,讀走pagecache,同時最大程度消除了page cache的鎖競爭,降低了毛刺。它還使用了鎖機制,避免直接內(nèi)存被交換到swap分區(qū)。
日常FileChannel的寫操作會經(jīng)過Page Cache,但是TransientStorePool開辟了直接內(nèi)存WriterBuffer,WriterBuffer只負責寫入,也是通過FileChannel寫入磁盤,讀操作由單獨的MappedByteBuffer負責,這樣實現(xiàn)了讀寫分離。
參考:https://github.com/apache/rocketmq/issues/2466
FileChannel.force VS MappedByteBuffer.force區(qū)別
This method is only guaranteed to force changes that were made to this channel's file via the methods defined in this class. It may or may not force changes that were made by modifying the content of a{@link MappedByteBuffer <i>mapped byte buffer</i>} obtained by invoking the {@link #map map} method. Invoking the {@link MappedByteBuffer#force force} method of the mapped byte buffer will force changes made to the buffer's content to be written.

FileChannel和MappedByteBuffer都是NIO模塊的類,ByteBuffer直接內(nèi)存映射到磁盤文件通過FileChannel。
FileChannel.force()只會將FileChannel類中方法使FileChannel發(fā)生改變的內(nèi)容強制刷新到存儲設備文件中。
MappedByteBuffer.force()會將Map類中方法使ByteBuffer發(fā)生改變的內(nèi)容強制刷新到存儲設備文件中。
Mmap的寫入操作是:Mmap的MappedByteBuffer映射直接內(nèi)存,直接內(nèi)存映射文件,然后文件會對應Page Cache,也就是 MmapedByteBuffer的直接內(nèi)存可能是Page Cache的東西,然后通過寫Page Cache,然后再寫入磁盤。
FileChannle:是寫直接內(nèi)存,這個效率比較高,然后直接內(nèi)存滿了,在落盤的時候,再去經(jīng)過Page Cache,落入磁盤。WriterBuffer的寫入方式實際也就是FileChannel的寫入方式,Mmap在寫入4k一下的文件比較快,然后FileChannel寫入文件大于4k時,比Mmap方式的要快,可能是因為PageCache 是4k,然后寫著就可能去落盤了。而FileChannel 是寫滿了直接內(nèi)存,才去經(jīng)過PageCache,這樣寫入直接內(nèi)存的效率更高,然后再經(jīng)過Page Cache,當大于4k的時候,大于Page Cache的內(nèi)存的時候,就是FileChannel快了。大概因為FileChannel是基于Block(塊)的。
Mmap VS FileChannle參考https://juejin.cn/post/6844903842472001550