場(chǎng)景2:?jiǎn)伟l(fā)送多接收
使用場(chǎng)景:一個(gè)發(fā)送端,多個(gè)接收端,如分布式的任務(wù)派發(fā)。為了保證消息發(fā)送的可靠性,不丟失消息,使消息持久化了。同時(shí)為了防止接收端在處理消息時(shí)down掉,只有在消息處理完成后才發(fā)送ack消息。

準(zhǔn)備
我們并沒有像調(diào)整圖片大小、渲染PDF這樣真實(shí)的任務(wù),所以我們將會(huì)發(fā)送一些字符串代表復(fù)雜的任務(wù),通過使用Thread.sleep()來假裝我們的業(yè)務(wù)系統(tǒng)很繁忙,小數(shù)點(diǎn)的數(shù)量代表這個(gè)任務(wù)的復(fù)雜性:每一個(gè)小數(shù)點(diǎn)代表一秒,比如,Hello...代表這個(gè)任務(wù)需要三秒。
我們稍微修改了一下上一篇的程序,來讓他可以發(fā)送任意數(shù)量的消息,這個(gè)程序?qū)?huì)給我們的工作隊(duì)列安排任務(wù),所以我們給他命名為NewTash.java


我們之前的Recv.java也需要修改一下,他要根據(jù)消息體里的小數(shù)點(diǎn)數(shù)量來增加工作的秒數(shù)。他會(huì)傳遞消息并且執(zhí)行他。改名為Worker.java


循環(huán)調(diào)度
使用任務(wù)隊(duì)列的好處之一就是可以輕易的平行工作(即部署分布式worker),當(dāng)面對(duì)積壓了非常大量的消息的情況時(shí),只需要增加worker的部署數(shù)量就可以輕易解決了。
我們嘗試同時(shí)跑兩個(gè)worker,即消息的接受者,他們會(huì)怎么樣共同工作呢?

運(yùn)行消息發(fā)送者,發(fā)送五條消息

消息接受者的情況如下:

worker1接收到了1、3、5條消息,worker2接受到了2、4條消息。
默認(rèn)情況下,rabbitMq會(huì)按照順序的方式給每個(gè)消費(fèi)者發(fā)送消息,平均每個(gè)消費(fèi)者都能得到相同數(shù)量的消息,這種方式叫做round-robin(循環(huán)??)。讀者可以自行嘗試三個(gè)甚至更多的消費(fèi)者的情況。
消息答復(fù)
Worker接收到消息后,完成他的業(yè)務(wù)代碼需要一些時(shí)間,你可能想知道在一個(gè)消費(fèi)者接收到一個(gè)消息,然后執(zhí)行業(yè)務(wù)代碼到一部分的時(shí)候掛掉了會(huì)怎么辦。根據(jù)我們現(xiàn)在的代碼來說,rabbitMq把消息傳遞給消費(fèi)者后就會(huì)把這些消息刪除掉,在這種情況下,如果你干掉了一個(gè)worker,我們就會(huì)失去這個(gè)worker正在執(zhí)行的,以及所有rabbitMq派發(fā)給他并且還沒來得及執(zhí)行的消息。
但是我們并不想失去任何的任務(wù)消息,如果一個(gè)worker掛掉了,我們想把這個(gè)worker頭上的任務(wù)消息派發(fā)到其他的worker頭上。
為了確保消息不會(huì)丟失,rabbitMq支持消息答復(fù)。當(dāng)一個(gè)消息被消費(fèi)者接收到并且執(zhí)行完成后,消費(fèi)者會(huì)發(fā)送一個(gè)ack給rabbitMq服務(wù)器告訴他我已經(jīng)執(zhí)行完成了,你可以把這條消息刪除了。
如果一個(gè)消費(fèi)者沒有返回消息答復(fù)就掛掉了(信道關(guān)閉,連接關(guān)閉或者TCP鏈接丟失),rabbitMq就會(huì)明白,這個(gè)消息還沒有被完成,rebbitMq就會(huì)重新把這條消息放入隊(duì)列,如果在這個(gè)時(shí)間有其他的消費(fèi)者在線,那么rabbitMq就會(huì)迅速的吧這條消息傳遞給其他的消費(fèi)者,這樣就確保了沒有消息會(huì)丟失。
就算執(zhí)行一個(gè)消息用了非常長(zhǎng)的時(shí)間,也不會(huì)有任何問題。
手動(dòng)消息答復(fù)默認(rèn)是開啟的,前面的例子我們通過autoAck=ture把他們關(guān)閉了。我們現(xiàn)在要把它設(shè)置為false,然后從一個(gè)worker那里發(fā)送一個(gè)合適的答復(fù)。

