消息隊列
此處使用window環(huán)境,Linux環(huán)境下,添加
資源URL
PHP的rabbitMQ擴展
rabbitMQ官網(wǎng)
rabbitMQ官網(wǎng)下載地址
erlang官網(wǎng)
erlang下載
概念
幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。
- Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
- Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
- Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
- Routing Key:路由關鍵字,exchange根據(jù)這個關鍵字進行消息投遞。
- vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
- producer:消息生產者,就是投遞消息的程序。
- consumer:消息消費者,就是接受消息的程序。
- channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,并設置相關屬性。
(3)客戶端聲明一個queue,并設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。
exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
exchange也有幾個類型,完全根據(jù)key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊列持久化包括3個部分:
?。?)exchange持久化,在聲明時指定durable => 1
?。?)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
安裝環(huán)境
- 先安裝erlang包,然后添加環(huán)境變量 PATH erlang安裝路徑\bin
- 然后安裝rabbitMQ,然后添加環(huán)境變量 PATH rabbitMQ安裝路徑\sbin
- 以管理員身份進入rabbitMQ安裝目錄/sbin依次執(zhí)行命令
- rabbitmq-plugins.bat enable rabbitmq_management
- rabbitmq-service.bat stop
- rabbitmq-service.bat install
- rabbitmq-service.bat start
執(zhí)行后,訪問127.0.0.1:15672 查看rabbitMQ是否安裝成功, 登錄賬號密碼是 guest
- PHP環(huán)境搭建
- 配置 php.ini extension=php_amqp.dll; 并將rabbitmq.*.dll 放到和php.ini的同級目錄下
- 重啟php-fpm
- 打印phpinfo() 查看amqp擴展
安裝END
customer.php
$conn_args = [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
];
$exchangeName= 'exchange'; #交換機名
$queueName = 'queue'; #隊列名
$keyRoute = 'keyRoute'; #路由key
//創(chuàng)建鏈接
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die('連接失敗');
}
//創(chuàng)建頻道
$channel = new AMQPChannel($conn);
//創(chuàng)建交換機
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName); #交換機名
$exchange->setType(AMQP_EX_TYPE_DIRECT); #類型
$exchange->setFlags(AMQP_DURABLE); #持久化
echo "交換機狀態(tài):".$exchange->declareExchange()."\n";
//創(chuàng)建隊列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); #持久化
echo "消息總數(shù):".$queue->declareQueue()."\n";
//綁定交換機與隊列,并指定路由鍵
$queue->bind($exchangeName, $keyRoute);
//阻塞模式接收消息
echo "Message:\n";
while(True){
$queue->consume(function($envelope, $queue){
$msg = $envelope->getBody();
echo $msg."\n"; //處理消息
$queue->ack($envelope->getDeliveryTag()); //手動發(fā)送ACK應答
});
//$queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}
$conn->disconnect();
publisher.php
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
$exchangeName = 'exchange'; //交換機名
$keyRoute = 'keyRoute'; //路由key
//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
date_default_timezone_set("Asia/Shanghai");
//創(chuàng)建交換機對象
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_DURABLE);
//發(fā)送消息
//$channel->startTransaction(); //開始事務
for ($i = 0; $i < 5; ++$i) {
sleep(1);//休眠1秒
//消息內容
$message = "測試數(shù)據(jù)!" . date("H:i:s");
echo "Send Message:" . $exchange->publish($message, $keyRoute) . "\n";
}
//$channel->commitTransaction(); //提交事務
$conn->disconnect();
命令行A下先執(zhí)行 php customer.php,然后新cmd下執(zhí)行 php publisher.php
命令行A將有輸出