rabbitmq指南

rabbitmq 簡介

RabbitMQ 是一個用 erlang 開發(fā)的 AMQP(Advanced Message Queue)的開源實現(xiàn),AMQP(高級消息隊列協(xié)議)是一個網(wǎng)絡(luò)協(xié)議。它支持符合要求的客戶端應(yīng)用和消息中間代理之間進行通訊。

AMQP 模型簡介

image

消息(message)被發(fā)布者(publisher)發(fā)送給交換機(exchange),交換機常常被比喻成郵局或者郵箱。然后交換機將收到的消息根據(jù)路由規(guī)則分發(fā)給綁定的隊列(queue)。最后AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。

AMQP 是一個可編的程協(xié)議

AMQP 是一個可編程協(xié)議,某種意義上說 AMQP 的實體和路由規(guī)則是由應(yīng)用本身定義的,而不是由消息代理定義。包括像聲明隊列和交換機,定義他們之間的綁定,訂閱隊列等等關(guān)于協(xié)議本身的操作。

這雖然能讓開發(fā)人員自由發(fā)揮,但也需要他們注意潛在的定義沖突。當然這在實踐中很少會發(fā)生,如果發(fā)生,會以配置錯誤(misconfiguration)的形式表現(xiàn)出來。

應(yīng)用程序(Applications)聲明AMQP實體,定義需要的路由方案,或者刪除不再需要的AMQP實體。

Exchange

交換機是用來發(fā)送消息的AMQP實體。交換機拿到一個消息之后將它路由給一個或零個隊列。它使用哪種路由算法是由交換機類型和被稱作綁定(bindings)的規(guī)則所決定的。AMQP 的代理提供了四種交換機。

Name(交換機類型) Default pre-declared names(預(yù)聲明的默認名稱)
Direct exchange(直連交換機) (Empty string) and amq.direct
Fanout exchange(扇型交換機) amq.fanout
Topic exchange(主題交換機) amq.topic
Headers exchange(頭交換機) amq.match (and amq.headers in RabbitMQ)

下列示例代碼可參考官方文檔:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

默認 exchanage

exchanage 可以使用空字符串代替,消息會根據(jù)指定的 routing_key 分發(fā)到與 routing_key 同名的隊列。

image

消息發(fā)送端 python 代碼:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "Hello World!"

count = 1

while True:
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=str(count) + message,
                          properties=pika.BasicProperties(
                              delivery_mode=2
                          ))

    count += 1
    print("[x] Send %d %r" % (count, message))
    time.sleep(1)

消息接收端 python 代碼:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for message. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] Worker1 Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(5)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

Direct exchange(直連交換機)

消息發(fā)送到與 exchange 綁定的 queue 上,且 routing_key 必須精確匹配。

image

發(fā)布端 python 代碼:

import pika
import time
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 定義一個 Direct 類型的 exchanage
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 發(fā)送消息給 exchange,如果 queue 不存在消息會丟失
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print ("[x] Sent %r:%r" % (severity, message))
connection.close()

接受端 python 代碼:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

channel.queue_declare(queue='aaaa', durable=True)


severities = sys.argv[1:]
if not severities:
    sys.exit(1)

for severity in severities:
    # exchange 跟 queue 綁定,且 routing_key 必須精確匹配才能接收
    channel.queue_bind(exchange='direct_logs',
                       queue='aaaa',
                       routing_key=severity)

print (' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):

    print (" [x] %r:%r" % (method.routing_key, body,))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue='aaaa')

channel.start_consuming()

Fanout exchange(扇型交換機)

將消息發(fā)送給綁定到 exchange 上的所有 queue。就是發(fā)布/訂閱模式。

image

發(fā)布端 python 代碼:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 定義一個 faout 類型的 exchanage
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

count = 1

message = 'Hello World!'

while True:
    # faout 會忽略 routing_key,所以這里為空
    channel.basic_publish(
        exchange='logs',
        routing_key='',
        body=str(count) + message
    )

    count += 1

    print("[x] Sent %d %r" %(count, message))

    time.sleep(3)

接收端 python 代碼:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# exchange 與 queue 綁定
channel.queue_bind(exchange='logs',
                   queue=queue_name)

def callback(ch, method, properties, body):
    print("[x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Topic exchange(主題交換機)

與 Direct exchange 類似,routing_key 可使用通配符匹配。

image

安裝向?qū)?/h2>

