1. 消息隊列介紹
1.1 MQ定義
消息隊列的目的是為了實現(xiàn)各個應用程序之間的通訊, 應用程序基于MQ實現(xiàn)消息的發(fā)送和接收, 實現(xiàn)應用程序之間的通訊
這樣多個應用程序可以運行在不同的主機上, 通過MQ就可以實現(xiàn)跨網(wǎng)絡的通信, 因此MQ實現(xiàn)了業(yè)務的解耦和異步機制
1.2 MQ使用場合
消息隊列作為高并發(fā)系統(tǒng)的核心組件之一, 能夠幫助業(yè)務系統(tǒng)結(jié)構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性, 消息隊列主要具有以下特點:
1. 削峰填谷: 主要解決瞬時寫壓力大于應用程序處理能力, 導致消息丟失, 系統(tǒng)崩潰等問題
案例: 當應用程序處理能力低, 而瞬時并發(fā)寫請求過多時, 客戶端請求就會出現(xiàn)排隊, 并且很可能斷開連接, 造成信息丟失.
解決方案: 通過消息隊列去存儲用戶請求, 利用生產(chǎn)者/消費者模型, 讓生產(chǎn)者把消息寫入到MQ, 由消費者去MQ消費消息, 處理請求.
暫時沒有被消費的請求, 就在MQ中排隊, 做臨時存儲
2. 系統(tǒng)解耦: 解決不同重要程度, 不同能力級別系統(tǒng)之間依賴, 避免一死全死
案例: 按照不同的功能, 把一個服務, 拆分成多個系統(tǒng), 比如登錄, 查詢, 支付等. 這樣, 當某一個系統(tǒng)服務崩潰, 不會影響其他的服務.
比如, 當支付系統(tǒng)崩潰時, 查詢系統(tǒng)并不會受到影響
3. 蓄流壓測: 線上有些鏈路不好壓測, 可以通過在MQ中堆積一定量消息, 再放開來壓測
案例: 通過開發(fā)應用程序模擬用戶請求, 把請求全部先寫入到MQ, 之后再開啟后端服務, 來檢查后端服務壓力處理能力
2 RabbitMQ
2.1 RabbitMQ簡介
RabbitMQ 采用 Erlang 語言開發(fā),Erlang 語言由 Ericson 設計,Erlang 在分布式編
程和故障恢復方面表現(xiàn)出色,電信領域被廣泛使用。
RabbitMQ架構(gòu)

- Broker: 接收和分發(fā)消息的應用,一個RabbitMQ Server 就是 Message Broker。
- Virtual host: 出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛
擬的分組中,類似于網(wǎng)絡中的 namespace 概念,當多個不同的用戶使用同一個
RabbitMQ server 提供的服務時,可以劃分出多個 vhost,每個用戶在自己的 vhost
創(chuàng)建 exchange/queue 等。通常每個業(yè)務會有一套RabbitMQ服務器, 因此, 一般RabbitMQ不會混用. 即使多套業(yè)務使用同一個RabbitMQ, 只要隊列不沖突即可 - Connection: publisher/consumer 和 broker 之間的 TCP 連接。
- Channel: 如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候
建立 TCP Connection 的開銷將是巨大的,效率也較低。Channel 是在 connection
內(nèi)部建立的邏輯連接,如果應用程序支持多線程,通常每個 thread 創(chuàng)建單獨的
channel 進行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker
識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection
極大減少了操作系統(tǒng)建立 TCP connection 的開銷。 - Exchange: message 到達 broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing
key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish?subscribe) and fanout (multicast)。 - Queue: 消息最終被送到這里等待 consumer 取走。
- Binding: exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。
Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)。
RabbitMQ 優(yōu)勢
基于 erlang 語言開發(fā),具有高并發(fā)優(yōu)點、支持分布式
具有消息確認機制、消息持久化機制,消息可靠性和集群可靠性高
簡單易用、運行穩(wěn)定、跨平臺、多語言
開源
Queue 的特性
消息基于先進先出的原則進行順序消費
消息可以持久化到磁盤節(jié)點服務器
消息可以緩存到內(nèi)存節(jié)點服務器提高性能
2.2 RabbitMQ中的生產(chǎn)者消費者示例

