RocketMQ源碼解析——存儲部分(3)CommitLog物理日志相關(guān)的`CommitLog`類

CommitLog文件講解

概述

?commitlog文件的存儲地址:$HOME\store\commitlog${fileName},每個文件的大小默認1G,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0也就是fileFromOffset值,當(dāng)這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648。消息存儲的時候會順序?qū)懭胛募?,?dāng)文件滿了,寫入下一個文件。

文件結(jié)構(gòu)

順序編號 |字段簡稱 |字段大小(字節(jié))| 字段含義
---|---|---|---|---
1| msgSize |4| 代表這個消息的大小
2 |MAGICCODE| 4 |MAGICCODE = daa320a7
3 |BODY CRC |4 |消息體BODY CRC 當(dāng)broker重啟recover時會校驗
4 |queueId |4 |消息隊列id
5 |flag| 4 |
6 |QUEUEOFFSET| 8 |這個值是個自增值不是真正的consume queue的偏移量,可以代表這個consumeQueue隊列或者tranStateTable隊列中消息的個數(shù),若是非事務(wù)消息或者commit事務(wù)消息,可以通過這個值查找到consumeQueue中數(shù)據(jù),QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事務(wù),則可以通過該值從tranStateTable中查找數(shù)據(jù)
7| PHYSICALOFFSET |8 |代表消息在commitLog中的物理起始地址偏移量
8 |SYSFLAG| 4 |指明消息是事物事物狀態(tài)等消息特征,二進制為四個字節(jié)從右往左數(shù):當(dāng)4個字節(jié)均為0(值為0)時表示非事務(wù)消息;當(dāng)?shù)?個字節(jié)為1(值為1)時表示表示消息是壓縮的(Compressed);當(dāng)?shù)?個字節(jié)為1(值為2)表示多消息(MultiTags);當(dāng)?shù)?個字節(jié)為1(值為4)時表示prepared消息;當(dāng)?shù)?個字節(jié)為1(值為8)時表示commit消息;當(dāng)?shù)?/4個字節(jié)均為1時(值為12)時表示rollback消息;當(dāng)?shù)?/4個字節(jié)均為0時表示非事務(wù)消息;
9 |BORNTIMESTAMP| 8 |消息產(chǎn)生端(producer)的時間戳
10| BORNHOST |8| 消息產(chǎn)生端(producer)地址(address:port)
11| STORETIMESTAMP |8 |消息在broker存儲時間
12| STOREHOSTADDRESS |8 |消息存儲到broker的地址(address:port)
13| RECONSUMETIMES| 8 |消息被某個訂閱組重新消費了幾次(訂閱組之間獨立計數(shù)),因為重試消息發(fā)送到了topic名字為%retry%groupName的隊列queueId=0的隊列中去了,成功消費一次記錄為0;
14| PreparedTransaction Offset| 8 |表示是prepared狀態(tài)的事物消息
15| messagebodyLength |4| 消息體大小值
16| messagebody| bodyLength| 消息體內(nèi)容
17| topicLength| 1 |topic名稱內(nèi)容大小
18| topic |topicLength |topic的內(nèi)容值
19| propertiesLength| 2 |屬性值大小
20| properties |propertiesLength |propertiesLength大小的屬性數(shù)據(jù)

?可以看到CommitLog文件的一個消息體的長度是不確定的,但是有字段messagebodyLength來表示的是消息體大小和propertiesLength表示屬性值的大小。所以可以計算出這個消息數(shù)據(jù)的大小。

CommitLog類分析

