RabbitMQ讀書筆記

1.RabbitMQ的概述與重要概念

RabbitMQ是流行的開源消息隊列系統(tǒng),用erlang語言開發(fā)。RabbitMQ是AMQP(高級消息隊列協(xié)議)的標準實現(xiàn)。
MQ的主要用途包含如下:

  • 異步處理,比如注冊發(fā)送郵件和短信
  • 應用解耦,比如電商系統(tǒng)中,訂單服務和庫存服務解耦
  • 流量削峰,比如秒殺,搶紅包活動

重要的概念可以通過如下圖進行了解:


Rabbit模型.png
  • Message 由消息頭和消息體組成,消息頭由一組可選屬性組成,消息體是不透明的
  • Publisher 消息的生產者,表示一個向Exchange發(fā)布消息的客戶端應用程序
  • Exchange 交換器,接收Publisher發(fā)送的消息并路由給Queue
  • Binding 綁定,關聯(lián)Exchange和Queue
  • Queue 消息隊列,用于保存消息直到發(fā)送給Consumer,Message會一直在隊列里直至被消費者取走
  • Connection TCP連接
  • Channel 信道,是建立在真實的TCP連接里的虛擬連接,對于操作系統(tǒng)來說建立和銷毀一次TCP是非常昂貴的開銷,因為引入信道來復用一條TCP連接
  • Consumer 消費者,表示從一個Queue中取得消息的一個客戶端應用程序
  • Broker RabbitMQ服務器實體
  • Virtual Host 虛擬主機,表示一個mini版的RabbitMQ服務器,擁有自己的Queue、Exchange、Binding和權限機制,默認的vhost是/,必須在連接時指定

MQ對比

MQ對比.png

2.RabbitMQ Window安裝介紹

RabbitMQ安裝依賴Erlang,因此安裝之前需要先安裝Erlang環(huán)境,如下:

otp_win64_22.3.exe Erlang的Window安裝包,安裝包中沒有Erlang關鍵字
rabbitmq-server-3.8.3.exe

Erlang和RabbitMQ有對應的版本關系,請點擊查看官網信息

Erlang和RabbitMQ版本對應關系部分截圖.png

3.插件安裝

軟件安裝完成之后,需要安裝管理界面插件。打開RabbitMQ Command Prompt命令行界面,輸入如下命令:

rabbitmq-plugins enable rabbitmq_management

安裝完成之后,打開網址:http://localhost:15672/
默認賬號為guest,密碼為guest,進入系統(tǒng)之后創(chuàng)建admin賬號,并修改guest密碼

4.RabbitMQ 運維篇

4.1 單機模式(開發(fā)測試環(huán)境推薦)

單機模式參考Window安裝即可(暫不提供Linux版本)。

4.2 普通集群模式

組成集群需要兩步操作,該操作同樣適用于鏡像模式

  1. 該模式下需要保證不同機器之間的erlang cooike一致,可將其中一臺機器的erlang cookie拷貝到其他機器上。
  2. 將節(jié)點加入集群,假如有三個節(jié)點,可在節(jié)點2,3兩臺機器上加入到節(jié)點1,如下:
rabbitmqctl stop_app 
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@node1
rabbitmqctl start_app

查看集群狀態(tài) rabbitmqctl cluster_status

普通集群模式中每個節(jié)點都有相同的元數(shù)據(jù),即相同的隊列結構。但是消息(實際數(shù)據(jù))只存在其中一個節(jié)點上,因此若消費者連接到非數(shù)據(jù)節(jié)點的時候,消息會先傳遞給消費者連接的節(jié)點,再提供給消費者。因此該模式下由兩個重要的特點,1. 若存儲消息節(jié)點宕機了,整個集群不可用,因此此模式并非高可用;2. 節(jié)點之間可能存在大量的數(shù)據(jù)傳遞,占用帶寬高。即使如此,若使用此種模式,客戶端應盡快均勻散布到各個節(jié)點上。

原理圖如下:


RabbitMQ普通集群.png
4.3 鏡像集群模式(生產環(huán)境必須)

在創(chuàng)建普通集群的基礎上,設置策略(policy),該操作可通過web ui設置,如下:


policy設置.png

也可以通過命令設置

// 為每個以“rock.wechat”開頭的隊列設置所有節(jié)點的鏡像,并且設置為自動同步模式
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

該模式是一個HA方案(高可用方案),RabbitMQ是沒有中心的,不會因為一個節(jié)點掛了導致整個集群不可用,解決了普通模式中的問題,與普通模式不同的是,消息會主動在鏡像節(jié)點之間同步,而不是在客戶端獲取數(shù)據(jù)時再拉取數(shù)據(jù)。
因數(shù)據(jù)在不同節(jié)點之間主動同步,因此帶寬要求更高,降低了系統(tǒng)的性能。這種模式適合對消息可靠性要求較高的場合中使用。
原理圖如下:


