情緒要么順?lè)?,要么支配你,這要看誰(shuí)說(shuō)了算。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ——吉米.羅恩
大綱


玩轉(zhuǎn)各種消息
1.普通消息
????整體流程如下
? ? >導(dǎo)入MQ客戶(hù)端依賴(lài)
? ??????<dependency>
????????????<groupId>org.apache.rocketmq</groupId>
????????????<artifactId>rocketmq-client</artifactId>
????????????<version>4.8.0</version>
????????</dependency>
? ? >消息發(fā)送者步驟
? ? ????1.創(chuàng)建消息生產(chǎn)者producer,并指定生產(chǎn)者組名
? ? ????2.指定NameServer地址
? ? ????3.啟動(dòng)producer
? ? ????4.創(chuàng)建消息對(duì)象,指定Topic、Tag和消息體
? ? ????5.發(fā)送消息
? ? ????6.關(guān)閉生產(chǎn)者producer
? ? >消息消費(fèi)者步驟
? ? ? ? 1.創(chuàng)建消費(fèi)者Consumer,指定消費(fèi)者組名
? ? ? ? 2.指定NameServer地址
? ? ? ? 3.訂閱主題Topic和Tag
? ? ? ? 4.設(shè)置回調(diào)函數(shù),處理消息
? ? ? ? 5.啟動(dòng)消費(fèi)者consumer
(1)消息發(fā)送
發(fā)送同步消息

????這種可靠性同步地發(fā)送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
????代碼演示


????同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,同步等待,直到收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)請(qǐng)求。
????>Message ID
? ? 消息的全局唯一標(biāo)識(shí)(內(nèi)部機(jī)制的ID生成是使用機(jī)器IP和消息偏移量的組成,所以有可能重復(fù),如果是冪等性還是最好考慮Key),由消息隊(duì)列MQ系統(tǒng)自動(dòng)生成,唯一標(biāo)識(shí)某條信息。
? ? >SendStatus
? ? 發(fā)送的標(biāo)識(shí)。成功,失敗等。
? ? >Queue
? ? 相當(dāng)于是Topic的分區(qū):用于并行發(fā)送和接收消息。
發(fā)送異步消息

? ? 異常消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長(zhǎng)時(shí)間地等待Broker的響應(yīng)。
? ? 代碼演示

消息發(fā)送方在發(fā)送了一條消息后,不等接收方發(fā)回響應(yīng),接著進(jìn)行第二條消息發(fā)送。發(fā)送方通過(guò)回調(diào)接口的方式接收服務(wù)器響應(yīng),并對(duì)響應(yīng)結(jié)果進(jìn)行處理。
單向發(fā)送

? ? 這種方式主要用在不特別關(guān)心發(fā)送結(jié)果的場(chǎng)景,例如:日志發(fā)送。
? ? 代碼演示

? ? 單向(Oneway)發(fā)送特點(diǎn)為發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā),即只發(fā)送請(qǐng)求不等待應(yīng)答。此方式發(fā)送消息的過(guò)程耗時(shí)非常短,一般在微秒級(jí)別。
消息發(fā)送的權(quán)衡

(2)消息消費(fèi)
集群消費(fèi)

? ? 消費(fèi)者的一種消費(fèi)模式。一個(gè)Consumer Group中的各個(gè)Consumer實(shí)例分?jǐn)側(cè)ハM(fèi)消息,即一條消息只會(huì)投遞到一個(gè)Consumer Group下面的一個(gè)實(shí)例。
? ? 實(shí)際上,每個(gè)Consumer是平均分?jǐn)侻essage Queue的去做拉取消息。例如:某個(gè)Topic有3條Q,其中一個(gè)Consumer Group有3個(gè)實(shí)例(可能是3個(gè)進(jìn)程,或者3臺(tái)機(jī)器),那么每個(gè)實(shí)例只消費(fèi)其中的1條Q。
? ? 而由Producer發(fā)送消息的時(shí)候是輪詢(xún)所有的Q,所以消息會(huì)平均散落在不同的Q上,可以認(rèn)為Q上的消息是平均的。那么實(shí)例也就平均地消費(fèi)消息了。
? ? 這種模式下,消費(fèi)進(jìn)度(Consumer Offset)的存儲(chǔ)會(huì)持久化到Broker。
? ? 代碼演示

