首先來看看一線大廠的 架構(gòu)圖

????如上圖所示,中間是我們的 MQ 集群架構(gòu), 在上層利用 keepalived 和 HA-Proxy, 最下面是 兩個(gè)大的 MQ 集群 做一個(gè)高可用。當(dāng)然了,在實(shí)際大廠里面 可能就不止 兩個(gè)集群了,可以是很多個(gè)集群,然后集群之間利用 federration 插件進(jìn)行數(shù)據(jù)同步 。
? ??上面的生產(chǎn)端組件:這里面可能就有很多的架構(gòu)設(shè)計(jì),比如生產(chǎn)端怎么對(duì)一個(gè)容器進(jìn)行緩存,舉個(gè)例子,我們生產(chǎn)端發(fā)送消息,都是利用 RabbitTemplate 進(jìn)行,在高并發(fā)場(chǎng)景下,你每次發(fā)消息都創(chuàng)建一個(gè) RabbitTemplate , 這是影響性能的,那你的?RabbitTemplate? 是不是要進(jìn)行一個(gè)池化的操作,也就是?RabbitTemplateContainer, 創(chuàng)建一個(gè)?RabbitTemplate? 后就進(jìn)行緩存起來,后面再發(fā)這一 topic 主題的消息時(shí)就從 緩存里面拿。還有生產(chǎn)消息確認(rèn)組件?RabbitTemplateConfirmCallback、 消息序列化解析器 RapidMQMessageConverter 封裝我們常用的 Json 序列化方法、生產(chǎn)端發(fā)送處理器 RabbitBrokerProvider、消息發(fā)送客戶端 ProducerClient 等等 ,還有 重試策略器、定時(shí)抓取器、失敗處理器 等等,都是架構(gòu)組要去封裝好的,然后打成一個(gè) jar 包提供給業(yè)務(wù)線去使用。這里面涉及到的落地技術(shù)、設(shè)計(jì)思想,都是很有必要自己去學(xué)習(xí)學(xué)習(xí)的。
? ??消費(fèi)端組件:首先就是消費(fèi)者監(jiān)聽容器 RabbitListenerContainerFactory,用于處理和緩存一些消息監(jiān)聽容器。接著就是消費(fèi)者 冪等性保障攔截器 IdempotentRabbitHandler ,用于做消息的冪等性處理。接著就引出了我們的 消息存儲(chǔ)路由選擇器 DBRoutingSelector,因?yàn)槲覀償?shù)據(jù)庫是分庫分表的,所以在做冪等性的時(shí)候具體落庫到哪就是這里要做的。然后就是 消息異步處理器 AsyncMessageInter,意思就是消費(fèi)端接收到消息了,但是我并不是馬上就去處理,而是根據(jù)優(yōu)先級(jí)別,這個(gè)消息可能稍后才會(huì)去處理。再接著就是 消息存儲(chǔ)冪等服務(wù) IdempotentMessageService ,這個(gè)就是做冪等服務(wù)的具體實(shí)現(xiàn)類。最后就是消費(fèi)異常監(jiān)聽器 ConsumerFailMessageListener, 在我們消費(fèi)處理時(shí)發(fā)生了異常,這時(shí)候該怎么去處理, 跟死信隊(duì)列也會(huì)有一些關(guān)系,也會(huì)有一些對(duì)應(yīng)的配置。這些都是一個(gè)整體打成一個(gè) jar,然后提供給到業(yè)務(wù)線去使用
通過上面的架構(gòu)圖,我們就引出了 MQ 組件必須實(shí)現(xiàn)功能點(diǎn)
? ? 1.? 支持消息高性能的序列化轉(zhuǎn)換、異步化發(fā)送消息
? ? 2. 支持消息生產(chǎn)實(shí)例 與 消費(fèi)實(shí)例的連接池化緩存化,提升性能
? ? 3. 支持可靠性投遞消息,保障消息的 100% 不丟失
? ? 4. 支持消費(fèi)端的冪等操作,避免消費(fèi)端重復(fù)消費(fèi)的問題
MQ 組件需要拓展的功能點(diǎn):
? ? 1. 支持迅速消息發(fā)送模式,在一些 日志收集、統(tǒng)計(jì)分析等需求下保證高性能,超高吞吐量
? ? 2. 支持延遲消息模式,消息可以延遲發(fā)送,指定延遲時(shí)間,用于某些延遲檢查,服務(wù)限流場(chǎng)景
? ? 3. 支持事務(wù)消息,且 100% 保障可靠性投遞,在金融行業(yè)單筆大金額操作時(shí)會(huì)有此類需求
? ? 4. 支持順序消息,保證消息送達(dá)消費(fèi)端的前后順序,例如下訂單 等復(fù)合型操作
? ? 5. 支持消息補(bǔ)償,重試,以及快速定位異常、失敗消息
? ? 6. 支持集群消息負(fù)載均衡,保障消息落到具體 SET 集群的負(fù)載均衡
? ? 7. 支持消息路由策略,指定某些消息路由到指定的 SET? 集群
消息發(fā)送模式:
????1. 迅速消息發(fā)送
? ? ? ? 迅速消息是指消息不進(jìn)行落庫存儲(chǔ),不做可靠性的保障。在一些非核心消息、日志數(shù)據(jù)、或者統(tǒng)計(jì)分析等場(chǎng)景下比較合適。
? ? ? ? 迅速消息的優(yōu)點(diǎn)就是性能最高,吞吐量最大

