一、為什么使用 MQ?
1.1 解耦
1.1.1 解耦1
例如電商系統(tǒng)核心是交易服務(wù),交易服務(wù)要調(diào)用另外三個(gè)服務(wù),訂單服務(wù)、庫(kù)存服務(wù)、倉(cāng)儲(chǔ)服務(wù)。

這三個(gè)服務(wù)如果有一個(gè)服務(wù)不可用,交易服務(wù)就無法正常運(yùn)行,所以交易服務(wù)是強(qiáng)耦合另外三個(gè)服務(wù)。
引入MQ之后,交易服務(wù)只跟MQ交互,把消息發(fā)到MQ里面就行了,無需關(guān)心另外三個(gè)服務(wù)是否可用。這時(shí)候交易服務(wù)跟另外三個(gè)服務(wù)就是弱耦合的關(guān)系,耦合性被降低了。
哪怕是另外三個(gè)服務(wù)暫時(shí)不可用,也不影響交易服務(wù)的運(yùn)行,只要其他服務(wù)運(yùn)行起來后,把MQ里面的消息消費(fèi)了就行。(降級(jí)接口)

1.1.2 解耦2
假設(shè) A 系統(tǒng)在用戶發(fā)生某個(gè)操作的時(shí)候,需要把用戶提交的數(shù)據(jù)同時(shí)推送到 B、C 兩個(gè)系統(tǒng)的時(shí)候。這個(gè)時(shí)候負(fù)責(zé) A 系統(tǒng)的哥們想:沒事啊,B、C 兩個(gè)系統(tǒng)給我提供一個(gè) HTTP 接口或者 RPC 接口,我把數(shù)據(jù)推送過去不就完事了嘛,負(fù)責(zé) A 系統(tǒng)的哥們美滋滋。
一切看起來很美好,但是隨著業(yè)務(wù)快速迭代,這個(gè)時(shí)候系統(tǒng) D 也想要這個(gè)數(shù)據(jù)。那既然這樣,A 系統(tǒng)的開發(fā)同學(xué)就改咯,在發(fā)送數(shù)據(jù)給 B、C 的同時(shí)加上一個(gè) D。但是,越到后面越發(fā)現(xiàn),麻煩來了。整個(gè)系統(tǒng)好像不止這個(gè)數(shù)據(jù)要發(fā)送給 B、C、D、還有第二、第三個(gè)數(shù)據(jù)要發(fā)送給 B、C、D。甚至有時(shí)候又加入了 E、F 等系統(tǒng),他們也要這個(gè)數(shù)據(jù)。并且有時(shí)候可能 B 系統(tǒng)突然又不要這個(gè)數(shù)據(jù)了,A 系統(tǒng)改來改去,A 系統(tǒng)的開發(fā)哥們頭皮發(fā)麻。更復(fù)雜的場(chǎng)景是,數(shù)據(jù)通過接口傳給其他系統(tǒng)有時(shí)候還要考慮重試、超時(shí)等一些異常情況。
這個(gè)時(shí)候,就該我們的 MQ 粉墨登場(chǎng)了,這種情況下使用 MQ 來解耦是再合適不過了,因?yàn)樨?fù)責(zé) A 系統(tǒng)的哥們只需要把消息扔到 MQ 就行了,其他系統(tǒng)按需來訂閱消息就好了。就算某個(gè)系統(tǒng)不需要這個(gè)數(shù)據(jù)了,也不會(huì)需要 A 系統(tǒng)改動(dòng)代碼。
1.2 異步
沒有引入MQ的時(shí)候,交易服務(wù)需要同步調(diào)用三個(gè)服務(wù),如果調(diào)用一個(gè)服務(wù)需要耗時(shí)1秒,那么同步調(diào)用三個(gè)服務(wù)需要耗時(shí)3秒。在引入MQ之后,全都改成了異步調(diào)用,整個(gè)耗時(shí)不到1秒,大大提高了接口的性能。
1.3 削峰
如果一秒內(nèi)同時(shí)來了5000筆交易,而訂單服務(wù)每秒只能處理100筆交易,那么后面的4900筆交易失敗。在引入MQ之后,交易服務(wù)可以把交易數(shù)據(jù)先發(fā)送到MQ中,而訂單服務(wù)再慢慢從MQ拉取交易信息處理。從而避免突發(fā)流量壓垮服務(wù)器。
1.3.1 削峰填谷
舉個(gè)例子,比如我們的訂單系統(tǒng),在下單的時(shí)候就會(huì)往數(shù)據(jù)庫(kù)寫數(shù)據(jù)。但是數(shù)據(jù)庫(kù)只能支撐每秒 1000 左右的并發(fā)寫入,并發(fā)量再高就容易宕機(jī)。低峰期的時(shí)候并發(fā)也就 100 多個(gè),但是在高峰期時(shí)候,并發(fā)量會(huì)突然激增到 5000 以上,這個(gè)時(shí)候數(shù)據(jù)庫(kù)肯定死了。
但是使用了 MQ 之后,情況就變了,消息被 MQ 保存起來了,然后系統(tǒng)就可以按照自己的消費(fèi)能力來消費(fèi),比如每秒 1000 個(gè)數(shù)據(jù),這樣慢慢寫入數(shù)據(jù)庫(kù),這樣就不會(huì)打死數(shù)據(jù)庫(kù)了。
至于為什么叫做削峰填谷呢?如果沒有用 MQ 的情況下,并發(fā)量高峰期的時(shí)候是有一個(gè)“頂峰”的,然后高峰期過后又是一個(gè)低并發(fā)的“谷”。但是使用了 MQ 之后,限制消費(fèi)消息的速度為 1000QPS,但是這樣一來,高峰期產(chǎn)生的數(shù)據(jù)勢(shì)必會(huì)被積壓在 MQ 中,高峰就被“削”掉了。但是因?yàn)橄⒎e壓,在高峰期過后的一段時(shí)間內(nèi),消費(fèi)消息的速度還是會(huì)維持在 1000QPS,直到消費(fèi)完積壓的消息,這就叫做“填谷”。
二、引入MQ之后的問題
2.1 系統(tǒng)可用性降低
本來整個(gè)系統(tǒng)有四個(gè)服務(wù),我們只需要保證這四個(gè)服務(wù)可用就行了?,F(xiàn)在又多引入了一個(gè)MQ,我們還要保證MQ的可用,所以整個(gè)系統(tǒng)的可用性降低。
2.2 系統(tǒng)復(fù)雜性提高
本來交易服務(wù)是同步調(diào)用另外三個(gè)服務(wù),如果另外三個(gè)服務(wù)不可用,交易服務(wù)能立即感知到。引入MQ之后,整個(gè)系統(tǒng)的穩(wěn)定性就要靠MQ保證了。
這時(shí)候,我們就要考慮到發(fā)到MQ里面的消息怎么避免丟失的問題?順序性消費(fèi)的問題,就是同一筆交易的下單消息應(yīng)該比撤單消息先處理。重復(fù)性消費(fèi)的問題,就是同一筆下單交易的消息可能被多次處理。
當(dāng)然,每種問題都有具體的解決方案,避免消息丟失可以使用MQ集群,順序性消費(fèi)可以把消息發(fā)到同一個(gè)分區(qū),重復(fù)性消費(fèi)可以在消費(fèi)端做冪等性處理。
2.3 重復(fù)消費(fèi)問題
2.3.1 問題場(chǎng)景
重復(fù)消費(fèi)問題可以說是 MQ 中普遍存在的問題, 不管你用哪種 MQ 都無法避免。有哪些場(chǎng)景會(huì)出現(xiàn)重復(fù)的消息呢?
- 消息生產(chǎn)者產(chǎn)生了重復(fù)的消息;
- Kafka 和 RocketMQ 的 offset 被回調(diào)了;
- 消息消費(fèi)者確認(rèn)失??;
- 消息消費(fèi)者確認(rèn)時(shí)超時(shí);
- 業(yè)務(wù)系統(tǒng)主動(dòng)發(fā)起重試。
如果重復(fù)消息不做正確的處理,會(huì)對(duì)業(yè)務(wù)造成很大的影響,產(chǎn)生重復(fù)數(shù)據(jù)或者導(dǎo)致數(shù)據(jù)異常,比如會(huì)員系統(tǒng)多開通了一個(gè)月的會(huì)員等。
2.3.2 解決方案
不管是由于生產(chǎn)者產(chǎn)生的重復(fù)消息,還是由于消費(fèi)者導(dǎo)致的重復(fù)消息,我們都可以在消費(fèi)者中解決這個(gè)問題。
這就要求消費(fèi)者在做業(yè)務(wù)處理時(shí),要做冪等設(shè)計(jì)。在這里我推薦增加一張消費(fèi)消息表,來解決 MQ的這類問題。
消費(fèi)消息表中,使用 messageId 做唯一索引。在處理業(yè)務(wù)邏輯之前,先根據(jù) messageId 查詢一下該消息有沒有處理過。如果已經(jīng)處理過了則直接返回成功,如果沒有處理過,則繼續(xù)做業(yè)務(wù)處理。

