RabbitMQ消息可靠性分析和應(yīng)用

RabbitMQ流程簡(jiǎn)介(帶Exchange)

? ? ? ?RabbitMQ使用一些機(jī)制來(lái)保證可靠性,如持久化、消費(fèi)確認(rèn)及發(fā)布確認(rèn)等。

? ? ? ?先看以下這個(gè)圖:


? ? ? ?P為生產(chǎn)者,X為中轉(zhuǎn)站(Exchange),紅色部分為消息隊(duì)列,C1、C2為消費(fèi)者。

? ? ? ?整個(gè)流程分成三部分:第一,生產(chǎn)者生產(chǎn)消息,發(fā)送到中轉(zhuǎn)站;第二,中轉(zhuǎn)站按定義的規(guī)則轉(zhuǎn)發(fā)消息到消息隊(duì)列;第三,消費(fèi)者從消息隊(duì)列獲取消息進(jìn)行消費(fèi)(處理)。

RabbitMQ消息可靠性分析和應(yīng)用

? ? ? ?應(yīng)用代碼均使用C#客戶端代碼實(shí)現(xiàn)。

一、發(fā)布確認(rèn)

? ? ? ?生產(chǎn)者生產(chǎn)消息,發(fā)送到中轉(zhuǎn)站的過(guò)程中,可能會(huì)因?yàn)榫W(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等問(wèn)題造成消息丟失。為了確保生產(chǎn)者發(fā)送的消息不會(huì)丟失,RabbitMQ提供了發(fā)布確認(rèn)(Publisher Confirms)機(jī)制,從而提高消息的可靠性(注意:發(fā)布確認(rèn)機(jī)制不能和事務(wù)機(jī)制一起使用)。

? ? ? ?單條消息發(fā)布確認(rèn):

1

2

3

4

5

6

7

8

9

10

channel.ConfirmSelect();//發(fā)布確認(rèn)機(jī)制

stringmessage =?"msg";

varbody = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(

????????exchange:?"MarkTopicChange",

????????routingKey:?"MarkRouteKey.one",

????????basicProperties:?null,

????????body: body

????????);

boolisPublished = channel.WaitForConfirms();//通道(channel)里消息發(fā)送成功返回true

? ? ? ?使用channel.ConfirmSelect,一旦信道進(jìn)入確認(rèn)模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始)。消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ就會(huì)發(fā)送(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID),生產(chǎn)者從而知道消息發(fā)送成功。

? ? ? ?多條消息發(fā)布確認(rèn):

1

2

3

4

5

6

7

8

9

10

11

12

13

channel.ConfirmSelect();//發(fā)布確認(rèn)機(jī)制

foreach(varitemMsg?inlstMsg)

{

????byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);

????//發(fā)布消息

????channel.BasicPublish(

????????exchange:?"MarkTopicChange",

????????routingKey:?"MarkRouteKey.one",

????????basicProperties:?null,

????????body: sendBytes

????????);

}

boolisAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均發(fā)送才返回true

? ? ? ?注意:多消息發(fā)布確認(rèn)機(jī)制情況下,倘若要發(fā)送100條消息,發(fā)送90條后,突然網(wǎng)絡(luò)故障,后面的消息發(fā)送失敗了,那么isAllPublished返回的是false,而前面90條消息已經(jīng)發(fā)送到消息隊(duì)列了。我們還不知道哪些消息是發(fā)送失敗的,所以很多條消息發(fā)布確認(rèn),建議分幾次發(fā)送或多通道發(fā)送。

? ? ? ?此外,需要確保在中轉(zhuǎn)站(Exchange)的消息可以順利到達(dá)消息隊(duì)列。

? ? ? ?(1)首先需要定義匹配的Exchange和Queue,根據(jù)Exchange的類型和routingKey確定轉(zhuǎn)發(fā)的關(guān)系。

? ? ? ?(2)設(shè)置BasicPublish方法中mandatory參數(shù)為true,然后監(jiān)聽(tīng)Exchange中沒(méi)有匹配的隊(duì)列的消息,然后進(jìn)行相操作。

? ? ? ?(3)確保消息隊(duì)列有足夠內(nèi)存存儲(chǔ)消息。

? ? ? ?RabbitMQ默認(rèn)配置vm_memory_high_watermark為0.4。意思是控制消息占40%內(nèi)存左右。vm_memory_high_watermark_paging_ratio為0.5,當(dāng)消息占用內(nèi)存超過(guò)50%,RabbitMQ會(huì)把消息轉(zhuǎn)移到磁盤上以釋放內(nèi)存。當(dāng)磁盤剩余空間小于閥值disk_free_limit(默認(rèn)為50M),所有生產(chǎn)者阻塞,避免充滿磁盤,導(dǎo)致所有的寫操作失敗。

? ? ? ?RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.

? ? ? ?%APPDATA% 一般為 C:\Users\%USERNAME%\AppData\Roaming(Windows環(huán)境)

二、持久化

