消息隊(duì)列之 RabbitMQ

關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)候把這塊的知識(shí)整理記錄一下了。

市面上的消息隊(duì)列產(chǎn)品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,去年底阿里巴巴捐贈(zèng)給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 數(shù)據(jù)庫(kù)也支持 MQ 功能??傊@塊知名的產(chǎn)品就有十幾種,就我自己的使用經(jīng)驗(yàn)和興趣只打算談?wù)?RabbitMQ、Kafka 和 ActiveMQ ,本文先講 RabbitMQ ,在此之前先看下消息隊(duì)列的相關(guān)概念。

什么叫消息隊(duì)列

消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)雜,可能包含嵌入對(duì)象。

消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式,消息發(fā)送后可以立即返回,由消息系統(tǒng)來確保消息的可靠傳遞。消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的。這樣發(fā)布者和使用者都不用知道對(duì)方的存在。

為何用消息隊(duì)列

從上面的描述中可以看出消息隊(duì)列是一種應(yīng)用間的異步協(xié)作機(jī)制,那什么時(shí)候需要使用 MQ 呢?

以常見的訂單系統(tǒng)為例,用戶點(diǎn)擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:扣減庫(kù)存、生成相應(yīng)單據(jù)、發(fā)紅包、發(fā)短信通知。在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行,隨著業(yè)務(wù)的發(fā)展訂單量增長(zhǎng),需要提升系統(tǒng)服務(wù)的性能,這時(shí)可以將一些不需要立即生效的操作拆分出來異步執(zhí)行,比如發(fā)放紅包、發(fā)短信通知等。這種場(chǎng)景下就可以用 MQ ,在下單的主流程(比如扣減庫(kù)存、生成相應(yīng)單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨(dú)線程拉取MQ的消息(或者由 MQ 推送消息),當(dāng)發(fā)現(xiàn) MQ 中有發(fā)紅包或發(fā)短信之類的消息時(shí),執(zhí)行相應(yīng)的業(yè)務(wù)邏輯。

以上是用于業(yè)務(wù)解耦的情況,其它常見場(chǎng)景包括最終一致性、廣播、錯(cuò)峰流控等等。

RabbitMQ 特點(diǎn)

RabbitMQ 是一個(gè)由 Erlang 語言開發(fā)的 AMQP 的開源實(shí)現(xiàn)。

AMQP :Advanced Message Queue,高級(jí)消息隊(duì)列協(xié)議。它是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。

RabbitMQ 最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。具體特點(diǎn)包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些機(jī)制來保證可靠性,如持久化、傳輸確認(rèn)、發(fā)布確認(rèn)。

  2. 靈活的路由(Flexible Routing)
    在消息進(jìn)入隊(duì)列之前,通過 Exchange 來路由消息的。對(duì)于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實(shí)現(xiàn)。針對(duì)更復(fù)雜的路由功能,可以將多個(gè) Exchange 綁定在一起,也通過插件機(jī)制實(shí)現(xiàn)自己的 Exchange 。

  3. 消息集群(Clustering)
    多個(gè) RabbitMQ 服務(wù)器可以組成一個(gè)集群,形成一個(gè)邏輯 Broker 。

  4. 高可用(Highly Available Queues)
    隊(duì)列可以在集群中的機(jī)器上進(jìn)行鏡像,使得在部分節(jié)點(diǎn)出問題的情況下隊(duì)列仍然可用。

  5. 多種協(xié)議(Multi-protocol)
    RabbitMQ 支持多種消息隊(duì)列協(xié)議,比如 STOMP、MQTT 等等。

  6. 多語言客戶端(Many Clients)
    RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)
    RabbitMQ 提供了一個(gè)易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面。

  8. 跟蹤機(jī)制(Tracing)
    如果消息異常,RabbitMQ 提供了消息跟蹤機(jī)制,使用者可以找出發(fā)生了什么。

  9. 插件機(jī)制(Plugin System)
    RabbitMQ 提供了許多插件,來從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件。

RabbitMQ 中的概念模型

消息模型

所有 MQ 產(chǎn)品從模型抽象上來說都是一樣的過程:
消費(fèi)者(consumer)訂閱某個(gè)隊(duì)列。生產(chǎn)者(producer)創(chuàng)建消息,然后發(fā)布到隊(duì)列(queue)中,最后將消息發(fā)送到監(jiān)聽的消費(fèi)者。

