一、為什么使用消息隊(duì)列?
為什么使用?其實(shí)就是在實(shí)際業(yè)務(wù)中,有個(gè)具體的場景,如果不使用MQ,可能會有很多麻煩,用了MQ之后帶給我們很多好處。場景其實(shí)有很多,常見的有三個(gè):1.解耦、2.異步、3.削峰
1.解耦
A系統(tǒng)要發(fā)送一條數(shù)據(jù)到BCD三個(gè)系統(tǒng),接口調(diào)用發(fā)送,如果新增個(gè)E系統(tǒng),也需要這條數(shù)據(jù)呢?如果C系統(tǒng)現(xiàn)在不需要這條數(shù)據(jù)了呢?如果A系統(tǒng)又要發(fā)送第二種數(shù)據(jù)了呢?而且A系統(tǒng)要時(shí)刻關(guān)注BCD系統(tǒng)的狀態(tài),BCD掛了怎么辦?要不要重發(fā)?要不要把數(shù)據(jù)存起來?
如果系統(tǒng)中存在類似情況,你可以考慮這個(gè)調(diào)用是不是必須要同步調(diào)用?如果用MQ來解耦,會省去很多麻煩。
2.異步
A系統(tǒng)接收一個(gè)請求,需要在ABCD四個(gè)數(shù)據(jù)庫進(jìn)行寫庫操作,A本地寫庫需要30ms,B庫需要100ms,C庫需要200ms,C庫需要200ms,則一共需要30+100+200+200ms。如果用MQ來進(jìn)行異步操作,則只需要30ms后即可返回,BCD異步寫入即可,A不用考慮。
3.削峰
每天0~11點(diǎn),A系統(tǒng)風(fēng)平浪靜,每秒并發(fā)100,12點(diǎn)時(shí),并發(fā)暴增到10000,系統(tǒng)每秒只能處理1000個(gè)請求,怎么辦?這時(shí)可以用MQ進(jìn)行流量削峰。

