rabbitmq 簡介
RabbitMQ 是一個用 erlang 開發(fā)的 AMQP(Advanced Message Queue)的開源實現(xiàn),AMQP(高級消息隊列協(xié)議)是一個網(wǎng)絡(luò)協(xié)議。它支持符合要求的客戶端應(yīng)用和消息中間代理之間進行通訊。
AMQP 模型簡介

消息(message)被發(fā)布者(publisher)發(fā)送給交換機(exchange),交換機常常被比喻成郵局或者郵箱。然后交換機將收到的消息根據(jù)路由規(guī)則分發(fā)給綁定的隊列(queue)。最后AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。
AMQP 是一個可編的程協(xié)議
AMQP 是一個可編程協(xié)議,某種意義上說 AMQP 的實體和路由規(guī)則是由應(yīng)用本身定義的,而不是由消息代理定義。包括像聲明隊列和交換機,定義他們之間的綁定,訂閱隊列等等關(guān)于協(xié)議本身的操作。
這雖然能讓開發(fā)人員自由發(fā)揮,但也需要他們注意潛在的定義沖突。當然這在實踐中很少會發(fā)生,如果發(fā)生,會以配置錯誤(misconfiguration)的形式表現(xiàn)出來。
應(yīng)用程序(Applications)聲明AMQP實體,定義需要的路由方案,或者刪除不再需要的AMQP實體。
Exchange
交換機是用來發(fā)送消息的AMQP實體。交換機拿到一個消息之后將它路由給一個或零個隊列。它使用哪種路由算法是由交換機類型和被稱作綁定(bindings)的規(guī)則所決定的。AMQP 的代理提供了四種交換機。
| Name(交換機類型) | Default pre-declared names(預(yù)聲明的默認名稱) |
|---|---|
| Direct exchange(直連交換機) | (Empty string) and amq.direct |
| Fanout exchange(扇型交換機) | amq.fanout |
| Topic exchange(主題交換機) | amq.topic |
| Headers exchange(頭交換機) | amq.match (and amq.headers in RabbitMQ) |
下列示例代碼可參考官方文檔:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
默認 exchanage
exchanage 可以使用空字符串代替,消息會根據(jù)指定的 routing_key 分發(fā)到與 routing_key 同名的隊列。

消息發(fā)送端 python 代碼:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = "Hello World!"
count = 1
while True:
channel.basic_publish(exchange='',
routing_key='task_queue',
body=str(count) + message,
properties=pika.BasicProperties(
delivery_mode=2
))
count += 1
print("[x] Send %d %r" % (count, message))
time.sleep(1)
消息接收端 python 代碼:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for message. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] Worker1 Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
time.sleep(5)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Direct exchange(直連交換機)
消息發(fā)送到與 exchange 綁定的 queue 上,且 routing_key 必須精確匹配。

