PHP中RabbitMQ之a(chǎn)mqp擴(kuò)展實(shí)現(xiàn)
參考地址:https://blog.csdn.net/yeyun666/article/details/85112742
在安裝完成后我們就可以開始我們的RabbitMQ之旅了,本Demo示例只創(chuàng)建了一個(gè)直連交換機(jī),共有四個(gè)文件Consum.php?(消費(fèi)者),Publish.php?(生產(chǎn)者) ,RabbitMqParernt.php (自己封裝的RabbitMQ的方法) ,以及test.php (測(cè)試數(shù)據(jù))
RabbitMqParernt.php如下所
<?php
abstract class RabbitMqParernt
{
? ? //rabbitMQ配置信息(默認(rèn)配置)
? ? public $config = array(
? ? ? ? 'host'=>'127.0.0.1',? //host
'port'=>5672,? ? ? ? //端口
'username'=>'guest',? //賬號(hào)
'password'=>'guest',? //密碼
'vhost'=>'/'? ? ? ? ? //虛擬主機(jī)
);
public $exchangeName = ''; //交換機(jī)
? ? public $queueName = '';? ? //隊(duì)列名
? ? public $routeKey = '';? ? //路由鍵
public $exchangeType = '';? //交換機(jī)類型
public $channel;? ? ? //信道
? ? public $connection; ? //連接
public $exchange;? ? //交換機(jī)
? ? public $queue;? ? ? ? //隊(duì)列
//初始化RabbitMQ($config數(shù)組是用來修改rabbitMQ的配置信息的)
public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = '',$config = array())
{
$this->exchangeName = $exchangeName;
? ? ? ? $this->queueName = $queueName;
? ? ? ? $this->routeKey = $routeKey;
$this->exchangeType = $exchangeType;
if(!empty($config))
{
$this->setConfig($config);
}
$this->createConnet();
}
? ? //對(duì)RabbitMQ的配置重新進(jìn)行配置
public function setConfig($config)
{
if (!is_array($config))
{
? ? ? ? ? ? throw new Exception('config不是一個(gè)數(shù)組');
? ? ? ? }
foreach($config as $key => $value)
{
$this->config[$key] = $value;
}
}
//創(chuàng)建連接與信道
public function createConnet()
{
//創(chuàng)建連接
$this->connection = new AMQPConnection($this->config);
if(!$this->connection->connect())
{
throw new Exception('RabbitMQ創(chuàng)建連接失敗');
}
//創(chuàng)建信道
$this->channel = new AMQPChannel($this->connection);
//創(chuàng)建交換機(jī)
$this->createExchange();
//生產(chǎn)時(shí)不需要隊(duì)列,故隊(duì)列名為空,只有消費(fèi)時(shí)需要隊(duì)列名
if(!empty($this->queueName))
{
$this->createQueue();
}
}
//創(chuàng)建交換機(jī)
public function createExchange()
{
$this->exchange = new AMQPExchange($this->channel);? ?
$this->exchange->setName($this->exchangeName);
? ? ? ? $this->exchange->setType(AMQP_EX_TYPE_DIRECT);?
$this->exchange->setFlags(AMQP_DURABLE);
}
//創(chuàng)建隊(duì)列,綁定交換機(jī)
public function createQueue()
{
$this->queue = new AMQPQueue($this->channel);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE);
$this->queue->bind($this->exchangeName, $this->routeKey);
}
public function dealMq($flag)
{
if($flag)
{
$this->queue->consume(function($envelope){$this->getMsg($envelope, $this->queue);},AMQP_AUTOACK);//自動(dòng)ACK應(yīng)答
}
else
{
$this->queue->consume(function($envelope){$this->processMessage($envelope, $this->queue);});
}
}
public function getMsg($envelope, $queue)
{
$msg = $envelope->getBody();
$this->doProcess($msg);
}
public function processMessage($envelope, $queue)
{
$msg = $envelope->getBody();?
? ? ? ? $this->doProcess($msg);
? ? ? ? $queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答
}
//處理消息的真正函數(shù),在消費(fèi)者里使用
abstract public function doProcess($msg);? ?
? ? //發(fā)送消息
public function sendMessage($message)
{ ?
? ? ? ? $this->exchange->publish($message, $this->routeKey);? ?
}
//關(guān)閉連接
public function closeConnect()
{
$this->channel->close();
$this->connection->disconnect();
}
}
?Consum.php 如下所示
<?php
include_once('RabbitMqParernt.php');
class Consum extends RabbitMqParernt
{
? ? public function __construct()
? ? {
? ? ? ? parent::__construct('exchange', 'queue', 'routeKey');
? ? }
? ? public function doProcess($msg)
? ? {
? ? ? ? echo $msg;
? ? }
}
$consum = new Consum();
//$consum->dealMq(false);
$consum->dealMq(true);
Publish.php如下所示
<?php
include_once('RabbitMqParernt.php');
class Publish extends RabbitMqParernt
{
? ? public function __construct()
? ? {
? ? ? ? parent::__construct('exchange', '', 'routeKey');
? ? }
? ? public function doProcess($msg)
? ? {
? ? }
}
---------------------