PHP中RabbitMQ之a(chǎn)mqp擴(kuò)展實(shí)現(xiàn)

PHP中RabbitMQ之a(chǎn)mqp擴(kuò)展實(shí)現(xiàn)

參考地址:https://blog.csdn.net/yeyun666/article/details/85112742

在安裝完成后我們就可以開始我們的RabbitMQ之旅了,本Demo示例只創(chuàng)建了一個(gè)直連交換機(jī),共有四個(gè)文件Consum.php?(消費(fèi)者),Publish.php?(生產(chǎn)者) ,RabbitMqParernt.php (自己封裝的RabbitMQ的方法) ,以及test.php (測(cè)試數(shù)據(jù))

RabbitMqParernt.php如下所

<?php

abstract class RabbitMqParernt

{

? ? //rabbitMQ配置信息(默認(rèn)配置)

? ? public $config = array(

? ? ? ? 'host'=>'127.0.0.1',? //host

'port'=>5672,? ? ? ? //端口

'username'=>'guest',? //賬號(hào)

'password'=>'guest',? //密碼

'vhost'=>'/'? ? ? ? ? //虛擬主機(jī)

);

public $exchangeName = ''; //交換機(jī)

? ? public $queueName = '';? ? //隊(duì)列名

? ? public $routeKey = '';? ? //路由鍵

public $exchangeType = '';? //交換機(jī)類型

public $channel;? ? ? //信道

? ? public $connection; ? //連接

public $exchange;? ? //交換機(jī)

? ? public $queue;? ? ? ? //隊(duì)列

//初始化RabbitMQ($config數(shù)組是用來修改rabbitMQ的配置信息的)

public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = '',$config = array())

{

$this->exchangeName = $exchangeName;

? ? ? ? $this->queueName = $queueName;

? ? ? ? $this->routeKey = $routeKey;

$this->exchangeType = $exchangeType;

if(!empty($config))

{

$this->setConfig($config);

}

$this->createConnet();

}

? ? //對(duì)RabbitMQ的配置重新進(jìn)行配置

public function setConfig($config)

{

if (!is_array($config))

{

? ? ? ? ? ? throw new Exception('config不是一個(gè)數(shù)組');

? ? ? ? }

foreach($config as $key => $value)

{

$this->config[$key] = $value;

}

}

//創(chuàng)建連接與信道

public function createConnet()

{

//創(chuàng)建連接

$this->connection = new AMQPConnection($this->config);

if(!$this->connection->connect())

{

throw new Exception('RabbitMQ創(chuàng)建連接失敗');

}

//創(chuàng)建信道

$this->channel = new AMQPChannel($this->connection);

//創(chuàng)建交換機(jī)

$this->createExchange();

//生產(chǎn)時(shí)不需要隊(duì)列,故隊(duì)列名為空,只有消費(fèi)時(shí)需要隊(duì)列名

if(!empty($this->queueName))

{

$this->createQueue();

}

}

//創(chuàng)建交換機(jī)

public function createExchange()

{

$this->exchange = new AMQPExchange($this->channel);? ?

$this->exchange->setName($this->exchangeName);

? ? ? ? $this->exchange->setType(AMQP_EX_TYPE_DIRECT);?

$this->exchange->setFlags(AMQP_DURABLE);

}

//創(chuàng)建隊(duì)列,綁定交換機(jī)

public function createQueue()

{

$this->queue = new AMQPQueue($this->channel);

$this->queue->setName($this->queueName);

$this->queue->setFlags(AMQP_DURABLE);

$this->queue->bind($this->exchangeName, $this->routeKey);

}

public function dealMq($flag)

{

if($flag)

{

$this->queue->consume(function($envelope){$this->getMsg($envelope, $this->queue);},AMQP_AUTOACK);//自動(dòng)ACK應(yīng)答

}

else

{

$this->queue->consume(function($envelope){$this->processMessage($envelope, $this->queue);});

}

}

public function getMsg($envelope, $queue)

{

$msg = $envelope->getBody();

$this->doProcess($msg);

}

public function processMessage($envelope, $queue)

{

$msg = $envelope->getBody();?

? ? ? ? $this->doProcess($msg);

? ? ? ? $queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答

}

//處理消息的真正函數(shù),在消費(fèi)者里使用

abstract public function doProcess($msg);? ?

? ? //發(fā)送消息

public function sendMessage($message)

{ ?

? ? ? ? $this->exchange->publish($message, $this->routeKey);? ?

}

//關(guān)閉連接

public function closeConnect()

{

$this->channel->close();

$this->connection->disconnect();

}

}

?Consum.php 如下所示

<?php

include_once('RabbitMqParernt.php');

class Consum extends RabbitMqParernt

{

? ? public function __construct()

? ? {

? ? ? ? parent::__construct('exchange', 'queue', 'routeKey');

? ? }

? ? public function doProcess($msg)

? ? {

? ? ? ? echo $msg;

? ? }

}

$consum = new Consum();

//$consum->dealMq(false);

$consum->dealMq(true);

Publish.php如下所示

<?php

include_once('RabbitMqParernt.php');

class Publish extends RabbitMqParernt

{

? ? public function __construct()

? ? {

? ? ? ? parent::__construct('exchange', '', 'routeKey');

? ? }

? ? public function doProcess($msg)

? ? {

? ? }

}

---------------------

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容