RocketMQ介紹
RocketMQ是阿里巴巴開源的一個(gè)消息中間件,在阿里內(nèi)部歷經(jīng)了雙十一等很多高并發(fā)場(chǎng)景的考驗(yàn),能夠處理億萬級(jí)別的消息。2016年開源后捐贈(zèng)給Apache,現(xiàn)在是Apache的一個(gè)頂級(jí)項(xiàng)目。
早期阿里使用ActiveMQ,但是,當(dāng)消息開始逐漸增多后,ActiveMQ的IO性能很快達(dá)到了瓶頸。于是,阿里開始關(guān)注Kafka。但是Kafka是針對(duì)日志收集場(chǎng)景設(shè)計(jì)的,他的高級(jí)功能并不是很貼合阿里的業(yè)務(wù)場(chǎng)景。尤其當(dāng)他的Topic過多時(shí),由于Partition文件也會(huì)過多,這就會(huì)加大文件索引的耗時(shí),會(huì)嚴(yán)重影響IO性能。于是阿里才決定自研中間件,最早叫做MetaQ,后來改名成為RocketMQ。最早他所希望解決的最大問題就是多Topic下的IO性能壓力。但是產(chǎn)品在阿里內(nèi)部的不斷改進(jìn),RocketMQ開始體現(xiàn)出一些不一樣的優(yōu)勢(shì)。
主流MQ對(duì)比
| 優(yōu)點(diǎn) | 缺點(diǎn) | 適用場(chǎng)景 | |
|---|---|---|---|
| Kafka | 吞吐量非常大,性能非常好,技術(shù)生態(tài)完整 | 功能比較單一 | 分布式日志收集,大數(shù)據(jù)采集 |
| RabbitMQ | 消息可靠性高,功能全面 | 吞吐量較低。消息積壓會(huì)影響性能。 erlang語言比較小眾 | 企業(yè)內(nèi)部系統(tǒng)調(diào)用 |
| RocketMQ | 高吞吐、高性能、高可用,高級(jí)功能非常全 | 技術(shù)生態(tài)相對(duì)沒有那么完整 | 幾乎全場(chǎng)景。尤其適合金融場(chǎng)景 |
其中RocketMQ,孵化自阿里巴巴。歷經(jīng)阿里多年雙十一的嚴(yán)格考驗(yàn),RocketMQ可以說是從全世界最嚴(yán)苛的高并發(fā)場(chǎng)景中摸爬滾打出來的過硬產(chǎn)品,也是少數(shù)幾個(gè)在金融場(chǎng)景比較適用的MQ產(chǎn)品。
從橫向?qū)Ρ葋砜矗琑ocketMQ與Kafka和RabbitMQ相比。RocketMQ的消息吞吐量雖然和Kafka相比還是稍有差距,但是卻比RabbitMQ高很多。
在阿里內(nèi)部,RocketMQ集群每天處理的請(qǐng)求數(shù)超過5萬億次,支持的核心應(yīng)用超過3000個(gè)。而RocketMQ最大的優(yōu)勢(shì)就是他天生就為金融互聯(lián)網(wǎng)而生。
他的消息可靠性相比Kafka也有了很大的提升,而消息吞吐量相比RabbitMQ也有很大的提升。
另外,RocketMQ的高級(jí)功能也越來越全面,廣播消費(fèi)、延遲隊(duì)列、死信隊(duì)列等等高級(jí)功能一應(yīng)俱全,甚至某些業(yè)務(wù)功能比如事務(wù)消息,已經(jīng)呈現(xiàn)出領(lǐng)先潮流的趨勢(shì)。
運(yùn)行架構(gòu)

RocketMQ的架構(gòu)主要分為三大部分,分別是NameServer、Broker和Client。
NameServer
NameServer是RocketMQ的名稱服務(wù),它主要負(fù)責(zé)集群的路由信息管理。NameServer是無狀態(tài)的,可以部署多個(gè)實(shí)例來實(shí)現(xiàn)高可用。
Broker
Broker是RocketMQ的核心組件,它主要負(fù)責(zé)消息的存儲(chǔ)和轉(zhuǎn)發(fā)。Broker可以分為主從兩種角色,主Broker負(fù)責(zé)處理客戶端的讀寫請(qǐng)求,從Broker負(fù)責(zé)同步主Broker的數(shù)據(jù),實(shí)現(xiàn)高可用。
Client
Client是RocketMQ的客戶端組件,它主要負(fù)責(zé)與Broker進(jìn)行通信,發(fā)送和接收消息。Client可以分為生產(chǎn)者和消費(fèi)者兩種角色,生產(chǎn)者負(fù)責(zé)發(fā)送消息,消費(fèi)者負(fù)責(zé)接收消息。
消息模型

