1.RabbitMQ的概述與重要概念
RabbitMQ是流行的開源消息隊列系統(tǒng),用erlang語言開發(fā)。RabbitMQ是AMQP(高級消息隊列協(xié)議)的標準實現(xiàn)。
MQ的主要用途包含如下:
- 異步處理,比如注冊發(fā)送郵件和短信
- 應用解耦,比如電商系統(tǒng)中,訂單服務和庫存服務解耦
- 流量削峰,比如秒殺,搶紅包活動
重要的概念可以通過如下圖進行了解:

- Message 由消息頭和消息體組成,消息頭由一組可選屬性組成,消息體是不透明的
- Publisher 消息的生產者,表示一個向Exchange發(fā)布消息的客戶端應用程序
- Exchange 交換器,接收Publisher發(fā)送的消息并路由給Queue
- Binding 綁定,關聯(lián)Exchange和Queue
- Queue 消息隊列,用于保存消息直到發(fā)送給Consumer,Message會一直在隊列里直至被消費者取走
- Connection TCP連接
- Channel 信道,是建立在真實的TCP連接里的虛擬連接,對于操作系統(tǒng)來說建立和銷毀一次TCP是非常昂貴的開銷,因為引入信道來復用一條TCP連接
- Consumer 消費者,表示從一個Queue中取得消息的一個客戶端應用程序
- Broker RabbitMQ服務器實體
- Virtual Host 虛擬主機,表示一個mini版的RabbitMQ服務器,擁有自己的Queue、Exchange、Binding和權限機制,默認的vhost是/,必須在連接時指定
MQ對比

2.RabbitMQ Window安裝介紹
RabbitMQ安裝依賴Erlang,因此安裝之前需要先安裝Erlang環(huán)境,如下:
otp_win64_22.3.exe Erlang的Window安裝包,安裝包中沒有Erlang關鍵字
rabbitmq-server-3.8.3.exe
Erlang和RabbitMQ有對應的版本關系,請點擊查看官網信息

3.插件安裝
軟件安裝完成之后,需要安裝管理界面插件。打開RabbitMQ Command Prompt命令行界面,輸入如下命令:
rabbitmq-plugins enable rabbitmq_management
安裝完成之后,打開網址:http://localhost:15672/
默認賬號為guest,密碼為guest,進入系統(tǒng)之后創(chuàng)建admin賬號,并修改guest密碼
4.RabbitMQ 運維篇
4.1 單機模式(開發(fā)測試環(huán)境推薦)
單機模式參考Window安裝即可(暫不提供Linux版本)。
4.2 普通集群模式
組成集群需要兩步操作,該操作同樣適用于鏡像模式
- 該模式下需要保證不同機器之間的erlang cooike一致,可將其中一臺機器的erlang cookie拷貝到其他機器上。
- 將節(jié)點加入集群,假如有三個節(jié)點,可在節(jié)點2,3兩臺機器上加入到節(jié)點1,如下:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@node1
rabbitmqctl start_app
查看集群狀態(tài) rabbitmqctl cluster_status
普通集群模式中每個節(jié)點都有相同的元數(shù)據(jù),即相同的隊列結構。但是消息(實際數(shù)據(jù))只存在其中一個節(jié)點上,因此若消費者連接到非數(shù)據(jù)節(jié)點的時候,消息會先傳遞給消費者連接的節(jié)點,再提供給消費者。因此該模式下由兩個重要的特點,1. 若存儲消息節(jié)點宕機了,整個集群不可用,因此此模式并非高可用;2. 節(jié)點之間可能存在大量的數(shù)據(jù)傳遞,占用帶寬高。即使如此,若使用此種模式,客戶端應盡快均勻散布到各個節(jié)點上。
原理圖如下:

4.3 鏡像集群模式(生產環(huán)境必須)
在創(chuàng)建普通集群的基礎上,設置策略(policy),該操作可通過web ui設置,如下:

也可以通過命令設置
// 為每個以“rock.wechat”開頭的隊列設置所有節(jié)點的鏡像,并且設置為自動同步模式
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
該模式是一個HA方案(高可用方案),RabbitMQ是沒有中心的,不會因為一個節(jié)點掛了導致整個集群不可用,解決了普通模式中的問題,與普通模式不同的是,消息會主動在鏡像節(jié)點之間同步,而不是在客戶端獲取數(shù)據(jù)時再拉取數(shù)據(jù)。
因數(shù)據(jù)在不同節(jié)點之間主動同步,因此帶寬要求更高,降低了系統(tǒng)的性能。這種模式適合對消息可靠性要求較高的場合中使用。
原理圖如下:

5.RabbitMQ 實戰(zhàn)篇
5.1 管理規(guī)范
5.1.1 命名規(guī)范
exchange:以ex開頭,規(guī)則為ex.業(yè)務域.應用名稱.消息類型
ex.businame.appname.msgtype
queue:以q開頭,規(guī)則為q.業(yè)務域.應用名稱.消息類型
q.businame.appname.msgtype
5.1.2 用戶管理規(guī)范
- 提供給應用使用的用戶類型為none
- 只授權用戶特定的exchange(寫)和queue(讀)訪問權限,這樣代碼就無法創(chuàng)建交換器和隊列
5.1.3 其他規(guī)范
- 隊列和交換器由MQ管理員與研發(fā)人員溝通規(guī)則后,統(tǒng)一由MQ管理員進行創(chuàng)建。
- 代碼中禁止進行創(chuàng)建交換器和隊列的操作(若用戶管理規(guī)范,此操作無法執(zhí)行)。
5.2 環(huán)境準備
- maven依賴配置
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
- 新建一個標準的maven結構工程,編寫MQ工具類
public class ConnectionUtil {
public static Connection get(String username, String pwd) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername(username);
factory.setPassword(pwd);
try {
return factory.newConnection();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
5.3 應用實例
abbitMQ常用的Exchange Type有三種
- direct 當消息的Routing key與Binding key完全匹配時,將消息路由到Queue中
- fanout 將消息廣播到與Exchange綁定的所有Queue,效率最高
- topic Binding key使用模式,“#”匹配一個或多個詞,“*”只匹配一個詞,當消息的Routing key模糊匹配該模式才進行路由
5.3.1 Direct模式
public class DirectSend {
public static final String EXCHANGE_NAME = "exchange-test-direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String msg = "哈哈123";
channel.basicPublish(EXCHANGE_NAME, "delete", null, msg.getBytes("utf-8"));
System.out.println("[X] send: " + msg);
channel.close();
connection.close();
}
}
public class DirectRec2 {
// public static final String EXCHANGE_NAME = "exchange-test-direct";
public static final String QUEUE_NAME = "queue-test-direct-2";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.basicQos(1); // 同一時刻服務器只會發(fā)送一條消息給消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y2] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
5.3.2 Fanout模式
public class SubscribeSend {
public static final String EXCHANGE_NAME = "exchange-test-fanout-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "hello world";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("utf-8"));
System.out.println("[X] send: " + msg);
channel.close();
connection.close();
}
}
public class SubscribeRec2 {
public static final String QUEUE_NAME = "queue-test-fanout-02";
// public static final String EXCHANGE_NAME = "exchange-test-01";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y2] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
5.3.3 Topic模式
public class TopicSend {
private final static String EXCHANGE_NAME = "exchange-test-topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// 聲明exchange
// channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息內容
String message = "Hello World!!";
channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class TopRec1 {
public static final String QUEUE_NAME = "queue-test-topic-01";
// public static final String EXCHANGE_NAME = "exchange-test-topic";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y1] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
5.3.4 ACK消息確認(推薦使用方式)
public class MultMqSend {
private final static String EX_NAME = "exchange-test-ack-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello";
for (int i = 0; i < 100; i++) {
channel.basicPublish(EX_NAME, "rk", null, (msg + i).getBytes("UTF-8"));
System.out.println("[X] send " + (msg + i));
}
channel.close();
connection.close();
}
}
public class MultiMqRecManualConfirm1 {
private final static String QUEUE_NAME = "queue-test-ack-01";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻服務器只會發(fā)一條消息給消費者
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y1] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
public class MultiMqRecManualConfirm2 {
private final static String QUEUE_NAME = "queue-test-ack-01";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻服務器只會發(fā)一條消息給消費者
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y1] receive msg: " + msg);
//休眠
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
confirm模式解決了公平輪訓的問題,哪個消費者處理更快,處理的消息更多(能者多勞)。這個案例解決了消費者消費消息可靠性問題,但是沒有解決發(fā)送者發(fā)送消息可靠性問題。
5.3.5 basicQo和basicAck關系
兩者是配套使用的。
// channel.basicQos(1)指該消費者在接收到隊列里的消息但沒有返回確認結果之前,
// 隊列不會將新的消息分發(fā)給該消費者。隊列中沒有被消費的消息不會被刪除,還是存在于隊列中。
channel.basicQos(1);
// 確認消息
channel.basicAck(envelope.getDeliveryTag(), false);