(一)RabbitMQ基礎(chǔ)

RabbitMQ 實戰(zhàn)教程

1.MQ引言

修改ip地址

image-20210206195309632

1.1 什么是MQ

MQ(Message Quene) : 翻譯為 消息隊列,通過典型的 生產(chǎn)者消費者模型,生產(chǎn)者不斷向消息隊列中生產(chǎn)消息,消費者不斷的從隊列中獲取消息。因為消息的生產(chǎn)和消費都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,輕松的實現(xiàn)系統(tǒng)間解耦。別名為 消息中間件 通過利用高效可靠的消息傳遞機制進行平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成。

1.2 MQ有哪些

當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQRabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發(fā)RocketMQ等。

1.3 不同MQ特點

# 1.ActiveMQ
  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。它是一個完全支持JMS規(guī)范的的消息中間件。豐富的API,多種集群架構(gòu)模式讓ActiveMQ在業(yè)界成為老牌的消息中間件,在中小型企業(yè)頗受歡迎!
 
 # 2.Kafka
  Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache頂級項目。Kafka主要特點是基于Pull的模式來處理消息消費,
  追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支持復(fù)制,不支持事務(wù),對消息的重復(fù)、丟失、錯誤沒有嚴格要求,
  適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。
 
 # 3.RocketMQ
  RocketMQ是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點。RocketMQ思路起
  源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團被廣泛應(yīng)用于交易、充值、流計算、消
  息推送、日志流式處理、binglog分發(fā)等場景。
 
 # 4.RabbitMQ
  RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議來實現(xiàn)。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。

RabbitMQ比Kafka可靠,Kafka更適合IO高吞吐的處理,一般應(yīng)用在大數(shù)據(jù)日志處理或?qū)崟r性(少量延遲),可靠性(少量丟數(shù)據(jù))要求稍低的場景使用,比如ELK日志收集。


2.RabbitMQ 的引言

2.1 RabbitMQ

基于AMQP協(xié)議,erlang語言開發(fā),是部署最廣泛的開源消息中間件,是最受歡迎的開源消息中間件之一。

image-20190925215603036

官網(wǎng): https://www.rabbitmq.com/