RocketMQ的消息模型主要包括主題(Topic)、隊(duì)列(Queue)和消息(Message)。
生產(chǎn)者和消費(fèi)者都可以指定一個(gè)Topic發(fā)送消息或者拉取消息。而Topic是一個(gè)邏輯概念。
Topic中的消息會(huì)分布在后面多個(gè)MessageQueue當(dāng)中。這些MessageQueue會(huì)分布到一個(gè)或者多個(gè)broker中。
MessageQueue是物理概念,是消息存儲(chǔ)的基本單元。每個(gè)MessageQueue其實(shí)就是一個(gè)FIFO隊(duì)列數(shù)據(jù)結(jié)構(gòu),生產(chǎn)者發(fā)送的消息會(huì)被追加到MessageQueue的末尾,消費(fèi)者則從MessageQueue的頭部開始消費(fèi)消息。
MessageQueue類似kafka的Partition概念,一個(gè)Topic可以有多個(gè)MessageQueue,這樣就可以實(shí)現(xiàn)消息的并行處理,提高系統(tǒng)的吞吐量。
同一時(shí)間,只能有一個(gè)消費(fèi)者實(shí)例消費(fèi)同一個(gè)MessageQueue中的消息。
消息確認(rèn)機(jī)制
RocketMQ要支持互聯(lián)網(wǎng)金融場(chǎng)景,那么消息安全是必須優(yōu)先保障的。而消息安全有兩方面的要求,一方面是生產(chǎn)者要能確保將消息發(fā)送到Broker上。另一方面是消費(fèi)者要能確保從Broker上爭取獲取到消息。
1、生產(chǎn)端確認(rèn)消息正常發(fā)送到RocketMQ
消息生產(chǎn)者發(fā)送消息分為三種方式
- 同步發(fā)送,生產(chǎn)者發(fā)送消息后會(huì)阻塞等待Broker的響應(yīng),直到收到Broker的確認(rèn)消息或者超時(shí)
- 異步發(fā)送,生產(chǎn)者發(fā)送消息后不會(huì)阻塞等待Broker的響應(yīng),而是通過回調(diào)函數(shù)來處理Broker的確認(rèn)消息
- 單向發(fā)送,生產(chǎn)者發(fā)送消息后不會(huì)等待Broker的響應(yīng),也不會(huì)進(jìn)行任何確認(rèn)
采用消息確認(rèn)加多次重試的機(jī)制保證消息能夠可靠發(fā)送到Broker上。但并不保證消息一定能被消費(fèi)者消費(fèi)到,因?yàn)橄M(fèi)者可能會(huì)因?yàn)楦鞣N原因消費(fèi)失敗。
2、消費(fèi)端確認(rèn)消息正常消費(fèi)
我們之前分析生產(chǎn)者的可靠性問題,核心的解決思路就是通過確認(rèn)Broker端的狀態(tài)來保證生產(chǎn)者發(fā)送消息的可靠性。對(duì)于RocketMQ的消費(fèi)者來說,保證消息處理可靠性的思路也是類似的。只不過這次換成了Broker等待消費(fèi)者返回消息處理狀態(tài)。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
這個(gè)返回值是一個(gè)枚舉值,有兩個(gè)選項(xiàng) CONSUME_SUCCESS和RECONSUME_LATER。如果消費(fèi)者返回CONSUME_SUCCESS,那么消息自然就處理結(jié)束了。
但是如果消費(fèi)者沒有處理成功,返回的是RECONSUME_LATER,Broker就會(huì)過一段時(shí)間再發(fā)起消息重試。
如果消費(fèi)者一直處理失敗,那么消息就會(huì)一直被重試,直到達(dá)到最大重試次數(shù),默認(rèn)是16次,就會(huì)進(jìn)入死信隊(duì)列。
廣播消息
廣播模式和集群模式是RocketMQ的消費(fèi)者端處理消息最基本的兩種模式。
集群模式下,一個(gè)消息,只會(huì)被一個(gè)消費(fèi)者組中的多個(gè)消費(fèi)者實(shí)例 共同 處理一次。廣播模式下,一個(gè)消息,則會(huì)推送給所有消費(fèi)者實(shí)例處理,不再關(guān)心消費(fèi)者組。
默認(rèn)模式(也就是集群模式)下,Broker端會(huì)給每個(gè)ConsumerGroup維護(hù)一個(gè)統(tǒng)一的Offset,這個(gè)Offset可以保證一個(gè)消息,在同一個(gè)ConsumerGroup內(nèi)只會(huì)被消費(fèi)一次。
而廣播模式的實(shí)現(xiàn)方式,是將Offset轉(zhuǎn)移到消費(fèi)者端自行保管,這樣Broker端只管向所有消費(fèi)者推送消息,而不用負(fù)責(zé)維護(hù)消費(fèi)進(jìn)度。
順序消息機(jī)制
Rocketmq由于存在多個(gè)MessageQueue的設(shè)計(jì),所以天然是不保證消息順序的。因?yàn)橥粋€(gè)Topic下的消息會(huì)被分布到多個(gè)MessageQueue中,而每個(gè)MessageQueue是獨(dú)立的FIFO隊(duì)列,消費(fèi)者是并行消費(fèi)多個(gè)MessageQueue的消息,所以無法保證消息的全局順序。
要解決這個(gè)問題思路也很簡單,那就是將需要順序處理的消息全部發(fā)送到同一個(gè)MessageQueue中。這樣消費(fèi)者在消費(fèi)這個(gè)MessageQueue的消息時(shí),就能保證消息的順序性。
過濾消息
同一個(gè)Topic下有多種不同的消息,消費(fèi)者只希望關(guān)注某一類消息。
例如,某系統(tǒng)中給倉儲(chǔ)系統(tǒng)分配一個(gè)Topic,在Topic下,會(huì)傳遞過來入庫、出庫等不同的消息,倉儲(chǔ)系統(tǒng)的不同業(yè)務(wù)消費(fèi)者就需要過濾出自己感興趣的消息,進(jìn)行不同的業(yè)務(wù)操作。