補(bǔ)充:RocketMQ消費(fèi)過程冪等
以下內(nèi)容來自RocketMQ官網(wǎng):
RocketMQ無法避免消息重復(fù)(Exactly-Once),所以如果業(yè)務(wù)對(duì)消費(fèi)重復(fù)非常敏感,務(wù)必要在業(yè)務(wù)層面進(jìn)行去重處理。可以借助關(guān)系數(shù)據(jù)庫(kù)進(jìn)行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內(nèi)容中的唯一標(biāo)識(shí)字段,例如訂單Id等。在消費(fèi)之前判斷唯一鍵是否在關(guān)系數(shù)據(jù)庫(kù)中存在。如果不存在則插入,并消費(fèi),否則跳過。(實(shí)際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報(bào)主鍵沖突,則插入失敗,直接跳過)
msgId一定是全局唯一標(biāo)識(shí)符,但是實(shí)際使用中,可能會(huì)存在相同的消息有兩個(gè)不同msgId的情況(消費(fèi)者主動(dòng)重發(fā)、因客戶端重投機(jī)制導(dǎo)致的重復(fù)等),這種情況就需要使業(yè)務(wù)字段進(jìn)行重復(fù)消費(fèi)。
2.4 數(shù)據(jù)一致性問題(異步分布式事務(wù)問題)
2.4.1 問題場(chǎng)景
當(dāng)服務(wù)間是同步調(diào)用的時(shí)候,我們還可以使用本地事務(wù)來控制數(shù)據(jù)的一致性。但是引入MQ之后,服務(wù)間的調(diào)用都是異步了,就沒辦法使用本地事務(wù),也就無法做到數(shù)據(jù)的強(qiáng)一致性了。
例如,調(diào)用訂單服務(wù)下單成功了,但是調(diào)用庫(kù)存服務(wù)扣減庫(kù)存失敗,就會(huì)導(dǎo)致超賣,是嚴(yán)重的線上事故。
這時(shí)候怎么辦?
方案一:需要事務(wù)強(qiáng)一致的,不用消息異步,如下單、減庫(kù)存要放在一個(gè)事務(wù)里控制,加積分這種非核心的業(yè)務(wù)才用消息異步處理。

