MetaQ原理簡(jiǎn)介(一)

MetaQ是阿里巴巴中間件團(tuán)隊(duì)開發(fā)的一款消息隊(duì)列中間件,說起MetaQ的命名呢,也是有點(diǎn)意思。MetaQ最早是基于Kafka的設(shè)計(jì)并使用Java進(jìn)行了完全重寫,而Kafka(卡夫卡)作家最著名的作品,大家都清楚,叫做《變形記》,英文名叫Metamorphosis。

MetaQ集群架構(gòu)


MetaQ集群架構(gòu)
  • NameServer集群:MetaQ基于NameServer,也是基于阿里內(nèi)部中間件Config Server??梢园阉斫鉃轭愃苲ookeeper的角色
  • Broker:消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息
  • Consumer:消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。MetaQ提供兩種消費(fèi)模型
    • Push Consumer :向Consumer對(duì)象注冊(cè)一個(gè)Listener接口,收到消息后回調(diào)Listener接口方法,采用長(zhǎng)輪詢實(shí)現(xiàn)push
    • Pull Consumer:主動(dòng)由Consumer主動(dòng)拉取信息,同kafka
  • Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息

消息結(jié)構(gòu)模型

  • Message:?jiǎn)挝幌?/li>
  • Topic:消息主題,軟分區(qū),對(duì)應(yīng)相同的topic時(shí),生產(chǎn)者對(duì)應(yīng)消費(fèi)者的分區(qū)標(biāo)識(shí)
  • Tag:消息在topic基礎(chǔ)上的二級(jí)分類
  • Message Queue:硬分區(qū),物理上區(qū)分topic,一個(gè)topic對(duì)應(yīng)多個(gè)message queue。在 MetaQ 中,所有消息隊(duì)列都是持久化,長(zhǎng)度無限的數(shù)據(jù)結(jié)構(gòu),所謂長(zhǎng)度無限是指隊(duì)列中的每個(gè)存儲(chǔ)單元都是定長(zhǎng),訪問其中的存儲(chǔ)單元使用 Offset 來訪問,offset 為 java long 類型,64 位,理論上在 100 年內(nèi)不會(huì)溢出,所以認(rèn)為是長(zhǎng)度無限,另外隊(duì)列中只保存最近幾天的數(shù)據(jù),之前的數(shù)據(jù)會(huì)按照過期時(shí)間來 刪除。
  • Group:Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費(fèi)一類消息,且消費(fèi)邏輯一致;Producer Group,一類 Producer 的集合名稱,這類 Producer 通常發(fā)送一類消息,且發(fā)送邏輯一致。
  • Offset:絕對(duì)偏移值,message queue中有兩類offset(commitOffset和offset),前者存儲(chǔ)在OffsetStore中表示消費(fèi)到的位置,后者是在PullRequest中為拉取消息位置。

Broker


Broker以組為單位向Consumer提供消息服務(wù),group中分為masterslave兩種角色。然后通過NameServer暴露給Consumer具體通信地址,采用message queue消息隊(duì)列結(jié)構(gòu)來提供消費(fèi)接口。針對(duì)某一topic情況下,message queue會(huì)根據(jù)queue id分布在不同的broker上,Consumer的消息消費(fèi)壓力則會(huì)分?jǐn)傇诓煌腂roker上的message queue,從而達(dá)到負(fù)載均衡的作用。
雖然每個(gè)topic下面有很多message queue,但是message queue本身并不存儲(chǔ)消息。真正的消息存儲(chǔ)會(huì)寫在CommitLog的文件,message queue只是存儲(chǔ)CommitLog中對(duì)應(yīng)的位置信息,方便通過message queue找到對(duì)應(yīng)存儲(chǔ)在CommitLog的消息。不同的topic,message queue都是寫到相同的CommitLog 文件,也就是說CommitLog完全的順序?qū)?,而順序讀寫是metaq高吞吐量的基礎(chǔ)。

Broker存儲(chǔ)結(jié)構(gòu)

Broker存儲(chǔ)結(jié)構(gòu)
  • 重試隊(duì)列:%RETRY%+consumergroup,push consumer默認(rèn)訂閱用于消費(fèi)失敗后的重試消費(fèi)
  • 死信隊(duì)列:多次(默認(rèn)16次)消費(fèi)失敗后進(jìn)入DLQ隊(duì)列,需要人工處理
  • 定時(shí)隊(duì)列:用于定時(shí)和延時(shí)消息
  • ConsumeQueue: 即message queue,根據(jù)topic和queueId區(qū)分的消息隊(duì)列,對(duì)MappedFileQueue進(jìn)行封裝
  • CommitLog: Broker中順序存儲(chǔ)的消息結(jié)構(gòu),管理消息commit和flush,對(duì)MappedFileQueue進(jìn)行封裝
  • MappedFileQueue: 對(duì)~/store/commitlog/中MappedFile封裝成文件隊(duì)列,進(jìn)行文件大小格式檢查,對(duì)mappedFile進(jìn)行管理。
  • MappedFile: 實(shí)際broker數(shù)據(jù)文件映射成的類,即~/store/commitlog/中00000000000000000000、00000000001073741824等文件,每個(gè)文件默認(rèn)大小上限為1G。

消息寫入

CommitLog負(fù)責(zé)將Producer的消息寫入文件中


消息寫入

核心代碼如下

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

putMessageLock這里提供了兩種上鎖方式,一種是默認(rèn)的自旋鎖,使用compareAndSet實(shí)現(xiàn)(用于low-race condition);一種是可重入鎖,使用ReentrantLock實(shí)現(xiàn)

參考


  • RocketMQ_原理簡(jiǎn)介
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容