發(fā)布端 python 代碼:
import pika
import time
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 定義一個 Direct 類型的 exchanage
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 發(fā)送消息給 exchange,如果 queue 不存在消息會丟失
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print ("[x] Sent %r:%r" % (severity, message))
connection.close()
接受端 python 代碼:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
channel.queue_declare(queue='aaaa', durable=True)
severities = sys.argv[1:]
if not severities:
sys.exit(1)
for severity in severities:
# exchange 跟 queue 綁定,且 routing_key 必須精確匹配才能接收
channel.queue_bind(exchange='direct_logs',
queue='aaaa',
routing_key=severity)
print (' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" [x] %r:%r" % (method.routing_key, body,))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue='aaaa')
channel.start_consuming()
Fanout exchange(扇型交換機)
將消息發(fā)送給綁定到 exchange 上的所有 queue。就是發(fā)布/訂閱模式。

發(fā)布端 python 代碼:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 定義一個 faout 類型的 exchanage
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
count = 1
message = 'Hello World!'
while True:
# faout 會忽略 routing_key,所以這里為空
channel.basic_publish(
exchange='logs',
routing_key='',
body=str(count) + message
)
count += 1
print("[x] Sent %d %r" %(count, message))
time.sleep(3)
接收端 python 代碼:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# exchange 與 queue 綁定
channel.queue_bind(exchange='logs',
queue=queue_name)
def callback(ch, method, properties, body):
print("[x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
Topic exchange(主題交換機)
與 Direct exchange 類似,routing_key 可使用通配符匹配。

安裝向?qū)?/h2>
這里直接使用二進制包的方式安裝:https://www.rabbitmq.com/install-generic-unix.html
- 系統(tǒng)需要安裝 Erlang 。
- 內(nèi)核參數(shù)系統(tǒng) limits 調(diào)整。
- 設(shè)置打開文件最大數(shù),推薦至少 65536
- 下載二進制包 rabbitmq-server-generic-unix-3.7.8.tar.xz
- 解壓至
/user/local 并將 sbin 目錄添加至 $PATH 中。
- 啟動,默認的數(shù)據(jù)目錄在
./var 下,sbin/rabbitmq-server 或者 sbin/rabbitmq-server -detached 后臺運行。
- 停止,
sbin/rabbitmqctl shutdown。
- 配置文件
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf。
- 開啟 WEB UI
rabbitmq-plugins enable rabbitmq_management。
rabbitmq 相關(guān) 文件&目錄 路徑
這里直接使用二進制包的方式安裝:https://www.rabbitmq.com/install-generic-unix.html
- 設(shè)置打開文件最大數(shù),推薦至少 65536
/user/local 并將 sbin 目錄添加至 $PATH 中。./var 下,sbin/rabbitmq-server 或者 sbin/rabbitmq-server -detached 后臺運行。sbin/rabbitmqctl shutdown。$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf。rabbitmq-plugins enable rabbitmq_management。你可以通過環(huán)境變量來設(shè)置 rmq 相關(guān)文件或目錄的位置,但是大多數(shù)情況下使用默認的即可。
Deb/RPM 包安裝的情況下(${install_prefix} 為空)
| Name | Location |
|---|---|
| RABBITMQ_BASE | (Not used - Windows only) |
| RABBITMQ_CONFIG_FILE | ${install_prefix}/etc/rabbitmq/rabbitmq |
| RABBITMQ_MNESIA_BASE | ${install_prefix}/var/lib/rabbitmq/mnesia |
| RABBITMQ_MNESIA_DIR |
|
| RABBITMQ_LOG_BASE | ${install_prefix}/var/log/rabbitmq |
| RABBITMQ_LOGS |
|
| RABBITMQ_SASL_LOGS |
|
| RABBITMQ_PLUGINS_DIR | /usr/lib/rabbitmq/plugins:$RABBITMQ_HOME/plugins |
| RABBITMQ_ENABLED_PLUGINS_FILE | ${install_prefix}/etc/rabbitmq/enabled_plugins |
| RABBITMQ_PID_FILE | $RABBITMQ_MNESIA_DIR.pid |
二進制包安裝的情況 (${RABBITMQ_HOME} 是指二進制包解壓的目錄)
| Name | Location |
|---|---|
| RABBITMQ_BASE | (Not used) |
| RABBITMQ_CONFIG_FILE | $RABBITMQ_HOME/etc/rabbitmq/rabbitmq |
| RABBITMQ_MNESIA_BASE | $RABBITMQ_HOME/var/lib/rabbitmq/mnesia |
| RABBITMQ_MNESIA_DIR |
|
| RABBITMQ_LOG_BASE | $RABBITMQ_HOME/var/log/rabbitmq |
| RABBITMQ_LOGS |
|
| RABBITMQ_SASL_LOGS |
|
| RABBITMQ_PLUGINS_DIR | $RABBITMQ_HOME/plugins |
| RABBITMQ_ENABLED_PLUGINS_FILE | $RABBITMQ_HOME/etc/rabbitmq/enabled_plugins |
| RABBITMQ_PID_FILE | $RABBITMQ_MNESIA_DIR.pid |
備份&恢復(fù)
https://www.rabbitmq.com/backup.html#rabbitmq-definitions
rmq 中有兩種類型的數(shù)據(jù):
- definitions (metadata, schema/topology)【Users, vhosts, queues, exchanges, bindings, runtime parameters all fall into this category.】
- message 數(shù)據(jù)
exporting definitions
有兩種方式:
-
rabbitmqadmin export rabbit.definitions.json(# => Exported configuration for localhost to "rabbit.config") -
GET /api/definitions,需要開啟 rabbitmq_management 插件。
import definitions
有兩種方式:
rabbitmqadmin -q import rabbit.definitions.jsonPOST /api/definitions
手動備份
- 通過
rabbitmqctl eval 'rabbit_mnesia:dir().'查出數(shù)據(jù)目錄并備份,如果是備份 message 數(shù)據(jù),則需要將 node 停止,如果集群中隊列是 mirror 則需要將整個集群停止。 - (可選)如果 node 的名字改變了需要使用
rabbitmqctl rename_cluster_node <oldnode> <newnode>
手動恢復(fù)
- 將上訴備份的目錄拷貝至相應(yīng)的目錄。
集群
https://www.rabbitmq.com/clustering.html
Virtual hosts, exchanges, users, and permissions are automatically mirrored across all nodes in a cluster. Queues may be located on a single node, or mirrored across multiple nodes. A client connecting to any node in a cluster can see all queues in the cluster, even if they are not located on that node.
通常一些分布式系統(tǒng)會有 master 跟 node 節(jié)點,但是 rabbitmq 并不是這樣。在集群中所有節(jié)點都是平等的 (equal peer) 。集群中的節(jié)點使用 /var/lib/rabbitmq/.erlang.cookie 來允許它們彼此之間通訊,改文件權(quán)限必須是 600,
前提
集群中的各個節(jié)點使用域名通訊,確保各個節(jié)點之間的
hostname都能夠解析。-
以下端口確?;ネǎ?/p>
- 4369: epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
- 5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
- 25672: used for inter-node and CLI tools communication (Erlang distribution server port) and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). Unless external connections on these ports are really necessary (e.g. the cluster uses federation or CLI tools are used on machines outside the subnet), these ports should not be publicly exposed. See networking guide for details.
- 35672-35682: used by CLI tools (Erlang distribution client ports) for communication with nodes and is allocated from a dynamic range (computed as server distribution port + 10000 through server distribution port + 10010). See networking guide for details.
- 15672: HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)
- 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
- 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
- 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
- 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)
手動創(chuàng)建集群
三個節(jié)點名字為 rabbit1 rabbit2 rabbit3
-
分別啟動三臺 rabbitmq
rabbit1$ rabbitmq-server -detached rabbit2$ rabbitmq-server -detached rabbit3$ rabbitmq-server -detached -
將其他兩個節(jié)點(如:
rabbit2rabbit3)加入rabbit1組成集群# rabbit2 節(jié)點操作 rabbit2$ rabbitmqctl stop_app Stopping node rabbit@rabbit2 ...done. rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1 Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done. rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done. # rabbit3 節(jié)點操作 rabbit3$ rabbitmqctl stop_app Stopping node rabbit@rabbit3 ...done. rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2 Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done. rabbit3$ rabbitmqctl start_app Starting node rabbit@rabbit3 ...done. -
查看集群狀態(tài)
rabbit1$ rabbitmqctl cluster_status Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done. -
重啟節(jié)點,已加入集群的節(jié)點可以任意重啟,當節(jié)點恢復(fù)后它就會從其他節(jié)點同步數(shù)據(jù)。
- 當一個節(jié)點重啟時,他會聯(lián)系對等體 10 次,每次超時 30s,如果通訊成功,則啟動成功,并且同步對等體數(shù)據(jù)。
- 當一個節(jié)點關(guān)閉時沒有其他的對等體了(最后一個關(guān)閉的節(jié)點),它啟動的時候不會作為一個獨立節(jié)點,它將等待對等體加入。
- 當所有節(jié)點關(guān)閉后,集群也就關(guān)閉了。當再次啟動一個節(jié)點,在指定的時間內(nèi)(默認為 5min, 可以通過下面的配置文件修改),其他節(jié)點再次啟動就會自動加入原來的集群。
# wait for 60 seconds instead of 30 mnesia_table_loading_retry_timeout = 60000 # retry 15 times instead of 10 mnesia_table_loading_retry_limit = 15
rabbitmq-peer-discovery-k8s 插件自動創(chuàng)建集群
https://github.com/rabbitmq/rabbitmq-peer-discovery-k8s
可以使用 helm 的 rabbitmq-ha chart 在 kubernetes 集群中快速部署一套 rabbitmq cluster.