方案二:可以使用MQ事務(wù)消息(只有RocketMQ才有事務(wù)消息功能,RocketMQ收發(fā)事務(wù)消息)。
事務(wù)狀態(tài)有以下三種:
- TransactionStatus.CommitTransaction:提交事務(wù),允許訂閱方消費(fèi)該消息。
- TransactionStatus.RollbackTransaction:回滾事務(wù),消息將被丟棄不允許消費(fèi)。
- TransactionStatus.Unknow:無法判斷狀態(tài),期待消息隊(duì)列RocketMQ版的Broker向發(fā)送方再次詢問該消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)。
步驟一: A 服務(wù)向消息中間件發(fā)布消息
在服務(wù)A處理任務(wù)A前,首先向消息中間件發(fā)送一條半信息。
消息中間件收到后將該消息持久化,但不進(jìn)行投遞。持久化成功后,向A服務(wù)返回確認(rèn)應(yīng)答。
服務(wù)A收到確認(rèn)應(yīng)答后,便可以開始處理任務(wù)A。
任務(wù)A處理完成后,服務(wù)A便會(huì)向消息中間件發(fā)送Commit 或者 Rollback 請(qǐng)求,該請(qǐng)求發(fā)送完成后,服務(wù)A的工作任務(wù)就結(jié)束了,該事務(wù)的處理過程也就結(jié)束了。
在消息中間件收到 Commit 后,便會(huì)向 B 服務(wù)投遞消息,如果收到 Rollback 便會(huì)直接丟棄消息。
如果消息中間件在最后的過程中,長(zhǎng)時(shí)間沒有收到服務(wù)A 發(fā)送的 Commit 或 Rollback 指令,這個(gè)時(shí)候就需要依靠 超時(shí)詢問機(jī)制。
步驟二: 消息中間件向B服務(wù)投遞消息
消息中間件收到A服務(wù)的提交 Commit指令后便會(huì)將該消息投遞給B服務(wù),然后將自己的狀態(tài)置為阻塞等待狀態(tài)。B服務(wù)收到消息中間件發(fā)送的消息后便開始處理任務(wù)B,處理完成后便會(huì)向消息中間件發(fā)出回應(yīng)。但是在消息中間件阻塞等待的時(shí)候同樣會(huì)出現(xiàn)問題。
- 正常情況:消息中間件投遞完消息后,進(jìn)入阻塞等待狀態(tài),在收到確認(rèn)應(yīng)答后便認(rèn)為事務(wù)處理完成,該流程結(jié)束。
- 等待超時(shí)情況:在等待確認(rèn)應(yīng)答超時(shí)之后就會(huì)重新進(jìn)行投遞,直到B服務(wù)器返回消費(fèi)成功響應(yīng)為止。而消息重試的次數(shù)和時(shí)間間隔都可以設(shè)置,如果最終還是不能成功進(jìn)行投遞,則需要人工干預(yù)。
2.4.2 解決方案
我們都知道數(shù)據(jù)一致性分為:強(qiáng)一致性、弱一致性、最終一致性。
而 MQ 為了性能考慮使用的是最終一致性,那么必定會(huì)出現(xiàn)數(shù)據(jù)不一致的問題。這類問題大概率是因?yàn)橄M(fèi)者讀取消息后,業(yè)務(wù)邏輯處理失敗導(dǎo)致的。這時(shí)候可以增加重試機(jī)制。重試分為同步重試和異步重試。
有些消息量比較小的業(yè)務(wù)場(chǎng)景,可以采用同步重試。在消費(fèi)消息時(shí)如果處理失敗,立刻重試 3-5 次,如果還是失敗則寫入到記錄表中。但如果消息量比較大,則不建議使用這種方式。因?yàn)槿绻霈F(xiàn)網(wǎng)絡(luò)異常,可能會(huì)導(dǎo)致大量的消息不斷重試,影響消息讀取速度造成消息堆積。

