序言
本文是RocketMQ學(xué)習(xí)及使用過(guò)程中整理的筆記,主要是個(gè)人覺(jué)得很關(guān)鍵或者是需要加深理解部分知識(shí)的紀(jì)錄,主要涉及RocketMQ基本概念、部署結(jié)構(gòu)、消息相關(guān)知識(shí)等,可以做入門資料閱讀
消息隊(duì)列應(yīng)用場(chǎng)景
- 異步處理
- 將不是必須的業(yè)務(wù)邏輯進(jìn)行異步處理
- 應(yīng)用解耦
- 應(yīng)用之間的交互采用消息隊(duì)列可以很好的減少彼此的依賴,一個(gè)應(yīng)用崩潰不會(huì)影響整個(gè)服務(wù)
- 流量削峰
- 比較常見(jiàn)于秒殺和團(tuán)購(gòu)
- 直接丟棄或者延后處理
- 消息通訊
- 應(yīng)用很單純,就是用作消息通訊,也是上邊所有的場(chǎng)景的基礎(chǔ)
基本概念
- Topic
- 主題,一級(jí)消息類型,可以配合Tag使用做細(xì)致區(qū)分,不同類型的消息設(shè)置不同Topic
- 建議:一個(gè)應(yīng)用盡可能使用一個(gè)Topic,功能以Tag區(qū)分
- Tag
- 消息標(biāo)簽,二級(jí)消息類型,用于進(jìn)一步區(qū)分某個(gè)Topic下的消息分類
- Queue
- 消息隊(duì)列
- 一個(gè)Topic下可以有多個(gè)queue
- Producer
- 生產(chǎn)者,發(fā)送消息
- Consumer
- 消費(fèi)者
- 一個(gè)消息可以對(duì)應(yīng)多個(gè)消費(fèi)者
- Group
- Consumer Group:消費(fèi)者分組,為了實(shí)現(xiàn)集群消費(fèi),不同Consumer Group之間消費(fèi)進(jìn)度彼此不受影響,一個(gè)Consumer Group下包含多個(gè)Consumer實(shí)例
- Producer Group:生產(chǎn)者分組,標(biāo)識(shí)發(fā)送同一類消息的Producer,通常發(fā)送邏輯一致,一個(gè)Producer Group可以發(fā)送多個(gè)Topic消息
- Broker
- 服務(wù)器
物理部署結(jié)構(gòu)

圖片來(lái)自網(wǎng)絡(luò)
- NameServer幾乎無(wú)狀態(tài)的節(jié)點(diǎn),節(jié)點(diǎn)之間無(wú)任何信息同步
- Broker為Master/Slave模式,一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,一個(gè)Slave只能對(duì)應(yīng)一個(gè)Master,對(duì)應(yīng)關(guān)系:BrokerName相同,BrokerId不同,0表示Master,非0表示Slave
- Producer與NameServer中的隨機(jī)一個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,定期從Name Server獲取Topic路由信息,并向提供Topic服務(wù)的Master建立長(zhǎng)連接
- Consumer跟提供Topic服務(wù)的Master、Slave建立長(zhǎng)連接。Consumer可以從Master訂閱也可以從Slave訂閱,取決于Broker的配置
- Producer與Consumer都會(huì)定時(shí)發(fā)送心跳
邏輯部署結(jié)構(gòu)