RabbitMQ鏡像集群.png

5.RabbitMQ 實戰(zhàn)篇

5.1 管理規(guī)范
5.1.1 命名規(guī)范

exchange:以ex開頭,規(guī)則為ex.業(yè)務域.應用名稱.消息類型

ex.businame.appname.msgtype

queue:以q開頭,規(guī)則為q.業(yè)務域.應用名稱.消息類型

q.businame.appname.msgtype
5.1.2 用戶管理規(guī)范
  • 提供給應用使用的用戶類型為none
  • 只授權用戶特定的exchange(寫)和queue(讀)訪問權限,這樣代碼就無法創(chuàng)建交換器和隊列
5.1.3 其他規(guī)范
  • 隊列和交換器由MQ管理員與研發(fā)人員溝通規(guī)則后,統(tǒng)一由MQ管理員進行創(chuàng)建。
  • 代碼中禁止進行創(chuàng)建交換器和隊列的操作(若用戶管理規(guī)范,此操作無法執(zhí)行)。
5.2 環(huán)境準備
  1. maven依賴配置
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>
  1. 新建一個標準的maven結構工程,編寫MQ工具類
public class ConnectionUtil {

    public static Connection get(String username, String pwd) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername(username);
        factory.setPassword(pwd);
        try {
            return factory.newConnection();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

}
5.3 應用實例

abbitMQ常用的Exchange Type有三種

  1. direct 當消息的Routing key與Binding key完全匹配時,將消息路由到Queue中
  2. fanout 將消息廣播到與Exchange綁定的所有Queue,效率最高
  3. topic Binding key使用模式,“#”匹配一個或多個詞,“*”只匹配一個詞,當消息的Routing key模糊匹配該模式才進行路由
5.3.1 Direct模式
public class DirectSend {

    public static final String EXCHANGE_NAME = "exchange-test-direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();
//        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String msg = "哈哈123";
        channel.basicPublish(EXCHANGE_NAME, "delete", null, msg.getBytes("utf-8"));
        System.out.println("[X] send: " + msg);

        channel.close();
        connection.close();
    }

}
public class DirectRec2 {

    //    public static final String EXCHANGE_NAME = "exchange-test-direct";
    public static final String QUEUE_NAME = "queue-test-direct-2";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.basicQos(1);  // 同一時刻服務器只會發(fā)送一條消息給消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y2] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
5.3.2 Fanout模式
public class SubscribeSend {

    public static final String EXCHANGE_NAME = "exchange-test-fanout-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();
//        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String msg = "hello world";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("utf-8"));
        System.out.println("[X] send: " + msg);

        channel.close();
        connection.close();
    }

}
public class SubscribeRec2 {
    public static final String QUEUE_NAME = "queue-test-fanout-02";

    //    public static final String EXCHANGE_NAME = "exchange-test-01";
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y2] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
5.3.3 Topic模式
public class TopicSend {

    private final static String EXCHANGE_NAME = "exchange-test-topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();

        // 聲明exchange
//        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息內容
        String message = "Hello World!!";
        channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

}
public class TopRec1 {

    public static final String QUEUE_NAME = "queue-test-topic-01";

//    public static final String EXCHANGE_NAME = "exchange-test-topic";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y1] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}
5.3.4 ACK消息確認(推薦使用方式)
public class MultMqSend {

    private final static String EX_NAME = "exchange-test-ack-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.get("producer-a", "producer-a");
        Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello";
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EX_NAME, "rk", null, (msg + i).getBytes("UTF-8"));
            System.out.println("[X] send " + (msg + i));
        }
        channel.close();
        connection.close();
    }

}
public class MultiMqRecManualConfirm1 {

    private final static String QUEUE_NAME = "queue-test-ack-01";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一時刻服務器只會發(fā)一條消息給消費者
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y1] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }


}
public class MultiMqRecManualConfirm2 {

    private final static String QUEUE_NAME = "queue-test-ack-01";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
        final Channel channel = connection.createChannel();
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一時刻服務器只會發(fā)一條消息給消費者
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[Y1] receive msg: " + msg);
                //休眠
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 將消費者綁定到隊列,并設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }


}

confirm模式解決了公平輪訓的問題,哪個消費者處理更快,處理的消息更多(能者多勞)。這個案例解決了消費者消費消息可靠性問題,但是沒有解決發(fā)送者發(fā)送消息可靠性問題。

5.3.5 basicQo和basicAck關系

兩者是配套使用的。

// channel.basicQos(1)指該消費者在接收到隊列里的消息但沒有返回確認結果之前,
// 隊列不會將新的消息分發(fā)給該消費者。隊列中沒有被消費的消息不會被刪除,還是存在于隊列中。
channel.basicQos(1);  
// 確認消息
channel.basicAck(envelope.getDeliveryTag(), false);
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容