廣播消費(fèi)

? ? 消費(fèi)者的一種消費(fèi)模式。消息將對(duì)一個(gè)Consumer Group下的各個(gè)Consumer實(shí)例都投遞一遍。即使這些Consumer屬于同一個(gè)Consumer Group,消息也會(huì)被Consumer Group中的每個(gè)Consumer都消費(fèi)一次。
? ? 實(shí)際上,是一個(gè)消費(fèi)組下的每個(gè)消費(fèi)者實(shí)例都獲取到了topic下面的每個(gè)Message Queue去拉取消費(fèi)。所以消息會(huì)投遞到每個(gè)消費(fèi)者實(shí)例。
? ? 這種模式下,消費(fèi)進(jìn)度(Consumer Offset)會(huì)存儲(chǔ)持久化到實(shí)例本地。
? ? 代碼演示

消息消費(fèi)時(shí)的權(quán)衡
? ? 集群模式:適用場(chǎng)景&注意事項(xiàng)
? ? 消費(fèi)端集群化部署,每條消息 只需要被處理一次。
? ? 由于消費(fèi)進(jìn)度在服務(wù)端維護(hù),可靠性更高。
? ? 集群消費(fèi)模式下,每一條消息都只會(huì)被分發(fā)到一臺(tái)機(jī)器上處理。如果需要被集群下的每一臺(tái)機(jī)器都處理,請(qǐng)使用廣播模式。
? ? 集群消費(fèi)模式下,不保證每一次失敗重投的消息路由到同一臺(tái)機(jī)器上,因此處理消息時(shí)不應(yīng)該做任何確定性假設(shè)。
? ? 廣播模式:適用場(chǎng)景&注意事項(xiàng)
? ? 廣播消費(fèi)模式下不支持順序消息。
? ? 廣播消費(fèi)模式下不支持重置消費(fèi)位點(diǎn)。
? ? 每條消息都需要被相同邏輯的多臺(tái)機(jī)器處理。
? ? 消費(fèi)進(jìn)度在客戶(hù)端維護(hù),出現(xiàn)重復(fù)的概率稍大于集群模式。
? ? 廣播模式下,消息隊(duì)列RocketMQ保證每條消息至少被每臺(tái)客戶(hù)端消費(fèi)一次,但是并不會(huì)對(duì)消費(fèi)失敗的消息進(jìn)行失敗重投,因此業(yè)務(wù)方需要關(guān)注消費(fèi)失敗的情況。
? ? 廣播模式下,客戶(hù)端每一次重啟都會(huì)從最新消息消費(fèi)??蛻?hù)端在被停止期間發(fā)送至服務(wù)端的消息將會(huì)被自動(dòng)跳過(guò),請(qǐng)謹(jǐn)慎選擇。
? ? 廣播模式下,每條消息都會(huì)被大量的客戶(hù)端重復(fù)處理,因此推薦盡可能使用集群模式。
? ? 目前僅Java客戶(hù)端支持廣播模式。
? ? 廣播模式下服務(wù)端不維護(hù)消費(fèi)進(jìn)度,所以消息隊(duì)列RocketMQ控制臺(tái)不支持消息堆積查詢(xún)、消息堆積報(bào)警和訂閱關(guān)系查詢(xún)功能。
2.順序消息
? ? 消息有序指的是可以按照消息的發(fā)送順序來(lái)消費(fèi)(FIFO)。RocketMQ可以嚴(yán)格的保證消息有序,可以分為分區(qū)有序或者全局有序。
? ? 順序消費(fèi)的原理解析,在默認(rèn)的情況下消息發(fā)送會(huì)采取Round Robin輪詢(xún)方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);而消費(fèi)消息的時(shí)候從多個(gè)queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。
? ? >全局順序消息

