用rebbitMq來實現(xiàn)你的延遲隊列功能

延遲隊列

在我們的上一篇文章使用delayedQueue實現(xiàn)你本地的延遲隊列
中提到了延遲隊列的作用.

但是我們知道,利用delayedQueue實現(xiàn)的是一個單機的,而且是內(nèi)存中的延遲隊列,他并沒有一個集群的支持,并且需要在對泵機的時候,消息消費異常的時候做相應的邏輯處理。

那么這樣做的話,我們需要的工作量還是很大的,有沒有什么東西是讓我們不做這一部分的工作也能實現(xiàn)延遲隊列的功能?

當然有了。答案是:rabbitMq

利用rabbitMq來實現(xiàn)延遲隊列的功能

那么如何利用rabbitMq來實現(xiàn)延遲隊列的功能呢?

請先注意一點,RabbitMQ本身沒有直接支持延遲隊列功能,但是可以通過以下特性模擬出延遲隊列的功能。那么這是通過哪些特性呢,那就讓我們來認識一下這兩個特性吧.

  • Per-Queue Message TTL

    RabbitMQ可以對消息和隊列設置TTL(過期時間)。

    RabbitMQ針對隊列中的消息過期時間(Time To Live, TTL)有兩種方法可以設置。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息進行單獨設置,每條消息TTL可以不同。如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead message,消費者將無法再收到該消息。

  • Dead Letter Exchanges

    利用DLX,當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange,這個Exchange就是DLX。消息變成死信一向有以下幾種情況:

    • 消息被拒絕(basic.reject or basic.nack)并且requeue=false
    • 消息TTL過期
    • 隊列達到最大長度

    DLX也是一下正常的Exchange同一般的Exchange沒有區(qū)別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發(fā)布到設置的Exchange中去,進而被路由到另一個隊列,publish可以監(jiān)聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認的功能。

結合以上兩個特性,就可以模擬出延遲消息的功能.

基于x-dead-letter-routing-key的單條消息延遲隊列的java代碼實現(xiàn)

生產(chǎn)者(發(fā)送)端代碼:


import java.util.HashMap;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    //隊列名稱  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] argv) throws Exception  
    {  
        
        /** 
         * 創(chuàng)建連接連接到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        //設置MabbitMQ所在主機ip或者主機名  
        factory.setHost("localhost");  
        //創(chuàng)建一個連接  
        Connection connection = factory.newConnection();
        //創(chuàng)建一個頻道  
        Channel channel = connection.createChannel();  
        
        
 
        
        //指定一個隊列  
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
        //發(fā)送的消息  
        String message = "hello world!"+System.currentTimeMillis();;  

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
        AMQP.BasicProperties properties = builder.expiration("2000").deliveryMode(2).build();  
        //往隊列中發(fā)出一條消息     這時候要發(fā)送的隊列不應該是QUEUE_NAME,這樣才能進行轉發(fā)的
        channel.basicPublish("", "DELAY_QUEUE", properties, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'" );  
        //關閉頻道和連接  
        channel.close();  
        connection.close();  
     }  
}

消費者(接受)端代碼:


import java.util.HashMap;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
    // 隊列名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
        // channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);

        HashMap<String, Object> arguments = new HashMap<String, Object>();  
        arguments.put("x-dead-letter-exchange", "amq.direct");  
        arguments.put("x-dead-letter-routing-key", QUEUE_NAME);  
        channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments); 
        
        //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創(chuàng)建隊列。  
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 
      //創(chuàng)建隊列消費者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        //指定消費隊列  
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true)  
        {  
            //nextDelivery是一個阻塞方法(內(nèi)部實現(xiàn)其實是阻塞隊列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(" [x] Received '" + message + "'"+ "'   [當前系統(tǒng)時間戳]" +System.currentTimeMillis());  
        }  
    }
}

參考資料

http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
https://www.cloudamqp.com/docs/delayed-messages.html
http://www.netfoucs.com/article/xtjsxtj/73636.html#

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就會被消費者馬上消費。 ...
    Java架構閱讀 2,266評論 3 24
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評論 19 139
  • 背景 何為延遲隊列? 顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就...
    wooyoo閱讀 3,533評論 0 17
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器。支持消息的持久化、事務、擁塞控...
    jiangmo閱讀 10,507評論 2 34
  • 1.什么是消息隊列 消息隊列允許應用間通過消息的發(fā)送與接收的方式進行通信,當消息接收方服務忙或不可用時,其提供了一...
    zhuke閱讀 4,628評論 0 12

友情鏈接更多精彩內(nèi)容