字段屬性分析

    //用來驗證消息的合法性,類似于java的魔數(shù)的作用
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    private final static int BLANK_MAGIC_CODE = -875286124;
    //映射文件集合
    private final MappedFileQueue mappedFileQueue;
    //默認消息存儲類,CommitLog的所有操作都是通過DefaultMessageStore來進行的
    private final DefaultMessageStore defaultMessageStore;
    //刷盤的任務(wù)類
    private final FlushCommitLogService flushCommitLogService;
    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    //在啟用了臨時存儲池的時候,定期把消息提交到FileChannel的任務(wù)類
    private final FlushCommitLogService commitLogService;
    //消息拼接的類
    private final AppendMessageCallback appendMessageCallback;
    //消息的編碼器,線程私有
    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
    //消息topic的偏移信息
    private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    private volatile long confirmOffset = -1L;
    private volatile long beginTimeInLock = 0;
    //消息鎖
    private final PutMessageLock putMessageLock;
字段 作用
MESSAGE_MAGIC_CODE 用來驗證消息的合法性,類似于java的魔數(shù)的作用
BLANK_MAGIC_CODE 消息不夠存儲的時候用這個來表示
mappedFileQueue MappedFile集合,表示的是CommitLog映射文件的集合
defaultMessageStore 用于操作CommitLog類的對象
flushCommitLogService 定時刷盤的任務(wù)線程對象
commitLogService 在啟用了臨時存儲池的時候,定期把消息提交到FileChannel的任務(wù)類
appendMessageCallback 異步拼接消息體的回調(diào)對象
batchEncoderThreadLocal 用于對消息進行編碼
topicQueueTable 每個消息topic的偏移信息,因為RocketMQ的Topic都存在一個CommitLog文件中,所以需要記錄每個Topic的消費進度信息
putMessageLock 并發(fā)存儲消息時候的鎖

內(nèi)部類分析

?在CommitLog中有幾個內(nèi)部類,跟文件的刷盤有關(guān)比如FlushRealTimeService和別的類,以及跟消息編碼有關(guān)的MessageExtBatchEncoder,這里主要介紹跟消息提交和刷盤有關(guān)的幾個內(nèi)部類。后面的分析很多都是基于前面的兩篇文章的基礎(chǔ)上來進行分析的。

消息提交CommitRealTimeService

?CommitRealTimeService主要就是定時的把臨時存儲池中的消息commit到FileChannel中,便于下次flush刷盤操作。而這個類只有在開啟臨時存儲池的時候才會有用。

