一、安裝
使用docker安裝rabbitMQ
docker pull rabbitmq:3.7.7-management
docker images
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 鏡像id
打開瀏覽器訪問 賬號(hào): admin 密碼: admin
http://127.0.0.1:15672
windows安裝擴(kuò)展,根據(jù)自己的php選擇下載
https://pecl.php.net/package/amqp/
php_amqp.dll 文件放到php目錄的ext文件夾下
php.ini里面添加
extension=php_amqp.dll
rabbitmq.4.dll 放到php根目錄下
重啟查看phpinfo 是否成功顯示amqp

二、使用
新建文件消費(fèi)者 consumer.php
vhost 查看

image.png
<?php
//聲明連接參數(shù)
$config = array(
'host' => '127.0.0.1',
'vhost' => 'my_vhost',
'port' => 5672,
'login' => 'admin',
'password' => 'admin'
);
//連接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
//在連接內(nèi)創(chuàng)建一個(gè)通道
$ch = new AMQPChannel($cnn);
//創(chuàng)建一個(gè)交換機(jī)
$ex = new AMQPExchange($ch);
//聲明路由鍵
$routingKey = 'key_1';
//聲明交換機(jī)名稱
$exchangeName = 'exchange_1';
//聲明隊(duì)列名稱
$queueName = 'queue_1';
//設(shè)置交換機(jī)名稱
$ex->setName($exchangeName);
//設(shè)置交換機(jī)類型
//AMQP_EX_TYPE_DIRECT:直連交換機(jī)
//AMQP_EX_TYPE_FANOUT:扇形交換機(jī)
//AMQP_EX_TYPE_HEADERS:頭交換機(jī)
//AMQP_EX_TYPE_TOPIC:主題交換機(jī)
$ex->setType(AMQP_EX_TYPE_DIRECT);
//設(shè)置交換機(jī)持久
$ex->setFlags(AMQP_DURABLE);
//聲明交換機(jī)
$ex->declareExchange();
//創(chuàng)建一個(gè)消息隊(duì)列
$q = new AMQPQueue($ch);
//設(shè)置隊(duì)列名稱
$q->setName($queueName);
//設(shè)置隊(duì)列持久
$q->setFlags(AMQP_DURABLE);
//聲明消息隊(duì)列
$q->declareQueue();
//交換機(jī)和隊(duì)列通過$routingKey進(jìn)行綁定
$q->bind($ex->getName(), $routingKey);
//接收消息并進(jìn)行處理的回調(diào)方法
function receive($envelope, $queue)
{
//休眠兩秒,
sleep(2);
//echo消息內(nèi)容
echo $envelope->getBody() . "\n";
//顯式確認(rèn),隊(duì)列收到消費(fèi)者顯式確認(rèn)后,會(huì)刪除該消息
$queue->ack($envelope->getDeliveryTag());
}
//設(shè)置消息隊(duì)列消費(fèi)者回調(diào)方法,并進(jìn)行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隱式確認(rèn),不推薦
新建文件生產(chǎn)者 send.php
<?php
$config = array(
'host' => 'localhost',
'vhost' => 'my_vhost',
'port' => 5672,
'login' => 'admin',
'password' => 'admin'
);
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
//消息的路由鍵,一定要和消費(fèi)者端一致
$routingKey = 'key_1';
//交換機(jī)名稱,一定要和消費(fèi)者端一致,
$exchangeName = 'exchange_1';
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
//創(chuàng)建10個(gè)消息
for ($i = 1; $i <= 10; $i++) {
//消息內(nèi)容
$msg = array(
'data' => 'message_' . $i,
'hello' => 'world',
);
//發(fā)送消息到交換機(jī),并返回發(fā)送結(jié)果
//delivery_mode:2聲明消息持久,持久的隊(duì)列+持久的消息在RabbitMQ重啟后才不會(huì)丟失
echo "<pre>";
print_r("Send Message:" . $ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2)) . "\n");
}
服務(wù)器上命令模式下執(zhí)行 consumer.php 消費(fèi)測(cè)試
php consumer.php
推薦使用supervisor守護(hù)進(jìn)程工具執(zhí)行 consumer.php
客戶端訪問 send.php 生產(chǎn)
http://localhost/rabbitmq/send.php
php代碼可以使用 php-amqplib 庫
https://github.com/php-amqplib/php-amqplib
新建composer.json
{
"require": {
"php-amqplib/php-amqplib":"2.8.*"
}
}
composer安裝
composer install
send.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin','my_vhost');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
consumer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'my_vhost');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();