今天和大家分享一個(gè)簡(jiǎn)單的rabbit實(shí)現(xiàn)延遲任務(wù)的方法,rabbit實(shí)現(xiàn)延遲隊(duì)列有兩種方式,一種是隊(duì)列或者消息的TTL(Time To Live),另一種是rabbit的rabbitmq-delayed-message-exchange插件,今天我和大家分享下TTL的使用方法。
rabbit有Per-Queue Message TTL和Per-Message TTL兩種設(shè)置超時(shí)的方式,分別指針對(duì)消息和隊(duì)列的,給消息添加過(guò)期時(shí)間相對(duì)比較靈活,這樣不用每一種過(guò)期時(shí)間都去建立一個(gè)隊(duì)列去監(jiān)聽(tīng),給消息設(shè)置過(guò)期時(shí)間方法
$msg = new AMQPMessage('hello expiration!');
$msg->set("expiration","5000"); //關(guān)鍵一點(diǎn):超時(shí)時(shí)間必須設(shè)置成字符串,否則不會(huì)生效!單位是ms
好了這會(huì)兒大家會(huì)想超時(shí)之后的處理在哪寫呢,總有個(gè)超時(shí)的回調(diào)或者什么吧,rabbit可以給隊(duì)列設(shè)置一個(gè)x-dead-letter-exchange,Dead letter routing key,意思是消息超時(shí)后的轉(zhuǎn)發(fā)隊(duì)列。
$channel->queue_declare("waitSendQueue",false,false,false,false,false,new AMQPTable(array
("x-dead-letter-exchange"=>"expireExchange")));
waitSendQueue上的消息如果超時(shí)了會(huì)轉(zhuǎn)發(fā)給expireExchange,那我只要去監(jiān)聽(tīng)expireExchange上的消息,拿到消息去處理業(yè)務(wù),就完成了我們的延遲任務(wù),那么接下來(lái)上代碼.。
客戶端:
/**
* Created by PhpStorm.
* User: qyc
* Date: 2017/8/14
* Time: 下午2:18
*/
require_once dirname(__DIR__) . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$con = new AMQPStreamConnection('localhost', 5672, "guest", "guest");
$channel = $con->channel();
//定義等待exchange
$channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);
//定義過(guò)期exchange
$channel->exchange_declare('expireExchange', 'fanout', false, false, false);
//定義過(guò)期queue
$channel->queue_declare("expireQueue",false,false,false,false,false);
//定義等待queue
$channel->queue_declare("waitSendQueue",false,false,false,false,false,new AMQPTable(array
("x-dead-letter-exchange"=>"expireExchange")));
$channel->queue_bind("waitSendQueue","waitSendExchange");
$channel->queue_bind("expireQueue","expireExchange");
$msg = new AMQPMessage('hello expiration!');
/*
* 設(shè)置超時(shí)時(shí)間
*/
$msg->set("expiration","5000");
/**
* 向等待exchage發(fā)布消息
*/
$channel->basic_publish($msg, 'waitSendExchange');
echo 'send1:' . date('Y-m-d H:i:s') . "\n";
$channel->close();
$con->close();
服務(wù)端
/**
* Created by PhpStorm.
* User: qyc
* Date: 2017/8/14
* Time: 下午2:30
*/
require_once dirname(__DIR__) . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$con = new AMQPStreamConnection("localhost",5672,"guest","guest");
$channel = $con->channel();
$channel = $con->channel();
//定義等待exchange
$channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);
//定義過(guò)期exchange
$channel->exchange_declare('expireExchange', 'fanout', false, false, false);
//定義過(guò)期queue
$channel->queue_declare("expireQueue",false,false,false,false,false);
//定義等待queue
$channel->queue_declare("waitSendQueue",false,false,false,false,false,new AMQPTable(array
("x-dead-letter-exchange"=>"expireExchange")));
$channel->queue_bind("waitSendQueue","waitSendExchange");
$channel->queue_bind("expireQueue","expireExchange");
$callback = function ($msg){
echo "\n".' [x] ', $msg->body, "\n";
echo "receive2:".date("Y-m-d H:i:s")."\n";
};
/**
* 訂閱超時(shí)queue
*/
$channel->basic_consume("expireQueue","",false,true,false,false,$callback);
while (count($channel->callbacks)){
$channel->wait();
}
$channel->close();
$connection->close();
結(jié)果:
發(fā)送:

image.png
接受:

image.png