RabbitMQ延時(shí)隊(duì)列實(shí)現(xiàn)(PHP)

安裝環(huán)境

Mac環(huán)境安裝Docker,使用Docker安裝RabbitMQ

本教程中使用php-amqplib,并且使用Composer依賴管理:php-amqplib
項(xiàng)目中添加一個(gè) composer.json文件:

 {
     "require": {
      "php-amqplib/php-amqplib": "2.6.1"
     }
 }

Rabbit官方文檔參考

AMQP 協(xié)議
RabbitMQ-PHP版本

Docker安裝RabbitMQ

// 搜索RabbitMQ鏡像
 docker search rabbitmq

// 獲取RabbitMQ鏡像
 docker pull rabbitmq

// 運(yùn)行RabbitMQ鏡像 | 5672 15672 為開(kāi)放端口 | -v 掛載目錄方便代碼修改自動(dòng)上傳 | -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123  設(shè)置后臺(tái)登錄賬號(hào)密碼
docker run -d -it --name my-rabbit -p 5672:5672 -p 15672:15672 -v /mnt:/home -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 rabbitmq

啟動(dòng)訪問(wèn) http://127.0.0.1:15672 地址訪問(wèn)RabbitMQ管理后臺(tái),然后發(fā)現(xiàn)嘎~打不開(kāi)??!需要執(zhí)行以下命令

// 查看Docker中RabbitMQ容器
 docker ps -l

// 進(jìn)入Docker中RabbitMQ容器 | 容器ID為 docker ps -l 結(jié)果中`CONTAINER ID`列
 docker exec -it 容器ID /bin/bash

// 打開(kāi)RabbitMQ后臺(tái)管理訪問(wèn)設(shè)置
 rabbitmqctl start_app
 rabbitmq-plugins enable rabbitmq_management
 rabbitmqctl stop

成功訪問(wèn)!?。?!


image

image

RabbitMQ安裝RabbitMQ延遲隊(duì)列插件

RabbitMQ官方提供了延遲隊(duì)列插件rabbitmq_delayed_message_exchange-3.9.0.ez, 下載前請(qǐng)確認(rèn)自己的RabbitMQ版本,下載對(duì)應(yīng)版本的插件,選擇ez格式文件

image.png

將插件放入宿主機(jī)與Docker的掛載目錄中,最終目的就是將插件放入RabbitMQ鏡像的/plugins目錄中


// 回顧Docker啟動(dòng)命令,Docker中宿主機(jī)/mnt目錄與RabbitMQ鏡像的/home掛載
 docker run -d -it --name my-rabbit -p 5672:5672 -p 15672:15672 -v /mnt:/home -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 rabbitmq

// 將下載的插件移動(dòng)到/mnt目錄中
 mv rabbitmq_delayed_message_exchange-3.9.0.ez /mnt

// 把rabbitmq_delayed_message_exchange-3.9.0.ez移動(dòng)至/plugins中
 docker exec -it 容器ID /bin/bash
 cd /nmt
 mv rabbitmq_delayed_message_exchange-3.9.0.ez/plugins
 
// 執(zhí)行啟用插件命令
 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 exit 
 docker restart 容器ID

重新登錄管理后臺(tái) ,出現(xiàn)如圖所示,即插件安裝成功

image.png

PHP項(xiàng)目代碼

RabbitMQ類

class RabbitMQController {
    private $host = "127.0.0.1";
    private $port = 5672;
    private $user = "root";
    private $psss = 123;

    private $msg;
    private $channel;
    private $connection;

    //  過(guò)期時(shí)間
    const TIMEOUT_5_S   = 5;     // 5s
    const TIMEOUT_10_S  = 10;    // 10s

    private $exchange_logs      = "logs";
    private $exchange_direct    = "direct";
    private $exchange_delayed   = "delayed";

    private $queue_delayed      = "delayedQueue";

    CONST EXCHANGETYPE_FANOUT   = "fanout";
    CONST EXCHANGETYPE_DIRECT   = "direct";
    CONST EXCHANGETYPE_DELAYED  = "x-delayed-message";

    public function __construct($type = false) {
        $this->connection   = new AMQPStreamConnection($this->host,$this->port,$this->user ,$this->psss);
        $this->channel      = $this->connection->channel();
        // 聲明Exchange
        $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false,false,false,new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
        $this->channel->queue_declare($this->queue_delayed, false, true, false, false);
        $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed,$this->queue_delayed);
    }

    /**
     * delay creat message
     */
    public function createMessageDelay($msg,$time) {
        $delayConfig = [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
        ];
        $msg  = new AMQPMessage($msg,$delayConfig);
        return $msg;
    }

    /**
     * delay send message
     */
    public function sendDelay($msg,$time =  self::TIMEOUT_10_S) {
        $msg = $this->createMessageDelay($msg,$time);
        $this->channel->basic_publish($msg,$this->exchange_delayed,$this->queue_delayed);
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * delay consum
     */
    public function consumDelay(){
        $callback = function($msg){
            echo ' [x] ', $msg->body, "\n";
            $this->channel->basic_ack($msg->delivery_info['delivery_tag'],false);
        };
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
        echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }

        $this->channel->close();
        $this->connection->close();
    }

測(cè)試類

    public function actionSendDelay($msg,$time){
        $Rabbit = new RabbitMQController("x-delayed-message");
        $Rabbit->sendDelay($msg,$time);
    }

    public function actionConsumDelay(){
        $Rabbit = new RabbitMQController("x-delayed-message");
        $Rabbit->consumDelay();
    }
image.png

image.png

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容