官方教程: https://www.rabbitmq.com/#getstarted

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n21" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # AMQP 協(xié)議
AMQP(advanced message queuing protocol)`在2003年時被提出,最早用于解決金融領(lǐng)不同平臺之間的消息傳遞交互問題。顧名思義,AMQP是一種協(xié)議,更準(zhǔn)確的說是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進行限定,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。這使得實現(xiàn)了AMQP的provider天然性就是跨平臺的。以下是AMQP協(xié)議模型:</pre>

image-20200311182438041

2.2 RabbitMQ 的安裝

2.2.1 下載

官網(wǎng)下載地址: https://www.rabbitmq.com/download.html

image-20190925220115235

最新版本: 3.7.18

2.2.2 下載的安裝包
image-20190925220343521

注意:這里的安裝包是centos7安裝的包

2.2.3 安裝步驟


 1.將rabbitmq安裝包上傳到linux系統(tǒng)中
  erlang-22.0.7-1.el7.x86_64.rpm
  rabbitmq-server-3.7.18-1.el7.noarch.rpm
 
 # 2.安裝Erlang依賴包
  rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
 
 # 3.安裝RabbitMQ安裝包(需要聯(lián)網(wǎng))
  yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
  注意:默認安裝完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目錄中,需要 
  將配置文件復(fù)制到/etc/rabbitmq/目錄中,并修改名稱為rabbitmq.config
 # 4.復(fù)制配置文件
  cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
 
 # 5.查看配置文件位置
  ls /etc/rabbitmq/rabbitmq.config
 
 # 6.修改配置文件(參見下圖:)
  vim /etc/rabbitmq/rabbitmq.config 
image.png

將上圖中配置文件中紅色部分去掉%%,以及最后的,逗號 修改為下圖:允許來賓用戶在任意地方訪問

image.png
7.執(zhí)行如下命令,啟動rabbitmq中的插件管理
  rabbitmq-plugins enable rabbitmq_management

  出現(xiàn)如下說明:
  Enabling plugins on node rabbit@localhost:
  rabbitmq_management
  The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
  Applying plugin configuration to rabbit@localhost...
  The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
 
  set 3 plugins.
  Offline change; changes will take effect at broker restart.
 
 # 8.啟動RabbitMQ的服務(wù)
  systemctl start rabbitmq-server
  systemctl restart rabbitmq-server
  systemctl stop rabbitmq-server

 
 # 9.查看服務(wù)狀態(tài)(見下圖:)
  systemctl status rabbitmq-server
  ● rabbitmq-server.service - RabbitMQ broker
  Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
  Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
  Main PID: 2904 (beam.smp)
  Status: "Initialized"
  CGroup: /system.slice/rabbitmq-server.service
  ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
  MBlmbcs...
  ├─3220 erl_child_setup 32768
  ├─3243 inet_gethost 4
  └─3244 inet_gethost 4
  .........
image.png
10.關(guān)閉防火墻服務(wù)
  systemctl disable firewalld
  Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
  Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
  systemctl stop firewalld 
 
 # 11.訪問web管理界面
  http://10.15.0.8:15672/
image-20190926194738708
 12.登錄管理界面
  username:  guest
  password:  guest
image-20190926194954822

3. RabiitMQ 配置

3.1RabbitMQ 管理命令行

# 1.服務(wù)啟動相關(guān)
  systemctl start|restart|stop|status rabbitmq-server
 
 # 2.管理命令行  用來在不使用web管理界面情況下命令操作RabbitMQ
  rabbitmqctl  help  可以查看更多命令
 
 # 3.插件管理命令行
  rabbitmq-plugins enable|list|disable </pre>

3.2 web管理界面介紹

3.2.1 overview概覽

image-20191126162026720
  • connections:無論生產(chǎn)者還是消費者,都需要與RabbitMQ建立連接后才可以完成消息的生產(chǎn)和消費,在這里可以查看連接情況

  • channels:通道,建立連接后,會形成通道,消息的投遞獲取依賴通道。

  • Exchanges:交換機,用來實現(xiàn)消息的路由

  • Queues:隊列,即消息隊列,消息存放在隊列中,等待消費,消費后被移除隊列。

3.2.2 Admin用戶和虛擬主機管理

1. 添加用戶
image-20191126162617280

上面的Tags選項,其實是指定用戶的角色,可選的有以下幾個:

  • 超級管理員(administrator)

    可登陸管理控制臺,可查看所有的信息,并且可以對用戶,策略(policy)進行操作。

  • 監(jiān)控者(monitoring)

    可登陸管理控制臺,同時可以查看rabbitmq節(jié)點的相關(guān)信息(進程數(shù),內(nèi)存使用情況,磁盤使用情況等)

  • 策略制定者(policymaker)

    可登陸管理控制臺, 同時可以對policy進行管理。但無法查看節(jié)點的相關(guān)信息(上圖紅框標(biāo)識的部分)。

  • 普通管理者(management)

    僅可登陸管理控制臺,無法看到節(jié)點信息,也無法對策略進行管理。

  • 其他

    無法登陸管理控制臺,通常就是普通的生產(chǎn)者和消費者。

2. 創(chuàng)建虛擬主機
 虛擬主機
  為了讓各個用戶可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。其實就是一個獨立的訪問路徑,不同用戶使用不同路徑,各自有自己的隊列、交換機,互相不會影響。</pre>
image-20191126163023153
3. 綁定虛擬主機和用戶

創(chuàng)建好虛擬主機,我們還要給用戶添加訪問權(quán)限:

點擊添加好的虛擬主機:

image-20191126163506795

進入虛擬機設(shè)置界面:

image-20191126163631889

4.RabbitMQ 的第一個程序

4.0 AMQP協(xié)議的回顧

image-20200312140114784

4.1 RabbitMQ支持的消息模型

image-20191126165434784

image.png

4.2 引入依賴

 <dependencies>
  <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.10.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
  <dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
  <version>1.7.25</version>
  <scope>test</scope>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
  <dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.9</version>
  </dependency>
  </dependencies>

4.3 第一種模型(直連)

image-20191126165840602

在上圖的模型中,有以下概念:

  • P:生產(chǎn)者,也就是要發(fā)送消息的程序

  • C:消費者:消息的接受者,會一直等待消息到來。

  • queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費者從其中取出消息。

1. 開發(fā)生產(chǎn)者
  //創(chuàng)建連接工廠
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  //創(chuàng)建通道
  Channel channel = connection.createChannel();
  //參數(shù)1: 是否持久化  參數(shù)2:是否獨占隊列 參數(shù)3:是否自動刪除  參數(shù)4:其他屬性
  channel.queueDeclare("hello",true,false,false,null);
  channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
  channel.close();
  connection.close();
##### 2\. 開發(fā)消費者
 //創(chuàng)建連接工廠
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare("hello", true, false, false, null);
  channel.basicConsume("hello",true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println(new String(body));
  }
  });
##### 3\. 參數(shù)的說明

  channel.queueDeclare("hello",true,false,false,null);
  '參數(shù)1':用來聲明通道對應(yīng)的隊列
  '參數(shù)2':用來指定是否持久化隊列
  '參數(shù)3':用來指定是否獨占隊列
  '參數(shù)4':用來指定是否自動刪除隊列
  '參數(shù)5':對隊列的額外配置</pre>

4.4 第二種模型(work quene)

Work queues,也被稱為(Task queues),任務(wù)模型。當(dāng)消息處理比較耗時的時候,可能生產(chǎn)消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務(wù)是不會被重復(fù)執(zhí)行的。

image-20200314221002008

角色:

  • P:生產(chǎn)者:任務(wù)的發(fā)布者

  • C1:消費者-1,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢

  • C2:消費者-2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快

1. 開發(fā)生產(chǎn)者
channel.queueDeclare("hello", true, false, false, null);
 for (int i = 0; i < 10; i++) {
  channel.basicPublish("", "hello", null, (i+"====>:我是消息").getBytes());
 }
2.開發(fā)消費者-1
 channel.queueDeclare("hello",true,false,false,null);
 channel.basicConsume("hello",true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者1: "+new String(body));
  }
 });
3.開發(fā)消費者-2
 channel.queueDeclare("hello",true,false,false,null);
 channel.basicConsume("hello",true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  try {
  Thread.sleep(1000);   //處理消息比較慢 一秒處理一個消息
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  System.out.println("消費者2: "+new String(body));
  }
 });
4.測試結(jié)果
image.png
image.png

總結(jié):默認情況下,RabbitMQ將按順序?qū)⒚總€消息發(fā)送給下一個使用者。平均而言,每個消費者都會收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)。

5.消息自動確認機制

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

 channel.basicQos(1);//一次只接受一條未確認的消息
 //參數(shù)2:關(guān)閉自動確認消息
 channel.basicConsume("hello",false,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者1: "+new String(body));
  channel.basicAck(envelope.getDeliveryTag(),false);//手動確認消息
  }
 });
  • 設(shè)置通道一次只能消費一個消息

  • 關(guān)閉消息的自動確認,開啟手動確認消息

image.png

4.5 第三種模型(fanout)

fanout 扇出 也稱為廣播

image-20191126213115873

在廣播模式下,消息發(fā)送流程是這樣的:

  • 可以有多個消費者

  • 每個消費者有自己的queue(隊列)

  • 每個隊列都要綁定到Exchange(交換機)

  • 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機,交換機來決定要發(fā)給哪個隊列,生產(chǎn)者無法決定。

  • 交換機把消息發(fā)送給綁定過的所有隊列

  • 隊列的消費者都能拿到消息。實現(xiàn)一條消息被多個消費者消費

1. 開發(fā)生產(chǎn)者
 //聲明交換機
 channel.exchangeDeclare("logs","fanout");//廣播 一條消息多個消費者同時消費
 //發(fā)布消息
 channel.basicPublish("logs","",null,"hello".getBytes());
2. 開發(fā)消費者-1
//綁定交換機
 channel.exchangeDeclare("logs","fanout");
 //創(chuàng)建臨時隊列
 String queue = channel.queueDeclare().getQueue();
 //將臨時隊列綁定exchange
 channel.queueBind(queue,"logs","");
 //處理消息
 channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者1: "+new String(body));
  }
 });
3. 開發(fā)消費者-2
 //綁定交換機
 channel.exchangeDeclare("logs","fanout");
 //創(chuàng)建臨時隊列
 String queue = channel.queueDeclare().getQueue();
 //將臨時隊列綁定exchange
 channel.queueBind(queue,"logs","");
 //處理消息
 channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者2: "+new String(body));
  }
 });
4.開發(fā)消費者-3
 //綁定交換機
 channel.exchangeDeclare("logs","fanout");
 //創(chuàng)建臨時隊列
 String queue = channel.queueDeclare().getQueue();
 //將臨時隊列綁定exchange
 channel.queueBind(queue,"logs","");
 //處理消息
 channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者3: "+new String(body));
  }
 });
5. 測試結(jié)果
image.png
image.png
image.png

4.6 第四種模型(Routing)

4.6.1 Routing 之訂閱模型-Direct(直連)

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。

  • Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

流程:

image-20191126220145375

圖解:

  • P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時,會指定一個routing key。

  • X:Exchange(交換機),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊列

  • C1:消費者,其所在隊列指定了需要routing key 為 error 的消息

  • C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

1. 開發(fā)生產(chǎn)者
//聲明交換機  參數(shù)1:交換機名稱 參數(shù)2:交換機類型 基于指令的Routing key轉(zhuǎn)發(fā)
 channel.exchangeDeclare("logs_direct","direct");
 String key = "";
 //發(fā)布消息
 channel.basicPublish("logs_direct",key,null,("指定的route key"+key+"的消息").getBytes());
2.開發(fā)消費者-1
 //聲明交換機
 channel.exchangeDeclare("logs_direct","direct");
 //創(chuàng)建臨時隊列
 String queue = channel.queueDeclare().getQueue();
 //綁定隊列和交換機
 channel.queueBind(queue,"logs_direct","error");
 channel.queueBind(queue,"logs_direct","info");
 channel.queueBind(queue,"logs_direct","warn");
 
 //消費消息
 channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者1: "+new String(body));
  }
 });
3.開發(fā)消費者-2
 //聲明交換機
 channel.exchangeDeclare("logs_direct","direct");
 //創(chuàng)建臨時隊列
 String queue = channel.queueDeclare().getQueue();
 //綁定隊列和交換機
 channel.queueBind(queue,"logs_direct","error");
 //消費消息
 channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  System.out.println("消費者2: "+new String(body));
  }
 });
4.測試生產(chǎn)者發(fā)送Route key為error的消息時

[圖片上傳失敗...(image-e0aa80-1612771575087)]

[圖片上傳失敗...(image-d6b9d7-1612771575087)]

5.測試生產(chǎn)者發(fā)送Route key為info的消息時

[圖片上傳失敗...(image-416aff-1612771575086)]

[圖片上傳失敗...(image-7c7340-1612771575086)]


4.6.2 Routing 之訂閱模型-Topic

Topic類型的ExchangeDirect相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!這種模型Routingkey 一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

[圖片上傳失敗...(image-f499b2-1612771575086)]

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n219" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 統(tǒng)配符

  • (star) can substitute for exactly one word. 匹配不多不少恰好1個詞

(hash) can substitute for zero or more words. 匹配一個或多個詞

如:

audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs</pre>

1.開發(fā)生產(chǎn)者

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n221" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> //生命交換機和交換機類型 topic 使用動態(tài)路由(通配符方式)
channel.exchangeDeclare("topics","topic");
String routekey = "user.save";//動態(tài)路由key
//發(fā)布消息
channel.basicPublish("topics",routekey,null,("這是路由中的動態(tài)訂閱模型,route key: ["+routekey+"]").getBytes());</pre>

2.開發(fā)消費者-1

Routing Key中使用*通配符方式

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n224" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> //聲明交換機
channel.exchangeDeclare("topics","topic");
//創(chuàng)建臨時隊列
String queue = channel.queueDeclare().getQueue();
//綁定隊列與交換機并設(shè)置獲取交換機中動態(tài)路由
channel.queueBind(queue,"topics","user.*");

//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1: "+new String(body));
}
});</pre>

3.開發(fā)消費者-2

Routing Key中使用#通配符方式

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n227" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> //聲明交換機
channel.exchangeDeclare("topics","topic");
//創(chuàng)建臨時隊列
String queue = channel.queueDeclare().getQueue();
//綁定隊列與交換機并設(shè)置獲取交換機中動態(tài)路由
channel.queueBind(queue,"topics","user.#");

//消費消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2: "+new String(body));
}
});</pre>

4.測試結(jié)果

[圖片上傳失敗...(image-b45d2f-1612771575086)]

[圖片上傳失敗...(image-1dd8bf-1612771575086)]

5. SpringBoot中使用RabbitMQ

5.0 搭建初始環(huán)境

1. 引入依賴

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="xml" cid="n234" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency></pre>

2. 配置配置文件

<pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="yml" cid="n236" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 10.15.0.9
port: 5672
username: ems
password: 123
virtual-host: /ems</pre>

RabbitTemplate 用來簡化操作 使用時候直接在項目中注入即可使用

5.1 第一種hello world模型使用

  1. 開發(fā)生產(chǎn)者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n242" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testHello(){
    rabbitTemplate.convertAndSend("hello","hello world");
    }</pre>

  2. 開發(fā)消費者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n245" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
    @RabbitListener(queuesToDeclare = @Queue("hello"))
    public class HelloCustomer {

    @RabbitHandler
    public void receive1(String message){
    System.out.println("message = " + message);
    }
    }</pre>

5.2 第二種work模型使用

  1. 開發(fā)生產(chǎn)者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n250" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testWork(){
    for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("work","hello work!");
    }
    }</pre>

  2. 開發(fā)消費者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n253" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
    public class WorkCustomer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
    System.out.println("work message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
    System.out.println("work message2 = " + message);
    }
    }</pre>

    說明:默認在Spring AMQP實現(xiàn)中Work這種方式就是公平調(diào)度,如果需要實現(xiàn)能者多勞需要額外配置

5.3 Fanout 廣播模型

  1. 開發(fā)生產(chǎn)者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n260" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testFanout() throws InterruptedException {
    rabbitTemplate.convertAndSend("logs","","這是日志廣播");
    }</pre>

  2. 開發(fā)消費者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n264" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
    public class FanoutCustomer {

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue,
    exchange = @Exchange(name="logs",type = "fanout")
    ))
    public void receive1(String message){
    System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue, //創(chuàng)建臨時隊列
    exchange = @Exchange(name="logs",type = "fanout") //綁定交換機類型
    ))
    public void receive2(String message){
    System.out.println("message2 = " + message);
    }
    }</pre>

5.4 Route 路由模型

  1. 開發(fā)生產(chǎn)者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n270" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDirect(){
    rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
    }</pre>

  2. 開發(fā)消費者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n273" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
    public class DirectCustomer {

    @RabbitListener(bindings ={
    @QueueBinding(
    value = @Queue(),
    key={"info","error"},
    exchange = @Exchange(type = "direct",name="directs")
    )})
    public void receive1(String message){
    System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings ={
    @QueueBinding(
    value = @Queue(),
    key={"error"},
    exchange = @Exchange(type = "direct",name="directs")
    )})
    public void receive2(String message){
    System.out.println("message2 = " + message);
    }
    }
    </pre>

5.5 Topic 訂閱模型(動態(tài)路由模型)

  1. 開發(fā)生產(chǎn)者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n278" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Autowired
    private RabbitTemplate rabbitTemplate;

    //topic
    @Test
    public void testTopic(){
    rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
    }</pre>

  2. 開發(fā)消費者

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="java" cid="n282" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> @Component
    public class TopCustomer {
    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue,
    key = {"user.*"},
    exchange = @Exchange(type = "topic",name = "topics")
    )
    })
    public void receive1(String message){
    System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
    @QueueBinding(
    value = @Queue,
    key = {"user.#"},
    exchange = @Exchange(type = "topic",name = "topics")
    )
    })
    public void receive2(String message){
    System.out.println("message2 = " + message);
    }
    }</pre>


6. MQ的應(yīng)用場景

6.1 異步處理

場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信,傳統(tǒng)的做法有兩種 1.串行的方式 2.并行的方式

  • 串行方式: 將注冊信息寫入數(shù)據(jù)庫后,發(fā)送注冊郵件,再發(fā)送注冊短信,以上三個任務(wù)全部完成后才返回給客戶端。 這有一個問題是,郵件,短信并不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西.
這里寫圖片描述
  • 并行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送郵件的同時,發(fā)送短信,以上三個任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時間。
這里寫圖片描述
  • 消息隊列:假設(shè)三個業(yè)務(wù)節(jié)點分別使用50ms,串行方式使用時間150ms,并行使用時間100ms。雖然并行已經(jīng)提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網(wǎng)站沒有任何影響,客戶端沒有必要等著其發(fā)送完成才顯示注冊成功,應(yīng)該是寫入數(shù)據(jù)庫后就返回. 消息隊列: 引入消息隊列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理

    img

由此可以看出,引入消息隊列后,用戶的響應(yīng)時間就等于寫入數(shù)據(jù)庫的時間+寫入消息隊列的時間(可以忽略不計),引入消息隊列后處理后,響應(yīng)時間是串行的3倍,是并行的2倍。

6.2 應(yīng)用解耦

場景:雙11是購物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口.

這里寫圖片描述

這種做法有一個缺點:

當(dāng)庫存系統(tǒng)出現(xiàn)故障時,訂單就會失敗。 訂單系統(tǒng)和庫存系統(tǒng)高耦合. 引入消息隊列

這里寫圖片描述
  • 訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

  • 庫存系統(tǒng):訂閱下單的消息,獲取下單消息,進行庫操作。 就算庫存系統(tǒng)出現(xiàn)故障,消息隊列也能保證消息的可靠投遞,不會導(dǎo)致消息丟失.

6.3 流量削峰

場景: 秒殺活動,一般會因為流量過大,導(dǎo)致應(yīng)用掛掉,為了解決這個問題,一般在應(yīng)用前端加入消息隊列。

作用:

1.可以控制活動人數(shù),超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^)

2.可以緩解短時間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)

這里寫圖片描述

1.用戶的請求,服務(wù)器收到之后,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面.

2.秒殺業(yè)務(wù)根據(jù)消息隊列中的請求信息,再做后續(xù)處理.


7. RabbitMQ的集群

7.1 集群架構(gòu)

7.1.1 普通集群(副本集群)

All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster --摘自官網(wǎng)

默認情況下:RabbitMQ代理操作所需的所有數(shù)據(jù)/狀態(tài)都將跨所有節(jié)點復(fù)制。這方面的一個例外是消息隊列,默認情況下,消息隊列位于一個節(jié)點上,盡管它們可以從所有節(jié)點看到和訪問

  1. 架構(gòu)圖
image-20200320094147471

核心解決問題: 當(dāng)集群中某一時刻master節(jié)點宕機,可以對Quene中信息,進行備份

  1. 集群搭建

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n336" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 0.集群規(guī)劃
    node1: 10.15.0.3 mq1 master 主節(jié)點
    node2: 10.15.0.4 mq2 repl1 副本節(jié)點
    node3: 10.15.0.5 mq3 repl2 副本節(jié)點

    1.克隆三臺機器主機名和ip映射

    vim /etc/hosts加入:
    10.15.0.3 mq1
    10.15.0.4 mq2
    10.15.0.5 mq3
    node1: vim /etc/hostname 加入: mq1
    node2: vim /etc/hostname 加入: mq2
    node3: vim /etc/hostname 加入: mq3

    2.三個機器安裝rabbitmq,并同步cookie文件,在node1上執(zhí)行:

    scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
    scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/

    3.查看cookie是否一致:

    node1: cat /var/lib/rabbitmq/.erlang.cookie
    node2: cat /var/lib/rabbitmq/.erlang.cookie
    node3: cat /var/lib/rabbitmq/.erlang.cookie

    4.后臺啟動rabbitmq所有節(jié)點執(zhí)行如下命令,啟動成功訪問管理界面:

    rabbitmq-server -detached

    5.在node2和node3執(zhí)行加入集群命令:

    1.關(guān)閉 rabbitmqctl stop_app
    2.加入集群 rabbitmqctl join_cluster rabbit@mq1
    3.啟動服務(wù) rabbitmqctl start_app

    6.查看集群狀態(tài),任意節(jié)點執(zhí)行:

    rabbitmqctl cluster_status

    7.如果出現(xiàn)如下顯示,集群搭建成功:

    Cluster status of node rabbit@mq3 ...
    [{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
    {running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
    {cluster_name,<<"rabbit@mq1">>},
    {partitions,[]},
    {alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]

    8.登錄管理界面,展示如下狀態(tài):</pre>

    image-20200320095613586

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n338" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 9.測試集群在node1上,創(chuàng)建隊列</pre>

    image-20200320095743935

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n340" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 10.查看node2和node3節(jié)點:</pre>

    image-20200320095827688
    image-20200320095843370

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n343" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit;"> # 11.關(guān)閉node1節(jié)點,執(zhí)行如下命令,查看node2和node3:
    rabbitmqctl stop_app</pre>

    image-20200320100000347
    image-20200320100010968

7.1.2 鏡像集群

This guide covers mirroring (queue contents replication) of classic queues --摘自官網(wǎng)

By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. --摘自官網(wǎng)

鏡像隊列機制就是將隊列在三個節(jié)點之間設(shè)置主從關(guān)系,消息會在三個節(jié)點之間進行自動同步,且如果其中一個節(jié)點不可用,并不會導(dǎo)致消息丟失或服務(wù)不可用的情況,提升MQ集群的整體高可用性。

  1. 集群架構(gòu)圖
    image-20200320113423235
  2. 配置集群架構(gòu)

    <pre spellcheck="false" class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" lang="markdown" cid="n360" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 0px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"> # 0.策略說明
    rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
    -p Vhost: 可選參數(shù),針對指定vhost下的queue進行設(shè)置
    Name: policy的名稱
    Pattern: queue的匹配模式(正則表達式)
    Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode
    ha-mode:指明鏡像隊列的模式,有效值為 all/exactly/nodes
    all:表示在集群中所有的節(jié)點上進行鏡像
    exactly:表示在指定個數(shù)的節(jié)點上進行鏡像,節(jié)點的個數(shù)由ha-params指定
    nodes:表示在指定的節(jié)點上進行鏡像,節(jié)點名稱通過ha-params指定
    ha-params:ha-mode模式需要用到的參數(shù)
    ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic和manual
    priority:可選參數(shù),policy的優(yōu)先級

    1.查看當(dāng)前策略

    rabbitmqctl list_policies

    2.添加策略

    rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    說明:策略正則表達式為 “^” 表示所有匹配所有隊列名稱 ^hello:匹配hello開頭隊列

    3.刪除策略

    rabbitmqctl clear_policy ha-all

    4.測試集群</pre>


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容