2. 確認(rèn)消息發(fā)送
? ? 即可靠性投遞的一種方式,在業(yè)務(wù)落庫后,再針對(duì)消息進(jìn)行落庫,最后發(fā)送消息,最后有一個(gè)響應(yīng)給到生產(chǎn)端,確認(rèn)已收到這條消息,針對(duì)超時(shí)還未響應(yīng)的,利用分布式定時(shí)任務(wù)進(jìn)行重發(fā)消息。

3. 批量消息發(fā)送
? ? 批量消息是指 我們把消息放到一個(gè)集合里統(tǒng)一進(jìn)行提交,這種方案設(shè)計(jì)思路是 期望消息在一個(gè)會(huì)話里,比如投擲到 threadLocal 里的集合,然后擁有相同的會(huì)話 id(即消息都是有一個(gè)共同的 父級(jí) Id),并且?guī)в羞@次提交消息的 size 等相關(guān)屬性,最重要的一點(diǎn)是 要把這一批消息進(jìn)行合并。對(duì)于 Channel 而言,就是發(fā)送一次消息。
? ? 這種方式也是希望消費(fèi)端在消費(fèi)的時(shí)候,可以進(jìn)行批量化的消費(fèi),針對(duì)于某一原子業(yè)務(wù)的操作去處理,但是不保障可靠性,需要進(jìn)行補(bǔ)償機(jī)制!
如下圖所示,我們第一步肯定是要對(duì)業(yè)務(wù)進(jìn)行入庫,之后才是利用 批量發(fā)送的接口 ProducerBatchComponent ,ProducerBatchComponent? 里面包含了 會(huì)話Id SessionId ,這一批消息里面的 sessionId 都是相同的,然后就是 Threadlocal ,我們把這一批的消息放到?Threadlocal? 里面,再里面就是 MessageHoder, 它可能是一個(gè) List 集合 用于承裝這一批消息,裝滿之后就進(jìn)行消息落庫,但是并不是對(duì)這一批次里面的每一條消息都落庫,而是記錄這個(gè) sessionId ,記 1 條 記錄就可以了。然后再投遞出去,confirm 確認(rèn)啊 等等操作,后續(xù)操作都和之前可靠性性投遞過程類似。消費(fèi)端就是接收到這條消息,然后就把它拆開,根據(jù) size 去獲取有幾條消息記錄,組成一個(gè)完整的原子性操作。

