延遲隊列
在我們的上一篇文章使用delayedQueue實現(xiàn)你本地的延遲隊列
中提到了延遲隊列的作用.
但是我們知道,利用delayedQueue實現(xiàn)的是一個單機的,而且是內(nèi)存中的延遲隊列,他并沒有一個集群的支持,并且需要在對泵機的時候,消息消費異常的時候做相應的邏輯處理。
那么這樣做的話,我們需要的工作量還是很大的,有沒有什么東西是讓我們不做這一部分的工作也能實現(xiàn)延遲隊列的功能?
當然有了。答案是:rabbitMq
利用rabbitMq來實現(xiàn)延遲隊列的功能
那么如何利用rabbitMq來實現(xiàn)延遲隊列的功能呢?
請先注意一點,RabbitMQ本身沒有直接支持延遲隊列功能,但是可以通過以下特性模擬出延遲隊列的功能。那么這是通過哪些特性呢,那就讓我們來認識一下這兩個特性吧.
-
Per-Queue Message TTL
RabbitMQ可以對消息和隊列設置TTL(過期時間)。
RabbitMQ針對隊列中的消息過期時間(Time To Live, TTL)有兩種方法可以設置。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息進行單獨設置,每條消息TTL可以不同。如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead message,消費者將無法再收到該消息。
-
Dead Letter Exchanges
利用DLX,當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange,這個Exchange就是DLX。消息變成死信一向有以下幾種情況:
- 消息被拒絕(basic.reject or basic.nack)并且requeue=false
- 消息TTL過期
- 隊列達到最大長度
DLX也是一下正常的Exchange同一般的Exchange沒有區(qū)別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發(fā)布到設置的Exchange中去,進而被路由到另一個隊列,publish可以監(jiān)聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認的功能。
結合以上兩個特性,就可以模擬出延遲消息的功能.
基于x-dead-letter-routing-key的單條消息延遲隊列的java代碼實現(xiàn)
生產(chǎn)者(發(fā)送)端代碼:
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
//隊列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception
{
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設置MabbitMQ所在主機ip或者主機名
factory.setHost("localhost");
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建一個頻道
Channel channel = connection.createChannel();
//指定一個隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//發(fā)送的消息
String message = "hello world!"+System.currentTimeMillis();;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration("2000").deliveryMode(2).build();
//往隊列中發(fā)出一條消息 這時候要發(fā)送的隊列不應該是QUEUE_NAME,這樣才能進行轉發(fā)的
channel.basicPublish("", "DELAY_QUEUE", properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'" );
//關閉頻道和連接
channel.close();
connection.close();
}
}
消費者(接受)端代碼:
import java.util.HashMap;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
// 隊列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", QUEUE_NAME);
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
//聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創(chuàng)建隊列。
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//創(chuàng)建隊列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一個阻塞方法(內(nèi)部實現(xiàn)其實是阻塞隊列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'"+ "' [當前系統(tǒng)時間戳]" +System.currentTimeMillis());
}
}
}
參考資料
http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
https://www.cloudamqp.com/docs/delayed-messages.html
http://www.netfoucs.com/article/xtjsxtj/73636.html#