本文所有內(nèi)容均個(gè)人從RabbitMQ官網(wǎng)教程中翻譯,若圖片文字的引用有任何侵權(quán)的地方,聯(lián)系我,我會(huì)立馬刪除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
發(fā)布/訂閱
(使用php-amqplib)
在上一個(gè)教程,我們創(chuàng)建了一個(gè)工作隊(duì)列。Work Queue(工作隊(duì)列)是在每一個(gè)任務(wù)都分發(fā)給一個(gè)確切的處理程序的假設(shè)上建立的。在這一部分,我們將會(huì)做一些完全不同的事情——我們將會(huì)發(fā)送一條消息去多個(gè)Consumers(消費(fèi)者)上。這種模式被稱為“發(fā)布/訂閱”。
為了說明這種模式,我們將會(huì)建立一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將會(huì)包含兩個(gè)程序——第一個(gè)會(huì)發(fā)出日志消息,而另一個(gè)則會(huì)接收并打印這些消息。
在我們的日志系統(tǒng)里,每一個(gè)正在運(yùn)行的接收程序副本都會(huì)獲得(所有)消息。那樣我們就可以運(yùn)行一個(gè)接收程序把日志引導(dǎo)到磁盤;同時(shí)我們可以運(yùn)行另外一個(gè)接收程序把日志打印到屏幕上來看。
基本上,被發(fā)布的消息將會(huì)在所有接收程序間進(jìn)行廣播。
交換
在本教程的上一部分,我們通過一個(gè)Queue(隊(duì)列)發(fā)送和接收消息?,F(xiàn)在是時(shí)候介紹一下 RabbitMQ 的全消息模型(Full Messaging Model)了。
讓我們快速地復(fù)習(xí)前面的教程涵蓋的內(nèi)容:
1.一個(gè)Producer(生產(chǎn)者)就是一個(gè)發(fā)送消息的程序。
2.一個(gè)Queue(隊(duì)列)就是一個(gè)保存消息的緩沖(buffer)。
3.一個(gè)Consumer(消費(fèi)者)就是一個(gè)接收消息的用戶程序。
RabbitMQ的消息模型的主要思想就是:Producer(生產(chǎn)者)從不會(huì)直接將消息發(fā)送到Queue(隊(duì)列)中。實(shí)際上,很多時(shí)候Producer(生產(chǎn)者)甚至完全不知道一條消息將會(huì)被發(fā)送到Queue(隊(duì)列)。
相反地,Producer(生產(chǎn)者)只能發(fā)送消息到一個(gè)交換機(jī)(exchange)。一個(gè)交換機(jī)就是一個(gè)非常簡(jiǎn)單的東西。一方面(或者是叫一端更形象?)從Producer(生產(chǎn)者)處接收消息,另一方面(端?)它把這些消息推進(jìn)Queue(隊(duì)列)中。交換機(jī)必須準(zhǔn)確地知道對(duì)它接收的每一條消息做什么。應(yīng)該將這條消息追加到一個(gè)特別的Queue(隊(duì)列)嗎?應(yīng)該將這條消息追加到多個(gè)Queue(隊(duì)列)嗎?或者應(yīng)該把這條消息丟棄掉嗎?做這些事情的規(guī)則是通過定義交換類型來確定的。