這里直接使用二進制包的方式安裝:https://www.rabbitmq.com/install-generic-unix.html

  1. 系統(tǒng)需要安裝 Erlang 。
  2. 內(nèi)核參數(shù)系統(tǒng) limits 調(diào)整。
    • 設(shè)置打開文件最大數(shù),推薦至少 65536
  3. 下載二進制包 rabbitmq-server-generic-unix-3.7.8.tar.xz
  4. 解壓至 /user/local 并將 sbin 目錄添加至 $PATH 中。
  5. 啟動,默認的數(shù)據(jù)目錄在 ./var 下,sbin/rabbitmq-server 或者 sbin/rabbitmq-server -detached 后臺運行。
  6. 停止,sbin/rabbitmqctl shutdown。
  7. 配置文件 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf。
  8. 開啟 WEB UI rabbitmq-plugins enable rabbitmq_management。

rabbitmq 相關(guān) 文件&目錄 路徑

https://www.rabbitmq.com/relocate.html

你可以通過環(huán)境變量來設(shè)置 rmq 相關(guān)文件或目錄的位置,但是大多數(shù)情況下使用默認的即可。

Deb/RPM 包安裝的情況下(${install_prefix} 為空)

Name Location
RABBITMQ_BASE (Not used - Windows only)
RABBITMQ_CONFIG_FILE ${install_prefix}/etc/rabbitmq/rabbitmq
RABBITMQ_MNESIA_BASE ${install_prefix}/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR RABBITMQ_MNESIA_BASE/RABBITMQ_NODENAME
RABBITMQ_LOG_BASE ${install_prefix}/var/log/rabbitmq
RABBITMQ_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME.log
RABBITMQ_SASL_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR /usr/lib/rabbitmq/plugins:$RABBITMQ_HOME/plugins
RABBITMQ_ENABLED_PLUGINS_FILE ${install_prefix}/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE $RABBITMQ_MNESIA_DIR.pid

二進制包安裝的情況 (${RABBITMQ_HOME} 是指二進制包解壓的目錄)

Name Location
RABBITMQ_BASE (Not used)
RABBITMQ_CONFIG_FILE $RABBITMQ_HOME/etc/rabbitmq/rabbitmq
RABBITMQ_MNESIA_BASE $RABBITMQ_HOME/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR RABBITMQ_MNESIA_BASE/RABBITMQ_NODENAME
RABBITMQ_LOG_BASE $RABBITMQ_HOME/var/log/rabbitmq
RABBITMQ_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME.log
RABBITMQ_SASL_LOGS RABBITMQ_LOG_BASE/RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR $RABBITMQ_HOME/plugins
RABBITMQ_ENABLED_PLUGINS_FILE $RABBITMQ_HOME/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE $RABBITMQ_MNESIA_DIR.pid

備份&恢復(fù)

https://www.rabbitmq.com/backup.html#rabbitmq-definitions

rmq 中有兩種類型的數(shù)據(jù):

  • definitions (metadata, schema/topology)【Users, vhosts, queues, exchanges, bindings, runtime parameters all fall into this category.】
  • message 數(shù)據(jù)

exporting definitions

有兩種方式:

  • rabbitmqadmin export rabbit.definitions.json (# => Exported configuration for localhost to "rabbit.config")
  • GET /api/definitions,需要開啟 rabbitmq_management 插件。

import definitions

有兩種方式:

  • rabbitmqadmin -q import rabbit.definitions.json
  • POST /api/definitions

手動備份

  1. 通過 rabbitmqctl eval 'rabbit_mnesia:dir().' 查出數(shù)據(jù)目錄并備份,如果是備份 message 數(shù)據(jù),則需要將 node 停止,如果集群中隊列是 mirror 則需要將整個集群停止。
  2. (可選)如果 node 的名字改變了需要使用 rabbitmqctl rename_cluster_node <oldnode> <newnode>

手動恢復(fù)

  1. 將上訴備份的目錄拷貝至相應(yīng)的目錄。

集群

https://www.rabbitmq.com/clustering.html

Virtual hosts, exchanges, users, and permissions are automatically mirrored across all nodes in a cluster. Queues may be located on a single node, or mirrored across multiple nodes. A client connecting to any node in a cluster can see all queues in the cluster, even if they are not located on that node.

通常一些分布式系統(tǒng)會有 master 跟 node 節(jié)點,但是 rabbitmq 并不是這樣。在集群中所有節(jié)點都是平等的 (equal peer) 。集群中的節(jié)點使用 /var/lib/rabbitmq/.erlang.cookie 來允許它們彼此之間通訊,改文件權(quán)限必須是 600,

前提

  1. 集群中的各個節(jié)點使用域名通訊,確保各個節(jié)點之間的 hostname 都能夠解析。

  2. 以下端口確?;ネǎ?/p>

    • 4369: epmd, a peer discovery service used by RabbitMQ nodes and CLI tools
    • 5672, 5671: used by AMQP 0-9-1 and 1.0 clients without and with TLS
    • 25672: used for inter-node and CLI tools communication (Erlang distribution server port) and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). Unless external connections on these ports are really necessary (e.g. the cluster uses federation or CLI tools are used on machines outside the subnet), these ports should not be publicly exposed. See networking guide for details.
    • 35672-35682: used by CLI tools (Erlang distribution client ports) for communication with nodes and is allocated from a dynamic range (computed as server distribution port + 10000 through server distribution port + 10010). See networking guide for details.
    • 15672: HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)
    • 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
    • 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
    • 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
    • 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)