public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                //todo  獲取配置的 刷新commitLog頻次   默認200ms
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
                //todo  獲取配置的 提交數(shù)據(jù)頁數(shù)  默認4
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
                //todo  獲取配置的 提交commitLog最大頻次  默認200ms
                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
                long begin = System.currentTimeMillis();
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }
                try {
                    //對數(shù)據(jù)進行提交
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {
                        this.lastCommitTimestamp = end; // result = false means some data committed.
                        //now wake up flush thread.
                        flushCommitLogService.wakeup();
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            //進入這里表面服務(wù)準(zhǔn)備停止,此時把還沒提交的進行提交,最多重試10次
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
消息異步刷盤FlushRealTimeService

?FlushRealTimeService的主要作用就是刷盤相關(guān)的,直接上代碼

class FlushRealTimeService extends FlushCommitLogService {
        private long lastFlushTimestamp = 0;
        private long printTimes = 0;

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            //如果任務(wù)沒有停止,停止的時候會調(diào)用對應(yīng)的shutdown方法,把對應(yīng)的stop字段修改為true
            while (!this.isStopped()) {
                //todo 獲取是否定時刷新日志的設(shè)定 參數(shù)為 flushCommitLogTimed  默認為false
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
                //todo 獲取刷新到磁盤的時間間隔 參數(shù)為 flushIntervalCommitLog 默認為500毫秒
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                //todo 獲取一次刷新到磁盤的最少頁數(shù),參數(shù)為flushCommitLogLeastPages 默認為4
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                //todo 獲取刷新CommitLog的頻率 參數(shù)為flushCommitLogThoroughInterval 默認為10000毫秒
                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                //計算日志刷新進度信息
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    //如果定時刷新日志,則把線程sleep對應(yīng)的規(guī)定時間
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        //使用的是CountDownLatch等待對應(yīng)時間
                        this.waitForRunning(interval);
                    }
                    //打印進度
                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    //進行文件的刷盤
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    //獲取文件的最后修改時間也就是最后的刷新時間
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    //todo 設(shè)置CheckPoint文件的physicMsgTimestamp  消息物理落盤時間
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // Normal shutdown, to ensure that all the flush before exit
            boolean result = false;
            //todo 如果服務(wù)停止,那么把剩余的沒有刷新到磁盤的消息刷盤,重復(fù)次數(shù)為10次
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();

            CommitLog.log.info(this.getServiceName() + " service end");
        }
}

?邏輯就是一直循環(huán)不斷把映射文件隊列中的消息進行刷盤。其中有幾個參數(shù)可以人為的配置。

參數(shù) 作用 默認值
flushCommitLogTimed 是否定時刷新日志 默認為false
flushIntervalCommitLog 刷新到磁盤的時間間隔 默認為500毫秒
flushCommitLogLeastPages 一次刷新到磁盤的最少頁數(shù) 默認為4
flushCommitLogThoroughInterval 刷新CommitLog的頻率 默認為10000毫秒
消息同步刷盤GroupCommitService

?這個類內(nèi)部使用了CountDownLatch來進行一個任務(wù)調(diào)度。先看看入口方法

public synchronized void putRequest(final GroupCommitRequest request) {
            //添加寫請求到集合中
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            //啟動提交線程
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

?可以看到這個方法是吧傳入的提交消息的請求,放到一個寫的隊列中。

public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            //服務(wù)沒有停止則循環(huán)進行
            while (!this.isStopped()) {
                try {
                    //等待10毫秒后執(zhí)行,這個里面會調(diào)用onWaitEnd 方法
                    this.waitForRunning(10);
                    //執(zhí)行提交
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                //交換讀寫任務(wù),能進入這里說明應(yīng)用已經(jīng)準(zhǔn)備停止了,
                this.swapRequests();
            }
            //
            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

?在運行的時候會先等待10毫秒,而這10毫秒期間會調(diào)用,內(nèi)部的onWaitEnd方法進而調(diào)用swapRequests方法,吧讀寫請求進行交換。

        protected void onWaitEnd() {
            this.swapRequests();
        }
        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

?在讀寫請求交換完了之后就是doCommit方法了,這個方法就是吧請求的消息進行落盤

      private void doCommit() {
            synchronized (this.requestsRead) {
                //如果讀任務(wù)不為空則迭代處理
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of  two times the flush
                        // todo 可能存在一條消息存在下一個文件中,因此最多可能存在兩次刷盤
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            //如果文件刷盤的偏移量<請求的下一個偏移量,則說明還沒有刷新完,還需要繼續(xù)刷新
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }
                        //刷新完畢 喚醒用戶線程
                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        //todo 刷新CheckPoint文件的physicMsgTimestamp  消息物理落盤時間
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }
內(nèi)部類的啟動個關(guān)閉

?上面的這些內(nèi)部類,有的是根據(jù)構(gòu)造CommitLog類的時候進行初始化的。而對應(yīng)的啟動和停止在CommitLog中,而這些方法的調(diào)用又是在前面字段屬性介紹的DefaultMessageStore中進行調(diào)用的。

   public void start() {
        // 開啟刷盤線程
        this.flushCommitLogService.start();
        /**
         *   如果使用的是臨時存儲池來保存消息,則啟動定期提交消息的線程,把存儲池的信息提交到fileChannel中
         *   只有在開啟了使用臨時存儲池 && 刷盤為異步刷盤 && 是master節(jié)點  的情況才會為true
         */
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }

    public void shutdown() {
        //關(guān)閉提交臨時存儲池的任務(wù)
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.shutdown();
        }
        //關(guān)閉刷盤線程
        this.flushCommitLogService.shutdown();
    }

方法分析

構(gòu)造方法
    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        //創(chuàng)建MappedFileQueue對象,傳入的路徑是配置的CommitLog的文件路徑,和默認的文件大小1G,同時傳入提前創(chuàng)建MappedFile對象的AllocateMappedFileService
        this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
            defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
        this.defaultMessageStore = defaultMessageStore;
        //刷盤的模式如果是 同步刷盤SYNC_FLUSH   則對應(yīng)的刷盤線程對象為GroupCommitService
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            //刷盤模式為 異步刷盤ASYNC_FLUSH 則對應(yīng)的刷盤線程對象為FlushRealTimeService
            this.flushCommitLogService = new FlushRealTimeService();
        }
        //提交日志的線程任務(wù)對象 CommitRealTimeService
        this.commitLogService = new CommitRealTimeService();
        //拼接消息的回調(diào)對象
        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        //定于對應(yīng)的消息編碼器,會設(shè)定消息的最大大小,默認是512k
        batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
            @Override
            protected MessageExtBatchEncoder initialValue() {
                return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            }
        };
        //存儲消息的時候用自旋鎖還是互斥鎖(用的是JDK的ReentrantLock),默認的是自旋鎖(用的是JDK的原子類的AtomicBoolean)
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    }

