RabbitMQ官網(wǎng)教程翻譯(PHP版本)_3

本文所有內(nèi)容均個人從RabbitMQ官網(wǎng)教程中翻譯,若圖片文字的引用有任何侵權(quán)的地方,聯(lián)系我,我會立馬刪除。

This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.

發(fā)布/訂閱

(使用php-amqplib

在上一個教程,我們創(chuàng)建了一個工作隊列。Work Queue(工作隊列)是在每一個任務(wù)都分發(fā)給一個確切的處理程序的假設(shè)上建立的。在這一部分,我們將會做一些完全不同的事情——我們將會發(fā)送一條消息去多個Consumers(消費者)上。這種模式被稱為“發(fā)布/訂閱”。

為了說明這種模式,我們將會建立一個簡單的日志系統(tǒng)。它將會包含兩個程序——第一個會發(fā)出日志消息,而另一個則會接收并打印這些消息。

在我們的日志系統(tǒng)里,每一個正在運行的接收程序副本都會獲得(所有)消息。那樣我們就可以運行一個接收程序把日志引導(dǎo)到磁盤;同時我們可以運行另外一個接收程序把日志打印到屏幕上來看。

基本上,被發(fā)布的消息將會在所有接收程序間進行廣播。

交換

在本教程的上一部分,我們通過一個Queue(隊列)發(fā)送和接收消息。現(xiàn)在是時候介紹一下 RabbitMQ 的全消息模型(Full Messaging Model)了。

讓我們快速地復(fù)習前面的教程涵蓋的內(nèi)容:
1.一個Producer(生產(chǎn)者)就是一個發(fā)送消息的程序。
2.一個Queue(隊列)就是一個保存消息的緩沖(buffer)。
3.一個Consumer(消費者)就是一個接收消息的用戶程序。

RabbitMQ的消息模型的主要思想就是:Producer(生產(chǎn)者)從不會直接將消息發(fā)送到Queue(隊列)中。實際上,很多時候Producer(生產(chǎn)者)甚至完全不知道一條消息將會被發(fā)送到Queue(隊列)。

相反地,Producer(生產(chǎn)者)只能發(fā)送消息到一個交換機(exchange)。一個交換機就是一個非常簡單的東西。一方面(或者是叫一端更形象?)從Producer(生產(chǎn)者)處接收消息,另一方面(端?)它把這些消息推進Queue(隊列)中。交換機必須準確地知道對它接收的每一條消息做什么。應(yīng)該將這條消息追加到一個特別的Queue(隊列)嗎?應(yīng)該將這條消息追加到多個Queue(隊列)嗎?或者應(yīng)該把這條消息丟棄掉嗎?做這些事情的規(guī)則是通過定義交換類型來確定的。

Produce->Exchange->Queues

這里只有少數(shù)有效的交換類型:direct,topic,headersfanout。我們將會集中(介紹)最后一個fanout。讓我們創(chuàng)建一個這種類型的交換機并稱他為logs。

$channel->exchange_declare('logs', 'fanout', false, false, false);

fanout交換機是非常簡單的。從它的名稱就能猜到(反正我是不知道怎么翻譯好-_-),它只是將它獲取到的所有消息廣播到它所知道的所有Queue(隊列)中。這與我們的日志系統(tǒng)需求十分吻合。


列出交換機

你可以運行十分有用的 rabbitmqctl 來列出服務(wù)器上的所有交換機:

sudo rabbitmqctl list_exchanges

在列出的隊列中會由一些 amq.* 交換機以及默認的(未被命名)的交換機。這是默認創(chuàng)建的,但此時你似乎并不需要使用他們。

默認的交換機

在教程的上一部分我們對交換機一無所知,卻仍然可以發(fā)送消息到Queues(隊列)中。這很可能是因為我們使用了一個通過空字符串("")識別的默認的交換機。

重新回顧我們之前發(fā)送一個消息的時候:

$channel->basic_publish($msg, '', 'hello');

在此處,我們使用了默認的,或者說是無名的交換機:消息會被路由到routing_key指定的Queue(隊列),如果這個隊列存在的化。routing_key就是basic_publish 的第三個參數(shù)。

現(xiàn)在,讓我們發(fā)送消息到一個被命名的交換機上:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

臨時隊列

你可能記得之前我們使用的Queue(隊列)都是擁有了一個指定的名稱(還記得hellotask_queue嗎?)。在我們需要將工作程序指向?qū)?yīng)的Queue(隊列)時可以命名一個Queue(隊列)對于我們來說是十分重要的。當你想要在Producer(生產(chǎn)者)Consumer(消費者)之間共享Queue(隊列)時候為Queue(隊列)命名是十分重要的。