? ? >部分順序消息

(1)順序消息生產(chǎn)
? ? 一個(gè)訂單的順序流程是:創(chuàng)建、付款、推送、完成。訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,下面是訂單進(jìn)行分區(qū)有序的示例代碼。



使用順序消息:首先要保證消息是有序進(jìn)入MQ的,消息放入MQ之前,對(duì)id等關(guān)鍵字進(jìn)行取模,放入指定messageQueue,consume消費(fèi)消息失敗時(shí),不能返回reconsume--later,這樣會(huì)導(dǎo)致亂序。
應(yīng)該返回suspend_current_queue_a_moment,意思是先等一會(huì),一會(huì)兒再處理這批消息,而不是放到重試隊(duì)列里。
(2)順序消息消費(fèi)
? ? 消費(fèi)時(shí),同一個(gè)OrderId獲取到的肯定是同一個(gè)隊(duì)列。從而確保一個(gè)訂單中處理的順序。

3.消息發(fā)送時(shí)的重要方法/屬性
(1)屬性
org.apache.rocketmq.example.details. ProducerDetails 類(lèi)中

????producerGroup:生產(chǎn)者所屬組
????defaultTopicQueueNums:默認(rèn)主題在每一個(gè)Broker隊(duì)列數(shù)量
????sendMsgTimeout:發(fā)送消息默認(rèn)默認(rèn)超時(shí)時(shí)間,默認(rèn)3s
????compressMsgBodyOverHowmuch:消息體超過(guò)該值則啟用壓縮,默認(rèn)4k
? ? retryTimesWhenSendFailed:同步方式發(fā)送消息重復(fù)次數(shù),默認(rèn)為2,總共執(zhí)行3次
? ? retryTimesWhenSendAsyncFailed:異步方法發(fā)送消息重試次數(shù),默認(rèn)為2
? ? retryAnotherBrokerWhenNotStoreOK:消息重試時(shí)選擇另外一個(gè)Broker時(shí),是否不等待存儲(chǔ)結(jié)果就返回,默認(rèn)為false
? ? maxMessageSize:允許發(fā)送的最大消息長(zhǎng)度,默認(rèn)為4M
(2)方法
? ??org.apache.rocketmq.example.details. ProducerDetails 類(lèi)中



單向發(fā)送


同步發(fā)送


異步發(fā)送



4.消息消費(fèi)時(shí)的重要方法/屬性
????org.apache.rocketmq.example.details. ComuserDetails 類(lèi)中
(1)屬性


(2)方法
? ??void subscribe(final String topic, final MessageSelector selector) :訂閱消息,并指定隊(duì)列選擇器
????void unsubscribe(final String topic):取消消息訂閱
????Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :獲取消費(fèi)者對(duì)主題分配了那些消息隊(duì)列
????void registerMessageListener(final MessageListenerConcurrently messageListener):注冊(cè)并發(fā)事件監(jiān)聽(tīng)器

void registerMessageListener(final MessageListenerOrderly messageListener):注冊(cè)順序消息事件監(jiān)聽(tīng)器

