php 消息隊(duì)列rabbitMQ

一、安裝

使用docker安裝rabbitMQ
docker pull rabbitmq:3.7.7-management

docker images

docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 鏡像id

打開瀏覽器訪問 賬號(hào): admin 密碼: admin
http://127.0.0.1:15672
windows安裝擴(kuò)展,根據(jù)自己的php選擇下載
https://pecl.php.net/package/amqp/
php_amqp.dll 文件放到php目錄的ext文件夾下
php.ini里面添加
extension=php_amqp.dll
rabbitmq.4.dll 放到php根目錄下
重啟查看phpinfo 是否成功顯示amqp

二、使用

新建文件消費(fèi)者 consumer.php
vhost 查看
image.png
<?php

//聲明連接參數(shù)
$config = array(
    'host' => '127.0.0.1',
    'vhost' => 'my_vhost', 
    'port' => 5672,
    'login' => 'admin',
    'password' => 'admin'
);

//連接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}

//在連接內(nèi)創(chuàng)建一個(gè)通道
$ch = new AMQPChannel($cnn);

//創(chuàng)建一個(gè)交換機(jī)
$ex = new AMQPExchange($ch);

//聲明路由鍵
$routingKey = 'key_1';

//聲明交換機(jī)名稱
$exchangeName = 'exchange_1';

//聲明隊(duì)列名稱
$queueName = 'queue_1';

//設(shè)置交換機(jī)名稱
$ex->setName($exchangeName);

//設(shè)置交換機(jī)類型
//AMQP_EX_TYPE_DIRECT:直連交換機(jī)
//AMQP_EX_TYPE_FANOUT:扇形交換機(jī)
//AMQP_EX_TYPE_HEADERS:頭交換機(jī)
//AMQP_EX_TYPE_TOPIC:主題交換機(jī)
$ex->setType(AMQP_EX_TYPE_DIRECT);

//設(shè)置交換機(jī)持久
$ex->setFlags(AMQP_DURABLE);

//聲明交換機(jī)
$ex->declareExchange();

//創(chuàng)建一個(gè)消息隊(duì)列
$q = new AMQPQueue($ch);

//設(shè)置隊(duì)列名稱
$q->setName($queueName);

//設(shè)置隊(duì)列持久
$q->setFlags(AMQP_DURABLE);

//聲明消息隊(duì)列
$q->declareQueue();

//交換機(jī)和隊(duì)列通過$routingKey進(jìn)行綁定
$q->bind($ex->getName(), $routingKey);

//接收消息并進(jìn)行處理的回調(diào)方法
function receive($envelope, $queue)
{
    //休眠兩秒,
    sleep(2);
    //echo消息內(nèi)容
    echo $envelope->getBody() . "\n";
    //顯式確認(rèn),隊(duì)列收到消費(fèi)者顯式確認(rèn)后,會(huì)刪除該消息
    $queue->ack($envelope->getDeliveryTag());
}

//設(shè)置消息隊(duì)列消費(fèi)者回調(diào)方法,并進(jìn)行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隱式確認(rèn),不推薦
新建文件生產(chǎn)者 send.php
<?php

$config = array(
    'host' => 'localhost',
    'vhost' => 'my_vhost',
    'port' => 5672,
    'login' => 'admin',
    'password' => 'admin'
);

$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}

$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);

//消息的路由鍵,一定要和消費(fèi)者端一致
$routingKey = 'key_1';

//交換機(jī)名稱,一定要和消費(fèi)者端一致,
$exchangeName = 'exchange_1';

$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();

//創(chuàng)建10個(gè)消息
for ($i = 1; $i <= 10; $i++) {

    //消息內(nèi)容
    $msg = array(
        'data'  => 'message_' . $i,
        'hello' => 'world',
    );

    //發(fā)送消息到交換機(jī),并返回發(fā)送結(jié)果
    //delivery_mode:2聲明消息持久,持久的隊(duì)列+持久的消息在RabbitMQ重啟后才不會(huì)丟失
    echo "<pre>";
    print_r("Send Message:" . $ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2)) . "\n");

}

服務(wù)器上命令模式下執(zhí)行 consumer.php 消費(fèi)測(cè)試
php consumer.php
推薦使用supervisor守護(hù)進(jìn)程工具執(zhí)行 consumer.php
客戶端訪問 send.php 生產(chǎn)

http://localhost/rabbitmq/send.php


php代碼可以使用 php-amqplib 庫

https://github.com/php-amqplib/php-amqplib

新建composer.json
{
    "require": {
        "php-amqplib/php-amqplib":"2.8.*"
    }
}
composer安裝
composer install 
send.php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin','my_vhost');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

consumer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'my_vhost');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容