34.生產(chǎn)案例:從 RocketMQ 全鏈路分析一下為什么用戶支付后沒收到紅包?
有用戶反饋說,按照規(guī)則應(yīng)該是在支付之后可以拿到一個現(xiàn)金紅包的,但是他在支付了一個訂單之后,卻并沒有收到這個現(xiàn)金紅包,于是就反饋給了客服。
經(jīng)過一通排查,從訂單系統(tǒng)和紅包系統(tǒng)當(dāng)天那個時間段的日志來看,居然只看到了訂單系統(tǒng)有推送消息到RocketMQ的日志,但是并沒有看到紅包系統(tǒng)從RocketMQ中接收消息以及發(fā)現(xiàn)金紅包的日志。
可能原因一:訂單系統(tǒng)推送消息到MQ的過程會丟失消息

可能原因二:Broker丟失消息:消息寫入os cache但沒有寫入磁盤

可能原因三:Broker丟失消息:消息寫入磁盤但磁盤壞了

可能原因四:紅包系統(tǒng)獲取到消息后丟失消息
默認(rèn)情況下,MQ的消費(fèi)者有可能會自動提交已經(jīng)消費(fèi)的offset,那么如果此時你還沒處理這個消息派發(fā)紅包的情況下,MQ的消費(fèi)者可能直接自動給你提交這個消息1的offset到broker去了,標(biāo)識為你已經(jīng)成功處理了這個消息。接著恰巧在這個時候,我們的紅包系統(tǒng)突然重啟了,或者是宕機(jī)了,或者是可能在派發(fā)紅包的時候更新數(shù)據(jù)庫失敗了,總之就是他突然故障了,紅包系統(tǒng)的機(jī)器重啟了一下,然后此時內(nèi)存里的消息1必然就丟失了,而且紅包也沒發(fā)出去。

35.發(fā)送消息零丟失方案:RocketMQ事務(wù)消息的實(shí)現(xiàn)流程分析
-
首先要讓訂單系統(tǒng)去發(fā)送一條half消息到MQ去,這個half消息本質(zhì)就是一個訂單支付成功的消息,只不過你可以理解為他這個消息的狀態(tài)是half狀態(tài),這個時候紅包系統(tǒng)是看不見這個half消息的。
image.png - 萬一half消息寫入失敗
這個時候訂單系統(tǒng)就應(yīng)該執(zhí)行一系列的回滾操作,比如對訂單狀態(tài)做一個更新,讓狀態(tài)變成“關(guān)閉交易”,同時通知支付系統(tǒng)自動進(jìn)行退款。 -
half消息成功之后,訂單系統(tǒng)完成自己的任務(wù)
這個時候訂單系統(tǒng)就應(yīng)該在自己本地的數(shù)據(jù)庫里執(zhí)行一些增刪改操作了,因?yàn)橐坏﹉alf消息寫成功了,就說明MQ肯定已經(jīng)收到這條消息了,MQ還活著,而且目前你是可以跟MQ正常溝通的。
image.png -
如果訂單系統(tǒng)的本地事務(wù)執(zhí)行失敗
這個時候其實(shí)也很簡單,直接就是讓訂單系統(tǒng)發(fā)送一個rollback請求給MQ就可以了。這個意思就是說,你可以把之前我發(fā)給你的half消息給刪除掉了,因?yàn)槲易约哼@里都出問題了,已經(jīng)無力跟你繼續(xù)后續(xù)的流程了。請求給MQ刪除那個half消息之后,你的訂單系統(tǒng)就必須走后續(xù)的回退流程了,就是通知支付系統(tǒng)退款。
image.png -
訂單系統(tǒng)完成了本地事務(wù)
如果訂單系統(tǒng)成功完成了本地的事務(wù)操作,比如把訂單狀態(tài)都更新為“已完成”了,此時你就可以發(fā)送一個commit請求給MQ,要求讓MQ對之前的half消息進(jìn)行commit操作,讓紅包系統(tǒng)可以看見這個訂單支付成功消息。
image.png -
如果發(fā)送half消息成功了,但是沒收到響應(yīng)呢?
這個時候我們沒收到響應(yīng),可能就會網(wǎng)絡(luò)超時報(bào)錯,也可能直接有其他的異常錯誤,這個時候訂單系統(tǒng)會誤以為是發(fā)送half消息到MQ失敗了,訂單系統(tǒng)就直接會執(zhí)行退款流程了,訂單狀態(tài)也會標(biāo)記為“已關(guān)閉”。
image.png
其實(shí)RocketMQ這里有一個補(bǔ)償流程,他會去掃描自己處于half狀態(tài)的消息,如果我們一直沒有對這個消息執(zhí)行commit/rollback操作,超過了一定的時間,他就會回調(diào)你的訂單系統(tǒng)的一個接口,系統(tǒng)就得去查一下數(shù)據(jù)庫,看看這個訂單當(dāng)前的狀態(tài),一下發(fā)現(xiàn)訂單狀態(tài)是“已關(guān)閉”,此時就知道,你必然得發(fā)送rollback請求給MQ去刪除之前那個half消息了!
image.png
image.png - 如果rollback或者commit發(fā)送失敗了呢?
這個時候其實(shí)也很簡單,因?yàn)镸Q里的消息一直是half狀態(tài),所以說他過了一定的超時時間會發(fā)現(xiàn)這個half消息有問題,他會回調(diào)你的訂單系統(tǒng)的接口,此時要判斷一下,這個訂單的狀態(tài)如果更新為了“已完成”,那就得再次執(zhí)行commit請求,反之則再次執(zhí)行rollback請求。
本質(zhì)這個MQ的回調(diào)就是一個補(bǔ)償機(jī)制,如果你的half消息響應(yīng)沒收到,或者rollback、commit請求沒發(fā)送成功,他都會來找你問問對half消息后續(xù)如何處理。 - 總結(jié)
其實(shí)很簡單,如果你的MQ有問題或者網(wǎng)絡(luò)有問題,half消息根本都發(fā)不出去,此時half消息肯定是失敗的,那么訂單系統(tǒng)就不會執(zhí)行后續(xù)流程了!
如果要是half消息發(fā)送出去了,但是half消息的響應(yīng)都沒收到,然后執(zhí)行了退款流程,那MQ會有補(bǔ)償機(jī)制來回調(diào)找你詢問要commit還是rollback,此時你選擇rollback刪除消息就可以了,不會執(zhí)行后續(xù)流程!
如果要是訂單系統(tǒng)收到half消息了,結(jié)果訂單系統(tǒng)自己更新數(shù)據(jù)庫失敗了,那么他也會進(jìn)行回滾,不會執(zhí)行后續(xù)流程了!
如果要是訂單系統(tǒng)收到half消息了,然后還更新自己數(shù)據(jù)庫成功了,訂單狀態(tài)是“已完成”了,此時就必然會發(fā)送commit請求給MQ,一旦消息commit了,那么必然保證紅包系統(tǒng)可以收到這個消息!
而且即使你commit請求發(fā)送失敗了,MQ也會有補(bǔ)償機(jī)制,回調(diào)你接口讓你判斷是否重新發(fā)送commit請求。
總之,就是你的訂單系統(tǒng)只要成功了,那么必然要保證MQ里的消息是commit了可以讓紅包系統(tǒng)看到他!
36.事務(wù)消息機(jī)制的底層實(shí)現(xiàn)原理
-
half 消息是如何對消費(fèi)者不可見的?
RocketMQ一旦發(fā)現(xiàn)你發(fā)送的是一個half消息,他不會把這個half消息的offset寫入OrderPaySuccessTopic的ConsumeQueue里去。他會把這條half消息寫入到自己內(nèi)部的“RMQ_SYS_TRANS_HALF_TOPIC”這個Topic對應(yīng)的一個ConsumeQueue里去,所以你的紅包系統(tǒng)自然無法從OrderPaySuccessTopic的ConsumeQueue中看到這條half消息了。half消息進(jìn)入到RocketMQ內(nèi)部的RMQ_SYS_TRANS_HALF_TOPIC的ConsumeQueue文件了,此時就會認(rèn)為half消息寫入成功了,然后就會返回響應(yīng)給訂單系統(tǒng)。
RMQ_SYS_TRANS_HALF_TOPIC.png -
假如因?yàn)楦鞣N問題,沒有執(zhí)行rollback或者commit會怎么樣?
其實(shí)這個時候他會在后臺有定時任務(wù),定時任務(wù)會去掃描RMQ_SYS_TRANS_HALF_TOPIC中的half消息,如果你超過一定時間還是half消息,他會回調(diào)訂單系統(tǒng)的接口,讓你判斷這個half消息是要rollback還是commit。
定時任務(wù)掃描.png -
如果執(zhí)行rollback操作的話,如何標(biāo)記消息回滾?
因?yàn)镽ocketMQ都是順序把消息寫入磁盤文件的,所以在這里如果你執(zhí)行rollback,他的本質(zhì)就是用一個OP操作來標(biāo)記half消息的狀態(tài),RocketMQ內(nèi)部有一個OP_TOPIC,此時可以寫一條rollback OP記錄到這個Topic里,標(biāo)記某個half消息是rollback了。另外,假設(shè)你一直沒有執(zhí)行commit/rollback,RocketMQ會回調(diào)訂單系統(tǒng)的接口去判斷half消息的狀態(tài),但是他最多就是回調(diào)15次,如果15次之后你都沒法告知他half消息的狀態(tài),就自動把消息標(biāo)記為rollback。
如何標(biāo)記消息回滾.png -
如果執(zhí)行commit操作,如何讓消息對紅包系統(tǒng)可見?
執(zhí)行commit操作之后,RocketMQ就會在OP_TOPIC里寫入一條記錄,標(biāo)記half消息已經(jīng)是commit狀態(tài)了。接著需要把放在RMQ_SYS_TRANS_HALF_TOPIC中的half消息給寫入到OrderPaySuccessTopic的ConsumeQueue里去,然后我們的紅包系統(tǒng)可以就可以看到這條消息進(jìn)行消費(fèi)了。
如何標(biāo)記消息提交.png
其實(shí)本質(zhì)都是基于CommitLog、ConsumeQueue這套存儲機(jī)制來做的,只不過中間有一些Topic的變換,half消息可能就是寫入內(nèi)部Topic的。
37.同步發(fā)送消息 + 反復(fù)多次重試方案 VS RocketMQ事務(wù)消息方案
RocketMQ事務(wù)消息方案雖然能保證消息零丟失,但是機(jī)制復(fù)雜,完全有可能導(dǎo)致整體性能比較差,而且吞吐量比較低,是否有更加簡單的方法來確保消息一定可以到達(dá)MQ呢?能不能基于重試機(jī)制來確保消息到達(dá)MQ?
只要我們在代碼中發(fā)送消息到MQ之后,同步等待MQ返回響應(yīng)給我們,一直等待,如果半路中有網(wǎng)絡(luò)異?;蛘進(jìn)Q內(nèi)部異常,我們肯定會收到一個異常,比如網(wǎng)絡(luò)錯誤,或者請求超時之類的。
如果我們在收到異常之后,就認(rèn)為消息到MQ發(fā)送失敗了,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應(yīng)給我們,這樣反復(fù)重試,是否可以確保消息一定會到達(dá)MQ?

