rabbit實(shí)現(xiàn)延遲任務(wù)

今天和大家分享一個(gè)簡(jiǎn)單的rabbit實(shí)現(xiàn)延遲任務(wù)的方法,rabbit實(shí)現(xiàn)延遲隊(duì)列有兩種方式,一種是隊(duì)列或者消息的TTL(Time To Live),另一種是rabbit的rabbitmq-delayed-message-exchange插件,今天我和大家分享下TTL的使用方法。

rabbit有Per-Queue Message TTL和Per-Message TTL兩種設(shè)置超時(shí)的方式,分別指針對(duì)消息和隊(duì)列的,給消息添加過(guò)期時(shí)間相對(duì)比較靈活,這樣不用每一種過(guò)期時(shí)間都去建立一個(gè)隊(duì)列去監(jiān)聽(tīng),給消息設(shè)置過(guò)期時(shí)間方法

$msg = new AMQPMessage('hello expiration!');
$msg->set("expiration","5000");    //關(guān)鍵一點(diǎn):超時(shí)時(shí)間必須設(shè)置成字符串,否則不會(huì)生效!單位是ms

好了這會(huì)兒大家會(huì)想超時(shí)之后的處理在哪寫呢,總有個(gè)超時(shí)的回調(diào)或者什么吧,rabbit可以給隊(duì)列設(shè)置一個(gè)x-dead-letter-exchange,Dead letter routing key,意思是消息超時(shí)后的轉(zhuǎn)發(fā)隊(duì)列。

$channel->queue_declare("waitSendQueue",false,false,false,false,false,new AMQPTable(array
("x-dead-letter-exchange"=>"expireExchange")));     

waitSendQueue上的消息如果超時(shí)了會(huì)轉(zhuǎn)發(fā)給expireExchange,那我只要去監(jiān)聽(tīng)expireExchange上的消息,拿到消息去處理業(yè)務(wù),就完成了我們的延遲任務(wù),那么接下來(lái)上代碼.。
客戶端:

/**
 * Created by PhpStorm.
 * User: qyc
 * Date: 2017/8/14
 * Time: 下午2:18
 */

require_once dirname(__DIR__) . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$con = new AMQPStreamConnection('localhost', 5672, "guest", "guest");

$channel = $con->channel();
//定義等待exchange
$channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);
//定義過(guò)期exchange
$channel->exchange_declare('expireExchange', 'fanout', false, false, false);
//定義過(guò)期queue
$channel->queue_declare("expireQueue",false,false,false,false,false);
//定義等待queue
$channel->queue_declare("waitSendQueue",false,false,false,false,false,new AMQPTable(array
("x-dead-letter-exchange"=>"expireExchange")));
$channel->queue_bind("waitSendQueue","waitSendExchange");
$channel->queue_bind("expireQueue","expireExchange");

$msg = new AMQPMessage('hello expiration!');
/*
 * 設(shè)置超時(shí)時(shí)間
 */
$msg->set("expiration","5000");

/**
 * 向等待exchage發(fā)布消息
 */
$channel->basic_publish($msg, 'waitSendExchange');

echo 'send1:' . date('Y-m-d H:i:s') . "\n";

$channel->close();
$con->close();

服務(wù)端

/**
 * Created by PhpStorm.
 * User: qyc
 * Date: 2017/8/14
 * Time: 下午2:30
 */

require_once dirname(__DIR__) . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$con  = new AMQPStreamConnection("localhost",5672,"guest","guest");
$channel = $con->channel();

$channel = $con->channel();
//定義等待exchange
$channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);
//定義過(guò)期exchange
$channel->exchange_declare('expireExchange', 'fanout', false, false, false);
//定義過(guò)期queue
$channel->queue_declare("expireQueue",false,false,false,false,false);
//定義等待queue
$channel->queue_declare("waitSendQueue",false,false,false,false,false,new AMQPTable(array
("x-dead-letter-exchange"=>"expireExchange")));
$channel->queue_bind("waitSendQueue","waitSendExchange");
$channel->queue_bind("expireQueue","expireExchange");

$callback = function ($msg){
    echo "\n".' [x] ', $msg->body, "\n";
    echo "receive2:".date("Y-m-d H:i:s")."\n";
};
/**
 * 訂閱超時(shí)queue
 */
$channel->basic_consume("expireQueue","",false,true,false,false,$callback);

while (count($channel->callbacks)){
    $channel->wait();
}

$channel->close();
$connection->close();

結(jié)果:
發(fā)送:


image.png

接受:

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容