一、前言
RabbitMQ其實是我最早接觸的一個MQ框架,我記得當時是在大學的時候跑到圖書館一個人去看,由于RabbitMQ官網(wǎng)的英文還不算太難,因此也是參考官網(wǎng)學習的,一共有6章,當時是用Node來開發(fā)的,當時花了一下午看完了,也理解了。而現(xiàn)在回過頭來再看,發(fā)現(xiàn)已經(jīng)忘記了個差不多了,現(xiàn)在再回過頭來繼續(xù)看看,然乎記之。以防再忘,讀者看時最好有一定的MQ基礎(chǔ)。
二、RabbitMQ
首先我們需要知道的是RabbitMQ它是基于高級隊列協(xié)議(AMQP)的,它是Elang編寫的,下面將圍繞RabbitMQ隊列、交換機、RPC三個重點進行展開。
2.1、隊列
存儲消息的地方,多個生產(chǎn)者可以將消息發(fā)送到一個隊列,多個消費者也可以消費同一個隊列的消息。
注意:當多個消費者監(jiān)聽一個隊列,此時生產(chǎn)者發(fā)送消息到隊列只有一個消費者被消費,并且消費端的消費方式是按照消費端在內(nèi)部啟動的順序輪詢(round-robin)。
2.2、消費者
消費消息的一方
public class Send {
private final static String QUEUE_NAME = "hello";
private final static String IP = "172.16.12.162";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP);
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e){
e.printStackTrace();
}
}
}
public class Recv {
private final static String QUEUE_NAME = "hello";
private final static String IP = "172.16.12.162";
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}catch (Exception e){
e.printStackTrace();
}
}
}
2.3、小結(jié)
1、Rabbit是如何保證消息被消費的?
答:通過ack機制。每當一個消息被消費端消費的時候,消費端可以發(fā)送一個ack給RabbitMQ,這樣RabbitMQ就知道了該條消息已經(jīng)被完整消費并且可以被delete了。;如果一條消息被消費但是沒有發(fā)送ack,那么此時RabbitMQ將會認為需要重新消費該消息,如果此時還有其它的消費者,那么此時RabbitMQ將會把這條消息交給它處理。
注意:開啟ack機制的是autoAck=
false;
2、消息如何進行持久化?
- 將queue持久化,即設(shè)置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二個參數(shù)durable為true
- 設(shè)置消息持久化,即設(shè)置MessageProperties.PERSISTENT_TEXT_PLAIN
注意:消息持久化并不一定保證消息不會被丟失
3、RabbitMQ如何避免兩個消費者一個非常忙一個非常閑的情況?
通過如下設(shè)置,保證一個消費者一次只能消費一個消息,只有當它消費完成并且返回ack給RabbitMQ之后才給它派發(fā)新的消息。
int prefetchCount = 1 ;
channel.basicQos(prefetchCount)
4、RabbitMQ異常情況下如何保證消息不會被重復(fù)消費?
需要業(yè)務(wù)自身實現(xiàn)密等性,RabbitMQ沒有提供比較好的方式去保證。
2.2、交換機
在RabbitMQ中,生產(chǎn)者其實從來不會發(fā)送消息到隊列,甚至,它不知道消息被發(fā)送到了哪個隊列。那它被發(fā)送到了哪里呢?就是本節(jié)的重點:交換機,下面就是它在RabbitMQ中的介紹圖。(X就是交換機)生產(chǎn)者發(fā)送消息給交換機,然后由交換機將消息轉(zhuǎn)發(fā)給隊列。
從上圖就產(chǎn)生一個問題:X怎么將消息發(fā)給queue呢?它是把消息發(fā)給所有queue還是發(fā)給一個指定的queue或者丟棄消息呢?這就是看交換機的類型了。下面一起談?wù)勥@幾種類型
2.2.1、fanout
fanout:廣播模式,這個比較好理解,就是所有的隊列都能收到交換機的消息。

如上面,兩個隊列都能收到交換機的消息。
2.2.2、direct
這個模式相當于發(fā)布/訂閱模式的一種,當交換機類型為direct的時候,此時我們需要設(shè)置兩個參數(shù):
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二個參數(shù),我們可以把它稱呼為routeKey
- channel.queueBind(queueName, EXCHANGE_NAME, "");第三個參數(shù),我們把它稱呼為bindKey
有了這兩個參數(shù),我們就可以指定我們訂閱哪些消息了。

如圖,Q1訂閱了orange的消息,Q2訂閱了black、green的消息。
2.2.3、topic
其實topic和direct有一點類似,它相當于對direct作了增強。在direct中,我們上面所說的bind routeKey為black、green的它是有限制的,它只能絕對的等于routeKey,但是有時候我們的需求不是這樣,我們可能想要的是正則匹配即可,那么Topic就派上用場了。

當類型為topic時,它的bindKey對應(yīng)字符串需要是以“.”分割,同時RabbitMQ還提供了兩個符號:
- 星號(*):表示1個單詞
- 井號(#):表示0、多個單詞
上圖的意思是:所有第二個單詞為orange的消息發(fā)送個Q1,所有最后一個單詞為rabbit或者第一個單詞為lazy的消息發(fā)送給Q2。
2.2.4、header
這一種類型官方demo沒有過多解釋,這里也不研究了。
2.3、RPC
RabbitMQ 還可以實現(xiàn)RPC(遠程過程調(diào)用)。什么是RPC,簡單來說就是local調(diào)用remote方法。對應(yīng)于RabbitMQ中則是Client發(fā)送一個request message,Server處理完成之后將其返回給Client。這里就有了一個疑問?Server是如何將response返回給Client的,這里RabbitMQ定義了一個概念:Callback Queue。
Callback Queue
注意這個隊列是獨一無二的String replyQueueName = channel.queueDeclare().getQueue();。
首先我們需要明白一點的是為什么需要這個queue?我們知道在RabbitMQ作消息隊列的時候,Client只需要將消息投放到queue中,然后Server從queue去取就可以了。但是在RabbitMQ作為RPC的時候多了一點就是,Client還需要返回結(jié)果,這時Server端怎么知道把消息發(fā)送給Client,這就是Callback Queue的用處了。
Correlation Id
在上面我們知道Server返回數(shù)據(jù)給Client是通過Callback Queue的,那么是為每一個request都創(chuàng)建一個queue嗎?這未免太過浪費資源,RabbitMQ有更好的方案。在我們發(fā)送request,綁定一個唯一ID(correlationId),然后在消息被處理返回的時候取出這個ID和發(fā)出去的ID進行匹配。這樣來說一個Callback Queue是Client級別而不是request級別的了。
實現(xiàn)
上面介紹了RabbitMQ實現(xiàn)RPC最重要的兩個概念,具體代碼比較簡單還是貼下把。
client 端
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) throws Exception{
RPCClient fibonacciRpc = new RPCClient();
try {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
服務(wù)端
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
三、總結(jié)
這次回頭再看RabbitMQ,再次重新理解了以下RabbitMQ,有些東西還是要慢慢嚼的。當然這些也都是官網(wǎng)的入門例子,后續(xù)有機會的話再深入研究。