先執(zhí)行訂單本地事務(wù),還是先發(fā)消息到MQ?
如果我們先執(zhí)行訂單本地事務(wù),接著再發(fā)送消息到MQ的話,偽代碼是這樣的:

假設(shè)你剛執(zhí)行完成了訂單本地事務(wù)了,結(jié)果還沒等到你發(fā)送消息到MQ,結(jié)果你的訂單系統(tǒng)突然崩潰了!這就導(dǎo)致你的訂單狀態(tài)可能已經(jīng)修改為了“已完成”,但是消息卻沒發(fā)送到MQ去!這就是這個方案最大的隱患。

把訂單本地事務(wù)和重試發(fā)送MQ消息放到一個事務(wù)代碼中
偽代碼改成這樣:

上面這個代碼看起來似乎解決了我們的問題,就是在這個方法上加入事務(wù),在這個事務(wù)方法中,我們哪怕執(zhí)行了orderService.finishOrderPay(),但是其實(shí)也僅僅執(zhí)行了一些增刪改SQL語句,還沒提交訂單本地事務(wù)。
如果發(fā)送MQ消息失敗了,而且多次重試還不奏效,則我們拋出異常會自動回滾訂單本地事務(wù);如果你剛執(zhí)行了orderService.finishOrderPay(),結(jié)果訂單系統(tǒng)直接崩潰了,此時訂單本地事務(wù)會回滾,因?yàn)楦緵]提交過。
但是對于這個方案,還是非常的不理想,原因就出在那個MQ多次重試的地方。
假設(shè)用戶支付成功了,然后支付系統(tǒng)回調(diào)通知你的訂單系統(tǒng)說,有一筆訂單已經(jīng)支付成功了,這個時候你的訂單系統(tǒng)卡在多次重試MQ的代碼那里,可能耗時了好幾秒種,此時回調(diào)通知你的系統(tǒng)早就等不及可能都超時異常了。而且你把重試MQ的代碼放在這個邏輯里,可能會導(dǎo)致訂單系統(tǒng)的這個接口性能很差。

一定可以依靠本地事務(wù)回滾嗎?
看下面的代碼:

雖然在方法上加了事務(wù)注解,但是代碼里還有更新Redis緩存和Elasticsearch數(shù)據(jù)的代碼邏輯,如果你要是已
經(jīng)完成了訂單數(shù)據(jù)庫更新、Redis緩存更新、ES數(shù)據(jù)更新了,結(jié)果沒法送MQ呢訂單系統(tǒng)崩潰了。雖然訂單數(shù)據(jù)庫的操作會回滾,但是Redis、Elasticsearch中的數(shù)據(jù)更新會自動回滾嗎?不會的,因?yàn)樗麄兏緵]法自動回滾,此時數(shù)據(jù)還是會不一致的。所以說,完全寄希望于本地事務(wù)自動回滾是不現(xiàn)實(shí)的。
所以分析完了這個同步發(fā)送消息 + 反復(fù)多次重試的方案之后,我們會發(fā)現(xiàn)他實(shí)際落地的時候是可以的,但是里面存在一些問題。最后保證業(yè)務(wù)系統(tǒng)一致性的最佳方案還是:基于RocketMQ的事務(wù)消息機(jī)制。
38.分析RocketMQ事物消息的代碼實(shí)現(xiàn)細(xì)節(jié)
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
// 這個東西就是用來接受RocketMQ回調(diào)的一個監(jiān)聽器接口
// 這里會實(shí)現(xiàn)執(zhí)行訂單本地事務(wù),commit、rollback,回調(diào)查詢等邏輯
TransactionListener transactionListener = new TransactionListenerImpl();
// 創(chuàng)建一個支持事務(wù)消息的Producer
TransactionMQProducer producer = new TransactionMQProducer("TestProducerGroup");
// 這個線程池是用來處理RocketMQ回調(diào)你的請求的
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
(r) -> {
Thread thread = new Thread(r);
thread.setName("TestThread");
return thread;
});
// 給事務(wù)生產(chǎn)者設(shè)置對應(yīng)的線程池,負(fù)責(zé)執(zhí)行RocketMQ回調(diào)請求
producer.setExecutorService(threadPool);
// 給事務(wù)生產(chǎn)者設(shè)置對應(yīng)的回調(diào)函數(shù)
producer.setTransactionListener(transactionListener);
// 啟動這個事務(wù)消息生產(chǎn)者
producer.start();
// 構(gòu)建一條訂單支付成功的消息,指定Topic
Message message = new Message("PayOrderSuccessTopic", "TestTag", "TestKey",
"訂單消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
try {
SendResult sendResult = producer.sendMessageInTransaction(message, null);
} catch (MQClientException e) {
// half消息發(fā)送失敗
// 訂單系統(tǒng)執(zhí)行回滾邏輯,比如說觸發(fā)支付退款,更新訂單狀態(tài)為“已關(guān)閉”
}
}
}
public class TransactionListenerImpl implements TransactionListener {
// 如果half消息發(fā)送成功了,就會毀掉你的這個函數(shù),你就可以執(zhí)行本地事務(wù)了
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)
// 根據(jù)本地事務(wù)一連串執(zhí)行結(jié)果,去選擇commit or rollback
try {
// 如果本地事務(wù)都成功了,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事務(wù)都成功了,回滾一切執(zhí)行過的操作
// 如果本地事務(wù)執(zhí)行失敗了,返回rollback,標(biāo)記half消息無效
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 如果因?yàn)楦鞣N原因,沒有返回commit或者rollback
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查詢本地事務(wù),是否執(zhí)行成功了
Integer status = localTrans.get(msg.getTransactionId());
// 根據(jù)本地事務(wù)的情況取選擇commit or rollback
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
39.Broker消息零丟失方案:同步刷盤 + Raft協(xié)議主從同步
Broker消息丟失的可能原因:
- 消息被寫入到os cache,但沒有被寫入到磁盤,此時Broke宕機(jī);
- 消息已經(jīng)被寫入到磁盤,但是磁盤損壞了,導(dǎo)致磁盤中消息丟失,并且此時消費(fèi)者還沒有來得及消費(fèi)。
解決方案:
對于1,將異步刷盤調(diào)整為同步刷盤,這樣就保證了只有消息被刷入到磁盤中,該消息才被認(rèn)為寫入成功,返回響應(yīng)給生產(chǎn)者。比如我們發(fā)送half消息的時候,只要MQ返回響應(yīng)是half消息發(fā)送成功了,那么就說明消息已經(jīng)進(jìn)入磁盤文件了,不會停留在os cache里。具體做法:調(diào)整broker的配置文件,將其中的flushDiskType配置設(shè)置為:SYNC_FLUSH,默認(rèn)他的值是ASYNC_FLUSH,即默認(rèn)是異步刷盤的。
對于2,通過主從架構(gòu)模式避免磁盤故障導(dǎo)致的數(shù)據(jù)丟失,這樣一來,你一條消息但凡寫入成功了,此時主從兩個Broker上都有這條數(shù)據(jù)了,此時如果你的Master Broker的磁盤壞了,但是Slave Broker上至少還是有數(shù)據(jù)的,數(shù)據(jù)是不會因?yàn)榇疟P故障而丟失的。
40.Consumer消息零丟失方案:手動提交offset + 自動故障轉(zhuǎn)移
Consumer消息丟失原因:紅包系統(tǒng)已經(jīng)拿到了這條消息,但是消息目前還在他的內(nèi)存里,還沒執(zhí)行派發(fā)紅包的邏輯,此時他就直接提交了這條消息的offset到broker去說自己已經(jīng)處理過了,接著紅包系統(tǒng)在上圖這個狀態(tài)的時候就直接崩潰了,內(nèi)存里的消息就沒了,紅包也沒派發(fā)出去,結(jié)果Broker已經(jīng)收到他提交的消息offset了,還以為他已經(jīng)處理完這條消息了。等紅包系統(tǒng)重啟的時候,就不會再次消費(fèi)這條消息了。
解決方案:

對于RocketMQ而言,其實(shí)只要你的紅包系統(tǒng)是在這個監(jiān)聽器的函數(shù)中先處理一批消息,基于這批消息都派發(fā)完了紅包,然后返回了那個消費(fèi)成功的狀態(tài),接著才會去提交這批消息的offset到broker去。所以在這個情況下,如果你對一批消息都處理完畢了,然后再提交消息的offset給broker,接著紅包系統(tǒng)崩潰了,此時是不會丟失消息的。

那么如果是紅包系統(tǒng)獲取到一批消息之后,還沒處理完,也就沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個狀態(tài)呢,自然沒提交這批消息的offset給broker呢,此時紅包系統(tǒng)突然掛了,會怎么樣?
其實(shí)在這種情況下,你對一批消息都沒提交他的offset給broker的話,broker不會認(rèn)為你已經(jīng)處理完了這批消息,此時你突然紅包系統(tǒng)的一臺機(jī)器宕機(jī)了,他其實(shí)會感知到你的紅包系統(tǒng)的一臺機(jī)器作為一個Consumer掛了。接著他會把你沒處理完的那批消息交給紅包系統(tǒng)的其他機(jī)器去進(jìn)行處理,所以在這種情況下,消息也絕對是不會丟失的。
需要警惕的地方:不能異步消費(fèi)消息
不能在代碼中對消息進(jìn)行異步的處理,如下錯誤的示范,我們開啟了一個子線程去處理這批消息,然后啟動線程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。

