安裝rabbitmq和php擴(kuò)展

一、安裝rabbitmq服務(wù)端

1、安裝erlang

試了幾次erlang官方的包安裝后,在安裝rabbitmq時(shí)都報(bào)錯(cuò)要依賴R14B02,干脆使用rabbitmq官方提供的包:https://github.com/rabbitmq/erlang-rpm 點(diǎn)擊下載,終于不報(bào)錯(cuò)了。

rabbitmq官方提供安裝包的步驟:
1、下載后進(jìn)入安裝包,直接輸入命令:make
2、安裝過(guò)程需要3分鐘左右,編譯過(guò)程中需要用到rpm-build和autoconf,沒(méi)有安裝的需要安裝。安裝完成后,會(huì)在目錄RPMS/x86_64/下找到erlang的安裝包,rpm -ivh *** 安裝即可。

2、安裝rabbitmq

我直接使用rabbitmq 的rpm安裝包。如果這時(shí)還報(bào)錯(cuò)socat找不到,下載socat 的安裝包安裝即可。
注意:如果使用yum安裝socat,需安裝epel。

3、測(cè)試rabbitmq是否安裝成功
service rabbitmq-server start #開啟rabbitmq
service rabbitmq-server status #查看rabbitmq狀態(tài)

二、安裝rabbitmq的php擴(kuò)展

1、安裝擴(kuò)展依賴庫(kù)#####

注意:擴(kuò)展是C寫的,由于C與RabbitMQ通信一般需要依賴rabbitmq-c庫(kù)(也就是librabbitmq),所以編譯擴(kuò)展前需要先裝依賴庫(kù)。不同版本的擴(kuò)展,對(duì)php版本和librabbitmq兼容性不一樣。
rabbitmq-c依賴庫(kù)下載: https://github.com/alanxz/rabbitmq-c/archive/v0.8.0.tar.gz

mkdir build && cd build # 這一步是在rabbitmq-c的根目錄下創(chuàng)建一個(gè)build子目錄
cmake -DCMAKE_INSTALL_PREFIX=/usr/local/librabbitmq ..  # 這一步是讓cmake根據(jù)../CMakeList.txt,即rabbitmq-c的根目錄下的CMakeList.txt創(chuàng)建Makefile文件,Makefile文件會(huì)被創(chuàng)建到build目錄中
cmake --build . # 這一步是真正的build rabbitmq-c庫(kù)的,注意,不要漏掉結(jié)尾的點(diǎn) '.'
make
make install

2、安裝amqp擴(kuò)展

amqp擴(kuò)展下載: http://pecl.php.net/get/amqp-1.9.0.tgz

tar zvxf amqp-1.9.0.tgz #解壓
cd amqp-1.9.9 #打開目錄

./configure --with-php-config=/usr/local/php56/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/librabbitmq

make
make install

修改php.ini

extension=amqp.so #extension_dir自己定義

重啟php-fpm

service php-fpm restart #重啟

如無(wú)意外,則可以在phpinfo頁(yè)面看到如下所示:

圖一

三、監(jiān)控及代碼測(cè)試

啟動(dòng)rabbitmq監(jiān)控頁(yè)面

rabbitmq-plugins enable rabbitmq_management #啟動(dòng)監(jiān)控頁(yè)面插件

生產(chǎn)者代碼 (創(chuàng)建一個(gè)rabbit_publisher.php的文件)
創(chuàng)建連接-->創(chuàng)建channel-->創(chuàng)建交換機(jī)對(duì)象-->發(fā)送消息

<?php 
//配置信息
$conn_args = array(
    'host' => '192.168.1.93',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機(jī)名
//$q_name = 'q_linvo'; //無(wú)需隊(duì)列名
$k_route = 'key_1'; //路由key

//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//消息內(nèi)容
$message = "TEST MESSAGE! 測(cè)試消息!";

//創(chuàng)建交換機(jī)對(duì)象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);

//發(fā)送消息
//$channel->startTransaction(); //開始事務(wù)
for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事務(wù)

$conn->disconnect();

消費(fèi)者代碼(創(chuàng)建一個(gè)rabbit_consumer.php的文件)
創(chuàng)建連接-->創(chuàng)建channel-->創(chuàng)建交換機(jī)-->創(chuàng)建隊(duì)列-->綁定交換機(jī)/隊(duì)列/路由鍵-->接收消息

<?php 
//配置信息
$conn_args = array(
    'host' => '192.168.1.93',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機(jī)名
$q_name = 'q_linvo'; //隊(duì)列名
$k_route = 'key_1'; //路由key

//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//創(chuàng)建交換機(jī)
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";

//創(chuàng)建隊(duì)列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";

//綁定交換機(jī)與隊(duì)列,并指定路由鍵
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收消息
echo "Message:\n";
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答
}
$conn->disconnect();

/**
* 消費(fèi)回調(diào)函數(shù)
* 處理消息
*/
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理消息
    $queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答
}

/**
 * 消費(fèi)回調(diào)函數(shù)
 * 處理消息
 */
function processMessage($envelope, $queue) {
    var_dump($envelope->getRoutingKey);
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理消息
}

需要注意的地方是:
queue對(duì)象有兩個(gè)方法可用于取消息:consume和get。前者是阻塞的,無(wú)消息時(shí)會(huì)被掛起,適合循環(huán)中使用;
后者則是非阻塞的,取消息時(shí)有則取,無(wú)則返回false。
測(cè)試截圖
運(yùn)行消費(fèi)者:


運(yùn)行生產(chǎn)者,發(fā)消息:


消費(fèi)者接收到消息:


執(zhí)行兩個(gè)文件,再打開RabbitMQ的管理中心 http://127.0.0.1:15672/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容