PHP初次使用rabbitMQ

消息隊列

此處使用window環(huán)境,Linux環(huán)境下,添加

rabbitMQ是什么 傳送門在此

資源URL

PHP的rabbitMQ擴展
rabbitMQ官網(wǎng)
rabbitMQ官網(wǎng)下載地址
erlang官網(wǎng)
erlang下載


概念

幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。

  • Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
  • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  • Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
  • Routing Key:路由關鍵字,exchange根據(jù)這個關鍵字進行消息投遞。
  • vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
  • producer:消息生產者,就是投遞消息的程序。
  • consumer:消息消費者,就是接受消息的程序。
  • channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
    消息隊列的使用過程大概如下:
    (1)客戶端連接到消息隊列服務器,打開一個channel。
    (2)客戶端聲明一個exchange,并設置相關屬性。
    (3)客戶端聲明一個queue,并設置相關屬性。
    (4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
    (5)客戶端投遞消息到exchange。
    exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
    exchange也有幾個類型,完全根據(jù)key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
    RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊列持久化包括3個部分:
     ?。?)exchange持久化,在聲明時指定durable => 1
     ?。?)queue持久化,在聲明時指定durable => 1
      (3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
    如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

安裝環(huán)境

  1. 先安裝erlang包,然后添加環(huán)境變量 PATH erlang安裝路徑\bin
  2. 然后安裝rabbitMQ,然后添加環(huán)境變量 PATH rabbitMQ安裝路徑\sbin
  3. 以管理員身份進入rabbitMQ安裝目錄/sbin依次執(zhí)行命令
  • rabbitmq-plugins.bat enable rabbitmq_management
  • rabbitmq-service.bat stop
  • rabbitmq-service.bat install
  • rabbitmq-service.bat start

執(zhí)行后,訪問127.0.0.1:15672 查看rabbitMQ是否安裝成功, 登錄賬號密碼是 guest

  1. PHP環(huán)境搭建
  • 配置 php.ini extension=php_amqp.dll; 并將rabbitmq.*.dll 放到和php.ini的同級目錄下
  • 重啟php-fpm
  • 打印phpinfo() 查看amqp擴展
    安裝END

customer.php

$conn_args = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/',
];

$exchangeName= 'exchange'; #交換機名
$queueName = 'queue'; #隊列名
$keyRoute = 'keyRoute'; #路由key
//創(chuàng)建鏈接
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die('連接失敗');
}
//創(chuàng)建頻道
$channel = new AMQPChannel($conn);
//創(chuàng)建交換機
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);  #交換機名
$exchange->setType(AMQP_EX_TYPE_DIRECT); #類型
$exchange->setFlags(AMQP_DURABLE); #持久化

echo "交換機狀態(tài):".$exchange->declareExchange()."\n";

//創(chuàng)建隊列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); #持久化
echo "消息總數(shù):".$queue->declareQueue()."\n";
//綁定交換機與隊列,并指定路由鍵
$queue->bind($exchangeName, $keyRoute);
//阻塞模式接收消息
echo "Message:\n";
while(True){
    $queue->consume(function($envelope, $queue){
        $msg = $envelope->getBody();
        echo $msg."\n"; //處理消息
        $queue->ack($envelope->getDeliveryTag()); //手動發(fā)送ACK應答
    });
    //$queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}
$conn->disconnect();

publisher.php

$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);
$exchangeName = 'exchange'; //交換機名
$keyRoute = 'keyRoute'; //路由key

//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
date_default_timezone_set("Asia/Shanghai");

//創(chuàng)建交換機對象
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_DURABLE);
//發(fā)送消息
//$channel->startTransaction(); //開始事務
for ($i = 0; $i < 5; ++$i) {
    sleep(1);//休眠1秒
    //消息內容
    $message = "測試數(shù)據(jù)!" . date("H:i:s");
    echo "Send Message:" . $exchange->publish($message, $keyRoute) . "\n";
}
//$channel->commitTransaction(); //提交事務

$conn->disconnect();

命令行A下先執(zhí)行 php customer.php,然后新cmd下執(zhí)行 php publisher.php
命令行A將有輸出

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容