? ? ? ?消息存放到消息隊(duì)列后,在不配置消息持久化的情況下,若服務(wù)器重啟、關(guān)閉或宕機(jī)等,消息都會(huì)丟失。配置持久化可以有效提高消息的可靠性。持久化需要同時(shí)配置消息持久化和隊(duì)列持久化。單配置消息持久化,隊(duì)列消失了,消息沒(méi)有地方存放;單配置隊(duì)列持久化,隊(duì)列還在,消息沒(méi)了。

? ? ? ?隊(duì)列持久化在定義隊(duì)列時(shí)候配置

1

2

3

4

5

6

7

8

//定義隊(duì)列

channel.QueueDeclare(

????queue:?"Mark_Queue",?//隊(duì)列名稱

????durable:?true,?//隊(duì)列磁盤持久化??????????????????

????exclusive:?false,//是否排他的,false。如果一個(gè)隊(duì)列聲明為排他隊(duì)列,該隊(duì)列首次聲明它的連接可見(jiàn),并在連接斷開(kāi)時(shí)自動(dòng)刪除

????autoDelete:?false,//是否自動(dòng)刪除,一般設(shè)成false

????arguments:?null

????);

  消息持久化在發(fā)布消息時(shí)候配置

1

2

3

4

5

6

7

8

9

10

//消息持久化,把DeliveryMode設(shè)成2

IBasicProperties properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2;

????//發(fā)布消息

????channel.BasicPublish(

????????exchange:?"MarkTopicChange",

????????routingKey:?"MarkRouteKey.one",

????????basicProperties: properties,

????????body: sendBytes

????????);

? ? ? ?如何配置了事務(wù)機(jī)制或發(fā)布確認(rèn)(publisher confirm)機(jī)制,服務(wù)端的返回Basic.Ack是在消息落盤之后執(zhí)行的,進(jìn)一步的提高了消息的可靠性。

? ? ? ?為了防止磁盤損壞帶來(lái)的消息丟失,可以配置鏡像隊(duì)列,這里不作介紹。

三、消費(fèi)確認(rèn)

為了確保消息被消費(fèi)者消費(fèi),RabbitMQ提供消費(fèi)確認(rèn)模式(consumer Acknowledgements)。自動(dòng)確認(rèn)模式,當(dāng)消費(fèi)者成功接收到消息后,自動(dòng)通知RabbitMQ,把消息隊(duì)列中相應(yīng)消息刪除。這很大程度上滿足不了我們,假如消費(fèi)者接收到消息后,服務(wù)器宕機(jī),消息還沒(méi)處理完成,這樣就會(huì)造成消息丟失。手動(dòng)確認(rèn)模式,當(dāng)消費(fèi)者成功處理完消息后,手動(dòng)發(fā)消息通知RabbitMQ,把消息隊(duì)列中相應(yīng)消息刪除。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

consumer.Received += (model, ea) =>

{

????varbody = ea.Body;

????varmessage = Encoding.UTF8.GetString(body);

????varroutingKey = ea.RoutingKey;

????Console.WriteLine(" [x] Received '{0}':'{1}'",

??????????????????????routingKey,

??????????????????????message);


//確認(rèn)該消息已被消費(fèi),發(fā)刪除消息給RabbitMQ,把消息隊(duì)列中的消息刪除

channel.BasicAck(ea.DeliveryTag,?false);

//消費(fèi)消息失敗,拒絕此消息,重回隊(duì)列,讓它可以繼續(xù)發(fā)送到其他消費(fèi)者

//channel.BasicReject(ea.DeliveryTag, true);

//消費(fèi)消息失敗,拒絕多條消息,重回隊(duì)列,讓它們可以繼續(xù)發(fā)送到其他消費(fèi)者

//channel.BasicNack(ea.DeliveryTag, true, true);

};

//手動(dòng)確認(rèn)消息,把a(bǔ)utoAck設(shè)成false

channel.BasicConsume(queue:?"Mark_Queue",

?????????????????????autoAck:?false,

?????????????????????consumer: consumer);

? ? ? ?這里值得注意的是,消息處理完成后,一定要把處理完成的消息發(fā)送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ會(huì)一直等待,從而造成內(nèi)存泄露。若處理消息過(guò)程中發(fā)生異常,可以使用channel.BasicReject(ea.DeliveryTag, true)來(lái)拒絕此消息,讓它重回隊(duì)列。若RabbitMQ收不到消費(fèi)者任何確認(rèn)消息的信號(hào)(包括確認(rèn)信號(hào),拒絕信號(hào)燈),直到此消費(fèi)者斷開(kāi)連接,消息才能重回隊(duì)列,繼續(xù)發(fā)送到其他消費(fèi)者。

? ? ? ?提醒一下,假如消費(fèi)者消費(fèi)消息的方法不支持并發(fā)(取決于需求),可以限制消費(fèi)者每次只接收一條消息。

1channel.BasicQos(0, 1,?false);

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容