如果要是用這種方式來處理消息的話,那可能就會出現(xiàn)你開啟的子線程還沒處理完消息呢,你已經(jīng)返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了,認(rèn)為已經(jīng)處理結(jié)束了。然后此時你紅包系統(tǒng)突然宕機(jī),必然會導(dǎo)致你的消息丟失了!
41.基于 RocketMQ 設(shè)計(jì)的全鏈路消息零丟失方案總結(jié)
發(fā)送消息到MQ的零丟失:
方案一(同步發(fā)送消息 + 反復(fù)多次重試)
方案二(事務(wù)消息機(jī)制),兩者都有保證消息發(fā)送零丟失的效果,但是經(jīng)過分析,事務(wù)消息方案整體會更好一些
MQ收到消息之后的零丟失:開啟同步刷盤策略 + 主從架構(gòu)同步機(jī)制,只要讓一個Broker收到消息之后同步寫入磁盤,同時同步復(fù)制給其他Broker,然后再返回響應(yīng)給生產(chǎn)者說寫入成功,此時就可以保證MQ自己不會弄丟消息
消費(fèi)消息的零丟失:采用RocketMQ的消費(fèi)者天然就可以保證你處理完消息之后,才會提交消息的offset到broker去,只要記住別采用多線程異步處理消息的方式即可
消息零丟失方案的優(yōu)勢與劣勢
優(yōu)勢:消息零丟失
劣勢:整個從頭到尾的消息流轉(zhuǎn)鏈路的性能大幅度下降,MQ的吞吐量大幅度的下降
消息零丟失方案到底適用場景
一般我們建議,對于跟金錢、交易以及核心數(shù)據(jù)相關(guān)的系統(tǒng)和核心鏈路,可以上這套消息零丟失方案。
比如支付系統(tǒng),他是絕對不能丟失任何一條消息的,你的性能可以低一些,但是不能有任何一筆支付記錄丟失。
比如訂單系統(tǒng),公司一般是不能輕易丟失一個訂單的,畢竟一個訂單就對應(yīng)一筆交易,如果訂單丟失,用戶還支付成功了,你輕則要給用戶賠付損失,重則弄不好要經(jīng)受官司,特別是一些B2B領(lǐng)域的電商,一筆線上交易可能多大幾萬幾十萬。
所以對這種非常非常核心的場景和少數(shù)幾條核心鏈路,才會建議大家上這套復(fù)雜的消息0丟失方案。對于非核心的鏈路,非金錢交易的鏈路,大家可以適當(dāng)簡化這套方案,用一些方法避免數(shù)據(jù)輕易丟失,但是同時性能整體很高,即使有極個別的數(shù)據(jù)丟失,對非核心的場景,也不會有太大的影響。
42.生產(chǎn)案例:從 RocketMQ 底層原理分析為什么會重復(fù)發(fā)優(yōu)惠券?

-
用于發(fā)送消息到MQ的訂單系統(tǒng),如果出現(xiàn)了接口超時等問題,可能會導(dǎo)致上游的支付系統(tǒng)重試調(diào)用訂單系統(tǒng)的接口,進(jìn)而導(dǎo)致訂單系統(tǒng)對一個消息重復(fù)發(fā)送兩條到MQ里去!
可能原因1.png -
發(fā)送MQ的重試機(jī)制可能因?yàn)榫W(wǎng)絡(luò)原因出現(xiàn)超時異常,從而重復(fù)發(fā)送MQ。
重試代碼.png
網(wǎng)絡(luò)原因?qū)е轮匕l(fā)MQ.png -
優(yōu)惠券系統(tǒng)剛剛發(fā)完優(yōu)惠券,還沒來得及提交消息offset到broker,就宕機(jī)了(或者重啟),這時因?yàn)槟銢]提交這條消息的offset給broker,broker并不知道你已經(jīng)處理完了這條消息,然后優(yōu)惠券系統(tǒng)重啟之后,broker就會再次把這條消息交給你,讓你再一次進(jìn)行處理,然后你會再一次發(fā)送一張優(yōu)惠券,導(dǎo)致重復(fù)發(fā)送了兩次優(yōu)惠券!
第一次發(fā)送優(yōu)惠券.png
第二次發(fā)送優(yōu)惠券.png
43.對訂單系統(tǒng)核心流程引入 冪等性機(jī)制,保證數(shù)據(jù)不會重復(fù)
什么是冪等性機(jī)制?
這個冪等性機(jī)制,其實(shí)就是用來避免對同一個請求或者同一條消息進(jìn)行重復(fù)處理的機(jī)制,所謂的冪等,他的意思就是,比如你有一個接口,然后如果別人對一次請求重試了多次,來調(diào)用你的接口,你必須保證自己系統(tǒng)的數(shù)據(jù)是正常的,不能多出來一些重復(fù)的數(shù)據(jù),這就是冪等性的意思。
發(fā)送消息到MQ的時候如何保證冪等性?
1. 業(yè)務(wù)判斷法:當(dāng)你的訂單系統(tǒng)的接口被重試調(diào)用的時候,你這個接口上來就應(yīng)該發(fā)送請求到MQ里去查詢一下,比如對訂單id=1100這個訂單的支付成功消息,在你MQ那里有沒有?如果有的話,我就不再重復(fù)發(fā)送消息了!

弊端:在這個環(huán)節(jié)你直接從MQ查詢消息是沒這個必要的,他的性能也不是太好,會影響你的接口的性能。
2. 狀態(tài)判斷法-基于Redis緩存的冪等性機(jī)制:這個方法的核心在于,你需要引入一個Redis緩存來存儲你是否發(fā)送過消息的狀態(tài),如果你成功發(fā)送了一個消息到MQ里去,你得在Redis緩存里寫一條數(shù)據(jù),標(biāo)記這個消息已經(jīng)發(fā)送過,那么當(dāng)你的訂單接口被重復(fù)調(diào)用的時候,你只要根據(jù)訂單id去Redis緩存里查詢一下,這個訂單的支付消息是否已經(jīng)發(fā)送給MQ了,如果發(fā)送過了,你就別再次發(fā)送了!

弊端:這種方案一般情況下是可以做到冪等性的,但是如果有時候你剛發(fā)送了消息到MQ,還沒來得及寫Redis,系統(tǒng)就掛了,之后你的接口被重試調(diào)用的時候,你查Redis還以為消息沒發(fā)過,就會發(fā)送重復(fù)的消息到MQ去。
優(yōu)惠券系統(tǒng)如何保證消息處理的冪等性?
其實(shí)這里就比較簡單了,直接基于業(yè)務(wù)判斷法就可以了,因?yàn)閮?yōu)惠券系統(tǒng)每次拿到一條消息后給用戶發(fā)一張優(yōu)惠券,實(shí)際上核心就是在數(shù)據(jù)庫里給用戶插入一條優(yōu)惠券記錄。那么如果優(yōu)惠券系統(tǒng)從MQ那里拿到一個訂單的兩條重復(fù)的支付成功消息,這個時候其實(shí)很簡單,他只要先去優(yōu)惠券數(shù)據(jù)庫中查詢一下,比如對訂單id=1100的訂單,是否已經(jīng)發(fā)放過優(yōu)惠券了,是否有優(yōu)惠券記錄,如果有的話,就不要重復(fù)發(fā)券了!

總結(jié)
一般來說,對于MQ的重復(fù)消息而言,往MQ里重復(fù)發(fā)送一樣的消息還是可以接受的,因?yàn)镸Q里有多條重復(fù)消息,它不會對系統(tǒng)的核心數(shù)據(jù)造成影響,但是關(guān)鍵要保證的是,從MQ里獲取消息進(jìn)行處理的時候,必須要保證消息不能重復(fù)處理。
44.如果優(yōu)惠券系統(tǒng)的數(shù)據(jù)庫宕機(jī),如何用死信隊(duì)列解決這種異常場景?
假設(shè)了一個場景,就是訂單支付成功之后會推送消息到MQ,然后優(yōu)惠券系統(tǒng)、紅包系統(tǒng)會從MQ里獲取消息去執(zhí)行后續(xù)的處理,比如發(fā)紅包或者發(fā)優(yōu)惠券。那么如果這個時候,優(yōu)惠券系統(tǒng)的數(shù)據(jù)庫宕機(jī)了,針對這樣的一個坑爹的異常場景我們應(yīng)該怎么處理?
數(shù)據(jù)庫宕機(jī)的時候,返回RECONSUME_LATER
實(shí)際上如果我們因?yàn)閿?shù)據(jù)庫宕機(jī)等問題,對這批消息的處理是異常的,此時沒法處理這批消息,我們就應(yīng)該返回一個RECONSUME_LATER狀態(tài),他的意思是,我現(xiàn)在沒法完成這批消息的處理,麻煩你稍后過段時間再次給我這批消息讓我重新試一下!