圖片來(lái)自網(wǎng)絡(luò)
消費(fèi)類型
-
集群消費(fèi):
- 一個(gè)Group里的Consumer平均消費(fèi)Topic下的Queue
- Consumer Group里的Consumer數(shù)目最好和Topic的queue數(shù)目一致或者成倍數(shù)關(guān)系
-
廣播消費(fèi):
- 忽略Consumer Group,消費(fèi)者只要訂閱了Topic,那么就會(huì)收到該Topic下的所有queue
- 可以在實(shí)例化Consumer的時(shí)候指定消費(fèi)類型
offset
紀(jì)錄消費(fèi)位置
順序消息
-
相關(guān)概念
- 順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。包括順序發(fā)布和順序消費(fèi)
-
注意事項(xiàng)
- 順序消息暫不支持廣播模式
事務(wù)消息
-
相關(guān)概念
- 事務(wù)消息:通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致
- 半消息:暫時(shí)還不能投遞的消息,發(fā)送方成功將消息發(fā)送到了RocketMQ的服務(wù)器端,但是服務(wù)端沒(méi)有收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)消息被標(biāo)記為“暫不能投遞”狀態(tài)
- 消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,MQ 服務(wù)端通過(guò)掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢問(wèn)該消息的最終狀態(tài)(Commit 或是 Rollback),該過(guò)程即消息回查。
-
事務(wù)消息圖解
圖片來(lái)自網(wǎng)絡(luò)1). 可以分為三個(gè)階段:第一階段發(fā)送Prepare消息并且能拿到消息的地址,第二階段執(zhí)行本地事務(wù),第三階段通過(guò)第一階段拿到的地址去訪問(wèn)消息并修改消息的狀態(tài)
2). 事務(wù)消息主要在于消息投遞給消費(fèi)者之前,消息投遞給消費(fèi)者之后,如果消費(fèi)者消費(fèi)失敗,則不斷重試,如果最終還是失敗,則只能人工介入。
3). 回查是在二次確認(rèn)未到達(dá)MQ Server的情況下啟用
-
注意事項(xiàng)
- 事務(wù)消息的 Producer ID 不能與其他類型消息的 Producer ID 共用。
- 通過(guò) ONSFactory.createTransactionProducer 創(chuàng)建事務(wù)消息的 Producer 時(shí)必須指定 LocalTransactionChecker 的實(shí)現(xiàn)類,處理異常情況下事務(wù)消息的回查。
- 事務(wù)消息發(fā)送完成本地事務(wù)后,可在 execute 方法中返回如下三種狀態(tài):
- TransactionStatus.CommitTransaction 提交事務(wù),允許訂閱方消費(fèi)該消息。
- TransactionStatus.RollbackTransaction 回滾事務(wù),消息將被丟棄不允許消費(fèi)。
- TransactionStatus.Unknow 暫時(shí)無(wú)法判斷狀態(tài),期待固定時(shí)間以后 MQ Server 向發(fā)送方進(jìn)行消息回查。
消息查詢手段
-
按Message Id查詢
- Message Id消息唯一標(biāo)識(shí),系統(tǒng)自動(dòng)生成
-
按Message Key查詢
- 用戶指定,最好是能確保其唯一性
消息過(guò)濾
-
broker端消息過(guò)濾
- 減少無(wú)用消息傳輸,減少了網(wǎng)絡(luò)開(kāi)銷
- 增加了broker的負(fù)擔(dān),實(shí)現(xiàn)起來(lái)相對(duì)復(fù)雜
-
Consumer端消息過(guò)濾
- 過(guò)濾邏輯完全自定義實(shí)現(xiàn)
- 缺點(diǎn)是有無(wú)用消息傳輸,增大了網(wǎng)絡(luò)開(kāi)銷
開(kāi)發(fā)--零散知識(shí)點(diǎn)
引入
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0-incubating</version>
</dependency>
用法
- producer.setRetryTimesWhenSendFailed(5); //消息發(fā)送失敗重試次數(shù)
-
consumer.setConsumeFromWhere(ConsumeFromWhere.x)- CONSUME_FROM_FIRST_OFFSET:第一次啟動(dòng)從隊(duì)列的最前位置開(kāi)始消費(fèi),后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開(kāi)始消費(fèi)
- CONSUME_FROM_LAST_OFFSET:第一次啟動(dòng)從隊(duì)列的最后位置開(kāi)始消費(fèi),后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開(kāi)始消費(fèi)
- CONSUME_FROM_TIMESTAMP:第一次啟動(dòng)從指定時(shí)間點(diǎn)開(kāi)始消費(fèi),后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開(kāi)始消費(fèi),
