當(dāng)需要調(diào)用遠(yuǎn)端計(jì)算機(jī)的函數(shù)并等待結(jié)果,這模式通常被稱為遠(yuǎn)程過程調(diào)用或RPC。
BasicProperties:
消息屬性 這AMQP協(xié)議預(yù)先確定了消息中的14個(gè)屬性。常用的有:
-
deliveryMode
將一個(gè)消息標(biāo)記為持久化(值為2)或者瞬態(tài)的(其他值)。之前發(fā)送隊(duì)列消息時(shí)的用法:channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); contentType
用來描述媒體類型的編碼。例如常常使用的JSON編碼:application/json。replyTo
通常來命名回收隊(duì)列的名字。correlationId
對(duì)RPC加速響應(yīng)請(qǐng)求是很有用的。
相關(guān)性ID(Correlation Id):
為每一個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回收隊(duì)列,這個(gè)效率十分低下。每一個(gè)客戶端創(chuàng)建一個(gè)單一的回收隊(duì)列。 用Correlation Id判斷隊(duì)列中的響應(yīng)是屬于哪個(gè)請(qǐng)求的。
當(dāng)在回收隊(duì)列中接收消息時(shí),依據(jù)這個(gè)屬性值,刻意將每個(gè)響應(yīng)匹配到對(duì)應(yīng)的請(qǐng)求上。如果是未知的Correlation Id值,就丟棄這個(gè)消息,因?yàn)樗粚儆谌魏我粋€(gè)我們的請(qǐng)求。
請(qǐng)求RPC服務(wù):

服務(wù)端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
public static void main(String[] argv) throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//公平分發(fā)機(jī)制
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
客戶端:
package testrabbitmq;
import com.rabbitmq.client.*;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
}
- contentType
用來描述媒體類型的編碼。例如常常使用的JSON編碼,這是一個(gè)好的慣例,設(shè)置這個(gè)屬性為:application/json。 - replyTo
通常來命名回收隊(duì)列的名字。 - correlationId
對(duì)RPC加速響應(yīng)請(qǐng)求是很有用的。
相關(guān)性ID(Correlation Id):
為每一個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回收隊(duì)列,這個(gè)效率十分低下。每一個(gè)客戶端創(chuàng)建一個(gè)單一的回收隊(duì)列。 用Correlation Id判斷隊(duì)列中的響應(yīng)是屬于哪個(gè)請(qǐng)求的。
當(dāng)在回收隊(duì)列中接收消息時(shí),依據(jù)這個(gè)屬性值,刻意將每個(gè)響應(yīng)匹配到對(duì)應(yīng)的請(qǐng)求上。如果是未知的Correlation Id值,就丟棄這個(gè)消息,因?yàn)樗粚儆谌魏我粋€(gè)我們的請(qǐng)求。
請(qǐng)求RPC服務(wù):

服務(wù)端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
public static void main(String[] argv) throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//公平分發(fā)機(jī)制
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
客戶端:
package testrabbitmq;
import com.rabbitmq.client.*;
/**
* Created by zzhblh on 2016/8/29.
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
}
TTL
- Queue Message TTL
設(shè)置某隊(duì)列中所有消息的TTL,通過在 queue.declare 中設(shè)置 x-message-ttl 參數(shù),可以控制被 publish 到 queue 中的 message 被丟棄前能夠存活的時(shí)間。值得注意的是,當(dāng)一個(gè) message 被路由到多個(gè) queue 中時(shí),他們之間不會(huì)互相影響。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);//存活時(shí)間最大為 60 秒
channel.queueDeclare("myqueue", false, false, false, args);
- Per-Message TTL
TTL 設(shè)置可以具體到每一條 message 本身,只要在通過 basic.publish 命令發(fā)送 message 時(shí)設(shè)置 expiration 字段。expiration 字段必須為字符串類型。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("myexchange", "routingkey", properties, message.getBytes());
對(duì)于第一種設(shè)置隊(duì)列TTL屬性的方法,一旦消息過期,就會(huì)從隊(duì)列中抹去。而第二種方法里,即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,在過期 message 到達(dá) queue 的頭部時(shí)被真正的丟棄。
- Queue TTL
queue 被自動(dòng)刪除前可以處于未使用狀態(tài)的時(shí)間。未使用的意思是 queue 上沒有任何 consumer ,queue 沒有被重新聲明,并且在過期時(shí)間段內(nèi)未調(diào)用過 basic.get 命令。在服務(wù)器重啟后,持久化的 queue(本來還未過期的) 的超時(shí)時(shí)間將重新計(jì)算。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
DLX(死信)
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX。消息變成死信一向有以下幾種情況:
- 消息被拒絕(basic.reject or basic.nack)并且requeue=false
- 消息TTL過期
- 隊(duì)列達(dá)到最大長(zhǎng)度
DLX也是一下正常的Exchange同一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性,當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange中去,進(jìn)而被路由到另一個(gè)隊(duì)列, publish可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理,這個(gè)特性可以彌補(bǔ)RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認(rèn)的功能。
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
你也可以為這個(gè)DLX指定routing key,如果沒有特殊指定,則使用原隊(duì)列的routing key
args.put("x-dead-letter-routing-key", "some-routing-key");
http://memorynotfound.com/produce-consume-rabbitmq-spring-json-message-queue/
http://blog.csdn.net/jiao_fuyou/article/details/22923935