RocketMQ是如何讓你進(jìn)行消費(fèi)重試的?
簡單來說,RocketMQ會有一個針對你這個ConsumerGroup的重試隊(duì)列。如果你返回了RECONSUME_LATER狀態(tài),他會把你這批消息放到你這個消費(fèi)組的重試隊(duì)列中去比如你的消費(fèi)組的名稱是“VoucherConsumerGroup”,意思是優(yōu)惠券系統(tǒng)的消費(fèi)組,那么他會有一個
“%RETRY%VoucherConsumerGroup”這個名字的重試隊(duì)列,然后過一段時間之后,重試隊(duì)列中的消息會再次給我們,讓我們進(jìn)行處理。如果再次失敗,又返回了RECONSUME_LATER,那么會再過一段時間讓我們來進(jìn)行處理,默認(rèn)最多是重試16次!每次重試之間的間隔時間是不一樣的,這個間隔時間可以如下進(jìn)行配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

如果連續(xù)重試16次還是無法處理消息,然后怎么辦?
其實(shí)就是一批消息交給你處理,你重試了16次還一直沒處理成功,就不要繼續(xù)重試這批消息了,你就認(rèn)為他們死掉了就可以了。然后這批消息會自動進(jìn)入死信隊(duì)列。死信隊(duì)列的名字是“%DLQ%VoucherConsumerGroup”
那么對死信隊(duì)列中的消息我們怎么處理?
其實(shí)這個就看你的使用場景了,比如我們可以專門開一個后臺線程,就是訂閱“%DLQ%VoucherConsumerGroup”這個死信隊(duì)列,對死信隊(duì)列中的消息,還是一直不停的重試。

消費(fèi)者底層的一些依賴可能有故障了,比如數(shù)據(jù)庫宕機(jī),緩存宕機(jī)之類的,此時你就沒辦法完成消息的處理了,那么可以通過一些返回狀態(tài)去讓消息進(jìn)入RocketMQ自帶的重試隊(duì)列,同時如果反復(fù)重試還是不行,可以讓消息進(jìn)入RocketMQ自帶的死信隊(duì)列,后續(xù)針對死信隊(duì)列中的消息進(jìn)行單獨(dú)的處理就可以了。
45.生產(chǎn)案例:為什么基于 RocketMQ 進(jìn)行訂單庫數(shù)據(jù)同步時會消息亂序?
場景再現(xiàn):大數(shù)據(jù)系統(tǒng)在基于Mysql binlog同步訂單數(shù)據(jù)時,binlog里有兩條日志,依次時insert、update操作,但是大數(shù)據(jù)系統(tǒng)在處理消息的時候哦,先處理了upodate消息,后處理insert消息,導(dǎo)致消息亂序,數(shù)據(jù)出現(xiàn)問題。

原因分析:原本有順序的消息,完全可能會分發(fā)到不同的MessageQueue中去,然后大數(shù)據(jù)系統(tǒng)的不同機(jī)器上部署的Consumer可能會用混亂的順序從不同的MessageQueue里獲取消息然后處理。

46.如何解決訂單數(shù)據(jù)庫同步的消息亂序問題?
- 采用取模的方式讓屬于同一個訂單的binlog進(jìn)入一個MessageQueue
- 獲取binlog的時候也得有序
解決訂單數(shù)據(jù)庫同步的消息亂序問題的方案.png
萬一消息處理失敗了不可以走重試隊(duì)列
因?yàn)槿绻愕腸onsumer獲取到訂單的一個insert binlog,結(jié)果處理失敗了,此時返回了RECONSUME_LATER,那么這條消息會進(jìn)入重試隊(duì)列,過一會兒才會交給你重試。但是此時broker會直接把下一條消息,也就是這個訂單的update binlog交給你來處理,此時萬一你執(zhí)行成功了,就根本沒有數(shù)據(jù)可以更新!又會出現(xiàn)消息亂序的問題。
所以對于有序消息的方案中,如果你遇到消息處理失敗的場景,就必須返回SUSPEND_CURRENT_QUEUE_A_MOMENT這個狀態(tài),意思是先等一會兒,一會兒再繼續(xù)處理這批消息,而不能把這批消息放入重試隊(duì)列去,然后直接處理下一批消息。
RocketMQ的順序消息機(jī)制的代碼實(shí)現(xiàn)
-
讓一個訂單的binlog進(jìn)入一個MessageQueue
讓一個訂單的binlog進(jìn)入一個MessageQueue.png - 消費(fèi)者按照順序來獲取一個MessageQueue中的消息
消費(fèi)者按照順序來獲取一個MessageQueue中的消息.png
使用的是MessageListenerOrderly這個東西,他里面有Orderly這個名稱,也就是說,Consumer會對每一個ConsumeQueue,都僅僅用一個線程來處理其中的消息。比如對ConsumeQueue01中的訂單id=1100的多個binlog,會交給一個線程來按照binlog順序來依次處理。否則如果ConsumeQueue01中的訂單id=1100的多個binlog交給Consumer中的多個線程來處理的話,那還是會有消息亂序的問題。
47.基于RocketMQ的數(shù)據(jù)過濾機(jī)制,提升訂單數(shù)據(jù)庫同步的處理效率
一個數(shù)據(jù)庫中可能會包含很多表的數(shù)據(jù),比如訂單數(shù)據(jù)庫,他里面除了訂單信息表以外,可能還包含很多其他的表。所以我們在進(jìn)行數(shù)據(jù)庫binlog同步的時候,很可能是把一個數(shù)據(jù)庫里所有表的binlog都推送到MQ里去的!
假設(shè)我們的大數(shù)據(jù)系統(tǒng)僅僅關(guān)注訂單數(shù)據(jù)庫中的表A的binlog,并不關(guān)注其他表的binlog,那么大數(shù)據(jù)系統(tǒng)可能需要在獲取到所有表的binlog之后,對每條binlog判斷一下,是否是表A的binlog?
如果不是表A的binlog,那么就直接丟棄不要處理;如果是表A的binlog,才會去進(jìn)行處理!但是這樣的話,必然會導(dǎo)致大數(shù)據(jù)系統(tǒng)處理很多不關(guān)注的表的binlog,也會很浪費(fèi)時間,降低消息的效率.
解決方案:在發(fā)送消息的時候,給消息設(shè)置tag和屬性
針對這個問題,我們可以采用RocketMQ支持的數(shù)據(jù)過濾機(jī)制,來讓大數(shù)據(jù)系統(tǒng)僅僅關(guān)注他想要的表的binlog數(shù)據(jù)即可。
發(fā)送消息的時候,可以給消息設(shè)置tag和屬性:

在消費(fèi)數(shù)據(jù)的時候根據(jù)tag和屬性進(jìn)行過濾:


RocketMQ還是支持比較豐富的數(shù)據(jù)過濾語法的,如下所示:
(1)數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比較,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)邏輯符號 AND,OR,NOT;
(5)數(shù)值,比如:123,3.1415;
(6)字符,比如:'abc',必須用單引號包裹起來;
(7)NULL,特殊的常量
(8)布爾值,TRUE 或 FALSE
48.生產(chǎn)案例:基于延遲消息機(jī)制優(yōu)化大量訂單的定時退款掃描問題!
場景:在實(shí)際情況中,其實(shí)APP的大量用戶每天會下很多訂單,但是不少訂單可能是一直沒有進(jìn)行支付的,可能他下單之后猶豫了,可能是他忘了支付了!所以一般訂單系統(tǒng)都必須設(shè)置一個規(guī)則,當(dāng)一個訂單下單之后,超過比如30分鐘沒有支付,那么就必須訂單系統(tǒng)自動關(guān)閉這個訂單,后續(xù)你如果要購買這個訂單里的商品,就得重新下訂單了。
問題:那么訂單系統(tǒng)就需要有一個后臺線程,不停的掃描訂單數(shù)據(jù)庫里所有的未支付狀態(tài)的訂單,看他如果超過30分鐘了還沒支付,那么就必須自動把訂單狀態(tài) 更新為“已關(guān)閉”。

但是這里就引入了一個問題,就是訂單系統(tǒng)的后臺線程必須要不停的掃描各種未支付的訂單,這種實(shí)現(xiàn)方式實(shí)際上并不是很好。
- 一個原因是未支付狀態(tài)的訂單可能是比較多的,然后你需要不停的掃描他們,可能每個未支付狀態(tài)的訂單要被掃描N多遍,才會發(fā)現(xiàn)他已經(jīng)超過30分鐘沒支付了。
- 另外一個是很難去分布式并行掃描你的訂單。因?yàn)榧僭O(shè)你的訂單數(shù)據(jù)量特別的多,然后你要是打算用多臺機(jī)器部署訂單掃描服務(wù),但是每臺機(jī)器掃描哪些訂單?怎么掃描?什么時候掃描?這都是一系列的麻煩問題。
方案:針對類似這種場景,MQ里的延遲消息可以派上用場了。所謂延遲消息,意思就是說,我們訂單系統(tǒng)在創(chuàng)建了一個訂單之后,可以發(fā)送一條消息到MQ里去,我們指定這條消息是延遲消息,比如要等待30分鐘之后,才能被訂單掃描服務(wù)給消費(fèi)到,這樣當(dāng)訂單掃描服務(wù)在30分鐘后消費(fèi)到了一條消息之后,就可以針對這條消息的信息,去訂單數(shù)據(jù)庫里查詢這個訂單,看看他在創(chuàng)建過后都過了30分鐘了,此時他是否還是未支付狀態(tài)?如果此時訂單還是未支付狀態(tài),那么就可以關(guān)閉他,否則訂單如果已經(jīng)支付了,就什么都不用做了。
延遲消息用法.png
這種方式就比你用后臺線程掃描訂單的方式要好的多了,一個是對每個訂單你只會在他創(chuàng)建30分鐘后查詢他一次而已,不會反復(fù)掃描訂單多次。
另外就是如果你的訂單數(shù)量很多,你完全可以讓訂單掃描服務(wù)多部署幾臺機(jī)器,然后對于MQ中的Topic可以多指定一個MessageQueue,這樣每個訂單掃描服務(wù)的機(jī)器作為一個Consumer都會處理一部分訂單的查詢?nèi)蝿?wù)。
延遲消息代碼實(shí)現(xiàn):
延遲消息生產(chǎn)者:
生產(chǎn)者代碼.png
發(fā)送延遲消息的核心,就是設(shè)置消息的delayTimeLevel,也就是延遲級別
RocketMQ默認(rèn)支持一些延遲級別如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。所以上面代碼中設(shè)置延遲級別為3,意思就是延遲10s,你發(fā)送出去的消息,會過10s被消費(fèi)者獲取到。那么如果是訂單延遲掃描場景,可以設(shè)置延遲級別為16,也就是對應(yīng)上面的30分鐘。
延遲消息消費(fèi)者:
消費(fèi)者代碼.png
49.在RocketMQ的生產(chǎn)實(shí)踐中積累的各種一手經(jīng)驗(yàn)總結(jié)
1. 靈活的運(yùn)用 tags來過濾數(shù)據(jù)
在真正的生產(chǎn)項(xiàng)目中,建議大家合理的規(guī)劃Topic和里面的tags,一個Topic代表了一類業(yè)務(wù)消息數(shù)據(jù),然后對于這類業(yè)務(wù)消息數(shù)據(jù),如果你希望繼續(xù)劃分一些類別的話,可以在發(fā)送消息的時候設(shè)置tags。
舉個例子,比如我們都知道現(xiàn)在常見的外賣平臺有美團(tuán)外賣、餓了么外賣還有別的一些外賣,那么假設(shè)你現(xiàn)在一個系統(tǒng)要發(fā)送外賣訂單數(shù)據(jù)到MQ里去,就可以針對性的設(shè)置tags,比如不同的外賣數(shù)據(jù)都到一個“WaimaiOrderTopic”里去。但是不同類型的外賣可以有不同的tags:“meituan_waimai”,“eleme_waimai”,“other_waimai”,等等。然后對你消費(fèi)“WaimaiOrderTopic”的系統(tǒng),可以根據(jù)tags來篩選,可能你就需要某一種類別的外賣數(shù)據(jù)罷了。
2. 基于消息key來定位消息是否丟失
之前我們給大家講過,在消息0丟失方案中,可能要解決的是消息是否丟失的問題,那么如果消息真的丟失了,我們是不是要排查?此時是不是要從MQ里查一下,這個消息是否丟失了?
那么怎么從MQ里查消息是否丟失呢?可以基于消息key來實(shí)現(xiàn),比如通過下面的方式設(shè)置一個消息的key為訂單id:message.setKeys(orderId),這樣這個消息就具備一個key了。接著這個消息到broker上,會基于key構(gòu)建hash索引,這個hash索引就存放在IndexFile索引文件里。然后后續(xù)我們可以通過MQ提供的命令去根據(jù)key查詢這個消息,類似下面這樣:mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId
3. 消息零丟失方案的補(bǔ)充
之前我們給大家分析過消息零丟失方案,其實(shí)在消息零丟失方案中還有一個問題,那就是MQ集群徹底故障了,此時就是不可用了,那么怎么辦呢?
其實(shí)對于一些金融級的系統(tǒng),或者跟錢相關(guān)的支付系統(tǒng),或者是廣告系統(tǒng),類似這樣的系統(tǒng),都必須有超高級別的高可用保障機(jī)制。
一般假設(shè)MQ集群徹底崩潰了,你生產(chǎn)者就應(yīng)該把消息寫入到本地磁盤文件里去進(jìn)行持久化,或者是寫入數(shù)據(jù)庫里去暫存起來,等待MQ恢復(fù)之后,然后再把持久化的消息繼續(xù)投遞到MQ里去。
4. 提高消費(fèi)者的吞吐量
如果消費(fèi)的時候發(fā)現(xiàn)消費(fèi)的比較慢,那么可以提高消費(fèi)者的并行度,常見的就是部署更多的consumer機(jī)器
但是這里要注意,你的Topic的MessageQueue得是有對應(yīng)的增加,因?yàn)槿绻愕腸onsumer機(jī)器有5臺,然后MessageQueue只有4個,那么意味著有一個consumer機(jī)器是獲取不到消息的。
然后就是可以增加consumer的線程數(shù)量,可以設(shè)置consumer端的參數(shù):consumeThreadMin、consumeThreadMax,這樣一臺consumer機(jī)器上的消費(fèi)線程越多,消費(fèi)的速度就越快。此外,還可以開啟消費(fèi)者的批量消費(fèi)功能,就是設(shè)置consumeMessageBatchMaxSize參數(shù),他默認(rèn)是1,但是你可以設(shè)置的多一些,那么一次就會交給你的回調(diào)函數(shù)一批消息給你來處理了,此時你可以通過SQL語句一次性批量處理一些數(shù)據(jù),比如:update xxx setxxx where id in (xx,xx,xx)。通過批量處理消息的方式,也可以大幅度提升消息消費(fèi)的速度。
5. 要不要消費(fèi)歷史消息
其實(shí)consumer是支持設(shè)置從哪里開始消費(fèi)消息的,常見的有兩種:一個是從Topic的第一條數(shù)據(jù)開始消費(fèi),一個是從最后一次消費(fèi)過的消息之后開始消費(fèi)。對應(yīng)的是:CONSUME_FROM_LAST_OFFSET,CONSUME_FROM_FIRST_OFFSET。一般來說,我們都會選擇CONSUME_FROM_FIRST_OFFSET,這樣你剛開始就從Topic的第一條消息開始消費(fèi),但是以后每次重啟,你都是從上一次消費(fèi)到的位置繼續(xù)往后進(jìn)行消費(fèi)的。
50.企業(yè)級的RocketMQ集群如何進(jìn)行權(quán)限機(jī)制的控制?
在RocketMQ中實(shí)現(xiàn)權(quán)限控制也不難,首先我們需要在broker端放一個額外的ACK權(quán)限控制配置文件,里面需要規(guī)定好權(quán)限,包括什么用戶對哪些Topic有什么操作權(quán)限,這樣的話,各個Broker才知道你每個用戶的權(quán)限。
首先在每個Broker的配置文件里需要設(shè)置aclEnable=true這個配置,開啟權(quán)限控制
其次,在每個Broker部署機(jī)器的${ROCKETMQ_HOME}/store/config目錄下,可以放一個plain_acl.yml的配置文件,這個里面就可以進(jìn)行權(quán)限配置,類似下面這樣子:
# 這個參數(shù)就是全局性的白名單
# 這里定義的ip地址,都是可以訪問Topic的
globalWhiteRemoteAddresses:
- 13.21.33.*
- 192.168.0.*
# 這個accounts就是說,你在這里可以定義很多賬號
# 每個賬號都可以在這里配置對哪些Topic具有一些操作權(quán)限
accounts:
# 這個accessKey其實(shí)就是用戶名的意思,比如我們這里叫做“訂單技術(shù)團(tuán)隊(duì)”
- accessKey: OrderTeam
# 這個secretKey其實(shí)就是這個用戶名的密碼
secretKey: 123456
# 下面這個是當(dāng)前這個用戶名下哪些機(jī)器要加入白名單的
whiteRemoteAddress:
# admin指的是這個賬號是不是管理員賬號
admin: false
# 這個指的是默認(rèn)情況下這個賬號的Topic權(quán)限和ConsumerGroup權(quán)限
defaultTopicPerm: DENY
defaultGroupPerm: SUB
# 這個就是這個賬號具體的堆一些賬號的權(quán)限
# 下面就是說當(dāng)前這個賬號對兩個Topic,都具備PUB|SUB權(quán)限,就是發(fā)布和訂閱的權(quán)限
# PUB就是發(fā)布消息的權(quán)限,SUB就是訂閱消息的權(quán)限
# DENY就是拒絕你這個賬號訪問這個Topic
topicPerms:
- CreateOrderInformTopic=PUB|SUB
- PaySuccessInformTopic=PUB|SUB
# 下面就是對ConsumerGroup的權(quán)限,也是同理的
groupPerms:
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
# 下面就是另外一個賬號了,比如是商品技術(shù)團(tuán)隊(duì)的賬號
- accessKey: ProductTeam
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# 如果admin設(shè)置為true,就是具備一切權(quán)限
admin: true
如果你一個賬號沒有對某個Topic顯式的指定權(quán)限,那么就是會采用默認(rèn)Topic權(quán)限。
接著我們看看在你的生產(chǎn)者和消費(fèi)者里,如何指定你的團(tuán)隊(duì)分配到的RocketMQ的賬號,當(dāng)你使用一個賬號的時候,就只能訪問你有權(quán)限的Topic。

