本文所有內(nèi)容均個人從RabbitMQ官網(wǎng)教程中翻譯,若圖片文字的引用有任何侵權(quán)的地方,聯(lián)系我,我會立馬刪除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
發(fā)布/訂閱
(使用php-amqplib)
在上一個教程,我們創(chuàng)建了一個工作隊列。Work Queue(工作隊列)是在每一個任務(wù)都分發(fā)給一個確切的處理程序的假設(shè)上建立的。在這一部分,我們將會做一些完全不同的事情——我們將會發(fā)送一條消息去多個Consumers(消費者)上。這種模式被稱為“發(fā)布/訂閱”。
為了說明這種模式,我們將會建立一個簡單的日志系統(tǒng)。它將會包含兩個程序——第一個會發(fā)出日志消息,而另一個則會接收并打印這些消息。
在我們的日志系統(tǒng)里,每一個正在運行的接收程序副本都會獲得(所有)消息。那樣我們就可以運行一個接收程序把日志引導(dǎo)到磁盤;同時我們可以運行另外一個接收程序把日志打印到屏幕上來看。
基本上,被發(fā)布的消息將會在所有接收程序間進行廣播。
交換
在本教程的上一部分,我們通過一個Queue(隊列)發(fā)送和接收消息。現(xiàn)在是時候介紹一下 RabbitMQ 的全消息模型(Full Messaging Model)了。
讓我們快速地復(fù)習前面的教程涵蓋的內(nèi)容:
1.一個Producer(生產(chǎn)者)就是一個發(fā)送消息的程序。
2.一個Queue(隊列)就是一個保存消息的緩沖(buffer)。
3.一個Consumer(消費者)就是一個接收消息的用戶程序。
RabbitMQ的消息模型的主要思想就是:Producer(生產(chǎn)者)從不會直接將消息發(fā)送到Queue(隊列)中。實際上,很多時候Producer(生產(chǎn)者)甚至完全不知道一條消息將會被發(fā)送到Queue(隊列)。
相反地,Producer(生產(chǎn)者)只能發(fā)送消息到一個交換機(exchange)。一個交換機就是一個非常簡單的東西。一方面(或者是叫一端更形象?)從Producer(生產(chǎn)者)處接收消息,另一方面(端?)它把這些消息推進Queue(隊列)中。交換機必須準確地知道對它接收的每一條消息做什么。應(yīng)該將這條消息追加到一個特別的Queue(隊列)嗎?應(yīng)該將這條消息追加到多個Queue(隊列)嗎?或者應(yīng)該把這條消息丟棄掉嗎?做這些事情的規(guī)則是通過定義交換類型來確定的。