?構(gòu)造函數(shù)主要是讀取對應(yīng)的配置信息,然后初始化對應(yīng)的類。其中需要注意的是同步刷盤和異步刷盤使用的對象類型是不一樣的。對應(yīng)的配置參數(shù)有這些

參數(shù) 作用 默認值
storePathCommitLog 指定CommitLog的存儲路徑 ${user.home}/store/commitlog
mapedFileSizeCommitLog 指定CommitLog的文件大小 默認1G
flushDiskType 指定CommitLog的刷盤類型 默認是異步刷盤
maxMessageSize 單個消息的最大大小 默認512k
useReentrantLockWhenPutMessage 存儲消息是否使用互斥鎖(jdk的ReentrantLock 默認是false ,使用自旋鎖 (JDK的原子類的AtomicBoolean
文件加載load
    public boolean load() {
        //加載映射文件集合
        boolean result = this.mappedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }
獲取消息getData

?這個方法會返回傳入的偏移量所在的消息的buffer

public SelectMappedBufferResult getData(final long offset) {
        return this.getData(offset, offset == 0);
    }

    public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
        //獲取配置的CommitLog 的文件大小
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
       //按offset查詢映射文件,如果在偏移量為0的時候,會返回新創(chuàng)建的CommitLog文件映射對象,因為這是第一次插入
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
        if (mappedFile != null) {
            //位置=偏移量%文件大小
            int pos = (int) (offset % mappedFileSize);
            //獲取消息所在映射buffer
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
            return result;
        }
        return null;
    }
添加消息putMessageputMessages

?putMessageputMessages都是添加消息到CommitLog的方法,只不過一個是添加單個消息,一個是添加多個消息的。這里只講解添加單個消息的,添加多個消息的大家可以自行查看源碼。邏輯步驟如下:

  1. 設(shè)置消息對應(yīng)的存儲時間并對消息體編碼
  2. 獲取消息topic和queueId為后面使用
  3. 獲取消息的事務(wù)類型
  4. 如果不是事務(wù)消息,或者是事務(wù)消息的提交階段,則還原消息原來的topic和queueId
  5. 獲取存儲鎖
  6. 進行消息的存儲,如果期間文件滿了則再次存儲,出錯則拋錯
  7. 釋放鎖和映射文件,增加對應(yīng)的記錄信息
  8. 進行刷盤
  9. 進行高可用刷盤
   public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        //獲取當(dāng)前系統(tǒng)時間作為消息寫入時間
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting on the client)
        //設(shè)置編碼后的消息體
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        //從消息中獲取topic
        String topic = msg.getTopic();
        //從消息中獲取queueId
        int queueId = msg.getQueueId();

        //獲取事務(wù)類型(非事務(wù)性(第3/4字節(jié)為0),提交事務(wù)(commit,第4字節(jié)為1,第3字節(jié)為0)消息)
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //如果不是事務(wù)消息 或者 是事務(wù)消息的提交階段
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 如果設(shè)置了延遲時間
            if (msg.getDelayTimeLevel() > 0) {
                //延遲級別不能超過最大的延遲級別,超過也設(shè)置為最大的延遲級別
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                //設(shè)置延遲消息的topic
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //延遲消息的queueId= 延遲級別-1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId 備份真正的topic和queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        //獲取映射文件隊列的最后一個映射文件
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //自旋鎖或者互斥鎖
        putMessageLock.lock(); 
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            //開始鎖定時間
            this.beginTimeInLock = beginLockTimestamp;

            //設(shè)置消息的存儲時間
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                //映射文件不存在或者映射文件滿了則創(chuàng)建一個文件
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            //映射文件中添加消息,這里的appendMessageCallback是消息拼接對象,拼接過程不分析
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                //映射文件滿了
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    //創(chuàng)建一個文件來進行存儲
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    //重新添加消息=》
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                   // 消息過大
                case MESSAGE_SIZE_EXCEEDED:
                    //消息屬性過大
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }
            //釋放鎖
            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }
        //消息存儲的多定時間過長
        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            //解鎖映射文件
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics 單次存儲消息topic次數(shù)
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        //單次存儲消息topic大小
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        //磁盤刷新=》
        handleDiskFlush(result, putMessageResult, msg);
       // 主從刷新=》
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }
消息刷盤handleDiskFlush