消息流
RabbitMQ 基本概念

上面只是最簡(jiǎn)單抽象的描述,具體到 RabbitMQ 則有更詳細(xì)的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協(xié)議的一個(gè)開源實(shí)現(xiàn),所以其內(nèi)部實(shí)際上也是 AMQP 中的基本概念:

RabbitMQ 內(nèi)部結(jié)構(gòu)
  1. Message
    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
  2. Publisher
    消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
  3. Exchange
    交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
  4. Binding
    綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
  5. Queue
    消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
  6. Connection
    網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。
  7. Channel
    信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷?duì)于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
  8. Consumer
    消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
  9. Virtual Host
    虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。
  10. Broker
    表示消息隊(duì)列服務(wù)器實(shí)體。
AMQP 中的消息路由

AMQP 中消息的路由過程和 Java 開發(fā)者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產(chǎn)者把消息發(fā)布到 Exchange 上,消息最終到達(dá)隊(duì)列并被消費(fèi)者接收,而 Binding 決定交換器的消息應(yīng)該發(fā)送到那個(gè)隊(duì)列。

AMQP 的消息路由過程
Exchange 類型

Exchange分發(fā)消息時(shí)根據(jù)類型的不同分發(fā)策略有區(qū)別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

  1. direct


    direct 交換器

    消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發(fā)到對(duì)應(yīng)的隊(duì)列中。路由鍵與隊(duì)列名完全匹配,如果一個(gè)隊(duì)列綁定到交換機(jī)要求路由鍵為“dog”,則只轉(zhuǎn)發(fā) routing key 標(biāo)記為“dog”的消息,不會(huì)轉(zhuǎn)發(fā)“dog.puppy”,也不會(huì)轉(zhuǎn)發(fā)“dog.guard”等等。它是完全匹配、單播的模式。

  2. fanout


    fanout 交換器

    每個(gè)發(fā)到 fanout 類型交換器的消息都會(huì)分到所有綁定的隊(duì)列上去。fanout 交換器不處理路由鍵,只是簡(jiǎn)單的將隊(duì)列綁定到交換器上,每個(gè)發(fā)送到交換器的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換器綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。fanout 類型轉(zhuǎn)發(fā)消息是最快的。

  3. topic
    topic 交換器

    topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個(gè)模式進(jìn)行匹配,此時(shí)隊(duì)列需要綁定到一個(gè)模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點(diǎn)隔開。它同樣也會(huì)識(shí)別兩個(gè)通配符:符號(hào)“#”和符號(hào)“”。#匹配0個(gè)或多個(gè)單詞,匹配不多不少一個(gè)單詞。

RabbitMQ 安裝

一般來說安裝 RabbitMQ 之前要安裝 Erlang ,可以去Erlang官網(wǎng)下載。接著去RabbitMQ官網(wǎng)下載安裝包,之后解壓縮即可。根據(jù)操作系統(tǒng)不同官網(wǎng)提供了相應(yīng)的安裝說明:WindowsDebian / Ubuntu、RPM-based Linux、Mac

如果是Mac 用戶,個(gè)人推薦使用 HomeBrew 來安裝,安裝前要先更新 brew:

brew update

接著安裝 rabbitmq 服務(wù)器:

brew install rabbitmq

這樣 RabbitMQ 就安裝好了,安裝過程中會(huì)自動(dòng)其所依賴的 Erlang 。

RabbitMQ 運(yùn)行和管理

  1. 啟動(dòng)
    啟動(dòng)很簡(jiǎn)單,找到安裝后的 RabbitMQ 所在目錄下的 sbin 目錄,可以看到該目錄下有6個(gè)以 rabbitmq 開頭的可執(zhí)行文件,直接執(zhí)行 rabbitmq-server 即可,下面將 RabbitMQ 的安裝位置以 . 代替,啟動(dòng)命令就是:
./sbin/rabbitmq-server

啟動(dòng)正常的話會(huì)看到一些啟動(dòng)過程信息和最后的 completed with 7 plugins,這也說明啟動(dòng)的時(shí)候默認(rèn)加載了7個(gè)插件。


