進(jìn)程之間的通信在開(kāi)發(fā)過(guò)程中十分常見(jiàn),那么如何保證進(jìn)程之間消息通信的可靠性,下面分別從分布式系統(tǒng)(消息中間件RabbitMQ)和單機(jī)系統(tǒng)(ZeroMQ)來(lái)說(shuō)明他們?cè)谙鬏斨?,是如何保證消息的不丟失的。
1、RabbitMQ的可靠性傳輸
1.1 RabbitMQ出現(xiàn)消息丟失的情況及其解決辦法

如圖所示,RabbitMQ丟失消息的情況可以發(fā)送在任何一個(gè)節(jié)點(diǎn)。
1.1.1 生產(chǎn)者沒(méi)有成功把消息發(fā)送到MQ
a、丟失的原因:因?yàn)榫W(wǎng)絡(luò)傳輸?shù)牟环€(wěn)定性,當(dāng)生產(chǎn)者在向MQ發(fā)送消息的過(guò)程中,MQ沒(méi)有成功接收到消息,但是生產(chǎn)者卻以為MQ成功接收到了消息,不會(huì)再次重復(fù)發(fā)送該消息,從而導(dǎo)致消息的丟失。
b、解決辦法: 有兩個(gè)解決辦法:事務(wù)機(jī)制和confirm機(jī)制,最常用的是confirm機(jī)制。
事務(wù)機(jī)制:
? ? ? RabbitMQ 提供了事務(wù)功能,生產(chǎn)者發(fā)送數(shù)據(jù)之前開(kāi)啟 RabbitMQ 事務(wù)channel.txSelect,然后發(fā)送消息,如果消息沒(méi)有成功被 RabbitMQ 接收到,那么生產(chǎn)者會(huì)收到異常報(bào)錯(cuò),此時(shí)就可以回滾事務(wù)channel.txRollback,然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)channel.txCommit。偽代碼如下:

confirm機(jī)制:
? ? ? RabbitMQ可以開(kāi)啟 confirm 模式,在生產(chǎn)者那里設(shè)置開(kāi)啟 confirm 模式之后,生產(chǎn)者每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的 id,如果消息成功寫(xiě)入 RabbitMQ 中,RabbitMQ 會(huì)給生產(chǎn)者回傳一個(gè) ack 消息,告訴你說(shuō)這個(gè)消息 ok 了。如果 RabbitMQ 沒(méi)能處理這個(gè)消息,會(huì)回調(diào)你的一個(gè) nack 接口,告訴你這個(gè)消息接收失敗,生產(chǎn)者可以發(fā)送。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息 id 的狀態(tài),如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào),那么可以重發(fā)。
注意:RabbitMQ的事務(wù)機(jī)制是同步的,很耗型能,會(huì)降低RabbitMQ的吞吐量。confirm機(jī)制是異步的,生成者發(fā)送完一個(gè)消息之后,不需要等待RabbitMQ的回調(diào),就可以發(fā)送下一個(gè)消息,當(dāng)RabbitMQ成功接收到消息之后會(huì)自動(dòng)異步的回調(diào)生產(chǎn)者的一個(gè)接口返回成功與否的消息。
1.1.2 RabbitMQ接收到消息之后丟失了消息
a、丟失的原因:RabbitMQ接收到生產(chǎn)者發(fā)送過(guò)來(lái)的消息,是存在內(nèi)存中的,如果沒(méi)有被消費(fèi)完,此時(shí)RabbitMQ宕機(jī)了,那么再次啟動(dòng)的時(shí)候,原來(lái)內(nèi)存中的那些消息都丟失了。
b、解決辦法:開(kāi)啟RabbitMQ的持久化。當(dāng)生產(chǎn)者把消息成功寫(xiě)入RabbitMQ之后,RabbitMQ就把消息持久化到磁盤(pán)。結(jié)合上面的說(shuō)到的confirm機(jī)制,只有當(dāng)消息成功持久化磁盤(pán)之后,才會(huì)回調(diào)生產(chǎn)者的接口返回ack消息,否則都算失敗,生產(chǎn)者會(huì)重新發(fā)送。存入磁盤(pán)的消息不會(huì)丟失,就算RabbitMQ掛掉了,重啟之后,他會(huì)讀取磁盤(pán)中的消息,不會(huì)導(dǎo)致消息的丟失。
c、持久化的配置:
第一點(diǎn)是創(chuàng)建 queue 的時(shí)候?qū)⑵湓O(shè)置為持久化,這樣就可以保證 RabbitMQ 持久化 queue 的元數(shù)據(jù),但是它是不會(huì)持久化 queue 里的數(shù)據(jù)的。
第二個(gè)是發(fā)送消息的時(shí)候?qū)⑾⒌?deliveryMode 設(shè)置為 2,就是將消息設(shè)置為持久化的,此時(shí) RabbitMQ 就會(huì)將消息持久化到磁盤(pán)上去。
注意:持久化要起作用必須同時(shí)設(shè)置這兩個(gè)持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會(huì)從磁盤(pán)上重啟恢復(fù) queue,恢復(fù)這個(gè) queue 里的數(shù)據(jù)。
1.1.3 消費(fèi)者弄丟了消息
a、丟失的原因:如果RabbitMQ成功的把消息發(fā)送給了消費(fèi)者,那么RabbitMQ的ack機(jī)制會(huì)自動(dòng)的返回成功,表明發(fā)送消息成功,下次就不會(huì)發(fā)送這個(gè)消息。但如果就在此時(shí),消費(fèi)者還沒(méi)處理完該消息,然后宕機(jī)了,那么這個(gè)消息就丟失了。
b、解決的辦法:簡(jiǎn)單來(lái)說(shuō),就是必須關(guān)閉 RabbitMQ 的自動(dòng) ack,可以通過(guò)一個(gè) api 來(lái)調(diào)用就行,然后每次在自己代碼里確保處理完的時(shí)候,再在程序里 ack 一把。這樣的話,如果你還沒(méi)處理完,不就沒(méi)有 ack了?那 RabbitMQ 就認(rèn)為你還沒(méi)處理完,這個(gè)時(shí)候 RabbitMQ 會(huì)把這個(gè)消費(fèi)分配給別的 consumer 去處理,消息是不會(huì)丟的。
1.2? 如何防止重復(fù)消費(fèi)
先說(shuō)為什么會(huì)重復(fù)消費(fèi):正常情況下,消費(fèi)者在消費(fèi)消息的時(shí)候,消費(fèi)完畢后,會(huì)發(fā)送一個(gè)確認(rèn)消息給消息隊(duì)列,消息隊(duì)列就知道該消息被消費(fèi)了,就會(huì)將該消息從消息隊(duì)列中刪除;但是因?yàn)榫W(wǎng)絡(luò)傳輸?shù)鹊裙收希_認(rèn)信息沒(méi)有傳送到消息隊(duì)列,導(dǎo)致消息隊(duì)列不知道自己已經(jīng)消費(fèi)過(guò)該消息了,再次將消息分發(fā)給其他的消費(fèi)者。
解決思路是:
? ? ? ? 保證消息的唯一性,就算是多次傳輸,不要讓消息的多次消費(fèi)帶來(lái)影響;保證消息等冪性;
? ? ? ? 在消息生產(chǎn)時(shí),MQ內(nèi)部針對(duì)每條生產(chǎn)者發(fā)送的消息生成一個(gè)inner-msg-id,作為去重和冪等的依據(jù)(消息投遞失敗并重傳),避免重復(fù)的消息進(jìn)入隊(duì)列;
? ? ? ? 在消息消費(fèi)時(shí),要求消息體中必須要有一個(gè)bizId(對(duì)于同一業(yè)務(wù)全局唯一,如支付ID、訂單ID、帖子ID等)作為去重和冪等的依據(jù),避免同一條消息被重復(fù)消費(fèi)。
? ? ? 這個(gè)問(wèn)題針對(duì)業(yè)務(wù)場(chǎng)景來(lái)答分以下幾點(diǎn):
1.? 如果消息是做數(shù)據(jù)庫(kù)的insert操作,給這個(gè)消息做一個(gè)唯一主鍵,那么就算出現(xiàn)重復(fù)消費(fèi)的情況,就會(huì)導(dǎo)致主鍵沖突,避免數(shù)據(jù)庫(kù)出現(xiàn)臟數(shù)據(jù)。
2.? 如果消息是做redis的set的操作,不用解決,因?yàn)闊o(wú)論set幾次結(jié)果都是一樣的,set操作本來(lái)就算冪等操作。
3.? 如果以上兩種情況還不行,可以準(zhǔn)備一個(gè)第三方介質(zhì),來(lái)做消費(fèi)記錄。以redis為例,給消息分配一個(gè)全局id,只要消費(fèi)過(guò)該消息,將<id,message>以K-V形式寫(xiě)入redis。那消費(fèi)者開(kāi)始消費(fèi)前,先去redis中查詢有沒(méi)消費(fèi)記錄即可。
1.3 RabbitMQ的優(yōu)缺點(diǎn)
a、優(yōu)點(diǎn):支持集群部署,支持消息的持久化,使用erlang語(yǔ)言開(kāi)發(fā),并發(fā)性能高,吞吐量大。
b、缺點(diǎn):重量級(jí)消息隊(duì)列,需要單獨(dú)部署服務(wù)。
2、ZeroMQ的可靠性傳輸
2.1 ZeroMQ有什么不同
ZeroMQ(簡(jiǎn)稱ZMQ)是一個(gè)基于消息隊(duì)列的多線程網(wǎng)絡(luò)庫(kù),其對(duì)套接字類型、連接處理、幀、甚至路由的底層細(xì)節(jié)進(jìn)行抽象,提供跨越多種傳輸協(xié)議的套接字。
ZMQ不是單獨(dú)的服務(wù),而是一個(gè)嵌入式庫(kù),它封裝了網(wǎng)絡(luò)通信、消息隊(duì)列、線程調(diào)度等功能,向上層提供簡(jiǎn)潔的API,應(yīng)用程序通過(guò)加載庫(kù)文件,調(diào)用API函數(shù)來(lái)實(shí)現(xiàn)高性能網(wǎng)絡(luò)通信。
2.2 ZeroMQ如何保證消息的可靠性
ZMQ保證消息可靠性的原理跟RabbitMQ基本類似。除此之外還有其他的特點(diǎn):
1. 它在后臺(tái)線程異步的處理I/O。這些后臺(tái)線程使用無(wú)鎖數(shù)據(jù)結(jié)構(gòu)與程序線程交流,所以并發(fā)ZMQ程序不需要鎖、信號(hào)量、或其它等待狀態(tài)。
2. 組件可以動(dòng)態(tài)的來(lái)來(lái)去去,而ZMQ會(huì)自動(dòng)重連。這意味著你可以按任意順序啟動(dòng)組件。你可以創(chuàng)建“面向服務(wù)架構(gòu)”(SOAs),服務(wù)可以隨時(shí)加入和離開(kāi)網(wǎng)絡(luò)。
3. 當(dāng)需要時(shí)它自動(dòng)將消息排入隊(duì)列。以智能的方式,消息排入隊(duì)列前推送消息到盡可能靠近接收者。
4. 它有幾種辦法處理滿溢隊(duì)列(稱為“高水位線”)。當(dāng)隊(duì)列填滿時(shí),ZMQ自動(dòng)阻塞發(fā)送者,或丟棄消息,取決于你用的消息傳遞方式(所謂的“模式”)。
5. 它讓你的程序用任意傳輸方式來(lái)相互交談:TCP、多播、進(jìn)程內(nèi)、進(jìn)程間。更改傳輸方式時(shí)無(wú)需更改代碼。
6. 安全處理低速/阻塞的讀者,使用的是取決于消息傳遞模式的不同策略。
7. 它讓你路由消息使用各種模式如請(qǐng)求-應(yīng)答和發(fā)布-訂閱。這些模式是你創(chuàng)建拓?fù)洹⒕W(wǎng)絡(luò)結(jié)構(gòu)的方式。
8. 它讓你用一個(gè)調(diào)用就能創(chuàng)建代理來(lái)做隊(duì)列、轉(zhuǎn)發(fā)、或捕獲消息。代理可以降低網(wǎng)絡(luò)的互聯(lián)復(fù)雜度。
9. 它使用簡(jiǎn)單的線上組幀,轉(zhuǎn)發(fā)整個(gè)消息并精確重現(xiàn)其發(fā)送時(shí)的樣子。如果你寫(xiě)入一個(gè)10K的消息,就能接收一個(gè)10K的消息。
10. 它不在消息上強(qiáng)加任何格式。消息就是零到千兆大小的二進(jìn)制大對(duì)象。想要描述數(shù)據(jù)時(shí)你可以在其上選擇一些其它產(chǎn)品,例如谷歌的協(xié)議緩沖(protocol buffers)、外部數(shù)據(jù)表示法(XDR)、或其它。
11. 它智能的處理網(wǎng)絡(luò)錯(cuò)誤。有時(shí)它會(huì)重試,有時(shí)它告知你一個(gè)操作失敗了。
2.3 ZeroMQ的優(yōu)缺點(diǎn)
a、優(yōu)點(diǎn):輕量級(jí),不需要單獨(dú)部署服務(wù),速度快,作為單機(jī)C++和JAVA進(jìn)程之間可靠性通信的首選。
b、缺點(diǎn):消息隊(duì)列不支持持久化,不支持集群,不支持高可用。