1.mq原則
數(shù)據(jù)不能多,也不能少,不能多是說消息不能重復(fù)消費(fèi),這個(gè)我們上一節(jié)已解決;不能少,就是說不能丟失數(shù)據(jù)。如果mq傳遞的是非常核心的消息,支撐核心的業(yè)務(wù),那么這種場(chǎng)景是一定不能丟失數(shù)據(jù)的。
2.丟失數(shù)據(jù)場(chǎng)景
丟數(shù)據(jù)一般分為兩種,一種是mq把消息丟了,一種就是消費(fèi)時(shí)將消息丟了。下面從rabbitmq和kafka分別說一下,丟失數(shù)據(jù)的場(chǎng)景,
(1)rabbitmq
A:生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到rabbitmq的時(shí)候,可能在傳輸過程中因?yàn)榫W(wǎng)絡(luò)等問題而將數(shù)據(jù)弄丟了。
B:rabbitmq自己丟了數(shù)據(jù)
如果沒有開啟rabbitmq的持久化,那么rabbitmq一旦重啟,那么數(shù)據(jù)就丟了。所依必須開啟持久化將消息持久化到磁盤,這樣就算rabbitmq掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟失。除非極其罕見的情況,rabbitmq還沒來得及持久化自己就掛了,這樣可能導(dǎo)致一部分?jǐn)?shù)據(jù)丟失。
C:消費(fèi)端弄丟了數(shù)據(jù)
主要是因?yàn)橄M(fèi)者消費(fèi)時(shí),剛消費(fèi)到,還沒有處理,結(jié)果消費(fèi)者就掛了,這樣你重啟之后,rabbitmq就認(rèn)為你已經(jīng)消費(fèi)過了,然后就丟了數(shù)據(jù)。

(2)kafka
A:生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者沒有設(shè)置相應(yīng)的策略,發(fā)送過程中丟失數(shù)據(jù)。
B:kafka弄丟了數(shù)據(jù)
比較常見的一個(gè)場(chǎng)景,就是kafka的某個(gè)broker宕機(jī)了,然后重新選舉partition的leader時(shí)。如果此時(shí)follower還沒來得及同步數(shù)據(jù),leader就掛了,然后某個(gè)follower成為了leader,他就少了一部分?jǐn)?shù)據(jù)。
C:消費(fèi)者弄丟了數(shù)據(jù)
消費(fèi)者消費(fèi)到了這個(gè)數(shù)據(jù),然后消費(fèi)之自動(dòng)提交了offset,讓kafka知道你已經(jīng)消費(fèi)了這個(gè)消息,當(dāng)你準(zhǔn)備處理這個(gè)消息時(shí),自己掛掉了,那么這條消息就丟了。