4. 延遲消息發(fā)送
? ? 延遲消息相對(duì)簡(jiǎn)單,就是我們?cè)?Message 封裝的時(shí)候添加 delayTime 屬性即可,使得我們的消息可以進(jìn)行延遲發(fā)送,在實(shí)際具體的業(yè)務(wù)場(chǎng)景里面很實(shí)用。
? ? 場(chǎng)景舉例:
? ? ? ? 比如在電商平臺(tái)買到的商品簽收后,不點(diǎn)擊確認(rèn)支付,那么系統(tǒng)自動(dòng)會(huì)在 7 天(一定時(shí)間)后去進(jìn)行支付操作。
? ? ? ? 超時(shí)自動(dòng)作廢的場(chǎng)景,優(yōu)惠券、紅包 等有使用時(shí)間限制的場(chǎng)景也是可以利用延遲消息機(jī)制。
5. 順序消息
? ? ? ?順序消息比較類似于 批量消息的實(shí)現(xiàn)機(jī)制,但是也有些不同。
? ? ? ? 我們要保障以下幾點(diǎn):
? ? ? ? ? ? 1. 發(fā)送的順序消息,必須保障消息投遞到同一個(gè)隊(duì)列里面,且 消費(fèi)者只能有一個(gè) (獨(dú)占模式)
? ? ? ? ? ? 2. 然后需要統(tǒng)一提交(可能是合并成一個(gè)大消息(不建議這樣),也可能是拆分為多個(gè)消息),并且所有消息的會(huì)話 id 一致。這里要和之前的批量消息做區(qū)別,之前的批量消息是不需要保證順序消費(fèi)的,消費(fèi)端接收到批量消息后拆開,然后可以多線程去執(zhí)行,這樣就能提升性能。但是我們現(xiàn)在是順序消息,如果你把它合成一個(gè)整體,消費(fèi)端需要把它拆開,然后一個(gè)個(gè)按順序去執(zhí)行,這樣是非常耗時(shí)的,建議拆成一條條的小消息,消費(fèi)端獲取到消息,可能并不是馬上去執(zhí)行的,而是說做一些等待策略。
? ? ? ? ? ? 3. 添加消息屬性: 既然要按順序消費(fèi),那就必須 順序標(biāo)記的序號(hào) 和 本次順序消息的 size 屬性,接收到消息后馬上進(jìn)行落庫操作,并不是收到消息后馬上就去執(zhí)行業(yè)務(wù)
? ? ? ? ? ? 4. 因?yàn)槭盏较⒑蟛⒉皇邱R上就去執(zhí)行業(yè)務(wù)邏輯,而是延遲后進(jìn)行處理,所以要 并行進(jìn)行發(fā)送給 自身的延遲消息 (注意帶上關(guān)鍵屬性:會(huì)話 id、size)進(jìn)行后續(xù)業(yè)務(wù)處理
? ? ? ? ? ? 5. 當(dāng)收到延遲消息后,根據(jù)會(huì)話 id、size 抽取數(shù)據(jù)庫數(shù)據(jù)進(jìn)行處理即可
? ? ? ? ? ? 6. 對(duì)應(yīng)異常情況,用定時(shí)輪詢補(bǔ)償機(jī)制,比如 生產(chǎn)端消息沒有完全投遞成功、消費(fèi)端落庫異常 導(dǎo)致 消費(fèi)端落庫后 缺少消息條目的情況。
????如下流程圖,我們?cè)谧蛲順I(yè)務(wù)入庫后,利用 ProducerOrderlyComponent 去組成一批帶有順序的消息,消息里面帶上 相同的 sessionId,放到 threadlocal 里面, 然后就是 MessageHoder 這個(gè) List,里面就是一個(gè)個(gè)的小消息了 ,里面帶有消息的 size, 然后需要可靠性投遞的就可以進(jìn)行消息的入庫,注意這是對(duì)每條小消息進(jìn)行入庫,接著收到 confirm 確認(rèn)就更新消息狀態(tài),可靠性投遞的其他步驟和之前的一樣。
需要注意我們的消費(fèi)端,我們是對(duì)這個(gè)小消息直接入庫,不是馬上去執(zhí)行業(yè)務(wù)邏輯。當(dāng)我收到這一批次的第一條消息,那么我就同步的發(fā)送一個(gè)延遲消息給自己,告訴我過去多久,這一批次的消息都入庫成功了,我就要把這一批次的消息取出來,然后按順序去執(zhí)行業(yè)務(wù)處理。最后就是處理業(yè)務(wù)的時(shí)候,如果同一批次 最后一個(gè)操作失敗了,那么該怎么處理,要根據(jù)你實(shí)際的業(yè)務(wù)規(guī)則來設(shè)計(jì)的,所以就是需要這個(gè)補(bǔ)償機(jī)制。