生產(chǎn)者發(fā)送消息到 broker server(RabbitMQ),在 Broker 內(nèi)部,用戶創(chuàng)建Exchange/Queue,通過 Binding 規(guī)則將兩者聯(lián)系在一起,Exchange 分發(fā)消息,根據(jù)類型/binding 的不同分發(fā)策略有區(qū)別,消息最后來到 Queue 中,等待消費者取走。
2.3 數(shù)據(jù)的存儲
RabbitMQ支持內(nèi)存節(jié)點和磁盤節(jié)點, 區(qū)別就是內(nèi)存節(jié)點的吞吐量高, 而磁盤節(jié)點可以保證數(shù)據(jù)的安全性. 一般建議至少兩個節(jié)點, 一個內(nèi)存節(jié)負責正常的數(shù)據(jù)讀取, 而磁盤節(jié)點從內(nèi)存節(jié)點同步數(shù)據(jù), 一旦內(nèi)存節(jié)點宕機, 還可以通過磁盤節(jié)點進行數(shù)據(jù)的還原
3 RabbitMQ部署
3.1 RabbitMQ 單機部署
注意: RabbitMQ由Erlang語言編寫, 因此, 不同版本之間要一對一對應, 否則會造成RabbitMQ無法安裝

- 小型業(yè)務, 測試環(huán)境: 4C-8G-60G
- 大型業(yè)務: 16C-32G-60G+
Ubuntu1804安裝單機版RabbitMQ, 3.8.18
服務器環(huán)境
10.0.0.239
hostname: mq1.node
3.1.1 更新apt源
[root@mq1:~]# apt update
3.1.2 官方提供安裝腳本
#!/bin/sh
sudo apt-get install curl gnupg debian-keyring debian-archive-keyring apt-transport-https -y
## Team RabbitMQ's main signing key
sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"
## Launchpad PPA that provides modern Erlang releases
sudo apt-key adv --keyserver "keyserver.ubuntu.com" --recv-keys "F77F1EDA57EBB1CC"
## PackageCloud RabbitMQ repository
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | sudo apt-key add -
## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
## Provides RabbitMQ
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF
## Update package indices
sudo apt-get update -y
## Install Erlang packages
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
3.1.3 查看RabbitMQ可用版本
[root@mq1:~]# apt-cache madison rabbitmq-server
3.1.4 安裝RabbitMQ, 3.8.18最新版
apt -y install rabbitmq-server
3.1.5 RabbitMQ啟動
systemctl start rabbitmq-server
systemctl enable --now rabbitmq-server
3.1.6 RabbitMQ監(jiān)聽端口
5672: 消費者訪問的端口
15672: web管理端口
25672: 集群狀態(tài)通信端口
3.2 RabbitMQ插件管理
3.2.1 開啟web界面管理插件
rabbitmq-plugins enable rabbitmq_management
3.2.2 登錄web管理界面
注意: rabbitmq從3.3.0開始, 禁止使用guest/guest權(quán)限通過除localhost外的訪問, 直接訪問報錯如下

創(chuàng)建允許遠程登錄用戶, 創(chuàng)建后開發(fā)測試人員即可用這個賬號在代碼中遠程連接RabbitMQ
# create a user
rabbitmqctl add_user admin admin
# tag the user with "administrator" for full management UI and HTTP API access
rabbitmqctl set_user_tags admin administrator
再次登錄

給admin管理員添加管理vhost權(quán)限