3.如何防止消息丟失
(1)rabbitmq
A:生產(chǎn)者丟失消息
①:可以選擇使用rabbitmq提供是事物功能,就是生產(chǎn)者在發(fā)送數(shù)據(jù)之前開啟事物,然后發(fā)送消息,如果消息沒有成功被rabbitmq接收到,那么生產(chǎn)者會(huì)受到異常報(bào)錯(cuò),這時(shí)就可以回滾事物,然后嘗試重新發(fā)送;如果收到了消息,那么就可以提交事物。
channel.txSelect();//開啟事物
try{
//發(fā)送消息
}catch(Exection e){
channel.txRollback();//回滾事物
//重新提交
}
缺點(diǎn):rabbitmq事物已開啟,就會(huì)變?yōu)橥阶枞僮?,生產(chǎn)者會(huì)阻塞等待是否發(fā)送成功,太耗性能會(huì)造成吞吐量的下降。
②:可以開啟confirm模式。在生產(chǎn)者哪里設(shè)置開啟了confirm模式之后,每次寫的消息都會(huì)分配一個(gè)唯一的id,然后如何寫入了rabbitmq之中,rabbitmq會(huì)給你回傳一個(gè)ack消息,告訴你這個(gè)消息發(fā)送OK了;如果rabbitmq沒能處理這個(gè)消息,會(huì)回調(diào)你一個(gè)nack接口,告訴你這個(gè)消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個(gè)機(jī)制知道自己在內(nèi)存里維護(hù)每個(gè)消息的id,如果超過一定時(shí)間還沒接收到這個(gè)消息的回調(diào),那么你可以進(jìn)行重發(fā)。
//開啟confirm
channel.confirm();
//發(fā)送成功回調(diào)
public void ack(String messageId){
}
// 發(fā)送失敗回調(diào)
public void nack(String messageId){
//重發(fā)該消息
}
二者不同
事務(wù)機(jī)制是同步的,你提交了一個(gè)事物之后會(huì)阻塞住,但是confirm機(jī)制是異步的,發(fā)送消息之后可以接著發(fā)送下一個(gè)消息,然后rabbitmq會(huì)回調(diào)告知成功與否。
一般在生產(chǎn)者這塊避免丟失,都是用confirm機(jī)制。
B:rabbitmq自己弄丟了數(shù)據(jù)
設(shè)置消息持久化到磁盤。設(shè)置持久化有兩個(gè)步驟:
①創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證rabbitmq持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里面的數(shù)據(jù)。
②發(fā)送消息的時(shí)候講消息的deliveryMode設(shè)置為2,這樣消息就會(huì)被設(shè)為持久化方式,此時(shí)rabbitmq就會(huì)將消息持久化到磁盤上。
必須要同時(shí)開啟這兩個(gè)才可以。
而且持久化可以跟生產(chǎn)的confirm機(jī)制配合起來,只有消息持久化到了磁盤之后,才會(huì)通知生產(chǎn)者ack,這樣就算是在持久化之前rabbitmq掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack回調(diào)也會(huì)進(jìn)行消息重發(fā)。
C:消費(fèi)者弄丟了數(shù)據(jù)
使用rabbitmq提供的ack機(jī)制,首先關(guān)閉rabbitmq的自動(dòng)ack,然后每次在確保處理完這個(gè)消息之后,在代碼里手動(dòng)調(diào)用ack。這樣就可以避免消息還沒有處理完就ack。
(2)kafka
A:消費(fèi)端弄丟了數(shù)據(jù)
關(guān)閉自動(dòng)提交offset,在自己處理完畢之后手動(dòng)提交offset,這樣就不會(huì)丟失數(shù)據(jù)。
B:kafka弄丟了數(shù)據(jù)
一般要求設(shè)置4個(gè)參數(shù)來保證消息不丟失:
①給topic設(shè)置 replication.factor參數(shù):這個(gè)值必須大于1,表示要求每個(gè)partition必須至少有2個(gè)副本。
②在kafka服務(wù)端設(shè)置min.isync.replicas參數(shù):這個(gè)值必須大于1,表示 要求一個(gè)leader至少感知到有至少一個(gè)follower在跟自己保持聯(lián)系正常同步數(shù)據(jù),這樣才能保證leader掛了之后還有一個(gè)follower。
③在生產(chǎn)者端設(shè)置acks=all:表示 要求每條每條數(shù)據(jù),必須是寫入所有replica副本之后,才能認(rèn)為是寫入成功了
④在生產(chǎn)者端設(shè)置retries=MAX(很大的一個(gè)值,表示無限重試):表示 這個(gè)是要求一旦寫入事變,就無限重試
C:生產(chǎn)者弄丟了數(shù)據(jù)
如果按照上面設(shè)置了ack=all,則一定不會(huì)丟失數(shù)據(jù),要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認(rèn)為本次寫成功了。如果沒滿足這個(gè)條件,生產(chǎn)者會(huì)自動(dòng)不斷的重試,重試無限次。
上一篇《如何保證消息不重復(fù)消費(fèi)》
下一篇《如何保證消息按順序執(zhí)行》