RabbitMQ/RPC/TTL/死信隊(duì)列

當(dāng)需要調(diào)用遠(yuǎn)端計(jì)算機(jī)的函數(shù)并等待結(jié)果,這模式通常被稱為遠(yuǎn)程過程調(diào)用或RPC。

BasicProperties:
消息屬性 這AMQP協(xié)議預(yù)先確定了消息中的14個(gè)屬性。常用的有:

  • deliveryMode
    將一個(gè)消息標(biāo)記為持久化(值為2)或者瞬態(tài)的(其他值)。之前發(fā)送隊(duì)列消息時(shí)的用法:

      channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    
  • contentType
    用來描述媒體類型的編碼。例如常常使用的JSON編碼:application/json。

  • replyTo
    通常來命名回收隊(duì)列的名字。

  • correlationId
    對(duì)RPC加速響應(yīng)請(qǐng)求是很有用的。

相關(guān)性ID(Correlation Id):
為每一個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回收隊(duì)列,這個(gè)效率十分低下。每一個(gè)客戶端創(chuàng)建一個(gè)單一的回收隊(duì)列。 用Correlation Id判斷隊(duì)列中的響應(yīng)是屬于哪個(gè)請(qǐng)求的。
當(dāng)在回收隊(duì)列中接收消息時(shí),依據(jù)這個(gè)屬性值,刻意將每個(gè)響應(yīng)匹配到對(duì)應(yīng)的請(qǐng)求上。如果是未知的Correlation Id值,就丟棄這個(gè)消息,因?yàn)樗粚儆谌魏我粋€(gè)我們的請(qǐng)求。

請(qǐng)求RPC服務(wù):

當(dāng)客戶端啟動(dòng),它會(huì)創(chuàng)建一個(gè)匿名的獨(dú)占的回收隊(duì)列。 對(duì)于一個(gè)RPC請(qǐng)求,客戶端會(huì)發(fā)送一個(gè)消息到rpc_queue隊(duì)列中,其中有兩個(gè)屬性:replyTo(回復(fù)隊(duì)列)和correlationId(每一個(gè)請(qǐng)求都是唯一值)。

服務(wù)端:

package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//公平分發(fā)機(jī)制
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

客戶端:

package testrabbitmq;
import com.rabbitmq.client.*;
/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}
  • contentType
    用來描述媒體類型的編碼。例如常常使用的JSON編碼,這是一個(gè)好的慣例,設(shè)置這個(gè)屬性為:application/json。
  • replyTo
    通常來命名回收隊(duì)列的名字。
  • correlationId
    對(duì)RPC加速響應(yīng)請(qǐng)求是很有用的。

相關(guān)性ID(Correlation Id):
為每一個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回收隊(duì)列,這個(gè)效率十分低下。每一個(gè)客戶端創(chuàng)建一個(gè)單一的回收隊(duì)列。 用Correlation Id判斷隊(duì)列中的響應(yīng)是屬于哪個(gè)請(qǐng)求的。
當(dāng)在回收隊(duì)列中接收消息時(shí),依據(jù)這個(gè)屬性值,刻意將每個(gè)響應(yīng)匹配到對(duì)應(yīng)的請(qǐng)求上。如果是未知的Correlation Id值,就丟棄這個(gè)消息,因?yàn)樗粚儆谌魏我粋€(gè)我們的請(qǐng)求。

請(qǐng)求RPC服務(wù):

當(dāng)客戶端啟動(dòng),它會(huì)創(chuàng)建一個(gè)匿名的獨(dú)占的回收隊(duì)列。 對(duì)于一個(gè)RPC請(qǐng)求,客戶端會(huì)發(fā)送一個(gè)消息到rpc_queue隊(duì)列中,其中有兩個(gè)屬性:replyTo(回復(fù)隊(duì)列)和correlationId(每一個(gè)請(qǐng)求都是唯一值)。

服務(wù)端:

package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//公平分發(fā)機(jī)制
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

客戶端:

package testrabbitmq;
import com.rabbitmq.client.*;
/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}

TTL

  • Queue Message TTL
    設(shè)置某隊(duì)列中所有消息的TTL,通過在 queue.declare 中設(shè)置 x-message-ttl 參數(shù),可以控制被 publish 到 queue 中的 message 被丟棄前能夠存活的時(shí)間。值得注意的是,當(dāng)一個(gè) message 被路由到多個(gè) queue 中時(shí),他們之間不會(huì)互相影響。
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-message-ttl", 60000);//存活時(shí)間最大為 60 秒
channel.queueDeclare("myqueue", false, false, false, args);
  • Per-Message TTL
    TTL 設(shè)置可以具體到每一條 message 本身,只要在通過 basic.publish 命令發(fā)送 message 時(shí)設(shè)置 expiration 字段。expiration 字段必須為字符串類型。
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
channel.basicPublish("myexchange", "routingkey", properties, message.getBytes());

對(duì)于第一種設(shè)置隊(duì)列TTL屬性的方法,一旦消息過期,就會(huì)從隊(duì)列中抹去。而第二種方法里,即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,在過期 message 到達(dá) queue 的頭部時(shí)被真正的丟棄。

  • Queue TTL
    queue 被自動(dòng)刪除前可以處于未使用狀態(tài)的時(shí)間。未使用的意思是 queue 上沒有任何 consumer ,queue 沒有被重新聲明,并且在過期時(shí)間段內(nèi)未調(diào)用過 basic.get 命令。在服務(wù)器重啟后,持久化的 queue(本來還未過期的) 的超時(shí)時(shí)間將重新計(jì)算。
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 1800000);  
channel.queueDeclare("myqueue", false, false, false, args);  

DLX(死信)
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX。消息變成死信一向有以下幾種情況:

  • 消息被拒絕(basic.reject or basic.nack)并且requeue=false
  • 消息TTL過期
  • 隊(duì)列達(dá)到最大長(zhǎng)度

DLX也是一下正常的Exchange同一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性,當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange中去,進(jìn)而被路由到另一個(gè)隊(duì)列, publish可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理,這個(gè)特性可以彌補(bǔ)RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認(rèn)的功能。

channel.exchangeDeclare("some.exchange.name", "direct");  
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-dead-letter-exchange", "some.exchange.name");  
channel.queueDeclare("myqueue", false, false, false, args); 

你也可以為這個(gè)DLX指定routing key,如果沒有特殊指定,則使用原隊(duì)列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key"); 

http://memorynotfound.com/produce-consume-rabbitmq-spring-json-message-queue/
http://blog.csdn.net/jiao_fuyou/article/details/22923935

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,507評(píng)論 2 34
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件,它能夠在應(yīng)用之間提供可靠的消息傳輸。在易用性,擴(kuò)展性,高可用性...
    點(diǎn)融黑幫閱讀 3,129評(píng)論 3 41
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,233評(píng)論 3 51
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過消息的發(fā)送與接收的方式進(jìn)行通信,當(dāng)消息接收方服務(wù)忙或不可用時(shí),其提供了一...
    zhuke閱讀 4,628評(píng)論 0 12

友情鏈接更多精彩內(nèi)容