3.2.3 RabbitMQ生產(chǎn)者和消費者測試
生成者測試
利用Python腳本批量寫入消息
#coding:utf-8
import pika
#RabbitMQ用戶名密碼
cert = pika.PlainCredentials("admin","admin")
#連接到服務器
conn = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.239",5672,"/",cert))
#創(chuàng)建頻道
channel = conn.channel()
#聲明消息隊列,如果不存在就創(chuàng)建, 存在就將消息在此隊列中創(chuàng)建, 如果將消息發(fā)送到不存在的隊列, 則RabbitMQ會自動清除這些消息
channel.queue_declare(queue="hello")
#exchange告訴消息去往的隊列, routing key是隊列名, body是要傳遞的消息內(nèi)容
for i in range(100000): #通過循環(huán)寫入10萬條消息
# num = "%s" % i
channel.basic_publish(exchange="",
routing_key="hello",
body="hello world %d!" % i)
print("消息%s寫入成功!" % i)
#消息寫入完成,關閉連接
conn.close()
消費者測試
利用Python腳本批量消費信息
#coding:utf-8
import pika
#RabbitMQ用戶名密碼
cert = pika.PlainCredentials("admin","admin")
#連接到服務器
conn = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.239",5672,"/",cert))
#創(chuàng)建頻道
channel = conn.channel()
#聲明消息隊列,如果不存在就創(chuàng)建
channel.queue_declare(queue="hello")
#定義一個回調(diào)函數(shù), 將信息打印出來
def callback(ch,method,properties,body):
print("[x] Received %r" % body)
channel.basic_consume("hello",callback,
auto_ack=False,
exclusive=False,
consumer_tag=None,
arguments=None)
print(" [*] Waiting for messages. To exit press CTRL+C")
#開始接收消息, 并進入阻塞狀態(tài), 隊列里有信息才會調(diào)用callback進行處理, 按ctrl+c退出
channel.start_consuming()
還原虛擬機, 準備集群搭建
如果處理消息剩余數(shù)量過多?
由于下游服務器性能問題, 有時會導致有很多剩余數(shù)量積攢, 無法被消費, 此時需要添加下游節(jié)點,消費者服務器, 連接到消息隊列, 把消息盡快消費掉, 否則用戶訪問等待時間會過長
3.3 RabbitMQ 集群部署
RabbitMQ集群分為二種方式:
- 普通模式: 創(chuàng)建好RabbitMQ集群之后的默認模式, 數(shù)據(jù)不會主動拷貝到其他節(jié)點, 當消費者訪問的服務器沒有需要的數(shù)據(jù)時, 當前服務器會從存有需要的數(shù)據(jù)的節(jié)點拉取數(shù)據(jù), 之后返回給用戶. 此種方式涉及數(shù)據(jù)的臨時拷貝, 會影響性能. 此外, 如果存放數(shù)據(jù)的節(jié)點宕機, 數(shù)據(jù)就會無法訪問
- 鏡像模式: 把需要的隊列做成鏡像隊列, 消息會在每個服務器之間復制. 和MySQL主從方式一樣. 消息寫到某個mq服務器后, 會立即同步到其他服務器
普通集群模式:
queue 創(chuàng)建之后,如果沒有其它 policy,消息實體只存在于其中一個節(jié)點
A、B 兩個 RabbitMQ 節(jié)點僅有相同的元數(shù)據(jù),即隊列結(jié)構(gòu),但隊列的數(shù)據(jù)僅保存有一份
即創(chuàng)建該隊列的 rabbitmq 節(jié)點(A 節(jié)點),當消息進入 A 節(jié)點的 Queue 中后,如果consumer 從 B 節(jié)點拉取RabbitMQ 會臨時在 A、B 間進行
消息傳輸,把 A 中的消息實體取出并經(jīng)過 B 發(fā)送給 consumer,所以 consumer 可以連接每一個節(jié)點,從中取消息
該模式存在一個問題就是當 A 節(jié)點故障后,B節(jié)點無法取到 A 節(jié)點中還未消費的消息實體。
鏡像集群模式:
把需要的隊列做成鏡像隊列,存在于多個節(jié)點,屬于 RabbitMQ 的 HA 方案(鏡像模式是在普通模式的基礎上,增加一些鏡像策略)
該模式解決了普通模式中的數(shù)據(jù)丟失問題,其實質(zhì)和普通模式不同之處在于,消息實體會主動在鏡像節(jié)點間同步
而不是在 consumer 取數(shù)據(jù)時臨時拉取,該模式帶來的副作用也很明顯,除了降低系統(tǒng)性能外,如果鏡像隊列數(shù)量過多,加之
大量的消息進入,集群內(nèi)部的網(wǎng)絡帶寬將會被這種同步通訊大大消耗掉,所以在對可靠性要求較高的場合中適用,一個隊列想做成鏡像隊列,需要先設置 policy,
然后客戶端創(chuàng)建隊列的時候,rabbitmq 集群根據(jù)“隊列名稱”自動設置是普通集群模式或鏡像隊列。
RabbitMQ集群兩種節(jié)點類型:
內(nèi)存節(jié)點: 只將數(shù)據(jù)保存到內(nèi)存
磁盤節(jié)點: 保存數(shù)據(jù)到內(nèi)存和磁盤
內(nèi)存節(jié)點雖然不寫入磁盤, 但是它執(zhí)行比磁盤節(jié)點要好, 集群中, 只需要一個磁盤節(jié)點來保存數(shù)據(jù)就足夠了, 如果集群中只有內(nèi)存節(jié)點, 那么不能全部停止它們, 否則所有數(shù)據(jù)消息在服務器全部停機之后都會丟失. 需要分批次停止服務器, 比如, 先重啟一個服務器, 重啟后該服務器上的數(shù)據(jù)丟失, 但是服務啟動后會從另一個服務器將數(shù)據(jù)拷貝到本機, 然后再重啟另一個服務器, 重啟后, 數(shù)據(jù)也會被拷貝到第二個服務器, 這樣實現(xiàn)重啟不丟失數(shù)據(jù)
推薦設計架構(gòu):
在一個RabbitMQ集群里, 有三臺或以上機器, 其中一臺使用磁盤模式, 其他節(jié)點使用內(nèi)存模式, 內(nèi)存節(jié)點訪問速度更快, 由于磁盤IO相對較慢, 磁盤節(jié)點可作為數(shù)據(jù)備份使用.
Ubuntu1804部署集群版RabbitMQ
實驗環(huán)境
10.0.0.239 mq1.node
10.0.0.229 mq2.node
10.0.0.219 mq3.node
mq1和mq2作為內(nèi)存節(jié)點, 添加到mq3, 以mq3作為集群名稱
3.3.1 三臺服務器分別配置主機名解析
RabbitMQ只會識別主機名, 不會識別域名
注意: RabbitMQ集群的主機名不能隨意修改, 否則會導致無法解析, RabbitMQ無法啟動
vim /etc/hosts
10.0.0.239 mq1 mq1.node
10.0.0.229 mq2 mq2.node
10.0.0.219 mq3 mq3.node
3.3.2 各服務器安裝RabbitMQ-3.8.18
參考單機部署
安裝rabbitmq-server最新版
#!/bin/sh
sudo apt-get install curl gnupg debian-keyring debian-archive-keyring apt-transport-https -y
## Team RabbitMQ's main signing key
sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"
## Launchpad PPA that provides modern Erlang releases
sudo apt-key adv --keyserver "keyserver.ubuntu.com" --recv-keys "F77F1EDA57EBB1CC"
## PackageCloud RabbitMQ repository
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | sudo apt-key add -
## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
## Provides RabbitMQ
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF
## Update package indices
sudo apt-get update -y
## Install Erlang packages
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
apt install rabbitmq-server -y
3.3.3 啟動rabbitmq-server
systemctl start rabbitmq-server
systemctl enable --now rabbitmq-server
3.3.4 同步.erlang.cookie文件
RabbitMQ的集群是依賴于Erlang的集群來工作的, 所以必須先構(gòu)建起Erlang的集群環(huán)境, 而Erlang的集群中,各節(jié)點是通過一個magic cookie來實現(xiàn)的, 如果是apt安裝的RabbitMQ, 這個cookie存放在/var/lib/rabbitmq/.erlang.cookie中, 文件是400權(quán)限, 所以必須保證各節(jié)點cookie保持一致, 否則節(jié)點之間就無法通信
cookie文件會在安裝RabbitMQ是自動創(chuàng)建, 但是cookie內(nèi)容是隨機的, 因此要確保集群之間一致, 可以關閉所有RabbitMQ服務后, 將cookie文件從一個節(jié)點, 拷貝到其余節(jié)點, 實現(xiàn)統(tǒng)一
root@mq1:~# cat /var/lib/rabbitmq/.erlang.cookie
AVWTQKJALSKIEEILTHBS
root@mq2:~# cat /var/lib/rabbitmq/.erlang.cookie
TLJEYYIRVHOGQKJPLNSN
root@mq3:~# cat /var/lib/rabbitmq/.erlang.cookie
YFPNURCLZZGXBEFPTSJH
各服務器關閉RabbitMQ
systemctl stop rabbitmq-server
將cookie文件, 從mq1-10.0.0.239,同步到其他兩個節(jié)點
scp /var/lib/rabbitmq/.erlang.cookie 10.0.0.229:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie 10.0.0.219:/var/lib/rabbitmq/.erlang.cookie
各服務器啟動RabbitMQ
systemctl start rabbitmq-server
3.3.5 查看當前集群狀態(tài)
在未創(chuàng)建集群時, 每個mq都是單機的狀態(tài), 而且默認都是磁盤節(jié)點
rabbitmqctl cluster_status
Disk Nodes
rabbit@mq1
Disk Nodes
rabbit@mq2
Disk Nodes
rabbit@mq3
3.3.6 創(chuàng)建RabbitMQ集群
- 集群可以在任意mq節(jié)點創(chuàng)建, 之后將其余節(jié)點添加到本集群
- 創(chuàng)建集群前, 每個節(jié)點最好是空的環(huán)境
- 創(chuàng)建集群前, 要停止節(jié)點的app服務, 清空元數(shù)據(jù)
實例: 將mq1, mq2 添加到mq3節(jié)點作為內(nèi)存節(jié)點, 停止mq1和mq2上的app服務, 清空元數(shù)據(jù), 讓數(shù)據(jù)從mq3進行同步, mq3作為磁盤節(jié)點
將mq1作為內(nèi)存節(jié)點, 添加到mq3
[root@mq1:~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@mq1 ...
[root@mq1:~]# rabbitmqctl reset # 清空當前服務器元數(shù)據(jù)
Resetting node rabbit@mq1 ...
[root@mq1:~]# rabbitmqctl join_cluster rabbit@mq3 --ram #將mq1添加到集群當中, 并成為內(nèi)存節(jié)點, 不加--ram默認是磁盤節(jié)點, rabbit是協(xié)議, mq3是主機名, 不包含域名
Clustering node rabbit@mq1 with rabbit@mq3
[root@mq1:~]# rabbitmqctl start_app
Starting node rabbit@mq1 ...
將mq2作為內(nèi)存節(jié)點, 添加到mq3
[root@mq2:~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@mq2 ...
[root@mq2:~]# rabbitmqctl reset
Resetting node rabbit@mq2 ...
[root@mq2:~]# rabbitmqctl join_cluster rabbit@mq3 --ram #將mq2添加到集群當中, 并成為內(nèi)存節(jié)點, 不加--ram默認是磁盤節(jié)點
Clustering node rabbit@mq2 with rabbit@mq3
[root@mq2:~]# rabbitmqctl start_app
Starting node rabbit@mq2 ...
3.3.7 驗證集群狀態(tài)
- 創(chuàng)建集群后, 每臺節(jié)點上的集群狀態(tài)都是相同的
- 默認創(chuàng)建的集群是普通模式, 數(shù)據(jù)不會在各個節(jié)點之間同步, 而是當有需要時, 從有數(shù)據(jù)的節(jié)點拉取
root@mq1:~# rabbitmqctl cluster_status
Cluster status of node rabbit@mq1 ...
Basics
Cluster name: rabbit@mq1
Disk Nodes
rabbit@mq3
RAM Nodes
rabbit@mq1
rabbit@mq2
Running Nodes
rabbit@mq1
rabbit@mq2
rabbit@mq3
Versions
rabbit@mq1: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq2: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq3: RabbitMQ 3.8.18 on Erlang 24.0.2
Maintenance status
Node: rabbit@mq1, status: not under maintenance
Node: rabbit@mq2, status: not under maintenance
Node: rabbit@mq3, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Feature flags
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
root@mq2:~# rabbitmqctl cluster_status
Cluster status of node rabbit@mq2 ...
Basics
Cluster name: rabbit@mq2
Disk Nodes
rabbit@mq3
RAM Nodes
rabbit@mq1
rabbit@mq2
Running Nodes
rabbit@mq1
rabbit@mq2
rabbit@mq3
Versions
rabbit@mq1: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq2: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq3: RabbitMQ 3.8.18 on Erlang 24.0.2
Maintenance status
Node: rabbit@mq1, status: not under maintenance
Node: rabbit@mq2, status: not under maintenance
Node: rabbit@mq3, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Feature flags
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
root@mq3:~# rabbitmqctl cluster_status
Cluster status of node rabbit@mq3 ...
Basics
Cluster name: rabbit@mq3
Disk Nodes
rabbit@mq3
RAM Nodes
rabbit@mq1
rabbit@mq2
Running Nodes
rabbit@mq1
rabbit@mq2
rabbit@mq3
Versions
rabbit@mq1: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq2: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq3: RabbitMQ 3.8.18 on Erlang 24.0.2
Maintenance status
Node: rabbit@mq1, status: not under maintenance
Node: rabbit@mq2, status: not under maintenance
Node: rabbit@mq3, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Feature flags
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
3.3.8 將集群設置為鏡像模式
只需在任意節(jié)點執(zhí)行以下命令
[root@mq1:~]# rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}'
Setting policy "ha-all" for pattern "#" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
- 設置為鏡像模式后, 在任意節(jié)點寫入數(shù)據(jù), 都會主動同步到其余的節(jié)點
- 為了提高效率和可用性, 可以在ram節(jié)點前加負載均衡, 讓用戶的請求發(fā)往ram節(jié)點, disk節(jié)點只用來保存數(shù)據(jù)
3.3.9 通過web界面驗證集群狀態(tài)
不啟動web插件的rabbitmq服務器, 會在web節(jié)點提示節(jié)點統(tǒng)計信息不可用
先在mq1啟動插件
[root@mq1:~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@mq1:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq1...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.

創(chuàng)建管理用戶
- 只需要在一個節(jié)點創(chuàng)建即可, 創(chuàng)建后會同步到其余節(jié)點
[root@mq1:~]# rabbitmqctl add_user admin admin
Adding user "admin" ...
[root@mq1:~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
在其余節(jié)點啟用web管理插件
[root@mq2:~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@mq2:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq2...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
root@mq3:~# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@mq3:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq3...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.

測試在其余兩個節(jié)點, 可以使用admin賬戶登錄


3.3.10 集群測試
生產(chǎn)者在mq3寫入數(shù)據(jù), 消費者分別從mq1和mq2消費數(shù)據(jù)
- 在10.0.0.219寫入數(shù)據(jù), 在10.0.0.229查看

- 在10.0.0.239和10.0.0.229同時消費數(shù)據(jù)


3.4 RabbitMQ常用命令
創(chuàng)建vhost
[root@mq1:~]# rabbitmqctl add_vhost rabbit
Adding vhost "rabbit" ...
列出所有vhosts
[root@mq1:~]# rabbitmqctl list_vhosts
Listing vhosts ...
name
admin_host
rabbit
/
列出所有隊列
[root@mq1:~]# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
刪除指定vhost
[root@mq1:~]# rabbitmqctl delete_vhost admin_host
Deleting vhost "admin_host" ...
添加rabbitmq賬戶, 設置其密碼為123456
[root@mq1:~]# rabbitmqctl add_user rabbitmq 123456
Adding user "rabbitmq" ...
更改rabbitmq用戶密碼為111111
[root@mq1:~]# rabbitmqctl change_password rabbitmq 111111
Changing password for user "rabbitmq" ...
設置rabbitmq用戶對rabbit的vhost有讀寫權(quán)限, 配置正則, 讀正則和寫正則
[root@mq1:~]# rabbitmqctl set_permissions -p rabbit rabbitmq ".*" ".*" ".*"
Setting permissions for user "rabbitmq" in vhost "rabbit" ...
3.5 RabbitMQ集群監(jiān)控
3.5.1 集群狀態(tài)監(jiān)控
#!/usr/bin/python3
#coding:utf-8
import subprocess
running_list = []
error_list = []
false = "false"
true = "true"
def get_status():
obj = subprocess.Popen(("curl -s -u guest:guest http://localhost:15672/api/nodes &> /dev/null"), shell=True,stdout=subprocess.PIPE)
data = obj.stdout.read()
data1 = eval(data)
for i in data1:
if i.get("running") == "true":
running_list.append(i.get("name"))
else:
error_list.append(i.get("name"))
def count_server():
if len(running_list) < 3: # 可以判斷錯誤列表大于 0 或者運行列表小于 3,3為總計的節(jié)點數(shù)量
print(100) # 100 就是集群內(nèi)有節(jié)點運行不正常了
else:
print(50) # 50 為所有節(jié)點全部運行正常
def main():
get_status()
count_server()
if __name__ == "__main__":
main()
3.5.2 內(nèi)存使用監(jiān)控
vim rabbitmq_memory.py
#!/usr/bin/python3
#coding:utf-8
import subprocess
import sys
running_list = []
error_list = []
false = "false"
true = "true"
def get_status():
obj = subprocess.Popen(("curl -s -u guest:guest http://localhost:15672/api/nodes &> /dev/null"), shell=True,stdout=subprocess.PIPE)
data = obj.stdout.read()
data1 = eval(data)
#print(data1)
for i in data1:
if i.get("name") == sys.argv[1]:
print(i.get("mem_used"))
def main():
get_status()
if __name__ == "__main__":
main()
[root@mq2:~]# python3 rabbitmq_memory.py rabbit@mq1
87326720
[root@mq2:~]# python3 rabbitmq_memory.py rabbit@mq2
86876160
[root@mq2:~]# python3 rabbitmq_memory.py rabbit@mq3
89997312
3.5.3 RabbitMQ API
https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_9/priv/www/api/index.html
API使用舉例
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/nodes
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/overview
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/cluster-name
{"name":"rabbit@mq3"}
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/healthchecks/node
{"status":"ok"} #如果節(jié)點正常, 會返回status:ok
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.229:15672/api/healthchecks/node #如果節(jié)點異常, 不會返回數(shù)據(jù), 可以通過返回值, 利用zabbix進行監(jiān)控
root@mq_src:~# apt -y install autoconf make gcc socat ncurses-dev