包含兩種過濾方式,標(biāo)簽過濾和SQL過濾。
事務(wù)消息
事務(wù)消息是RocketMQ非常有特色的一個(gè)高級(jí)功能。他的基礎(chǔ)訴求是通過RocketMQ的事務(wù)機(jī)制,來保證上下游的數(shù)據(jù)一致性。
以電商為例,用戶支付訂單這一核心操作的同時(shí)會(huì)涉及到下游物流發(fā)貨、積分變更、購物車狀態(tài)清空等多個(gè)子系統(tǒng)的變更。這種場(chǎng)景,非常適合使用RocketMQ的解耦功能來進(jìn)行串聯(lián)。

考慮到事務(wù)的安全性,即要保證相關(guān)聯(lián)的這幾個(gè)業(yè)務(wù)一定是同時(shí)成功或者同時(shí)失敗的。如果要將四個(gè)服務(wù)一起作為一個(gè)分布式事務(wù)來控制,可以做到,但是會(huì)非常麻煩。
而使用RocketMQ在中間串聯(lián)了之后,事情可以得到一定程度的簡化。由于RocketMQ與消費(fèi)者端有失敗重試機(jī)制,所以,只要消息成功發(fā)送到RocketMQ了,那么可以認(rèn)為Branch2.1,Branch2.2,Branch2.3這幾個(gè)分支步驟,是可以保證最終的數(shù)據(jù)一致性的。
這樣,一個(gè)復(fù)雜的分布式事務(wù)問題,就變成了MinBranch1和Branch2兩個(gè)步驟的分布式事務(wù)問題。
然后,在此基礎(chǔ)上,RocketMQ提出了事務(wù)消息機(jī)制,采用兩階段提交的思路,保證Main Branch1和Branch2之間的事務(wù)一致性。

具體的實(shí)現(xiàn)思路是這樣的:

流程如下:
- 生產(chǎn)者將消息發(fā)送至Apache RocketMQ服務(wù)端。
- Apache RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息被標(biāo)記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息。
- 生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
- 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback),服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下:
- 二次確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者。
- 二次確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會(huì)將半事務(wù)消息投遞給消費(fèi)者。
- 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時(shí)間后,服務(wù)端將對(duì)消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。
- 生產(chǎn)者收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行處理。