消息量比較大的業(yè)務(wù)場(chǎng)景,建議采用異步重試。在消費(fèi)者處理失敗之后,立刻寫入重試表,有個(gè) job(如采用xxljob) 專門定時(shí)重試。
還有一種做法:如果消費(fèi)失敗,自己給同一個(gè) topic 發(fā)一條消息。在后面的某個(gè)時(shí)間點(diǎn),自己又會(huì)消費(fèi)到那條消息,起到了重試的效果。如果對(duì)消息順序要求不高的場(chǎng)景,可以使用這種方式。
2.5 消息丟失問題
2.5.1 問題場(chǎng)景
同樣消息丟失問題,也是 MQ 中普遍存在的問題,不管你用哪種 MQ 都 無法避免。有哪些 場(chǎng)景會(huì)出現(xiàn)消息丟失問題呢?
- 生產(chǎn)者產(chǎn)生消息時(shí),由于網(wǎng)絡(luò)原因發(fā)送到 MQ 失敗了;
- MQ 服務(wù)器持久化,存儲(chǔ)磁盤時(shí)出現(xiàn)異常;
- Kafka和RocketMQ 的 offset 被回調(diào)時(shí),略過了很多消息;
- 消費(fèi)者剛讀取消息,已經(jīng) ACK 確認(rèn),但業(yè)務(wù)還沒處理完,服務(wù)就被重啟了。
導(dǎo)致消息丟失問題的原因挺多的, 生產(chǎn)者、 MQ 服務(wù)器、 消費(fèi)者都有可能產(chǎn)生問題。我在這里就不一一列舉了。最終的結(jié)果會(huì)導(dǎo)致消費(fèi)者無法正確的處理消息,而導(dǎo)致數(shù)據(jù)不一致的情況。
2.5.2 解決方案
不管你是否承認(rèn),有時(shí)候消息真的會(huì)丟。即使這種概率非常小,也會(huì)對(duì)業(yè)務(wù)有影響。生產(chǎn)者、MQ 服務(wù)器、消費(fèi)者都有可能會(huì)導(dǎo)致消息丟失的問題。為了解決這個(gè)問題,我們可以增加一張消息發(fā)送表。
當(dāng)生產(chǎn)者發(fā)完消息之后,會(huì)往該表中寫入一條數(shù)據(jù),狀態(tài) status 標(biāo)記為待確認(rèn);
如果消費(fèi)者讀取消息之后,調(diào)用生產(chǎn)者的 API 更新該消息的status為已確認(rèn);
有個(gè)job(xxljob) 每隔一段時(shí)間檢查一次消息發(fā)送表,如果5分鐘(這個(gè)時(shí)間可以根據(jù)實(shí)際情況來定)后還有狀態(tài)是待確認(rèn)的消息,則認(rèn)為該消息已經(jīng)丟失了,重新發(fā)條消息。