正常啟動(dòng)
  1. 后臺(tái)啟動(dòng)
    如果想讓 RabbitMQ 以守護(hù)程序的方式在后臺(tái)運(yùn)行,可以在啟動(dòng)的時(shí)候加上 -detached 參數(shù):
./sbin/rabbitmq-server -detached
  1. 查詢服務(wù)器狀態(tài)
    sbin 目錄下有個(gè)特別重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的幾乎一站式解決方案,絕大部分的運(yùn)維命令它都可以提供。
    查詢 RabbitMQ 服務(wù)器的狀態(tài)信息可以用參數(shù) status :
./sbin/rabbitmqctl status

該命令將輸出服務(wù)器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名稱、內(nèi)存等等

  1. 關(guān)閉 RabbitMQ 節(jié)點(diǎn)
    我們知道 RabbitMQ 是用 Erlang 語言寫的,在Erlang 中有兩個(gè)概念:節(jié)點(diǎn)和應(yīng)用程序。節(jié)點(diǎn)就是 Erlang 虛擬機(jī)的每個(gè)實(shí)例,而多個(gè) Erlang 應(yīng)用程序可以運(yùn)行在同一個(gè)節(jié)點(diǎn)之上。節(jié)點(diǎn)之間可以進(jìn)行本地通信(不管他們是不是運(yùn)行在同一臺(tái)服務(wù)器之上)。比如一個(gè)運(yùn)行在節(jié)點(diǎn)A上的應(yīng)用程序可以調(diào)用節(jié)點(diǎn)B上應(yīng)用程序的方法,就好像調(diào)用本地函數(shù)一樣。如果應(yīng)用程序由于某些原因奔潰,Erlang 節(jié)點(diǎn)會(huì)自動(dòng)嘗試重啟應(yīng)用程序。
    如果要關(guān)閉整個(gè) RabbitMQ 節(jié)點(diǎn)可以用參數(shù) stop :
./sbin/rabbitmqctl stop

它會(huì)和本地節(jié)點(diǎn)通信并指示其干凈的關(guān)閉,也可以指定關(guān)閉不同的節(jié)點(diǎn),包括遠(yuǎn)程節(jié)點(diǎn),只需要傳入?yún)?shù) -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 

-n node 默認(rèn) node 名稱是 rabbit@server ,如果你的主機(jī)名是 server.example.com ,那么 node 名稱就是 rabbit@server.example.com 。

  1. 關(guān)閉 RabbitMQ 應(yīng)用程序
    如果只想關(guān)閉應(yīng)用程序,同時(shí)保持 Erlang 節(jié)點(diǎn)運(yùn)行則可以用 stop_app:
./sbin/rabbitmqctl stop_app

這個(gè)命令在后面要講的集群模式中將會(huì)很有用。

  1. 啟動(dòng) RabbitMQ 應(yīng)用程序
./sbin/rabbitmqctl start_app
  1. 重置 RabbitMQ 節(jié)點(diǎn)
./sbin/rabbitmqctl reset

該命令將清除所有的隊(duì)列。

  1. 查看已聲明的隊(duì)列
./sbin/rabbitmqctl list_queues
  1. 查看交換器
./sbin/rabbitmqctl list_exchanges

該命令還可以附加參數(shù),比如列出交換器的名稱、類型、是否持久化、是否自動(dòng)刪除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
  1. 查看綁定
./sbin/rabbitmqctl list_bindings

Java 客戶端訪問

