MetaQ是阿里巴巴中間件團(tuán)隊(duì)開發(fā)的一款消息隊(duì)列中間件,說起MetaQ的命名呢,也是有點(diǎn)意思。MetaQ最早是基于Kafka的設(shè)計(jì)并使用Java進(jìn)行了完全重寫,而Kafka(卡夫卡)作家最著名的作品,大家都清楚,叫做《變形記》,英文名叫Metamorphosis。
MetaQ集群架構(gòu)

- NameServer集群:MetaQ基于NameServer,也是基于阿里內(nèi)部中間件Config Server??梢园阉斫鉃轭愃苲ookeeper的角色
- Broker:消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息
-
Consumer:消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。MetaQ提供兩種消費(fèi)模型
- Push Consumer :向Consumer對(duì)象注冊(cè)一個(gè)Listener接口,收到消息后回調(diào)Listener接口方法,采用長(zhǎng)輪詢實(shí)現(xiàn)push
- Pull Consumer:主動(dòng)由Consumer主動(dòng)拉取信息,同kafka
- Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息
消息結(jié)構(gòu)模型
- Message:?jiǎn)挝幌?/li>
- Topic:消息主題,軟分區(qū),對(duì)應(yīng)相同的topic時(shí),生產(chǎn)者對(duì)應(yīng)消費(fèi)者的分區(qū)標(biāo)識(shí)
- Tag:消息在topic基礎(chǔ)上的二級(jí)分類
- Message Queue:硬分區(qū),物理上區(qū)分topic,一個(gè)topic對(duì)應(yīng)多個(gè)message queue。在 MetaQ 中,所有消息隊(duì)列都是持久化,長(zhǎng)度無限的數(shù)據(jù)結(jié)構(gòu),所謂長(zhǎng)度無限是指隊(duì)列中的每個(gè)存儲(chǔ)單元都是定長(zhǎng),訪問其中的存儲(chǔ)單元使用 Offset 來訪問,offset 為 java long 類型,64 位,理論上在 100 年內(nèi)不會(huì)溢出,所以認(rèn)為是長(zhǎng)度無限,另外隊(duì)列中只保存最近幾天的數(shù)據(jù),之前的數(shù)據(jù)會(huì)按照過期時(shí)間來 刪除。
- Group:Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費(fèi)一類消息,且消費(fèi)邏輯一致;Producer Group,一類 Producer 的集合名稱,這類 Producer 通常發(fā)送一類消息,且發(fā)送邏輯一致。
- Offset:絕對(duì)偏移值,message queue中有兩類offset(commitOffset和offset),前者存儲(chǔ)在OffsetStore中表示消費(fèi)到的位置,后者是在PullRequest中為拉取消息位置。
Broker
Broker以組為單位向Consumer提供消息服務(wù),group中分為master和slave兩種角色。然后通過NameServer暴露給Consumer具體通信地址,采用message queue消息隊(duì)列結(jié)構(gòu)來提供消費(fèi)接口。針對(duì)某一topic情況下,message queue會(huì)根據(jù)queue id分布在不同的broker上,Consumer的消息消費(fèi)壓力則會(huì)分?jǐn)傇诓煌腂roker上的message queue,從而達(dá)到負(fù)載均衡的作用。
雖然每個(gè)topic下面有很多message queue,但是message queue本身并不存儲(chǔ)消息。真正的消息存儲(chǔ)會(huì)寫在CommitLog的文件,message queue只是存儲(chǔ)CommitLog中對(duì)應(yīng)的位置信息,方便通過message queue找到對(duì)應(yīng)存儲(chǔ)在CommitLog的消息。不同的topic,message queue都是寫到相同的CommitLog 文件,也就是說CommitLog完全的順序?qū)?,而順序讀寫是metaq高吞吐量的基礎(chǔ)。
Broker存儲(chǔ)結(jié)構(gòu)

- 重試隊(duì)列:%RETRY%+consumergroup,push consumer默認(rèn)訂閱用于消費(fèi)失敗后的重試消費(fèi)
- 死信隊(duì)列:多次(默認(rèn)16次)消費(fèi)失敗后進(jìn)入DLQ隊(duì)列,需要人工處理
- 定時(shí)隊(duì)列:用于定時(shí)和延時(shí)消息
- ConsumeQueue: 即message queue,根據(jù)topic和queueId區(qū)分的消息隊(duì)列,對(duì)MappedFileQueue進(jìn)行封裝
- CommitLog: Broker中順序存儲(chǔ)的消息結(jié)構(gòu),管理消息commit和flush,對(duì)MappedFileQueue進(jìn)行封裝
- MappedFileQueue: 對(duì)~/store/commitlog/中MappedFile封裝成文件隊(duì)列,進(jìn)行文件大小格式檢查,對(duì)mappedFile進(jìn)行管理。
- MappedFile: 實(shí)際broker數(shù)據(jù)文件映射成的類,即~/store/commitlog/中00000000000000000000、00000000001073741824等文件,每個(gè)文件默認(rèn)大小上限為1G。
消息寫入
CommitLog負(fù)責(zé)將Producer的消息寫入文件中

核心代碼如下
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
putMessageLock這里提供了兩種上鎖方式,一種是默認(rèn)的自旋鎖,使用compareAndSet實(shí)現(xiàn)(用于low-race condition);一種是可重入鎖,使用ReentrantLock實(shí)現(xiàn)
參考
- RocketMQ_原理簡(jiǎn)介