手動創(chuàng)建集群

三個節(jié)點名字為 rabbit1 rabbit2 rabbit3

  1. 分別啟動三臺 rabbitmq

    rabbit1$ rabbitmq-server -detached
    rabbit2$ rabbitmq-server -detached
    rabbit3$ rabbitmq-server -detached
    
  2. 將其他兩個節(jié)點(如:rabbit2 rabbit3)加入 rabbit1 組成集群

    # rabbit2 節(jié)點操作
    rabbit2$ rabbitmqctl stop_app
    Stopping node rabbit@rabbit2 ...done.
    
    rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
    Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
    
    rabbit2$ rabbitmqctl start_app
    Starting node rabbit@rabbit2 ...done.
    
    # rabbit3 節(jié)點操作
    rabbit3$ rabbitmqctl stop_app
    Stopping node rabbit@rabbit3 ...done.
    
    rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
    Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
    
    rabbit3$ rabbitmqctl start_app
    Starting node rabbit@rabbit3 ...done.
    
  3. 查看集群狀態(tài)

    rabbit1$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit1 ...
    [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
     {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]
    ...done.
    
  4. 重啟節(jié)點,已加入集群的節(jié)點可以任意重啟,當節(jié)點恢復(fù)后它就會從其他節(jié)點同步數(shù)據(jù)。

    • 當一個節(jié)點重啟時,他會聯(lián)系對等體 10 次,每次超時 30s,如果通訊成功,則啟動成功,并且同步對等體數(shù)據(jù)。
    • 當一個節(jié)點關(guān)閉時沒有其他的對等體了(最后一個關(guān)閉的節(jié)點),它啟動的時候不會作為一個獨立節(jié)點,它將等待對等體加入。
    • 當所有節(jié)點關(guān)閉后,集群也就關(guān)閉了。當再次啟動一個節(jié)點,在指定的時間內(nèi)(默認為 5min, 可以通過下面的配置文件修改),其他節(jié)點再次啟動就會自動加入原來的集群。
      # wait for 60 seconds instead of 30
      mnesia_table_loading_retry_timeout = 60000
      
      # retry 15 times instead of 10
      mnesia_table_loading_retry_limit = 15
      

rabbitmq-peer-discovery-k8s 插件自動創(chuàng)建集群

https://github.com/rabbitmq/rabbitmq-peer-discovery-k8s

可以使用 helm 的 rabbitmq-ha chart 在 kubernetes 集群中快速部署一套 rabbitmq cluster.

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 什么叫消息隊列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,441評論 0 24
  • 本文大綱 RabbitMQ 歷史 RabbitMQ 應(yīng)用場景 RabbitMQ 系統(tǒng)架構(gòu) RabbitMQ 基本概...
    Java_Explorer閱讀 16,805評論 1 40
  • 整體架構(gòu) 部署步驟 基于 Docker 基本概念內(nèi)存節(jié)點只保存狀態(tài)到內(nèi)存,例外情況是:持久的 queue 的內(nèi)容將...
    mvictor閱讀 12,909評論 5 30
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,692評論 19 139
  • RabbitMQ采用Erlang編寫,需安裝語言庫才能運行RabbitMQ代理服務(wù)器。AMQP:高級消息隊列協(xié)議。...
    JAVA覓音閣閱讀 4,018評論 0 7

友情鏈接更多精彩內(nèi)容