二、消息隊(duì)列的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)已經(jīng)說了,解耦,異步,削峰,接下來說消息隊(duì)列的缺點(diǎn):
1.系統(tǒng)可用性降低
本來只需要考慮系統(tǒng)本身可用性,現(xiàn)在引入了MQ,如果MQ掛了怎么辦?
2.系統(tǒng)復(fù)雜性提高
MQ加進(jìn)來了,消息丟失怎么辦?重復(fù)投遞怎么辦?重復(fù)消費(fèi)怎么辦?消息的順序性怎么保證?
3.一致性問題
A處理完返回成功了,調(diào)用者以為請求成功了,可是BC成功,D失敗了怎么辦,數(shù)據(jù)就不一致了。
所以,消息隊(duì)列其實(shí)結(jié)構(gòu)非常復(fù)雜,引入它會帶來很多好處,但是同時(shí)需要規(guī)避很多問題。
三、應(yīng)對MQ缺點(diǎn)的辦法
1.消息丟失怎么辦?(消息的可靠性傳輸)
消息的丟失可能會出現(xiàn)在三個(gè)地方:
(1)生產(chǎn)者弄丟數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到RabbitMQ的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了,因?yàn)榫W(wǎng)絡(luò)啥的問題,都有可能。怎么解決?
①事務(wù):生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟RabbitMQ事務(wù)(channel.txSelect),然后發(fā)送消息,如果消息沒有成功被RabbitMQ接收到,那么生產(chǎn)者會收到異常報(bào)錯(cuò),此時(shí)就可以回滾事務(wù)(channel.txRollback),然后重試發(fā)送消息;如果收到了消息,可以提交事務(wù)(channel.txCommit)。但是問題是,RabbitMQ事務(wù)機(jī)制一搞,基本上吞吐量會下來,因?yàn)樘男阅堋?/p>
②confirm模式:在生產(chǎn)者那里設(shè)置開啟confirm模式之后,你每次寫的消息都會分配一個(gè)唯一的id,然后如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個(gè)ack消息,告訴你說這個(gè)消息ok了。如果RabbitMQ沒能處理這個(gè)消息,會回調(diào)你一個(gè)nack接口,告訴你這個(gè)消息接收失敗,你可以重試。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息id的狀態(tài),如果超過一定時(shí)間還沒接收到這個(gè)消息的回調(diào),那么你可以重發(fā)。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用confirm機(jī)制的。
(2)Mq弄丟數(shù)據(jù)
就是RabbitMQ自己弄丟了數(shù)據(jù),這個(gè)你必須開啟RabbitMQ的持久化,就是消息寫入之后會持久化到磁盤,哪怕是RabbitMQ自己掛了,恢復(fù)之后會自動讀取之前存儲的數(shù)據(jù),一般數(shù)據(jù)不會丟。
設(shè)置持久化有兩個(gè)步驟:
①第一個(gè)是創(chuàng)建queue和交換器的時(shí)候?qū)⑵湓O(shè)置為持久化,這樣就可以保證RabbitMQ持久化相關(guān)的元數(shù)據(jù),但是不會持久化queue里的數(shù)據(jù);
②第二個(gè)是發(fā)送消息的時(shí)候?qū)⑾⒌膁eliveryMode設(shè)置為2,就是將消息設(shè)置為持久化的,此時(shí)RabbitMQ就會將消息持久化到磁盤上去
必須要同時(shí)設(shè)置這兩個(gè)持久化才行
持久化可以和生產(chǎn)者的confirm結(jié)合,當(dāng)持久化成功后,再ack生產(chǎn)者。如果持久化之前RabbitMQ掛了,生產(chǎn)者沒收到ack,會重發(fā)。
(3)消費(fèi)者弄丟數(shù)據(jù)
RabbitMQ如果丟失了數(shù)據(jù),主要是因?yàn)槟阆M(fèi)的時(shí)候,剛消費(fèi)到,還沒處理,結(jié)果進(jìn)程掛了,比如重啟了,那么就尷尬了,RabbitMQ認(rèn)為你都消費(fèi)了,這數(shù)據(jù)就丟了。
這個(gè)時(shí)候得用RabbitMQ提供的ack機(jī)制,簡單來說,就是你關(guān)閉RabbitMQ自動ack,可以通過一個(gè)api來調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再程序里ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認(rèn)為你還沒處理完,這個(gè)時(shí)候RabbitMQ會把這個(gè)消費(fèi)分配給別的consumer去處理,消息是不會丟的。
2.消費(fèi)者順序消費(fèi)
從根本上說,異步消息是不應(yīng)該有順序依賴的。在MQ上估計(jì)是沒法解決。要實(shí)現(xiàn)嚴(yán)格的順序消息,簡單且可行的辦法就是:保證生產(chǎn)者- MQServer -消費(fèi)者是一對一對一的關(guān)系。
如果有順序依賴的消息,要保證消息有一個(gè)hashKey,類似于數(shù)據(jù)庫表分區(qū)的的分區(qū)key列。保證對同一個(gè)key的消息發(fā)送到相同的隊(duì)列。A用戶產(chǎn)生的消息(包括創(chuàng)建消息和刪除消息)都按A的hashKey分發(fā)到同一個(gè)隊(duì)列。只需要把強(qiáng)相關(guān)的兩條消息基于相同的路由就行了,也就是說經(jīng)過m1和m2的在路由表里的路由是一樣的,那自然m1會優(yōu)先于m2去投遞。而且一個(gè)queue只對應(yīng)一個(gè)consumer
3.消息的重復(fù)
分為兩大類情況:1、生產(chǎn)者消息重復(fù)發(fā)送; 2.MQ向消費(fèi)者投遞時(shí)重復(fù)投遞
終極解決辦法:冪等性? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
1. MVCC:
多版本并發(fā)控制,樂觀鎖的一種實(shí)現(xiàn),在生產(chǎn)者發(fā)送消息時(shí)進(jìn)行數(shù)據(jù)更新時(shí)需要帶上數(shù)據(jù)的版本號,消費(fèi)者去更新時(shí)需要去比較持有數(shù)據(jù)的版本號,版本號不一致的操作無法成功。例如博客點(diǎn)贊次數(shù)自動+1的接口:? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? public boolean addCount(Long id, Long version);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一個(gè)version只有一次執(zhí)行成功的機(jī)會,一旦失敗了生產(chǎn)者必須重新獲取數(shù)據(jù)的最新版本號再次發(fā)起更新。
2. 去重表:
利用數(shù)據(jù)庫表單的特性來實(shí)現(xiàn)冪等,常用的一個(gè)思路是在表上構(gòu)建唯一性索引,保證某一類數(shù)據(jù)一旦執(zhí)行完畢,后續(xù)同樣的請求不再重復(fù)處理了(利用一張日志表來記錄已經(jīng)處理成功的消息的ID,如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息。)
以電商平臺為例子,電商平臺上的訂單id就是最適合的token。當(dāng)用戶下單時(shí),會經(jīng)歷多個(gè)環(huán)節(jié),比如生成訂單,減庫存,減優(yōu)惠券等等。每一個(gè)環(huán)節(jié)執(zhí)行時(shí)都先檢測一下該訂單id是否已經(jīng)執(zhí)行過這一步驟,對未執(zhí)行的請求,執(zhí)行操作并緩存結(jié)果,而對已經(jīng)執(zhí)行過的id,則直接返回之前的執(zhí)行結(jié)果,不做任何操作。這樣可以在最大程度上避免操作的重復(fù)執(zhí)行問題,緩存起來的執(zhí)行結(jié)果也能用于事務(wù)的控制等。