6. 事務(wù)消息發(fā)送
? ? 事務(wù)消息,相對(duì)使用比較少。
????在互聯(lián)網(wǎng)金融行業(yè),面對(duì)單筆大額的現(xiàn)金流交易時(shí),比如單筆轉(zhuǎn)賬超過一個(gè)上限的時(shí)候,我們就希望這個(gè)消息優(yōu)先級(jí)最高,并且可靠性要求達(dá)到 100% 。當(dāng)然 我們的系統(tǒng) 和 銀行端系統(tǒng)都需要兼顧才行,在我們自己的系統(tǒng)里面可能一分鐘不到就處理完了,但是銀行的系統(tǒng)可能會(huì)遲遲不給響應(yīng),所以也會(huì)有一些補(bǔ)償機(jī)制,主動(dòng)發(fā)起銀行端查詢指令機(jī)制等,如果超過某個(gè)時(shí)間就需要運(yùn)維的人工介入,人工的幫助去找銀行查詢轉(zhuǎn)賬結(jié)果。
????為了保障性能的同時(shí),也支持事務(wù)。我們并沒有選擇傳統(tǒng)的 Rabbitmq 事務(wù) 和Spring 集成的機(jī)制,因?yàn)樵谛阅軠y(cè)試的過程中,效果并不理想,非常消耗系統(tǒng)資源,且會(huì)出現(xiàn)堵塞 等情況,在高峰期也是一定程度上影響 MQ 集群的性能。
? ? 解決方案:
? ? ? ? 采用類似可靠性投遞的機(jī)制,也就是補(bǔ)償機(jī)制。但是 數(shù)據(jù)源必須是同一個(gè),也就是業(yè)務(wù)操作 DB1 數(shù)據(jù)庫 和 消息記錄 DB2 數(shù)據(jù)庫使用同一個(gè)數(shù)據(jù)源。然后利用重寫 Spring DataSourceTransactionManager,在本地事務(wù)提交的時(shí)候進(jìn)行發(fā)送消息,但是也有可能事務(wù)提交成功,但是消息發(fā)送失敗,這個(gè)時(shí)候就需要進(jìn)行補(bǔ)償了。
這個(gè)時(shí)候只能出現(xiàn)一種意外情況,數(shù)據(jù)庫層面的都提交成功了,但是消息發(fā)送失敗,這時(shí)就需要重試機(jī)制。核心代碼里面就是 doCommit 的時(shí)候 調(diào)用 super.doCommit ,真正的把數(shù)據(jù)庫的事務(wù)給提交了,保障同源的那兩個(gè)數(shù)據(jù)庫操作是成功的,或者同時(shí)失敗,再把對(duì)應(yīng)的消息發(fā)出去。發(fā)消息是保障不了的,需要重試機(jī)制,但是我兩個(gè)入庫操作是成功的,核心就在這里。


消息冪等性的必要性? ??
? ? 保障消息的冪等性,這也是我們?cè)谑褂?MQ 中至關(guān)重要的環(huán)節(jié)?。。?/p>
? ? ? ? 可能導(dǎo)致消息出現(xiàn)非冪等性的原因
? ? ? ? ? ? 1.? 可靠性消息投遞機(jī)制,比如 生產(chǎn)端發(fā)送消息到 broker 了,broker 也給響應(yīng)了,但是 confirm 的時(shí)候出現(xiàn)網(wǎng)絡(luò)閃斷,生產(chǎn)端就沒收到這個(gè) ACk 了,這個(gè)時(shí)候定時(shí)任務(wù)肯定就會(huì)再把這個(gè)消息再發(fā)送出去
? ? ? ? ? ? 2. MQ Broker 服務(wù) 與消費(fèi)端傳輸消息的過程中發(fā)生了網(wǎng)絡(luò)抖動(dòng)影響到了
? ? ? ? ? ? 3. 消費(fèi)端故障或異常
所以,冪等性 很重要?。?/p>
????接著來看看 冪等性的設(shè)計(jì)圖,我們需要一個(gè) 統(tǒng)一Id 生成服務(wù),因?yàn)槲覀円U线@個(gè) id 是全局唯一的,對(duì)于我生產(chǎn)端來說,這個(gè)全局 Id 是一個(gè)統(tǒng)一的外部服務(wù),有可能生產(chǎn)端獲取半天都獲取失敗,這時(shí)候就需要有 本地 Id 生成服務(wù)(兜底的策略,或者降級(jí)解決方案)。broker 發(fā)給到 下游的消費(fèi)服務(wù),這中間就需要有冪等性,冪等性就涉及到 Id 規(guī)則路由,通過拿到的統(tǒng)一的 Id, 然后通過算法路由,然后進(jìn)行落庫,利用數(shù)據(jù)庫主鍵進(jìn)行冪等的操作,數(shù)據(jù)庫的主鍵是很好的一種方案。redis 也可以做到,只是復(fù)雜一些,可以看我之前的文章。