這里只有少數(shù)有效的交換類型:direct,topic,headers和fanout。我們將會(huì)集中(介紹)最后一個(gè)fanout。讓我們創(chuàng)建一個(gè)這種類型的交換機(jī)并稱他為logs。
$channel->exchange_declare('logs', 'fanout', false, false, false);
fanout交換機(jī)是非常簡(jiǎn)單的。從它的名稱就能猜到(反正我是不知道怎么翻譯好-_-),它只是將它獲取到的所有消息廣播到它所知道的所有Queue(隊(duì)列)中。這與我們的日志系統(tǒng)需求十分吻合。
列出交換機(jī)
你可以運(yùn)行十分有用的
rabbitmqctl來列出服務(wù)器上的所有交換機(jī):sudo rabbitmqctl list_exchanges在列出的隊(duì)列中會(huì)由一些
amq.*交換機(jī)以及默認(rèn)的(未被命名)的交換機(jī)。這是默認(rèn)創(chuàng)建的,但此時(shí)你似乎并不需要使用他們。默認(rèn)的交換機(jī)
在教程的上一部分我們對(duì)交換機(jī)一無所知,卻仍然可以發(fā)送消息到Queues(隊(duì)列)中。這很可能是因?yàn)槲覀兪褂昧艘粋€(gè)通過空字符串("")識(shí)別的默認(rèn)的交換機(jī)。
重新回顧我們之前發(fā)送一個(gè)消息的時(shí)候:
$channel->basic_publish($msg, '', 'hello');在此處,我們使用了默認(rèn)的,或者說是無名的交換機(jī):消息會(huì)被路由到
routing_key指定的Queue(隊(duì)列),如果這個(gè)隊(duì)列存在的化。routing_key就是basic_publish的第三個(gè)參數(shù)。
現(xiàn)在,讓我們發(fā)送消息到一個(gè)被命名的交換機(jī)上:
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
臨時(shí)隊(duì)列
你可能記得之前我們使用的Queue(隊(duì)列)都是擁有了一個(gè)指定的名稱(還記得hello和task_queue嗎?)。在我們需要將工作程序指向?qū)?yīng)的Queue(隊(duì)列)時(shí)可以命名一個(gè)Queue(隊(duì)列)對(duì)于我們來說是十分重要的。當(dāng)你想要在Producer(生產(chǎn)者)與Consumer(消費(fèi)者)之間共享Queue(隊(duì)列)時(shí)候?yàn)?strong>Queue(隊(duì)列)命名是十分重要的。
但是這對(duì)我們的日志記錄器來說這并不重要。我們打算監(jiān)聽所有的日志消息嗎,而不是僅僅是其中一部分。同樣我們只對(duì)當(dāng)前流動(dòng)的消息感興趣,而不是舊的消息。為了解決這一情況,我們需要兩樣?xùn)|西。
首先,無論何時(shí),我們連接到RabbitMQ時(shí)候都需要一個(gè)新的,并且是空的Queue(隊(duì)列)。為了達(dá)到這一目的,我們可以使用隨機(jī)的名稱來創(chuàng)建一個(gè)Queue(隊(duì)列),或者,更好的選擇是——讓RabbitMQ服務(wù)器為我們選擇一個(gè)隨機(jī)的Queue(隊(duì)列)。
其次,一旦我們的Consumer(消費(fèi)者)斷開了鏈接,對(duì)應(yīng)的Queue(隊(duì)列)應(yīng)該被自動(dòng)刪除。
在php-amqplib客戶端中,當(dāng)我們傳給Queue(隊(duì)列)名稱參數(shù)一個(gè)空字符串時(shí)候,我們能創(chuàng)建一個(gè)非持久化隊(duì)列(non-durable Queue),并反回了一個(gè)(自動(dòng))生成的隊(duì)列名稱:
list($queue_name,,) = $channel->queue_declare('');
當(dāng)這個(gè)方法反回,$queue_name變量包含了一個(gè)由RabbitMQ生成的隨機(jī)名稱。例如,它看起來會(huì)像是這樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
當(dāng)這個(gè)連接宣布關(guān)閉了,對(duì)應(yīng)的Queue(隊(duì)列)將會(huì)被刪除,因?yàn)樗宦暶鳛楠?dú)有的(exclusive)。
綁定

我們已經(jīng)創(chuàng)建了一個(gè)fanout交換機(jī)以及一個(gè)Queue(隊(duì)列)。現(xiàn)在我們需要告訴交換機(jī)去發(fā)送消息到我們的Queue(隊(duì)列)。交換機(jī)與隊(duì)列之間的關(guān)系成為綁定(binding)。
$channel->queue_bind($queue_name, 'logs');
從現(xiàn)在起,logs交換機(jī)將會(huì)追加消息去我們的隊(duì)列。
列出綁定
如你所想,通過以下方式你可以把正在使用的綁定(bindings)。
rabbitmqctl list_bindings
將他們放在一起

發(fā)出日志的這個(gè)Producer(生產(chǎn)者)程序與我們之前教程的不會(huì)相差太多。最重要的改變就是我們現(xiàn)在希望發(fā)送消息到我們的logs交換機(jī)而不是無名的那個(gè)。下面就是emit_log.php腳本的代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 聲明交換機(jī)
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
// 發(fā)送消息到交換機(jī)上(而不是指定隊(duì)列)
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
這如你所看到的,在建立連接之后我們聲明了一個(gè)交換機(jī)。這一步十分重要因?yàn)榘l(fā)送消息到不存在的交換機(jī)是被禁止的,
如果已經(jīng)沒有Queue(隊(duì)列)綁定到交換機(jī),交換機(jī)的消息將會(huì)被丟失,但對(duì)于我們來說可以接受;如果已經(jīng)沒有Consumer(消費(fèi)者)監(jiān)聽該交換機(jī)的消息了,我們可以安全地刪除這些消息。
reveive_logs.php的代碼如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 聲明對(duì)應(yīng)的交換機(jī)
$channel->exchange_declare('logs', 'fanout', false, false, false);
// 創(chuàng)建一個(gè)非持久化的隊(duì)列并獲取自動(dòng)生成的隊(duì)列名稱
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 綁定隊(duì)列到交換機(jī)
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果你想保存這些日志到文件,只需要打開一個(gè)控制臺(tái),并輸入:
php reveive_logs.php > logs_from_rabbit.log
如果你想在屏幕上看到這些日志,新建一個(gè)新的終端并運(yùn)行:
php reveive_logs.php
當(dāng)然,你還需要發(fā)送日志:
php emit_log.php
使用rabbitmqctl list_bindings你可以驗(yàn)證這些代碼已經(jīng)如我們所想地創(chuàng)建了綁定與Queue(隊(duì)列)。如果是運(yùn)行著兩個(gè)receive_logs.php程序,你將會(huì)看到類似下面的情況:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
這結(jié)果的解析很直接了當(dāng):數(shù)據(jù)從logs交換機(jī)發(fā)送到了兩個(gè)以服務(wù)器分配的名稱命名的Queue(隊(duì)列)上。這正式我們所期望的。