但是這對我們的日志記錄器來說這并不重要。我們打算監(jiān)聽所有的日志消息嗎,而不是僅僅是其中一部分。同樣我們只對當前流動的消息感興趣,而不是舊的消息。為了解決這一情況,我們需要兩樣東西。

首先,無論何時,我們連接到RabbitMQ時候都需要一個新的,并且是空的Queue(隊列)。為了達到這一目的,我們可以使用隨機的名稱來創(chuàng)建一個Queue(隊列),或者,更好的選擇是——讓RabbitMQ服務(wù)器為我們選擇一個隨機的Queue(隊列)。

其次,一旦我們的Consumer(消費者)斷開了鏈接,對應(yīng)的Queue(隊列)應(yīng)該被自動刪除。

php-amqplib客戶端中,當我們傳給Queue(隊列)名稱參數(shù)一個空字符串時候,我們能創(chuàng)建一個非持久化隊列(non-durable Queue),并反回了一個(自動)生成的隊列名稱:

list($queue_name,,) = $channel->queue_declare('');

當這個方法反回,$queue_name變量包含了一個由RabbitMQ生成的隨機名稱。例如,它看起來會像是這樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

當這個連接宣布關(guān)閉了,對應(yīng)的Queue(隊列)將會被刪除,因為它被聲明為獨有的(exclusive)。

綁定

Producer->Change->(bingding)->Queue

我們已經(jīng)創(chuàng)建了一個fanout交換機以及一個Queue(隊列)?,F(xiàn)在我們需要告訴交換機去發(fā)送消息到我們的Queue(隊列)。交換機與隊列之間的關(guān)系成為綁定(binding)。

$channel->queue_bind($queue_name, 'logs');

從現(xiàn)在起,logs交換機將會追加消息去我們的隊列。

列出綁定

如你所想,通過以下方式你可以把正在使用的綁定(bindings)。

rabbitmqctl list_bindings

將他們放在一起

通過交換機與隊列綁定實現(xiàn)”發(fā)布/訂閱“”模式

發(fā)出日志的這個Producer(生產(chǎn)者)程序與我們之前教程的不會相差太多。最重要的改變就是我們現(xiàn)在希望發(fā)送消息到我們的logs交換機而不是無名的那個。下面就是emit_log.php腳本的代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 聲明交換機
$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);

// 發(fā)送消息到交換機上(而不是指定隊列)
$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();
?>

(emit_log.php源碼)

這如你所看到的,在建立連接之后我們聲明了一個交換機。這一步十分重要因為發(fā)送消息到不存在的交換機是被禁止的,

如果已經(jīng)沒有Queue(隊列)綁定到交換機,交換機的消息將會被丟失,但對于我們來說可以接受;如果已經(jīng)沒有Consumer(消費者)監(jiān)聽該交換機的消息了,我們可以安全地刪除這些消息。

reveive_logs.php的代碼如下:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 聲明對應(yīng)的交換機
$channel->exchange_declare('logs', 'fanout', false, false, false);

// 創(chuàng)建一個非持久化的隊列并獲取自動生成的隊列名稱
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 綁定隊列到交換機
$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

(reveive_logs.php源碼)

如果你想保存這些日志到文件,只需要打開一個控制臺,并輸入:

php reveive_logs.php > logs_from_rabbit.log

如果你想在屏幕上看到這些日志,新建一個新的終端并運行:

php reveive_logs.php

當然,你還需要發(fā)送日志:

php emit_log.php

使用rabbitmqctl list_bindings你可以驗證這些代碼已經(jīng)如我們所想地創(chuàng)建了綁定與Queue(隊列)。如果是運行著兩個receive_logs.php程序,你將會看到類似下面的情況:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

這結(jié)果的解析很直接了當:數(shù)據(jù)從logs交換機發(fā)送到了兩個以服務(wù)器分配的名稱命名的Queue(隊列)上。這正式我們所期望的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容