這里只有少數(shù)有效的交換類型:direct,topic,headers和fanout。我們將會集中(介紹)最后一個fanout。讓我們創(chuàng)建一個這種類型的交換機并稱他為logs。
$channel->exchange_declare('logs', 'fanout', false, false, false);
fanout交換機是非常簡單的。從它的名稱就能猜到(反正我是不知道怎么翻譯好-_-),它只是將它獲取到的所有消息廣播到它所知道的所有Queue(隊列)中。這與我們的日志系統(tǒng)需求十分吻合。
列出交換機
你可以運行十分有用的
rabbitmqctl來列出服務(wù)器上的所有交換機:sudo rabbitmqctl list_exchanges在列出的隊列中會由一些
amq.*交換機以及默認的(未被命名)的交換機。這是默認創(chuàng)建的,但此時你似乎并不需要使用他們。默認的交換機
在教程的上一部分我們對交換機一無所知,卻仍然可以發(fā)送消息到Queues(隊列)中。這很可能是因為我們使用了一個通過空字符串("")識別的默認的交換機。
重新回顧我們之前發(fā)送一個消息的時候:
$channel->basic_publish($msg, '', 'hello');在此處,我們使用了默認的,或者說是無名的交換機:消息會被路由到
routing_key指定的Queue(隊列),如果這個隊列存在的化。routing_key就是basic_publish的第三個參數(shù)。
現(xiàn)在,讓我們發(fā)送消息到一個被命名的交換機上:
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
臨時隊列
你可能記得之前我們使用的Queue(隊列)都是擁有了一個指定的名稱(還記得hello和task_queue嗎?)。在我們需要將工作程序指向?qū)?yīng)的Queue(隊列)時可以命名一個Queue(隊列)對于我們來說是十分重要的。當你想要在Producer(生產(chǎn)者)與Consumer(消費者)之間共享Queue(隊列)時候為Queue(隊列)命名是十分重要的。
但是這對我們的日志記錄器來說這并不重要。我們打算監(jiān)聽所有的日志消息嗎,而不是僅僅是其中一部分。同樣我們只對當前流動的消息感興趣,而不是舊的消息。為了解決這一情況,我們需要兩樣東西。
首先,無論何時,我們連接到RabbitMQ時候都需要一個新的,并且是空的Queue(隊列)。為了達到這一目的,我們可以使用隨機的名稱來創(chuàng)建一個Queue(隊列),或者,更好的選擇是——讓RabbitMQ服務(wù)器為我們選擇一個隨機的Queue(隊列)。
其次,一旦我們的Consumer(消費者)斷開了鏈接,對應(yīng)的Queue(隊列)應(yīng)該被自動刪除。
在php-amqplib客戶端中,當我們傳給Queue(隊列)名稱參數(shù)一個空字符串時候,我們能創(chuàng)建一個非持久化隊列(non-durable Queue),并反回了一個(自動)生成的隊列名稱:
list($queue_name,,) = $channel->queue_declare('');
當這個方法反回,$queue_name變量包含了一個由RabbitMQ生成的隨機名稱。例如,它看起來會像是這樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
當這個連接宣布關(guān)閉了,對應(yīng)的Queue(隊列)將會被刪除,因為它被聲明為獨有的(exclusive)。
綁定

我們已經(jīng)創(chuàng)建了一個fanout交換機以及一個Queue(隊列)?,F(xiàn)在我們需要告訴交換機去發(fā)送消息到我們的Queue(隊列)。交換機與隊列之間的關(guān)系成為綁定(binding)。
$channel->queue_bind($queue_name, 'logs');
從現(xiàn)在起,logs交換機將會追加消息去我們的隊列。
列出綁定
如你所想,通過以下方式你可以把正在使用的綁定(bindings)。
rabbitmqctl list_bindings
將他們放在一起

發(fā)出日志的這個Producer(生產(chǎn)者)程序與我們之前教程的不會相差太多。最重要的改變就是我們現(xiàn)在希望發(fā)送消息到我們的logs交換機而不是無名的那個。下面就是emit_log.php腳本的代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 聲明交換機
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
// 發(fā)送消息到交換機上(而不是指定隊列)
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
這如你所看到的,在建立連接之后我們聲明了一個交換機。這一步十分重要因為發(fā)送消息到不存在的交換機是被禁止的,
如果已經(jīng)沒有Queue(隊列)綁定到交換機,交換機的消息將會被丟失,但對于我們來說可以接受;如果已經(jīng)沒有Consumer(消費者)監(jiān)聽該交換機的消息了,我們可以安全地刪除這些消息。
reveive_logs.php的代碼如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 聲明對應(yīng)的交換機
$channel->exchange_declare('logs', 'fanout', false, false, false);
// 創(chuàng)建一個非持久化的隊列并獲取自動生成的隊列名稱
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 綁定隊列到交換機
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果你想保存這些日志到文件,只需要打開一個控制臺,并輸入:
php reveive_logs.php > logs_from_rabbit.log
如果你想在屏幕上看到這些日志,新建一個新的終端并運行:
php reveive_logs.php
當然,你還需要發(fā)送日志:
php emit_log.php
使用rabbitmqctl list_bindings你可以驗證這些代碼已經(jīng)如我們所想地創(chuàng)建了綁定與Queue(隊列)。如果是運行著兩個receive_logs.php程序,你將會看到類似下面的情況:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
這結(jié)果的解析很直接了當:數(shù)據(jù)從logs交換機發(fā)送到了兩個以服務(wù)器分配的名稱命名的Queue(隊列)上。這正式我們所期望的。