ThinkPHP的AMQP庫(支持RabbitMQ)

1.安裝依賴庫

composer require php-amqplib/php-amqplib
地址:https://github.com/php-amqplib/php-amqplib

2. mq生產(chǎn)者.php
include(__DIR__ . '../../public/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
 * Created by PhpStorm.
 * User: pandeng
 * Date: 2017-07-26
 * Time: 21:51
 */
class MessageQueue
{
    const exchange = 'router';
    const queue = 'msgs';
    public static  function pushMessage($data)
    {
        $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
        $channel = $connection->channel();

        $channel->queue_declare(self::queue, false, true, false, false);
        $channel->exchange_declare(self::exchange, 'direct', false, true, false);
        $channel->queue_bind(self::queue, self::exchange);
        $messageBody = $data;
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message, self::exchange);
        $channel->close();
        $connection->close();
        return "ok";
    }
}
3.消費(fèi)者.php


namespace app\index\controller;
include(__DIR__ . '../../../../public/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;

use think\Controller;
use think\Log;
use think\Request;
use think\Db;

class MessageConsume extends Controller
{
    const exchange = 'router';
    const queue = 'msgs';
    const consumerTag = 'consumer';

    function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
        write_log("closed",3);
    }

    function process_message($message)
    {
        if ($message->body !== 'quit') {
            $obj = json_decode($message->body);
            if (!isset($obj->id)) {
                echo 'error data\n';
                write_log("error data:" . $message->body, 2);
            } else {
                try {
                    write_log("data:" . json_encode($message));
                } catch (\Think\Exception  $e) {
                    write_log($e->getMessage(), 2);
                    write_log(json_encode($message), 2);
                } catch (\PDOException $pe) {
                    write_log($pe->getMessage(), 2);
                    write_log(json_encode($message), 2);
                }
            }
        }
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }

    /**
     * 啟動
     *
     * @return \think\Response
     */
    public function start()
    {
        $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
        $channel = $connection->channel();
        $channel->queue_declare(self::queue, false, true, false, false);
        $channel->exchange_declare(self::exchange, 'direct', false, true, false);
        $channel->queue_bind(self::queue, self::exchange);
        $channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, 'process_message'));

        register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        write_log("starting",3);
    }

}
4. 啟動消費(fèi)者(守護(hù)進(jìn)程)

nohup php index.php index/Message_Consume/start &

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容