CommitLog文件講解
概述
?commitlog文件的存儲地址:$HOME\store\commitlog${fileName},每個文件的大小默認1G,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0也就是fileFromOffset值,當(dāng)這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648。消息存儲的時候會順序?qū)懭胛募?,?dāng)文件滿了,寫入下一個文件。
文件結(jié)構(gòu)
順序編號 |字段簡稱 |字段大小(字節(jié))| 字段含義
---|---|---|---|---
1| msgSize |4| 代表這個消息的大小
2 |MAGICCODE| 4 |MAGICCODE = daa320a7
3 |BODY CRC |4 |消息體BODY CRC 當(dāng)broker重啟recover時會校驗
4 |queueId |4 |消息隊列id
5 |flag| 4 |
6 |QUEUEOFFSET| 8 |這個值是個自增值不是真正的consume queue的偏移量,可以代表這個consumeQueue隊列或者tranStateTable隊列中消息的個數(shù),若是非事務(wù)消息或者commit事務(wù)消息,可以通過這個值查找到consumeQueue中數(shù)據(jù),QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事務(wù),則可以通過該值從tranStateTable中查找數(shù)據(jù)
7| PHYSICALOFFSET |8 |代表消息在commitLog中的物理起始地址偏移量
8 |SYSFLAG| 4 |指明消息是事物事物狀態(tài)等消息特征,二進制為四個字節(jié)從右往左數(shù):當(dāng)4個字節(jié)均為0(值為0)時表示非事務(wù)消息;當(dāng)?shù)?個字節(jié)為1(值為1)時表示表示消息是壓縮的(Compressed);當(dāng)?shù)?個字節(jié)為1(值為2)表示多消息(MultiTags);當(dāng)?shù)?個字節(jié)為1(值為4)時表示prepared消息;當(dāng)?shù)?個字節(jié)為1(值為8)時表示commit消息;當(dāng)?shù)?/4個字節(jié)均為1時(值為12)時表示rollback消息;當(dāng)?shù)?/4個字節(jié)均為0時表示非事務(wù)消息;
9 |BORNTIMESTAMP| 8 |消息產(chǎn)生端(producer)的時間戳
10| BORNHOST |8| 消息產(chǎn)生端(producer)地址(address:port)
11| STORETIMESTAMP |8 |消息在broker存儲時間
12| STOREHOSTADDRESS |8 |消息存儲到broker的地址(address:port)
13| RECONSUMETIMES| 8 |消息被某個訂閱組重新消費了幾次(訂閱組之間獨立計數(shù)),因為重試消息發(fā)送到了topic名字為%retry%groupName的隊列queueId=0的隊列中去了,成功消費一次記錄為0;
14| PreparedTransaction Offset| 8 |表示是prepared狀態(tài)的事物消息
15| messagebodyLength |4| 消息體大小值
16| messagebody| bodyLength| 消息體內(nèi)容
17| topicLength| 1 |topic名稱內(nèi)容大小
18| topic |topicLength |topic的內(nèi)容值
19| propertiesLength| 2 |屬性值大小
20| properties |propertiesLength |propertiesLength大小的屬性數(shù)據(jù)
?可以看到CommitLog文件的一個消息體的長度是不確定的,但是有字段messagebodyLength來表示的是消息體大小和propertiesLength表示屬性值的大小。所以可以計算出這個消息數(shù)據(jù)的大小。
CommitLog類分析
字段屬性分析
//用來驗證消息的合法性,類似于java的魔數(shù)的作用
public final static int MESSAGE_MAGIC_CODE = -626843481;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = -875286124;
//映射文件集合
private final MappedFileQueue mappedFileQueue;
//默認消息存儲類,CommitLog的所有操作都是通過DefaultMessageStore來進行的
private final DefaultMessageStore defaultMessageStore;
//刷盤的任務(wù)類
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
//在啟用了臨時存儲池的時候,定期把消息提交到FileChannel的任務(wù)類
private final FlushCommitLogService commitLogService;
//消息拼接的類
private final AppendMessageCallback appendMessageCallback;
//消息的編碼器,線程私有
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
//消息topic的偏移信息
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
//消息鎖
private final PutMessageLock putMessageLock;
| 字段 | 作用 |
|---|---|
| MESSAGE_MAGIC_CODE | 用來驗證消息的合法性,類似于java的魔數(shù)的作用 |
| BLANK_MAGIC_CODE | 消息不夠存儲的時候用這個來表示 |
| mappedFileQueue |
MappedFile集合,表示的是CommitLog映射文件的集合 |
| defaultMessageStore | 用于操作CommitLog類的對象 |
| flushCommitLogService | 定時刷盤的任務(wù)線程對象 |
| commitLogService | 在啟用了臨時存儲池的時候,定期把消息提交到FileChannel的任務(wù)類 |
| appendMessageCallback | 異步拼接消息體的回調(diào)對象 |
| batchEncoderThreadLocal | 用于對消息進行編碼 |
| topicQueueTable | 每個消息topic的偏移信息,因為RocketMQ的Topic都存在一個CommitLog文件中,所以需要記錄每個Topic的消費進度信息 |
| putMessageLock | 并發(fā)存儲消息時候的鎖 |
內(nèi)部類分析
?在CommitLog中有幾個內(nèi)部類,跟文件的刷盤有關(guān)比如FlushRealTimeService和別的類,以及跟消息編碼有關(guān)的MessageExtBatchEncoder,這里主要介紹跟消息提交和刷盤有關(guān)的幾個內(nèi)部類。后面的分析很多都是基于前面的兩篇文章的基礎(chǔ)上來進行分析的。
消息提交CommitRealTimeService
?CommitRealTimeService主要就是定時的把臨時存儲池中的消息commit到FileChannel中,便于下次flush刷盤操作。而這個類只有在開啟臨時存儲池的時候才會有用。
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//todo 獲取配置的 刷新commitLog頻次 默認200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
//todo 獲取配置的 提交數(shù)據(jù)頁數(shù) 默認4
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
//todo 獲取配置的 提交commitLog最大頻次 默認200ms
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
//對數(shù)據(jù)進行提交
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
//進入這里表面服務(wù)準(zhǔn)備停止,此時把還沒提交的進行提交,最多重試10次
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
消息異步刷盤FlushRealTimeService
?FlushRealTimeService的主要作用就是刷盤相關(guān)的,直接上代碼
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0;
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
//如果任務(wù)沒有停止,停止的時候會調(diào)用對應(yīng)的shutdown方法,把對應(yīng)的stop字段修改為true
while (!this.isStopped()) {
//todo 獲取是否定時刷新日志的設(shè)定 參數(shù)為 flushCommitLogTimed 默認為false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//todo 獲取刷新到磁盤的時間間隔 參數(shù)為 flushIntervalCommitLog 默認為500毫秒
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//todo 獲取一次刷新到磁盤的最少頁數(shù),參數(shù)為flushCommitLogLeastPages 默認為4
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//todo 獲取刷新CommitLog的頻率 參數(shù)為flushCommitLogThoroughInterval 默認為10000毫秒
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
//計算日志刷新進度信息
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
//如果定時刷新日志,則把線程sleep對應(yīng)的規(guī)定時間
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
//使用的是CountDownLatch等待對應(yīng)時間
this.waitForRunning(interval);
}
//打印進度
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
//進行文件的刷盤
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
//獲取文件的最后修改時間也就是最后的刷新時間
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
//todo 設(shè)置CheckPoint文件的physicMsgTimestamp 消息物理落盤時間
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
//todo 如果服務(wù)停止,那么把剩余的沒有刷新到磁盤的消息刷盤,重復(fù)次數(shù)為10次
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
}
?邏輯就是一直循環(huán)不斷把映射文件隊列中的消息進行刷盤。其中有幾個參數(shù)可以人為的配置。
| 參數(shù) | 作用 | 默認值 |
|---|---|---|
| flushCommitLogTimed | 是否定時刷新日志 | 默認為false |
| flushIntervalCommitLog | 刷新到磁盤的時間間隔 | 默認為500毫秒 |
| flushCommitLogLeastPages | 一次刷新到磁盤的最少頁數(shù) | 默認為4 |
| flushCommitLogThoroughInterval | 刷新CommitLog的頻率 | 默認為10000毫秒 |
消息同步刷盤GroupCommitService
?這個類內(nèi)部使用了CountDownLatch來進行一個任務(wù)調(diào)度。先看看入口方法
public synchronized void putRequest(final GroupCommitRequest request) {
//添加寫請求到集合中
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
//啟動提交線程
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
?可以看到這個方法是吧傳入的提交消息的請求,放到一個寫的隊列中。
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
//服務(wù)沒有停止則循環(huán)進行
while (!this.isStopped()) {
try {
//等待10毫秒后執(zhí)行,這個里面會調(diào)用onWaitEnd 方法
this.waitForRunning(10);
//執(zhí)行提交
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
//交換讀寫任務(wù),能進入這里說明應(yīng)用已經(jīng)準(zhǔn)備停止了,
this.swapRequests();
}
//
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
?在運行的時候會先等待10毫秒,而這10毫秒期間會調(diào)用,內(nèi)部的onWaitEnd方法進而調(diào)用swapRequests方法,吧讀寫請求進行交換。
protected void onWaitEnd() {
this.swapRequests();
}
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
?在讀寫請求交換完了之后就是doCommit方法了,這個方法就是吧請求的消息進行落盤
private void doCommit() {
synchronized (this.requestsRead) {
//如果讀任務(wù)不為空則迭代處理
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of two times the flush
// todo 可能存在一條消息存在下一個文件中,因此最多可能存在兩次刷盤
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
//如果文件刷盤的偏移量<請求的下一個偏移量,則說明還沒有刷新完,還需要繼續(xù)刷新
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
//刷新完畢 喚醒用戶線程
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//todo 刷新CheckPoint文件的physicMsgTimestamp 消息物理落盤時間
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
內(nèi)部類的啟動個關(guān)閉
?上面的這些內(nèi)部類,有的是根據(jù)構(gòu)造CommitLog類的時候進行初始化的。而對應(yīng)的啟動和停止在CommitLog中,而這些方法的調(diào)用又是在前面字段屬性介紹的DefaultMessageStore中進行調(diào)用的。
public void start() {
// 開啟刷盤線程
this.flushCommitLogService.start();
/**
* 如果使用的是臨時存儲池來保存消息,則啟動定期提交消息的線程,把存儲池的信息提交到fileChannel中
* 只有在開啟了使用臨時存儲池 && 刷盤為異步刷盤 && 是master節(jié)點 的情況才會為true
*/
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
public void shutdown() {
//關(guān)閉提交臨時存儲池的任務(wù)
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
}
//關(guān)閉刷盤線程
this.flushCommitLogService.shutdown();
}
方法分析
構(gòu)造方法
public CommitLog(final DefaultMessageStore defaultMessageStore) {
//創(chuàng)建MappedFileQueue對象,傳入的路徑是配置的CommitLog的文件路徑,和默認的文件大小1G,同時傳入提前創(chuàng)建MappedFile對象的AllocateMappedFileService
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
//刷盤的模式如果是 同步刷盤SYNC_FLUSH 則對應(yīng)的刷盤線程對象為GroupCommitService
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
//刷盤模式為 異步刷盤ASYNC_FLUSH 則對應(yīng)的刷盤線程對象為FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
//提交日志的線程任務(wù)對象 CommitRealTimeService
this.commitLogService = new CommitRealTimeService();
//拼接消息的回調(diào)對象
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
//定于對應(yīng)的消息編碼器,會設(shè)定消息的最大大小,默認是512k
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
//存儲消息的時候用自旋鎖還是互斥鎖(用的是JDK的ReentrantLock),默認的是自旋鎖(用的是JDK的原子類的AtomicBoolean)
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
?構(gòu)造函數(shù)主要是讀取對應(yīng)的配置信息,然后初始化對應(yīng)的類。其中需要注意的是同步刷盤和異步刷盤使用的對象類型是不一樣的。對應(yīng)的配置參數(shù)有這些
| 參數(shù) | 作用 | 默認值 |
|---|---|---|
storePathCommitLog |
指定CommitLog的存儲路徑 | ${user.home}/store/commitlog |
mapedFileSizeCommitLog |
指定CommitLog的文件大小 | 默認1G |
flushDiskType |
指定CommitLog的刷盤類型 | 默認是異步刷盤 |
maxMessageSize |
單個消息的最大大小 | 默認512k |
useReentrantLockWhenPutMessage |
存儲消息是否使用互斥鎖(jdk的ReentrantLock) |
默認是false ,使用自旋鎖 (JDK的原子類的AtomicBoolean) |
文件加載load
public boolean load() {
//加載映射文件集合
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
獲取消息getData
?這個方法會返回傳入的偏移量所在的消息的buffer
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
//獲取配置的CommitLog 的文件大小
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//按offset查詢映射文件,如果在偏移量為0的時候,會返回新創(chuàng)建的CommitLog文件映射對象,因為這是第一次插入
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
//位置=偏移量%文件大小
int pos = (int) (offset % mappedFileSize);
//獲取消息所在映射buffer
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
添加消息putMessage和putMessages
?putMessage和putMessages都是添加消息到CommitLog的方法,只不過一個是添加單個消息,一個是添加多個消息的。這里只講解添加單個消息的,添加多個消息的大家可以自行查看源碼。邏輯步驟如下:
- 設(shè)置消息對應(yīng)的存儲時間并對消息體編碼
- 獲取消息topic和queueId為后面使用
- 獲取消息的事務(wù)類型
- 如果不是事務(wù)消息,或者是事務(wù)消息的提交階段,則還原消息原來的topic和queueId
- 獲取存儲鎖
- 進行消息的存儲,如果期間文件滿了則再次存儲,出錯則拋錯
- 釋放鎖和映射文件,增加對應(yīng)的記錄信息
- 進行刷盤
- 進行高可用刷盤
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
//獲取當(dāng)前系統(tǒng)時間作為消息寫入時間
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting on the client)
//設(shè)置編碼后的消息體
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//從消息中獲取topic
String topic = msg.getTopic();
//從消息中獲取queueId
int queueId = msg.getQueueId();
//獲取事務(wù)類型(非事務(wù)性(第3/4字節(jié)為0),提交事務(wù)(commit,第4字節(jié)為1,第3字節(jié)為0)消息)
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//如果不是事務(wù)消息 或者 是事務(wù)消息的提交階段
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果設(shè)置了延遲時間
if (msg.getDelayTimeLevel() > 0) {
//延遲級別不能超過最大的延遲級別,超過也設(shè)置為最大的延遲級別
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//設(shè)置延遲消息的topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//延遲消息的queueId= 延遲級別-1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 備份真正的topic和queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
//獲取映射文件隊列的最后一個映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//自旋鎖或者互斥鎖
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
//開始鎖定時間
this.beginTimeInLock = beginLockTimestamp;
//設(shè)置消息的存儲時間
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
//映射文件不存在或者映射文件滿了則創(chuàng)建一個文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//映射文件中添加消息,這里的appendMessageCallback是消息拼接對象,拼接過程不分析
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
//映射文件滿了
case END_OF_FILE:
unlockMappedFile = mappedFile;
//創(chuàng)建一個文件來進行存儲
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
//重新添加消息=》
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
// 消息過大
case MESSAGE_SIZE_EXCEEDED:
//消息屬性過大
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
//釋放鎖
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
//消息存儲的多定時間過長
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
//解鎖映射文件
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics 單次存儲消息topic次數(shù)
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
//單次存儲消息topic大小
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//磁盤刷新=》
handleDiskFlush(result, putMessageResult, msg);
// 主從刷新=》
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
消息刷盤handleDiskFlush
?刷盤的邏輯稍微簡單,主要任務(wù)交給了前面講的兩個刷盤相關(guān)的內(nèi)部類了。
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷新
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//是否等待存儲
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//countdownLatch.await() 同步等待刷新結(jié)果,除非超時
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
//如果異步直接解除阻塞 countdownLatch.countDown()
service.wakeup();
}
}
// Asynchronous flush 異步刷新
else {
//沒有啟用臨時存儲池,則直接喚醒刷盤的任務(wù)
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
//如果使用臨時存儲池,需要先喚醒提交消息的任務(wù)
commitLogService.wakeup();
}
}
}
消息高可用刷盤handleHA
?高可用的消息刷盤,只有在設(shè)置了主從同步方式為同步方式的時候,才會有后續(xù)的邏輯。邏輯就是判斷主從之間的消息差偏移量是否在設(shè)置的范圍內(nèi),如果是的就可以對主庫進行刷盤。
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//如果設(shè)置的主從之間是同步更新
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// 檢查slave同步的位置是否小于 最大容忍的同步落后偏移量,如果是的則進行刷盤
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
//countDownLatch.await 同步等待刷新,除非等待超時
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
//設(shè)置從服務(wù)不可用的狀態(tài)
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
服務(wù)正常恢復(fù)recoverNormally
?recoverNormally方法在RocketMQ正常關(guān)閉然后啟動的時候會調(diào)用,這個方法就是把加載的映射文件列表進行遍歷,對文件進行校驗,和文件中的消息的魔數(shù)進行校驗,來判斷哪些數(shù)據(jù)是正常的,并計算出正常的數(shù)據(jù)的最大偏移量。然后,根據(jù)偏移量設(shè)置對應(yīng)的提交和刷新的位置以及不正常數(shù)據(jù)的刪除。
public void recoverNormally() {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//如果文件列表大于3就從倒數(shù)第3個開始,否則從第一個開始
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//校驗消息,然后返回轉(zhuǎn)發(fā)請求,根據(jù)Magic_code正確,并且crc32正確,并且消息的msgSize記錄大小和消息整體大小相等。則表示是合格的消息
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
//是一個合格的消息并且消息體大于0
if (dispatchRequest.isSuccess() && size > 0) {
//則讀取的偏移量mapedFileOffset累加msgSize
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
//是合格的消息,但是消息體為0,表示讀取到了文件的最后一塊信息
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
//文件讀完了
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
//最后讀取的MapedFile對象的fileFromOffset加上最后讀取的位置mapedFileOffset值
processOffset += mappedFileOffset;
//設(shè)置文件刷新到的offset
this.mappedFileQueue.setFlushedWhere(processOffset);
//設(shè)置文件提交到的offset
this.mappedFileQueue.setCommittedWhere(processOffset);
//刪除offset之后的臟數(shù)據(jù)文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
}
服務(wù)異?;謴?fù)recoverAbnormally
?異?;謴?fù)的邏輯比較復(fù)雜,會先檢查對應(yīng)的文件的最后的消息落盤時間。
- 開啟消息索引功能(默認開啟)并且使用安全的消息索引功能(默認不開啟)的情況下:日志的落盤時間要小于checkpoint的最小落盤時間
- 沒有開啟的時候:落盤時間需要小于checkpoint文件中物理隊列消息時間戳、邏輯隊列消息時間戳這兩個時間戳中最小值
如果檢查符合要求之后才能進行的校驗。這兩個參數(shù)分別是
| 參數(shù) | 描述 | 默認值 |
|---|---|---|
messageIndexEnable |
是否開啟的索引功能,開啟后會保存到Index文件中 | true |
messageIndexSafe |
是否開啟安全的消息索引功能 | false |
?這里說明Index文件是對應(yīng)的索引文件,后面會有文章分析的。
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file 從最后一個文件開始恢復(fù)
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//檢查文件是否符合恢復(fù)的條件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//校驗消息并返回消息的大小
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// 如果size大于0表示是正常的消息,
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
//如果消息在CommitLog中的物理起始偏移量 <
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
// 消息存儲轉(zhuǎn)發(fā)=》
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
// =》
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Intermediate file read error
else if (size == -1) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
// Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
//如果為0 表示文件的尾部不用處理,進入下一個文件
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
processOffset += mappedFileOffset;
// 設(shè)置刷新offset位置
this.mappedFileQueue.setFlushedWhere(processOffset);
// 設(shè)置commitOffset
this.mappedFileQueue.setCommittedWhere(processOffset);
// 刪除臟數(shù)據(jù)文件=》
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data 清除消息隊列冗余數(shù)據(jù)=》
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
// Commitlog case files are deleted
else {
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
// 銷毀消息隊列=》
this.defaultMessageStore.destroyLogics();
}
}
//
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//校驗文件的magic_code
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
//獲取消息存儲時間字段STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
//落盤時間需要大于0
if (0 == storeTimestamp) {
return false;
}
//開啟消息索引功能(默認開啟)并且使用安全的消息索引功能(默認不開啟)
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
//日志的落盤時間要小于checkpoint的最小落盤時間
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
//沒有開啟安全的消息索引功能,則落盤時間需要小于checkpoint文件中物理隊列消息時間戳、邏輯隊列消息時間戳這兩個時間戳中最小值
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}