本文所有內(nèi)容均個(gè)人從RabbitMQ官網(wǎng)教程中翻譯,若圖片文字的引用有任何侵權(quán)的地方,聯(lián)系我,我會立馬刪除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
工作隊(duì)列
(使用php-amqplib)

在第一個(gè)教程中我們寫了程序去通過一個(gè)已經(jīng)被named(被命名)的Queue(隊(duì)列)發(fā)送和接收消息。在這個(gè)教程中,我們將會創(chuàng)建一個(gè) Work Queue(工作隊(duì)列) 用于在多個(gè)處理程序中分配耗時(shí)任務(wù)。
Work Queues(工作隊(duì)列)(又名:Task Queues(任務(wù)隊(duì)列))的主要思想是避免立即處理資源密集型(resource-intensive)任務(wù),并且必須等待它完成。相反地,我們計(jì)劃讓任務(wù)稍后完成。我們把一個(gè)任務(wù)封裝成消息,并且發(fā)送它到Queue(隊(duì)列)。一個(gè)在后臺運(yùn)行的工作線程將會把任務(wù)出隊(duì)并最終執(zhí)行它。當(dāng)你運(yùn)行非常多的處理程序,這些(多個(gè))任務(wù)將會被均分到它們之間。
這個(gè)概念對于不可能在短暫的HTTP請求中處理一個(gè)復(fù)雜運(yùn)算的網(wǎng)站應(yīng)用來說十分有用。
準(zhǔn)備
在此教程的上一部分我們發(fā)送了一個(gè)包含 "Hello World!" 的消息?,F(xiàn)在我們將會發(fā)送字符串來代替(模擬)復(fù)雜的任務(wù)。我們并沒有一個(gè)真實(shí)的類似于縮放圖片或者渲染pdf的(復(fù)雜)任務(wù),所以讓我們通過使用 sleep() 函數(shù)假裝我們很忙來偽造此類復(fù)雜任務(wù)。我們以字符串中的點(diǎn)(.)來描述它的復(fù)雜度;每一個(gè)點(diǎn)(.)將會占用“工作”的一秒中。例如,一個(gè)通過Hello...來描述的偽裝任務(wù)將會花費(fèi)三秒鐘來處理。
為了允許發(fā)送來自命令行的任意消息我們將會稍微修改上一個(gè)例子的 send.php 代碼。這個(gè)程序?qū)才湃蝿?wù)到我們的工作Queue(隊(duì)列)中,我們把他命名為 new_task.php:
$data = implode(' ',array_slice($argv,1));
if(empty($data)) $data = 'Hello World!';
$msg = new AMQPMessage($data);
$channel->basic_publish($msg,'','hello');
echo " [x] Sent ", $data, "\n";
我們舊的 receive.php 腳本同樣需要一點(diǎn)修改:它需要去將消息中的每一個(gè)點(diǎn)(.)偽裝成任務(wù)中花費(fèi)一秒的時(shí)間。它將會把消息從Queue(隊(duì)列)中出隊(duì)并且執(zhí)行它,所以我們將它命名為 worker.php :
$callback = function($msg){
echo "[x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body,'.'));
echo " [x] Done","\n";
}
$channel->basic_consume('hello','',false,true,false,false,$callback);
注意我們偽裝的任務(wù)將會模擬執(zhí)行時(shí)間。
想在第一個(gè)教程中那樣運(yùn)行他們
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
循環(huán)調(diào)度
使用任務(wù)隊(duì)列(Task Queue) 的一個(gè)優(yōu)點(diǎn)就是它可以輕松地并行化任務(wù)。如果我們積壓了大量的任務(wù),我們只需要添加更多的工作程序,這可以很輕松的進(jìn)行擴(kuò)展規(guī)模。
首先,讓我們嘗試同時(shí)運(yùn)行兩個(gè) worker.php 腳本。他們都將會從Queue(隊(duì)列)中獲得消息,但確切的情況又是如何呢?讓我們來看看:
你需要打開三個(gè)控制臺。其中兩個(gè)運(yùn)行 worker.php 腳本。這些控制臺將會成為我們的兩個(gè)Consumer(消費(fèi)者)——C1和C2。
# shell 1
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
# shell 2
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
在第三個(gè)控制臺我們將會發(fā)送一些新的任務(wù)。一旦你運(yùn)行了Consumers(消費(fèi)者)你就可以推送一些消息:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
讓我們看一下任務(wù)交付情況:
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默認(rèn)情況下,RabbitMQ 將會按順序地發(fā)送每一個(gè)消息到下一個(gè)Consumer(消費(fèi)者)。平均每個(gè)消費(fèi)者都會得到相同數(shù)量的消息。這個(gè)分發(fā)消息的方式叫做輪詢調(diào)度 (round-robin)。使用三個(gè)或更多的處理程序來嘗試它吧。
消息確認(rèn)
執(zhí)行一個(gè)任務(wù)要花費(fèi)一些時(shí)間。你可能會想如果其中一個(gè)Consumer(消費(fèi)者)開始了一個(gè)長任務(wù)并且只完成部分任務(wù)就掛掉了會發(fā)生什么。目前我們的代碼,一旦 RabbitMQ 發(fā)送了一個(gè)消息去客戶端它會立馬標(biāo)記他們?yōu)閯h除。在這一情況下,如果你殺掉其中一個(gè)工作程序我們將會失去那些只是在處理中(但尚未完成)的消息。我們同樣失去了所有發(fā)送了給這個(gè)程序但還未處理的消息。
但是我們不想丟失任何任務(wù)。如果一個(gè)工作程序掛掉,我們想把這個(gè)任務(wù)發(fā)送到其他的工作程序。
為了確保消息永不消失,RabbitMQ 支持 消息確認(rèn)。Consumer將會反回一個(gè)應(yīng)答去告訴 RabbitMQ 一個(gè)特定的消息已經(jīng)被接收、處理,這時(shí)RabbitMQ 就可以自由地刪除這一消息了。
如果一個(gè)Consumer(消費(fèi)者)在反回應(yīng)答(ack)信號前掛掉了(例如它的channel(頻道) 關(guān)閉了,連接關(guān)閉了,或者TCP連接丟失了),RabbitMQ 將會知道該消息并沒有被完全處理并且會重新把它插入Queue(隊(duì)列)。如果這時(shí)有其他的Consumers(消費(fèi)者)在線,它會立馬重新發(fā)送這條消息去其他的Consumer(消費(fèi)者)。這一種方式即使處理程序意外掛掉你也能確保沒有消息丟失。
這里沒有任何消息會因?yàn)槌瑫r(shí)而丟失;當(dāng)Consumer(消費(fèi)者) 掛掉 RabbitMQ 將會重新發(fā)送消息,即使處理一個(gè)消息要很長很長的時(shí)間。(一直等到能發(fā)送到下一個(gè)消費(fèi)者?)
消息確認(rèn)默認(rèn)是關(guān)閉的。當(dāng)設(shè)置 basic_consume函數(shù)的第四個(gè)參數(shù)為 false(true代表沒有消息確認(rèn))并且當(dāng)工作程序在完成一個(gè)任務(wù)的時(shí)候返回一個(gè)適當(dāng)?shù)膽?yīng)答信號將會開啟消息確認(rèn)。
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用這段代碼我們將能確保沒有任何消失會丟失,即使你用ctrl+c 殺掉了一個(gè)正在處理消息的工作程序。很快在該工作程序掛掉之后,所有未被應(yīng)答的消息將會被重新發(fā)送。
忘了確認(rèn)
錯(cuò)過ack是一個(gè)很常見的錯(cuò)誤。它是一個(gè)簡單的錯(cuò)誤,但是后果都會很嚴(yán)重。當(dāng)你的客戶端退出時(shí)候消息將會被重新發(fā)送(這可能會像是隨機(jī)地重新發(fā)送給),但是如果不能夠釋放一些未被應(yīng)答的消息, RabbitMQ 將會占用越來越多的內(nèi)存。
為了調(diào)試這種錯(cuò)誤你可以使用rabbitmqctl把messages_unacknowledged(未被確認(rèn)的消息)都打印出來:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows平臺,省略掉sudo即可:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
我們已經(jīng)學(xué)習(xí)了怎樣做才能確保即使Consumer(消費(fèi)者)掛掉,任務(wù)也不會丟失。但是如果是 RabbitMQ 服務(wù)停止了,我們的任務(wù)也同樣會丟失。
當(dāng) RabbitMQ 退出或者崩潰,它會把隊(duì)列與消息統(tǒng)統(tǒng)忘掉,除非你告訴它不要這樣做。為了確保消息不會丟失(即使RabbitMQ退出或崩潰),我們需要做兩件事情:我們需要將Queue(隊(duì)列)和消息都標(biāo)記為持久化的。
首先我們需要確保 RabbitMQ 將永不會丟失我們的Queue(隊(duì)列)。為了實(shí)現(xiàn)這個(gè),我們需要聲明它為durable(持久的)。所以我們把(聲明隊(duì)列時(shí)候的)第三個(gè)參數(shù) queue_declare 置為 true:
$channel->queue_declare('hello', false, true, false, false);
盡管這條命令它本身是正確無誤的,但是就目前我們(RabbitMQ)的配置來看,這并不會真正生效。這是因?yàn)槲覀円呀?jīng)定義了一個(gè)并不持久化(not durable)的叫做 hello的 Queue(隊(duì)列)。RabbitMQ將不會允許你對已經(jīng)存在的Queue(隊(duì)列)重新定義,并且當(dāng)任何程序嘗試這樣做的時(shí)候會返回一個(gè) error 錯(cuò)誤。但這有一個(gè)快速的解決方法——讓我們重新定義一個(gè)不同名字的Queue(隊(duì)列),例如 task_queus:
$channel->queue_declare('task_queue', false, true, false, false);
Producer(生產(chǎn)者)和Consumer(消費(fèi)者)的代碼中這個(gè)標(biāo)記都要被設(shè)為 true.
這時(shí)候我們就能確保即使 RabbitMQ 重啟, task_queue 也不會丟失?,F(xiàn)在我們需要通過設(shè)置 AMQPMessage 參數(shù)數(shù)組中的 delivery_mode = 2 這一消息屬性來確保我們的消息是持久化的。
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
消息持久化需要注意的地方
標(biāo)記消息為持久花的并不能完全保重一條消息都不會丟失。盡管它高數(shù)了 RabbitMQ 把消息保存到磁盤,在 RabbitMQ 接收了一條消息但并未將它保存(到磁盤)也會由一段很短的時(shí)間(可能發(fā)生錯(cuò)誤)。同樣 RabbitMQ 不會對所有消息都執(zhí)行
fsync(2)——它可能只是保存在緩存中而不是真正地寫入到磁盤中。這種持久化的保證不是很強(qiáng)大,但對于我們的 Simple Task Queue (簡單任務(wù)隊(duì)列) 來說也已經(jīng)完全足夠了。如果你需要一個(gè)更加強(qiáng)大持久化,你可以是Publisher Confirms(發(fā)送者確認(rèn))
公平調(diào)度(Fair dispatch)
你可能已經(jīng)注意到目前的調(diào)度并不能完全按照我們所希望的進(jìn)行。例如在有兩個(gè)處理程序( Consumer(消費(fèi)者) )的時(shí)候,所有奇數(shù)的消息都十分繁雜但其他都都很輕松,這樣,其中一個(gè)處理程序就會一直繁忙,然而另外一個(gè)幾乎什么都不用做。然而,RabbitMQ 并不知道這種情況,它將會繼續(xù)均勻地分發(fā)所有消息。
導(dǎo)致這種情況的原因是 RabbitMQ 在消息進(jìn)入Queue(隊(duì)列)的時(shí)候僅僅是分發(fā)這些消息。它(RabbitMQ)并沒有留意一個(gè)Consumer(消費(fèi)者)的未被應(yīng)答消息數(shù)量。它只是盲目地將第n條消息發(fā)送到第n個(gè)Consumer(消費(fèi)者)。

為了解決這種問題,我們使用 basic_qos 函數(shù)并設(shè)置該函數(shù)的 (第二個(gè)參數(shù))prefetch_count = 1。這會告訴 RabbitMQ 不要在同一時(shí)間分發(fā)不止一條消息到一個(gè)處理程序上。換句話說就是,不要在處理程序處理并應(yīng)答了上一條消息前再分發(fā)一個(gè)新的消息給它。相反地,它會把這條新的消息分發(fā)到下一個(gè)并不繁忙的處理程序。
$channel->basic_qos(null, 1, null);
關(guān)于Queue(隊(duì)列) 的大小
如果所有的處理程序都在繁忙,你的隊(duì)列將會被填滿。你將會想關(guān)注這一點(diǎn),添加更多的處理程序,或者更換一種策略。
讓他們一起運(yùn)行
最后我們的 new_task.php 文件是這樣的:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array(
'delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT
)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
以及我們的 worker.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
在這章教程中我們學(xué)習(xí)到使用消息確認(rèn)以及 prefetch 你可以設(shè)立一個(gè)工作隊(duì)列。持久化配置讓讓任務(wù)在即使 RabbitMQ 重啟的情況下也能留存。