RabbitMQ 支持多種語言訪問,以 Java 為例看下一般使用 RabbitMQ 的步驟。

  1. maven工程的pom文件中添加依賴
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>
  1. 消息生產(chǎn)者
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        //設(shè)置 RabbitMQ 地址
        factory.setHost("localhost");
        //建立到代理服務(wù)器到連接
        Connection conn = factory.newConnection();
        //獲得信道
        Channel channel = conn.createChannel();
        //聲明交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";
        //發(fā)布消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}
  1. 消息消費(fèi)者
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        //建立到代理服務(wù)器到連接
        Connection conn = factory.newConnection();
        //獲得信道
        final Channel channel = conn.createChannel();
        //聲明交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //聲明隊(duì)列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        //綁定隊(duì)列,通過鍵 hola 將隊(duì)列和交換器綁定起來
        channel.queueBind(queueName, exchangeName, routingKey);

        while(true) {
            //消費(fèi)消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消費(fèi)的路由鍵:" + routingKey);
                    System.out.println("消費(fèi)的內(nèi)容類型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    //確認(rèn)消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消費(fèi)的消息體內(nèi)容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}
  1. 啟動(dòng) RabbitMQ 服務(wù)器
./sbin/rabbitmq-server
  1. 運(yùn)行 Consumer
    先運(yùn)行 Consumer ,這樣當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候能在消費(fèi)者后端看到消息記錄。
  2. 運(yùn)行 Producer
    接著運(yùn)行 Producer ,發(fā)布一條消息,在 Consumer 的控制臺(tái)能看到接收的消息:


    Consumer 控制臺(tái)

RabbitMQ 集群

RabbitMQ 最優(yōu)秀的功能之一就是內(nèi)建集群,這個(gè)功能設(shè)計(jì)的目的是允許消費(fèi)者和生產(chǎn)者在節(jié)點(diǎn)崩潰的情況下繼續(xù)運(yùn)行,以及通過添加更多的節(jié)點(diǎn)來線性擴(kuò)展消息通信吞吐量。RabbitMQ 內(nèi)部利用 Erlang 提供的分布式通信框架 OTP 來滿足上述需求,使客戶端在失去一個(gè) RabbitMQ 節(jié)點(diǎn)連接的情況下,還是能夠重新連接到集群中的任何其他節(jié)點(diǎn)繼續(xù)生產(chǎn)、消費(fèi)消息。

RabbitMQ 集群中的一些概念

RabbitMQ 會(huì)始終記錄以下四種類型的內(nèi)部元數(shù)據(jù):

  1. 隊(duì)列元數(shù)據(jù)
    包括隊(duì)列名稱和它們的屬性,比如是否可持久化,是否自動(dòng)刪除
  2. 交換器元數(shù)據(jù)
    交換器名稱、類型、屬性
  3. 綁定元數(shù)據(jù)
    內(nèi)部是一張表格記錄如何將消息路由到隊(duì)列
  4. vhost 元數(shù)據(jù)
    為 vhost 內(nèi)部的隊(duì)列、交換器、綁定提供命名空間和安全屬性

在單一節(jié)點(diǎn)中,RabbitMQ 會(huì)將所有這些信息存儲(chǔ)在內(nèi)存中,同時(shí)將標(biāo)記為可持久化的隊(duì)列、交換器、綁定存儲(chǔ)到硬盤上。存到硬盤上可以確保隊(duì)列和交換器在節(jié)點(diǎn)重啟后能夠重建。而在集群模式下同樣也提供兩種選擇:存到硬盤上(獨(dú)立節(jié)點(diǎn)的默認(rèn)設(shè)置),存在內(nèi)存中。

如果在集群中創(chuàng)建隊(duì)列,集群只會(huì)在單個(gè)節(jié)點(diǎn)而不是所有節(jié)點(diǎn)上創(chuàng)建完整的隊(duì)列信息(元數(shù)據(jù)、狀態(tài)、內(nèi)容)。結(jié)果是只有隊(duì)列的所有者節(jié)點(diǎn)知道有關(guān)隊(duì)列的所有信息,因此當(dāng)集群節(jié)點(diǎn)崩潰時(shí),該節(jié)點(diǎn)的隊(duì)列和綁定就消失了,并且任何匹配該隊(duì)列的綁定的新消息也丟失了。還好RabbitMQ 2.6.0之后提供了鏡像隊(duì)列以避免集群節(jié)點(diǎn)故障導(dǎo)致的隊(duì)列內(nèi)容不可用。

RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的數(shù)據(jù)和狀態(tài)都是必須在所有節(jié)點(diǎn)上復(fù)制的,例外就是上面所說的消息隊(duì)列。RabbitMQ 節(jié)點(diǎn)可以動(dòng)態(tài)的加入到集群中。

當(dāng)在集群中聲明隊(duì)列、交換器、綁定的時(shí)候,這些操作會(huì)直到所有集群節(jié)點(diǎn)都成功提交元數(shù)據(jù)變更后才返回。集群中有內(nèi)存節(jié)點(diǎn)和磁盤節(jié)點(diǎn)兩種類型,內(nèi)存節(jié)點(diǎn)雖然不寫入磁盤,但是它的執(zhí)行比磁盤節(jié)點(diǎn)要好。內(nèi)存節(jié)點(diǎn)可以提供出色的性能,磁盤節(jié)點(diǎn)能保障配置信息在節(jié)點(diǎn)重啟后仍然可用,那集群中如何平衡這兩者呢?

RabbitMQ 只要求集群中至少有一個(gè)磁盤節(jié)點(diǎn),所有其他節(jié)點(diǎn)可以是內(nèi)存節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)加入火離開集群時(shí),它們必須要將該變更通知到至少一個(gè)磁盤節(jié)點(diǎn)。如果只有一個(gè)磁盤節(jié)點(diǎn),剛好又是該節(jié)點(diǎn)崩潰了,那么集群可以繼續(xù)路由消息,但不能創(chuàng)建隊(duì)列、創(chuàng)建交換器、創(chuàng)建綁定、添加用戶、更改權(quán)限、添加或刪除集群節(jié)點(diǎn)。換句話說集群中的唯一磁盤節(jié)點(diǎn)崩潰的話,集群仍然可以運(yùn)行,但知道該節(jié)點(diǎn)恢復(fù),否則無法更改任何東西。

RabbitMQ 集群配置和啟動(dòng)

如果是在一臺(tái)機(jī)器上同時(shí)啟動(dòng)多個(gè) RabbitMQ 節(jié)點(diǎn)來組建集群的話,只用上面介紹的方式啟動(dòng)第二、第三個(gè)節(jié)點(diǎn)將會(huì)因?yàn)楣?jié)點(diǎn)名稱和端口沖突導(dǎo)致啟動(dòng)失敗。所以在每次調(diào)用 rabbitmq-server 命令前,設(shè)置環(huán)境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定唯一的節(jié)點(diǎn)名稱和端口。下面的例子端口號(hào)從5672開始,每個(gè)新啟動(dòng)的節(jié)點(diǎn)都加1,節(jié)點(diǎn)也分別命名為test_rabbit_1、test_rabbit_2、test_rabbit_3。