(3)消費(fèi)確認(rèn)(ACK)
????業(yè)務(wù)實(shí)現(xiàn)消費(fèi)回調(diào)的時(shí)候,當(dāng)且僅當(dāng)此回調(diào)函數(shù)返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才會(huì)認(rèn)為這批消息(默認(rèn)是 1 條) 是消費(fèi)完成的中途斷電,拋出異常等都不會(huì)認(rèn)為成功——即都會(huì)重新投遞。
? ??返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就會(huì)認(rèn)為這批消息消費(fèi)失敗了。
? ??如果業(yè)務(wù)的回調(diào)沒(méi)有處理好而拋出異常,會(huì)認(rèn)為是消費(fèi)失敗 ConsumeConcurrentlyStatus.RECONSUME_LATER 處理。
? ??為了保證消息是肯定被至少消費(fèi)成功一次,RocketMQ 會(huì)把這批消息重發(fā)回 Broker(topic 不是原 topic 而是這個(gè)消費(fèi)組的 RETRY topic),在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是 10 秒,業(yè)務(wù)可設(shè)置)后,再次投遞到這個(gè) ConsumerGroup。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默認(rèn) 16 次),就會(huì)投遞到 DLQ 死信隊(duì)列。應(yīng)用可以監(jiān)控死信隊(duì)列來(lái)做人工干預(yù)。
? ??另外如果使用順序消費(fèi)的回調(diào) MessageListenerOrderly 時(shí),由于順序消費(fèi)是要前者消費(fèi)成功才能繼續(xù)消費(fèi),所以沒(méi)有 RECONSUME_LATER 的這個(gè)狀態(tài), 只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 來(lái)暫停隊(duì)列的其余消費(fèi),直到原消息不斷重試成功為止才能繼續(xù)消費(fèi)。
5.延時(shí)消息
(1)概念介紹
? ??延時(shí)消息:Producer 將消息發(fā)送到消息隊(duì)列 RocketMQ 服務(wù)端,但并不期望這條消息立馬投遞,而是延遲一定時(shí)間后才投遞到 Consumer 進(jìn)行消費(fèi), 該消息即延時(shí)消息。
(2)適用場(chǎng)景
? ??消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求:比如在電商交易中超時(shí)未支付關(guān)閉訂單的場(chǎng)景,在訂單創(chuàng)建時(shí)會(huì)發(fā)送一條延時(shí)消息。這條消息將會(huì)在 30 分鐘以后投遞給消費(fèi)者,消費(fèi)者收到此消息后需要判斷對(duì)應(yīng)的訂單是否已完成支付。 如支付未完成,則關(guān)閉訂單。如已完成支付則忽略。
(3)使用方式
? ??Apache RocketMQ 目前只支持固定精度的定時(shí)消息,因?yàn)槿绻С秩我獾臅r(shí)間精度,在 Broker 層面,必須要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的產(chǎn)生巨大性能開(kāi)銷(xiāo)。(阿里云 RocketMQ 提供了任意時(shí)刻的定時(shí)消息功能,Apache 的 RocketMQ 并沒(méi)有,阿里并沒(méi)有開(kāi)源)。
????發(fā)送延時(shí)消息時(shí)需要設(shè)定一個(gè)延時(shí)時(shí)間長(zhǎng)度,消息將從當(dāng)前發(fā)送時(shí)間點(diǎn)開(kāi)始延遲固定時(shí)間之后才開(kāi)始投遞。
????延遲消息是根據(jù)延遲隊(duì)列的 level 來(lái)的,延遲隊(duì)列默認(rèn)是
? ??msg.setDelayTimeLevel(3)代表延遲 10 秒
? ??"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

是這 18 個(gè)等級(jí)(秒(s)、分(m)、小時(shí)(h)),level 為 1,表示延遲 1 秒后消費(fèi),level 為 5 表示延遲 1 分鐘后消費(fèi),level 為 18 表示延遲 2 個(gè)小時(shí)消費(fèi)。生產(chǎn)消息跟普通的生產(chǎn)消息類(lèi)似,只需要在消息上設(shè)置延遲隊(duì)列的 level 即可。消費(fèi)消息跟普通的消費(fèi)消息一致。
(4)代碼演示
? ??org.apache.rocketmq.example. scheduled 包中
生產(chǎn)者

消費(fèi)者

6.批量消息
? ??批量發(fā)送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應(yīng)該有相同的 topic,相同的 waitStoreMsgOK,而且不能是延時(shí)消息。此外,這一批消息的總大小不應(yīng)超過(guò) 4MB。
(1)代碼演示
? ??org.apache.rocketmq.example. batch 包中
生產(chǎn)者

消費(fèi)者