這樣不管是由于生產(chǎn)者、 MQ服務(wù)器、還是消費(fèi)者導(dǎo)致的消息丟失問題,job 都會(huì)重新發(fā)消息。
2.6 消息順序問題
2.6.1 問題場(chǎng)景
有些業(yè)務(wù)數(shù)據(jù)是有狀態(tài)的,比如訂單有下單、支付、完成、退貨等狀態(tài)。 如果訂單數(shù)據(jù)作為消息體,就會(huì)涉及順序問題了。
例如消費(fèi)者收到同一個(gè)訂單的兩條消息。第一條消息的狀態(tài)是下單,第二條消息的狀態(tài)是支付,這是沒問題的。但如果第一條消息的狀態(tài)是支付,第二條消息的狀態(tài)是下單就會(huì)有問題了。沒有下單就先支付了?

消息順序問題是一個(gè)非常棘手的問題,比如:
Kafka 同一個(gè) partition 中能保證順序,但是不同的 partition 無法保證順序;
RabbitMQ的同一個(gè)queue能夠保證順序,但是如果多個(gè)消費(fèi)者同一個(gè)queue 也會(huì)有順序問題。
如果消費(fèi)者使用多線程消費(fèi)消息,也無法保證順序。
如果消費(fèi)消息時(shí)同一個(gè)訂單的多條消息中,中間的一條消息出現(xiàn)異常情況,順序?qū)?huì)被打亂。
還有如果生產(chǎn)者發(fā)送到 MQ中的路由規(guī)則,跟消費(fèi)者不一樣,也無法保證順序。
2.6.2 解決方案
消息順序問題是一種常見問題。我們以 Kafka 消費(fèi)訂單消息為例,訂單有下單、 支付、 完成、 退貨等狀態(tài)。這些狀態(tài)是有先后順序的,如果順序錯(cuò)了會(huì)導(dǎo)致業(yè)務(wù)異常。
解決這類問題之前,我們需要先確認(rèn):消費(fèi)者是否真的需要知道中間狀態(tài),只知道最終狀態(tài)行不行?

其實(shí)很多時(shí)候,我真的需要知道的是最終狀態(tài)。這時(shí)可以把流程優(yōu)化一下:

這種方式可以解決大部分的消息順序問題。
但如果真的有需要保證消息順序的需求,那么可以將訂單號(hào)路由到不同的 partition。同一個(gè)訂單號(hào)的消息,每次到發(fā)到同一個(gè)partition。
2.7 消息堆積
2.7.1 問題場(chǎng)景
如果消息消費(fèi)者讀取消息的速度,能夠跟上消息生產(chǎn)者的節(jié)奏,那么整套 MQ 機(jī)制就能發(fā)揮最大作用。
但是很多時(shí)候,由于某些批處理或者其他原因,導(dǎo)致消費(fèi)速度小于生產(chǎn)速度。這樣會(huì)直接導(dǎo)致消息堆積問題,從而影響業(yè)務(wù)功能。
這里以下單 開通會(huì)員為例,如果消息出現(xiàn)堆積會(huì)導(dǎo)致用戶下單之后,很久之后才能變成會(huì)員。這種情況肯定會(huì)引起大量用戶投訴。
2.7.2 解決方案
那么消息堆積問題該如何解決呢?這個(gè)要看消息是否需要保證順序。如果不需要保證順序,可以讀取消息之后用多線程處理業(yè)務(wù)邏輯。

這樣就能增加業(yè)務(wù)邏輯處理速度,解決消息堆積問題。但是線程池的核心線程數(shù)和最大線程數(shù)需要合理配置,不然可能會(huì)浪費(fèi)系統(tǒng)資源。
如果需要保證順序,可以讀取消息之后將消息按照一定的規(guī)則分發(fā)到多個(gè)隊(duì)列中,然后在隊(duì)列中用單線程處理。

資料來源:
面試官竟然問我為啥要用MQ,幸虧我看了參考答案