?刷盤的邏輯稍微簡單,主要任務(wù)交給了前面講的兩個刷盤相關(guān)的內(nèi)部類了。

   public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush 同步刷新
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            //是否等待存儲
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                //countdownLatch.await() 同步等待刷新結(jié)果,除非超時
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                //如果異步直接解除阻塞 countdownLatch.countDown()
                service.wakeup();
            }
        }
        // Asynchronous flush 異步刷新
        else {
            //沒有啟用臨時存儲池,則直接喚醒刷盤的任務(wù)
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                //如果使用臨時存儲池,需要先喚醒提交消息的任務(wù)
                commitLogService.wakeup();
            }
        }
    }
消息高可用刷盤handleHA

?高可用的消息刷盤,只有在設(shè)置了主從同步方式為同步方式的時候,才會有后續(xù)的邏輯。邏輯就是判斷主從之間的消息差偏移量是否在設(shè)置的范圍內(nèi),如果是的就可以對主庫進行刷盤。

   public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        //如果設(shè)置的主從之間是同步更新
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // 檢查slave同步的位置是否小于 最大容忍的同步落后偏移量,如果是的則進行刷盤
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    //countDownLatch.await 同步等待刷新,除非等待超時
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    //設(shè)置從服務(wù)不可用的狀態(tài)
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
    }
服務(wù)正常恢復(fù)recoverNormally

?recoverNormally方法在RocketMQ正常關(guān)閉然后啟動的時候會調(diào)用,這個方法就是把加載的映射文件列表進行遍歷,對文件進行校驗,和文件中的消息的魔數(shù)進行校驗,來判斷哪些數(shù)據(jù)是正常的,并計算出正常的數(shù)據(jù)的最大偏移量。然后,根據(jù)偏移量設(shè)置對應(yīng)的提交和刷新的位置以及不正常數(shù)據(jù)的刪除。

   public void recoverNormally() {
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Began to recover from the last third file
            //如果文件列表大于3就從倒數(shù)第3個開始,否則從第一個開始
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;

            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
               //校驗消息,然后返回轉(zhuǎn)發(fā)請求,根據(jù)Magic_code正確,并且crc32正確,并且消息的msgSize記錄大小和消息整體大小相等。則表示是合格的消息
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();
                // Normal data
                //是一個合格的消息并且消息體大于0
                if (dispatchRequest.isSuccess() && size > 0) {
                    //則讀取的偏移量mapedFileOffset累加msgSize
                    mappedFileOffset += size;
                }
                // Come the end of the file, switch to the next file Since the  return 0 representatives met last hole,  this can not be included in truncate offset
                //是合格的消息,但是消息體為0,表示讀取到了文件的最后一塊信息
                else if (dispatchRequest.isSuccess() && size == 0) {
                    index++;
                    //文件讀完了
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                else if (!dispatchRequest.isSuccess()) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }
            //最后讀取的MapedFile對象的fileFromOffset加上最后讀取的位置mapedFileOffset值
            processOffset += mappedFileOffset;
            //設(shè)置文件刷新到的offset
            this.mappedFileQueue.setFlushedWhere(processOffset);
            //設(shè)置文件提交到的offset
            this.mappedFileQueue.setCommittedWhere(processOffset);
            //刪除offset之后的臟數(shù)據(jù)文件
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
        }
    }