(2)批量切分
? ??如果消息的總長(zhǎng)度可能大于 4MB 時(shí),這時(shí)候最好把消息進(jìn)行分割。
代碼演示
????我們需要發(fā)送 10 萬(wàn)元素的數(shù)組,這個(gè)量很大,怎么快速發(fā)送完。同時(shí)每一次批量發(fā)送的消息大小不能超過(guò) 4M 。
????具體見(jiàn)代碼

7.過(guò)濾消息
? ??org.apache.rocketmq.example. filter 包中
(1)Tag過(guò)濾
????在大多數(shù)情況下,TAG 是一個(gè)簡(jiǎn)單而有用的設(shè)計(jì),其可以來(lái)選擇您想要的消息。


? ??消費(fèi)者將接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一個(gè)消息只能有一個(gè)標(biāo)簽,這對(duì)于復(fù)雜的場(chǎng)景可能不起作用。在這種情況下,可以使用 SQL 表達(dá)式篩選消息。SQL 特性可以通過(guò)發(fā)送消息時(shí)的屬性來(lái)進(jìn)行計(jì)算。
(2)Sql過(guò)濾
SQL基本語(yǔ)法
RocketMQ 定義了一些基本語(yǔ)法來(lái)支持這個(gè)特性。你也可以很容易地?cái)U(kuò)展它。
只有使用 push 模式的消費(fèi)者才能用使用 SQL92 標(biāo)準(zhǔn)的 sql 語(yǔ)句,常用的語(yǔ)句如下:
? ??數(shù)值比較:比如:>,>=,<,<=,BETWEEN,=;
? ??字符比較:比如:=,<>,IN;
????IS NULL 或者 IS NOT NULL;
? ??邏輯符號(hào):AND,OR,NOT;
? ??常量支持類(lèi)型為:
????數(shù)值,比如:123,3.1415;
????字符,比如:'abc',必須用單引號(hào)包裹起來(lái);
? ??NULL,特殊的常量;
????布爾值,TRUE 或 FALSE;
消息生產(chǎn)者(加入消息屬性)
? ??發(fā)送消息時(shí),你能通過(guò) putUserProperty 來(lái)設(shè)置消息的屬性。

消息消費(fèi)者(使用SQL篩選)
? ??用 MessageSelector.bySql 來(lái)使用 sql 篩選消息。

如果這個(gè)地方拋出錯(cuò)誤:說(shuō)明 Sql92 功能沒(méi)有開(kāi)啟。

需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重啟 Broker 服務(wù)。
8.事務(wù)消息

????其中分為兩個(gè)流程:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。
(1)正常事務(wù)流程
????(1)發(fā)送消息(half 消息):圖中步驟 1。
????(2)服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果:圖中步驟 2。
????(3)根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫(xiě)入失敗,此時(shí) half 消息對(duì)業(yè)務(wù)不可見(jiàn),本地邏輯不執(zhí)行):圖中步驟 3。
????(4)根據(jù)本地事務(wù)狀態(tài)執(zhí)行 Commit 或者 Rollback(Commit 操作生成消息索引,消息對(duì)消費(fèi)者可見(jiàn)):圖中步驟 4
(2)事務(wù)補(bǔ)償流程
????(1)對(duì)沒(méi)有 Commit/Rollback 的事務(wù)消息(pending 狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”:圖中步驟 5。
????(2)Producer 收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài):圖中步驟 6。
????(3)根據(jù)本地事務(wù)狀態(tài),重新 Commit 或者 Rollback::圖中步驟 7。
????其中,補(bǔ)償階段用于解決消息 Commit 或者 Rollback 發(fā)生超時(shí)或者失敗的情況。
(3)事務(wù)消息狀態(tài)
????事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):
? ?> TransactionStatus.CommitTransaction: 提交狀態(tài),它允許消費(fèi)者消費(fèi)此消息(完成圖中了 1,2,3,4 步,第 4 步是Commit)。?
? ? >TransactionStatus.RollbackTransaction: 回滾狀態(tài),它代表該消息將被刪除,不允許被消費(fèi)(完成圖中了 1,2,3,4 步, 第 4 步是 Rollback)。
? ? >TransactionStatus.Unknown: 中間狀態(tài),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)(完成圖中了 1,2,3 步, 但是沒(méi)有 4 或者沒(méi)有 7,無(wú)法 Commit 或 Rollback)。
(4)代碼演示
? ??org.apache.rocketmq.example. transaction 包中
創(chuàng)建事務(wù)性生產(chǎn)者
? ??使用 TransactionMQProducer 類(lèi)創(chuàng)建生產(chǎn)者,并指定唯一的 ProducerGroup,就可以設(shè)置自定義線(xiàn)程池來(lái)處理這些檢查請(qǐng)求。執(zhí)行本地事務(wù)后、需要根據(jù)執(zhí)行結(jié)果對(duì)消息隊(duì)列進(jìn)行回復(fù)。