這樣編碼的話,就算你用Ctrl+C殺掉一個(gè)正在處理消息的worker進(jìn)程也不會(huì)丟失任何消息,worker掛掉之后,沒有答復(fù)的消息就會(huì)被自動(dòng)重新傳遞。
消息持久化
我們已經(jīng)學(xué)到了如何確保就算消費(fèi)者掛掉消息也不會(huì)丟失。但是如果我們的ribbitMq服務(wù)器停了的話,我們的消息任務(wù)仍然會(huì)丟失。
當(dāng)rabbitMq服務(wù)器停止或者崩潰的時(shí)候,它就會(huì)忘掉所有的隊(duì)列和消息,除非你告訴它不要這么做。。要確保消息不會(huì)丟失我們要做兩件事:我們需要使隊(duì)列和消息持久化。
首先,我們要確保rbbitMq不會(huì)丟失我們的隊(duì)列,我們要做的是聲明隊(duì)列為可持久化的。

盡管命令是正確的,基于我們目前的設(shè)置他也不會(huì)生效。因?yàn)槲覀円呀?jīng)定義了一個(gè)叫hello的非持久化的隊(duì)列。rabbitMq不會(huì)允許你用不同的參數(shù)重新定義一個(gè)已經(jīng)存在的隊(duì)列,如果你這么做,會(huì)返回一個(gè)錯(cuò)誤。我們這有一個(gè)快速的變通方案-我們什么一個(gè)不同名字的隊(duì)列。如task_queue:

這個(gè)隊(duì)列聲明的變化需要同時(shí)應(yīng)用于生產(chǎn)者和消費(fèi)者的代碼。
這個(gè)時(shí)候我們就相當(dāng)確定這個(gè)叫task_queue的隊(duì)列就算rabbitMq重啟也不會(huì)丟失了?,F(xiàn)在我們需要通過設(shè)置MessageProperties的值為PERSISTENT_TEXT_PLAIN.把我們的消息標(biāo)記成可持久化的。

注意:把消息標(biāo)記成持久化的并不能完全保證消息不會(huì)丟失。盡管他告訴了rabbitMq要把這條消息保存到磁盤上,但是仍然有少數(shù)情況rabbitMq接收到消息還沒來得及保存它。需要更強(qiáng)壯的保證機(jī)制publisher confirms.
公平的分配機(jī)制
你可能注意到了現(xiàn)在的派發(fā)機(jī)制并沒有像我們想象的那樣工作。比如有兩個(gè)worker的情況下,當(dāng)所有奇數(shù)的消息都很重、偶數(shù)消息都很輕的時(shí)候,一個(gè)worker就會(huì)不斷地繁忙工作,但是另一個(gè)就幾乎不工作。但是這些對(duì)于rabbitMq服務(wù)器來說是不可知的。
這種情況是因?yàn)閞abbitMq服務(wù)器并不去查看每個(gè)消費(fèi)者未答復(fù)的消息的數(shù)量,它只是盲目的派發(fā)消息到消費(fèi)者。

為了避免這種情況,我們可以使用basicQos方法,設(shè)置prefetchCount=1。這就告訴rabbitMq不要把多個(gè)消息同時(shí)派發(fā)給同一個(gè)worker。換句話說就是,在一個(gè)worker沒有完成和答復(fù)前一個(gè)消息之前,不要給他派發(fā)新消息。相應(yīng)的,它會(huì)把新消息派發(fā)給一個(gè)不忙的worker。

注意隊(duì)列的大?。?/p>
當(dāng)所有的worker都非常忙碌時(shí),你的隊(duì)列可能會(huì)被填滿,所以你要注意你的隊(duì)列大小,適當(dāng)?shù)脑黾痈嗟膚orker,或者使用其他的策略。
綜合起來,代碼如下。



翻譯自:http://www.rabbitmq.com/tutorials/tutorial-two-java.html
參考:https://www.cnblogs.com/luxiaoxun/p/3918054.html