服務(wù)異?;謴?fù)recoverAbnormally

?異?;謴?fù)的邏輯比較復(fù)雜,會先檢查對應(yīng)的文件的最后的消息落盤時間。

  • 開啟消息索引功能(默認開啟)并且使用安全的消息索引功能(默認不開啟)的情況下:日志的落盤時間要小于checkpoint的最小落盤時間
  • 沒有開啟的時候:落盤時間需要小于checkpoint文件中物理隊列消息時間戳、邏輯隊列消息時間戳這兩個時間戳中最小值

如果檢查符合要求之后才能進行的校驗。這兩個參數(shù)分別是

參數(shù) 描述 默認值
messageIndexEnable 是否開啟的索引功能,開啟后會保存到Index文件中 true
messageIndexSafe 是否開啟安全的消息索引功能 false

?這里說明Index文件是對應(yīng)的索引文件,后面會有文章分析的。

    public void recoverAbnormally() {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file 從最后一個文件開始恢復(fù)
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                //檢查文件是否符合恢復(fù)的條件
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this mapped file " + mappedFile.getFileName());
                    break;
                }
            }

            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }

            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                //校驗消息并返回消息的大小
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();

                // 如果size大于0表示是正常的消息,
                if (size > 0) {
                    mappedFileOffset += size;
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        //如果消息在CommitLog中的物理起始偏移量 < 
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                           // 消息存儲轉(zhuǎn)發(fā)=》
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
//                        =》
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                // Intermediate file read error
                else if (size == -1) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
                // Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
                //如果為0 表示文件的尾部不用處理,進入下一個文件
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            }

            processOffset += mappedFileOffset;
//            設(shè)置刷新offset位置
            this.mappedFileQueue.setFlushedWhere(processOffset);
//            設(shè)置commitOffset
            this.mappedFileQueue.setCommittedWhere(processOffset);
//            刪除臟數(shù)據(jù)文件=》
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data 清除消息隊列冗余數(shù)據(jù)=》
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
        // Commitlog case files are deleted
        else {
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
//            銷毀消息隊列=》
            this.defaultMessageStore.destroyLogics();
        }
    }
//
    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        //校驗文件的magic_code
        int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
        if (magicCode != MESSAGE_MAGIC_CODE) {
            return false;
        }

        //獲取消息存儲時間字段STORETIMESTAMP
        long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
        //落盤時間需要大于0
        if (0 == storeTimestamp) {
            return false;
        }
        //開啟消息索引功能(默認開啟)并且使用安全的消息索引功能(默認不開啟)
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
            && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
            //日志的落盤時間要小于checkpoint的最小落盤時間
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
                log.info("find check timestamp, {} {}",
                    storeTimestamp,
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        } else {
            //沒有開啟安全的消息索引功能,則落盤時間需要小于checkpoint文件中物理隊列消息時間戳、邏輯隊列消息時間戳這兩個時間戳中最小值
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
                log.info("find check timestamp, {} {}",
                    storeTimestamp,
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        }

        return false;
    }

下一篇存儲部分(4)ConsumeQueue文件存儲加載刷新的ConsumeQueue

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容