注意:僅僅記錄學(xué)習(xí),不能直接運(yùn)行,有任何問題請留言。
1. 安裝RabbitMQ
拉取鏡像
docker pull rabbitmq:3.7.7-management
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
http://ip:15672

image.png
2. composer安裝php-amqplib
composer require php-amqplib/php-amqplib
3.Tp5 實(shí)現(xiàn)
再次封裝php-amqplib
<?php
/**
* User: yuzhao
* Description: RabbitMq 工具
*/
namespace app\common\tool;
use app\common\config\SelfConfig;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQTool {
/**
* User: yuzhao
* @var
* Description:
*/
private $channel;
private $mqConf;
/**
* RabbitMQTool constructor.
* @param $mqName
*/
public function __construct($mqName)
{
// 獲取rabbitmq所有配置
$rabbitMqConf = SelfConfig::getConfig('Source.rabbit_mq');
if (!isset($rabbitMqConf['rabbit_mq_queue'])) {
die('沒有定義Source.rabbit_mq');
}
//建立生產(chǎn)者與mq之間的連接
$this->conn = new AMQPStreamConnection(
$rabbitMqConf['host'], $rabbitMqConf['port'], $rabbitMqConf['user'], $rabbitMqConf['pwd'], $rabbitMqConf['vhost']
);
$channal = $this->conn->channel();
if (!isset($rabbitMqConf['rabbit_mq_queue'][$mqName])) {
die('沒有定義'.$mqName);
}
// 獲取具體mq信息
$mqConf = $rabbitMqConf['rabbit_mq_queue'][$mqName];
$this->mqConf = $mqConf;
// 聲明初始化交換機(jī)
$channal->exchange_declare($mqConf['exchange_name'], 'direct', false, true, false);
// 聲明初始化一條隊(duì)列
$channal->queue_declare($mqConf['queue_name'], false, true, false, false);
// 交換機(jī)隊(duì)列綁定
$channal->queue_bind($mqConf['queue_name'], $mqConf['exchange_name']);
$this->channel = $channal;
}
/**
* User: yuzhao
* @param $mqName
* @return RabbitMQTool
* Description: 返回當(dāng)前實(shí)例
*/
public static function instance($mqName) {
return new RabbitMQTool($mqName);
}
/**
* User: yuzhao
* @param $data
* Description: 寫mq
* @return bool
*/
public function wMq($data) {
try {
$data = json_encode($data, JSON_UNESCAPED_UNICODE);
$msg = new AMQPMessage($data, ['content_type' => 'text/plain', 'delivery_mode' => 2]);
$this->channel->basic_publish($msg, $this->mqConf['exchange_name']);
} catch (\Throwable $e) {
$this->closeConn();
return false;
}
$this->closeConn();
return true;
}
/**
* User: yuzhao
* @param int $num
* @return array
* Description:
* @throws \ErrorException
*/
public function rMq($num=1) {
$rData = [];
$callBack = function ($msg) use (&$rData){
$rData[] = json_decode($msg->body, true);
};
for ($i=0;$i<$num;$i++) {
$this->channel->basic_consume($this->mqConf['queue_name'], '', false, true, false, false, $callBack);
}
$this->channel->wait();
$this->closeConn();
return $rData;
}
/**
* User: yuzhao
* Description: 關(guān)閉連接
*/
public function closeConn() {
$this->channel->close();
$this->conn->close();
}
}
入隊(duì)列
<?php
/**
* User: yuzhao
* Description:
*/
namespace app\test\controller;
use app\common\tool\RabbitMQTool;
use think\Controller;
class TestController extends Controller {
public function test() {
RabbitMQTool::instance('test')->wMq(['name'=>'123']);
}
}
啟動消費(fèi)隊(duì)列
<?php
/**
* User: yuzhao
* Description: 啟動MQ,php xxx/public/index.php /daemon/start_Mq/main 隊(duì)列別名 進(jìn)程數(shù) -d(守護(hù)進(jìn)程) | -s (殺死進(jìn)程)
*/
namespace app\daemon\controller;
use app\common\config\SelfConfig;
use app\common\tool\RabbitMQTool;
class StartMqController {
private $dealPath = null;
private $childsPid = array();
/**
* StartRabbitMQ constructor.
*/
public function __construct()
{
// 腳本路徑
$this->dealPath = str_replace('/','\\',"/app/daemon/deal/");
}
/**
* User: yuzhao
* Description: 返回當(dāng)前實(shí)例
*/
public static function instance() {
return new StartMqController();
}
/**
* User: yuzhao
* Description: 主要處理流程
* @throws \ErrorException
*/
public function main() {
global $argv;
// 擴(kuò)展參數(shù)
if (isset($argv[3])) {
switch ($argv[3]) {
case '-d': // 守護(hù)進(jìn)程啟動
$this->daemonStart();
break;
case '-s': // 殺死進(jìn)程
$this->killEasyExport($argv[2]);die();
break;
}
}
// 判斷參數(shù)
if (count($argv) < 2) {
die('缺少參數(shù)');
}
// 獲取配置信息
$rabbitMqConf = SelfConfig::getConfig('Source.rabbit_mq');
if (!isset( $rabbitMqConf['rabbit_mq_queue'][$argv[2]])) {
die('沒有配置:'.$argv[2]);
}
// 獲取mq配置
$mqConf = $rabbitMqConf['rabbit_mq_queue'][$argv[2]];
// 實(shí)例化處理腳本
$dealClass = $this->dealPath.$mqConf['consumer'];
$dealObj = new $dealClass;
$processNum = 1;
if (isset($mqConf['process_num']) || !is_numeric($mqConf['process_num']) || $mqConf['process_num'] < 1 || $mqConf['process_num'] >10 ) {
$processNum = $mqConf['process_num'];
}
if (!isset($mqConf['deal_num']) || !is_numeric($mqConf['deal_num'])) {
die('處理?xiàng)l數(shù)設(shè)置有誤');
}
// fork進(jìn)程
for ($i=0; $i<$processNum; $i++) {
$pid = pcntl_fork();
if( $pid < 0 ){
exit();
} else if( 0 == $pid ) {
$this->downMqData($dealObj, $argv, $mqConf);
exit();
} else if( $pid > 0 ) {
$this->childsPid[] = $pid;
}
}
while( true ){
sleep(1);
}
}
/**
* User: yuzhao
* @param $dealObj
* @param $argv
* @param $mqConf
* @throws \ErrorException
* Description:
*/
private function downMqData($dealObj, $argv, $mqConf) {
while (true) {
// 下載數(shù)據(jù)
$mqData = RabbitMQTool::instance($argv[2])->rMq($mqConf['deal_num']);
$dealObj->deal($mqData);
sleep(1);
}
}
private function killEasyExport($startFile) {
exec("ps aux | grep $startFile | grep -v grep | awk '{print $2}'", $info);
if (count($info) <= 1) {
echo "not run\n";
} else {
echo "[$startFile] stop success";
exec("ps aux | grep $startFile | grep -v grep | awk '{print $2}' |xargs kill -SIGINT", $info);
}
}
/**
* User: yuzhao
* Description: 守護(hù)進(jìn)程模式啟動
*/
private function daemonStart() {
// 守護(hù)進(jìn)程需要pcntl擴(kuò)展支持
if (!function_exists('pcntl_fork'))
{
exit('Daemonize needs pcntl, the pcntl extension was not found');
}
umask( 0 );
$pid = pcntl_fork();
if( $pid < 0 ){
exit('fork error.');
} else if( $pid > 0 ) {
exit();
}
if( !posix_setsid() ){
exit('setsid error.');
}
$pid = pcntl_fork();
if( $pid < 0 ){
exit('fork error');
} else if( $pid > 0 ) {
// 主進(jìn)程退出
exit;
}
// 子進(jìn)程繼續(xù),實(shí)現(xiàn)daemon化
}
}
自定義配置文件
<?php
/**
* User: yuzhao
* Description:
*/
return [
'rabbit_mq' => [
'host' => ip,
'port' => 5672,
'user' => 'root',
'pwd' => 'xxx',
'vhost' => 'my_vhost',
'rabbit_mq_queue' => [
'test' => [
'exchange_name' => 'ex_test', // 交換機(jī)名稱
'queue_name' => 'que_test', // 隊(duì)列名稱
'process_num' => 3, // 默認(rèn)單臺機(jī)器的進(jìn)程數(shù)量
'deal_num' => '50', // 單次處理數(shù)量
'consumer' => 'DealTest' // 消費(fèi)地址
]
]
]
];
4. 學(xué)習(xí)地址
https://www.cnblogs.com/yufeng218/p/9452621.html
https://blog.csdn.net/demon3182/article/details/77335206
https://blog.csdn.net/u010472499/article/details/78366614
https://segmentfault.com/a/1190000012308675