實(shí)現(xiàn)事務(wù)的監(jiān)聽(tīng)接口
????當(dāng)發(fā)送半消息成功時(shí),我們使用 executeLocalTransaction 方法來(lái)執(zhí)行本地事務(wù)(步驟 3)。它返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。 checkLocalTranscation 方法用于檢查本地事務(wù)狀態(tài)(步驟 5),并回應(yīng)消息隊(duì)列的檢查請(qǐng)求。它也是返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。

(5)使用場(chǎng)景
? ??用戶(hù)提交訂單后,扣減庫(kù)存成功、扣減優(yōu)惠券成功、使用余額成功,但是在確認(rèn)訂單操作失敗,需要對(duì)庫(kù)存、余額進(jìn)行回退。如何保證數(shù)據(jù)的完整性?
????可以使用 RocketMQ 的分布式事務(wù)保證在下單失敗后系統(tǒng)數(shù)據(jù)的完整性。
(6)使用限制
? ??1. 事務(wù)消息不支持延時(shí)消息和批量消息。
????2.事務(wù)回查的間隔時(shí)間:BrokerConfig. transactionCheckInterval 通過(guò) Broker 的配置文件設(shè)置好。
? ??3. 為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次,但是用戶(hù)可以通過(guò) Broker 配置文件的 transactionCheckMax 參數(shù)來(lái)修改此限制。如果已經(jīng)檢查某條消息超過(guò) N 次的話(huà)( N = transactionCheckMax ) 則 Broker 將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯(cuò)誤日志。用戶(hù)可以通過(guò)重寫(xiě) AbstractTransactionCheckListener 類(lèi)來(lái)修改這個(gè)行為。
????4.事務(wù)消息將在 Broker 配置文件中的參數(shù) transactionMsgTimeout 這樣的特定時(shí)間長(zhǎng)度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶(hù)還可以通過(guò)設(shè)置用戶(hù)屬性CHECK_IMMUNITY_TIME_IN_SECONDS 來(lái)改變這個(gè)限制,該參數(shù)優(yōu)先于transactionMsgTimeout 參數(shù)。
????5.事務(wù)性消息可能不止一次被檢查或消費(fèi)。
????6.事務(wù)性消息中用到了生產(chǎn)者群組,這種就是一種高可用機(jī)制,用來(lái)確保事務(wù)消息的可靠性。
????7.提交給用戶(hù)的目標(biāo)主題消息可能會(huì)失敗,目前這依日志的記錄而定。它的高可用性通過(guò) RocketMQ 本身的高可用性機(jī)制來(lái)保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫(xiě)入機(jī)制。
????8. 事務(wù)消息的生產(chǎn)者 ID 不能與其他類(lèi)型消息的生產(chǎn)者 ID 共享。與其他類(lèi)型的消息不同,事務(wù)消息允許反向查詢(xún)、MQ 服務(wù)器能通過(guò)它們的生產(chǎn)者ID 查詢(xún)到消費(fèi)者。
我是嬈疆_蚩夢(mèng),讓堅(jiān)持成為一種習(xí)慣,感謝各位大佬的:點(diǎn)贊、收藏和評(píng)論,我們下期見(jiàn)!