上面的代碼中就是在創(chuàng)建Producer的時候后,傳入進(jìn)去一個AclClientRPCHook,里面就可以設(shè)置你這個Producer的賬號密碼,對于創(chuàng)建Consumer也是同理的。通過這樣的方式,就可以在Broker端設(shè)置好每個賬號對Topic的訪問權(quán)限,然后你不同的技術(shù)團(tuán)隊(duì)就用不同的賬號就可以了。
51.如何對線上生產(chǎn)環(huán)境的RocketMQ集群進(jìn)行消息軌跡的追蹤?
首先需要在broker的配置文件里開啟traceTopicEnable=true這個選項(xiàng),此時就會開啟消息軌跡追蹤的功能。
接著當(dāng)我們開啟了上述的選項(xiàng)之后,我們啟動這個Broker的時候會自動創(chuàng)建出來一個內(nèi)部的Topic,就是RMQ_SYS_TRACE_TOPIC,這個Topic就是用來存儲所有的消息軌跡追蹤的數(shù)據(jù)的。
此時創(chuàng)建Producer的時候要用如下的方式,下面構(gòu)造函數(shù)中的第二個參數(shù),就是enableMsgTrace參數(shù),他設(shè)置為true,就是說可以對消息開啟軌跡追蹤,在訂閱消息的時候,對于Consumer也是同理的,在構(gòu)造函數(shù)的第二個參數(shù)設(shè)置為true,就是開啟了消費(fèi)時候的軌跡追蹤。