啟動(dòng)第1個(gè)節(jié)點(diǎn):

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached

啟動(dòng)第2個(gè)節(jié)點(diǎn):

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

啟動(dòng)第2個(gè)節(jié)點(diǎn)前建議將 RabbitMQ 默認(rèn)激活的插件關(guān)掉,否則會(huì)存在使用了某個(gè)插件的端口號(hào)沖突,導(dǎo)致節(jié)點(diǎn)啟動(dòng)不成功。

現(xiàn)在第2個(gè)節(jié)點(diǎn)和第1個(gè)節(jié)點(diǎn)都是獨(dú)立節(jié)點(diǎn),它們并不知道其他節(jié)點(diǎn)的存在。集群中除第一個(gè)節(jié)點(diǎn)外后加入的節(jié)點(diǎn)需要獲取集群中的元數(shù)據(jù),所以要先停止 Erlang 節(jié)點(diǎn)上運(yùn)行的 RabbitMQ 應(yīng)用程序,并重置該節(jié)點(diǎn)元數(shù)據(jù),再加入并且獲取集群的元數(shù)據(jù),最后重新啟動(dòng) RabbitMQ 應(yīng)用程序。

停止第2個(gè)節(jié)點(diǎn)的應(yīng)用程序:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app

重置第2個(gè)節(jié)點(diǎn)元數(shù)據(jù):

./sbin/rabbitmqctl -n test_rabbit_2 reset

第2節(jié)點(diǎn)加入第1個(gè)節(jié)點(diǎn)組成的集群:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost

啟動(dòng)第2個(gè)節(jié)點(diǎn)的應(yīng)用程序

./sbin/rabbitmqctl -n test_rabbit_2 start_app

第3個(gè)節(jié)點(diǎn)的配置過程和第2個(gè)節(jié)點(diǎn)類似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached

./sbin/rabbitmqctl -n test_rabbit_3 stop_app

./sbin/rabbitmqctl -n test_rabbit_3 reset

./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost

./sbin/rabbitmqctl -n test_rabbit_3 start_app
RabbitMQ 集群運(yùn)維

停止某個(gè)指定的節(jié)點(diǎn),比如停止第2個(gè)節(jié)點(diǎn):

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop

查看節(jié)點(diǎn)3的集群狀態(tài):

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容