接著如果我們想要查詢消息軌跡,也很簡單,在RocketMQ控制臺里,在導(dǎo)航欄里就有一個消息軌跡,在里面可以創(chuàng)建查詢?nèi)蝿?wù),你可以根據(jù)messageId、message key或者Topic來查詢,查詢?nèi)蝿?wù)執(zhí)行完畢之后,就可以看到消息軌跡的界面了。
52.由于消費(fèi)系統(tǒng)故障導(dǎo)致的RocketMQ百萬消息積壓問題,應(yīng)該如何處理?
1. MessageQueue數(shù)量大于消費(fèi)者系統(tǒng)數(shù)量->增加機(jī)器
假如你的Topic有20個MessageQueue,然后你只有4個消費(fèi)者系統(tǒng)在消費(fèi),那么每個消費(fèi)者系統(tǒng)會從5個MessageQueue里獲取消息,所以此時如果你僅僅依靠4個消費(fèi)者系統(tǒng)是肯定不夠的,畢竟MQ里積壓了百萬消息了。
所以此時你可以臨時申請16臺機(jī)器多部署16個消費(fèi)者系統(tǒng)的實(shí)例,然后20個消費(fèi)者系統(tǒng)同時消費(fèi),每個人消費(fèi)一個MessageQueue的消息,此時你會發(fā)現(xiàn)你消費(fèi)的速度提高了5倍,很快積壓的百萬消息都會被處理完畢。
當(dāng)你處理完百萬積壓的消息之后,就可以下線多余的16臺機(jī)器了。
2. MessageQueue數(shù)量等于消費(fèi)者系統(tǒng)數(shù)量->寫入臨時隊(duì)列
那么如果你的Topic總共就只有4個MessageQueue,然后你就只有4個消費(fèi)者系統(tǒng)呢?
這個時候就沒辦法擴(kuò)容消費(fèi)者系統(tǒng)了,因?yàn)槟慵釉俣嗟南M(fèi)者系統(tǒng),還是只有4個MessageQueue,沒法并行消費(fèi)。
所以此時往往是臨時修改那4個消費(fèi)者系統(tǒng)的代碼,讓他們獲取到消息然后不寫入NoSQL,而是直接把消息寫入一個新的Topic,這個速度是很快的,因?yàn)閮H僅是讀寫MQ而已。
然后新的Topic有20個MessageQueue,然后再部署20臺臨時增加的消費(fèi)者系統(tǒng),去消費(fèi)新的Topic后寫入數(shù)據(jù)到NoSQL里去,這樣子也可以迅速的增加消費(fèi)者系統(tǒng)的并行處理能力,使用一個新的Topic來允許更多的消費(fèi)者系統(tǒng)并行處理。
53.金融級的系統(tǒng)如何針對RocketMQ集群崩潰設(shè)計(jì)高可用方案?
跟金錢相關(guān)的一些系統(tǒng),他可能需要依賴MQ去傳遞消息,如果你MQ突然崩潰了,可能導(dǎo)致很多跟錢相關(guān)的東西就會出問題。
針對這種場景,我們通常都會在你發(fā)送消息到MQ的那個系統(tǒng)中設(shè)計(jì)高可用的降級方案,這個降級方案通常的思路是,你需要在你發(fā)送消息到MQ代碼里去try catch捕獲異常,如果你發(fā)現(xiàn)發(fā)送消息到MQ有異常,此時你需要進(jìn)行重試。
如果你發(fā)現(xiàn)連續(xù)重試了比如超過3次還是失敗,說明此時可能就是你的MQ集群徹底崩潰了,此時你必須把這條重要的消息寫入到本地存儲中去,可以是寫入數(shù)據(jù)庫里,也可以是寫入到機(jī)器的本地磁盤文件里去,或者是NoSQL存儲中去。
之后你要不停的嘗試發(fā)送消息到MQ去,一旦發(fā)現(xiàn)MQ集群恢復(fù)了,你必須有一個后臺線程可以把之前持久化存儲的消息都查詢出來,然后依次按照順序發(fā)送到MQ集群里去,這樣才能保證你的消息不會因?yàn)镸Q徹底崩潰會丟失。
這里要有一個很關(guān)鍵的注意點(diǎn),就是你把消息寫入存儲中暫存時,一定要保證他的順序,比如按照順序一條一條的寫入本地磁盤文件去暫存消息。而且一旦MQ集群故障了,你后續(xù)的所有寫消息的代碼必須嚴(yán)格的按照順序把消息寫入到本地磁盤文件里去暫存,這個順序性是要嚴(yán)格保證的。
54.為什么要給RocketMQ增加消息限流功能保證其高可用性?
其實(shí)本質(zhì)上來說,限流功能就是對系統(tǒng)的一個保護(hù)功能。
在接收消息這塊,必須引入一個限流機(jī)制,也就是說要限制好,你這臺機(jī)器每秒鐘最多就只能處理比如3萬條消息,根據(jù)你的MQ集群的壓測結(jié)果來,你可以通過壓測看看你的MQ最多可以抗多少Q(mào)PS,然后就做好限流。
一般來說,限流算法可以采取令牌桶算法,也就是說你每秒鐘就發(fā)放多少個令牌,然后只能允許多少個請求通過。關(guān)于限流算法的實(shí)現(xiàn),不在我們的討論范圍內(nèi),大家可以自己查閱一下資料,也并不是很難。
我們這里主要是給大家講一下,很多互聯(lián)網(wǎng)大廠其實(shí)都會改造開源MQ的內(nèi)核源碼,引入限流機(jī)制,然后只能允許指定范圍內(nèi)的消息被在一秒內(nèi)被處理,避免因?yàn)橐恍┊惓5那闆r,導(dǎo)致MQ集群掛掉。
55.設(shè)計(jì)一套Kafka到RocketMQ的雙寫+雙讀技術(shù)方案,實(shí)現(xiàn)無縫遷移!
假設(shè)你們公司本來線上的MQ用的主要是Kafka,現(xiàn)在要從Kafka遷移到RocketMQ去,那么這個遷移的過程應(yīng)
該怎么做呢?應(yīng)該采用什么樣的技術(shù)方案來做遷移呢?
MQ集群遷移過程中的雙寫+雙讀技術(shù)方案
- 一般來說,首先你要做到雙寫,也就是說,在你所有的Producer系統(tǒng)中,要引入一個雙寫的代碼,讓他同時往Kafka和RocketMQ中去寫入消息,然后多寫幾天,起碼雙寫要持續(xù)個1周左右,因?yàn)镸Q一般都是實(shí)時數(shù)據(jù),里面數(shù)據(jù)也就最多保留一周。
- 當(dāng)你的雙寫持續(xù)一周過后,你會發(fā)現(xiàn)你的Kafka和RocketMQ里的數(shù)據(jù)看起來是幾乎一模一樣了,因?yàn)镸Q反正也就保留最近幾天的數(shù)據(jù),當(dāng)你雙寫持續(xù)超過一周過后,你會發(fā)現(xiàn)Kafka和RocketMQ里的數(shù)據(jù)幾乎一模一樣了。
- 但是光是雙寫還是不夠的,還需要同時進(jìn)行雙讀,也就是說在你雙寫的同時,你所有的Consumer系統(tǒng)都需要同時從Kafka和RocketMQ里獲取消息,分別都用一模一樣的邏輯處理一遍。只不過從Kafka里獲取到的消息還是走核心邏輯去處理,然后可以落入數(shù)據(jù)庫或者是別的存儲什么的,但是對于RocketMQ里獲取到的消息,你可以用一樣的邏輯處理,但是不能把處理結(jié)果具體的落入數(shù)據(jù)庫之類的地方。
- 你的Consumer系統(tǒng)在同時從Kafka和RocketMQ進(jìn)行消息讀取的時候,你需要統(tǒng)計(jì)每個MQ當(dāng)日讀取和處理的消息的數(shù)量,這點(diǎn)非常的重要,同時對于RocketMQ讀取到的消息處理之后的結(jié)果,可以寫入一個臨時的存儲中。
- 同時你要觀察一段時間,當(dāng)你發(fā)現(xiàn)持續(xù)雙寫和雙讀一段時間之后,如果所有的Consumer系統(tǒng)通過對比發(fā)現(xiàn),從Kafka和RocketMQ讀取和處理的消息數(shù)量一致,同時處理之后得到的結(jié)果也都是一致的,此時就可以判斷說當(dāng)前Kafka和RocketMQ里的消息是一致的,而且計(jì)算出來的結(jié)果也都是一致的。
6.這個時候就可以實(shí)施正式的切換了,你可以停機(jī)Producer系統(tǒng),再重新修改后上線,全部修改為僅僅寫RocketMQ,這個時候他數(shù)據(jù)不會丟,因?yàn)橹耙呀?jīng)雙寫了一段時間了,然后所有的Consumer系統(tǒng)可以全部下線后修改代碼再上線,全部基于RocketMQ來獲取消息,計(jì)算和處理,結(jié)果寫入存儲中?;旧蠈τ陬愃频囊恍┲匾虚g件的遷移,往往都會采取雙寫的方法,雙寫一段時間,然后觀察兩個方案的結(jié)果都一致了,你再正式下線舊的一套東西。





















