RabbitMQ使用和高可用方案

簡單介紹一下RabbitMQ安裝和使用,這里使用docker安裝RabbitMQ3.7版本。

docker安裝rabbitmq

  1. 下載rabbitmq鏡像,官方rabbitmq有很多版本,這里推薦下載帶management的版本,帶管理界面
>docker pull rabbitmq:3.7-management
  1. 運行rabbitmq
    查看鏡像
>docker images
REPOSITORY                TAG                 IMAGE ID            CREATED             SIZE
rabbitmq                  3.7-management      c497955c219a        12 days ago         180MB
redis                     5.0.9               5120d23bad51        2 weeks ago         98.3MB
containerize/elastichd    latest              c2202f76db37        2 years ago         28.1MB
mobz/elasticsearch-head   5                   b19a5c98e43b        3 years ago         824MB

可以看到rabbitmq的鏡像id是c497955c219a,我們使用鏡像Id安裝

> docker run --name rabbitmq-3.7 -d -p 15672:15672 -p 5672:5672 --restart=always  c497955c219a

可以看到很快就安裝好了

> docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
34d6319d5720        c497955c219a        "docker-entrypoint.s…"   21 minutes ago      Up 21 minutes       4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbitmq-3.7
  1. 訪問http://主機IP:15672,就可以看到rabbitmq管理界面了,默認用戶名和密碼都是guest


    rabbitmq管理界面

rabbitmq使用

1. 在使用rabbitmq之前,首先得對AMQP協(xié)議有所了解。
  • AMQP協(xié)議模型


    AMQP協(xié)議模型
  • AMQP消息流轉過程


    消息流轉過程

Publisher:消息發(fā)送者,將消息發(fā)送到Exchange并指明Routing Key,以便Message Queue可以正確的收到消息
Consumer:消息接受者,從Message Queue獲取消息,一個Consumer可以訂閱多個Queue, 來接受Queue中的消息
Server: 一個具體的MQ服務實例
Virtual host: 虛擬主機,一個server下面可以有多個虛擬主機,通常用于隔離不同的項目,一個Virtual host下面通常會有多個Exchange、Message Queue
Exchange:交換器,從Producer接受消息, 根據Bindings中配置的Routing key, 把消息分派到對應的Message Queue中
Routing key:路由鍵,用于Exchange判斷哪些消息需要發(fā)送對應的Message Queue
Bindings: 描述了Exchange和Queue之間的關系。Exchange 根據消息內容 (routing key),和Binding配置來決定把消息分派到哪個Queue中
Message Queue: 存儲消息, 并把消息傳遞給最終的 Consumer

2. 創(chuàng)建exchange和queue

首先需要創(chuàng)建一個exchange和queue才能收發(fā)消息,直接登錄到管理界面進入Exchanges菜單(這里創(chuàng)建一個type為topic的exchange)


image.png
  • Name是Exchage的名字
  • Type是Exchage的Routingkey的綁定類型(direct,topic,fanout,header)
  • Durability:消息是否持久化
  • Auto delete:如果設置為yes則當exchange最后一個綁定的隊列被刪除后,就會自動刪除
  • Internal:如果設置為yes,是RabbitMQ的內部使用,不提供給外部,自己編寫erlang語言做擴展時使用
  • Arguments:擴展AMQP的自定義參數

再創(chuàng)建一個queue


創(chuàng)建queue

新建routingkey,綁定exchange和queue的關系


綁定關系
3. 操作rabbitmq

使用springboot操作rabbitmq非常簡單,首先加入maven依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置rabbitmq地址

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000

發(fā)送消息,可以直接使用RabbitTemplate發(fā)送消息。

   @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test0(){
        //傳入exchange 和 routingkey 就可以投遞消息到綁定的隊列,消息可以發(fā)字符串也可以發(fā)序列化對象
        rabbitTemplate.convertAndSend("order-exchange","order.update","hello rabbit mq");
    }

接收消息也非常簡單,添加listener配置。

spring:
  rabbitmq:
    host: 39.99.219.219
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000
    listener: #消費端配置
      simple:
        concurrency: 5 #初始化并發(fā)數
        max-concurrency: 10 #最大并發(fā)數據
        auto-startup: true #自動開啟監(jiān)聽
        prefetch: 1 #每個并發(fā)連接同一時間最多處理幾個消息,限流設置
        acknowledge-mode: manual #簽收模式,設置為手動

接收消息只需要在方法上加@RabbitListener和@RabbitHandler注解。

 @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
            exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
            key = "order.update"
        )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload String msg, @Headers Map<String,Object> headers, Channel channel) throws Exception{
        System.out.println("************消息接收開始***********");
        System.out.println("msg: "+msg);
    }
4. 消息接收的應答模式ACK和NACK的使用

為了保證消息可以正確被消費MQ提供了ACK和NACK的機制。

  • ACK是手動簽收消息的標識,如果消息的簽收模式設置成為了手工模式,在MQ沒有接收到ACK信息時都是Unacked的狀態(tài),并且消息還在隊列中,這個時候消息不會重試,不會再主動發(fā)給消費者
  • NACK:將消息重回隊列,如果我們發(fā)現異常,就可以調用NACK來將消息重回隊列,他會重回到隊尾重新發(fā)給消費者

可以看到上面的例子打印出了隊列信息但卻無法把這個隊列消費掉,當你重新啟動的時候又會讀到之前未消費完的消息。


未消費完的消息

可以看一下下面的例子,每個消息進來重試三次,第四次才簽收完成。

   int flag = 1;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
            exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
            key = "order.update"
        )
    )
    @RabbitHandler
    public void onListenMessage(@Payload String msg, @Headers Map<String,Object> headers, Channel channel) throws Exception{
        System.out.println("************消息接收開始***********");
        System.out.println("msg: "+msg);
        Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //ACK進行簽收,第一個參數是標識,第二個參數是批量接收為fasle
        //channel.basicAck(deliverTag,false);
        if(flag>3){
            //說明執(zhí)行了3次都沒有成功
            //消息確認
            channel.basicAck(deliverTag,false);
        }else {
            flag = flag+1;
            //前兩個參數和上面ACK一樣,第三個參數是否重回隊列
            channel.basicNack(deliverTag, false, true);
        }
    }
}
************Select消息接收開始***********
mq msg: hello rabbit mq
************Select消息接收開始***********
mq msg: hello rabbit mq
************Select消息接收開始***********
mq msg: hello rabbit mq
************Select消息接收開始***********
mq msg: hello rabbit mq
5. 發(fā)送確認和發(fā)送失敗退回

為了保證消息能一定投遞成功,AMQP提供了消息確認和消息失敗退回。

  1. 先加入配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000
    #開啟發(fā)送確認
    publisher-confirms: true
    #開啟發(fā)送失敗退回
    publisher-returns: true

2.發(fā)送消息的時候添加一個消息的唯一標識,在消息回調的時候可以通過唯一標識判斷消息投遞是否成功

   @Test
   public void test01(){
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("ms0001");
        rabbitTemplate.convertAndSend("order-exchange","order.select","this is a msg",correlationData);
   }
  1. 創(chuàng)建一個類并實現RabbitTemplate.ConfirmCallback和 RabbitTemplate.ReturnCallback接口,來接收消息投遞回調
@Component
public class OrderSenderCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 每次發(fā)送完消息都會調這個方法
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("==========消息publish成功的ID: " + (correlationData != null ? correlationData.getId() : "null"));
        System.out.println("==========消息是否發(fā)布成功: " + ack);
        System.out.println("==========失敗的異常信息: " + cause);
    }

    /**
     * 每次消息發(fā)送失敗會調用這個方法
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("******replayCode: " + replyCode);
        System.out.println("******replayText: " + replyText);
        System.out.println("******exchange: " + exchange);
        System.out.println("******routingkey: " + routingKey);
    }
}

消息隊列高可用方案

1. 生產端的可靠性投遞

假如用MQ所做的業(yè)務不允許消息丟失,那么生產端就要保證消息能夠百分之百投遞,消費端就要保證消息百分之百被消費。那么該怎么做呢?

  1. 消費端需要開啟手動應答,用ACK和NACK的機制來確保消息的消費應答
  2. 生產者發(fā)送消息后一定要有一個確認應答來確認消息的發(fā)送狀態(tài)
  3. RabbitMQ本身需要做HA高可用
  4. 做一個完善的消息補償機制
  • RabbitMQHA高可用方案
    鏡像模式和普通模式的區(qū)別就是,隊列的數據都鏡像了一份到所有的節(jié)點上。這樣任何一個節(jié)點失效,不會影響整個集群的使用。
rabiitmq集群
  • 消息補償機制
  1. 一開始要有一個業(yè)務DB和MQDB,將業(yè)務數據存入DB
  2. 業(yè)務落庫后就通過Sender發(fā)送消息給Broker,發(fā)送消息后就將消息體和消息狀態(tài)記錄到MQDB中(發(fā)送中,發(fā)送完成),并且會接收到Broker返回的消息投遞確認狀態(tài)(需要代碼支持)
  3. 消息發(fā)送方接收Broker返回的消息和網絡狀態(tài)的異常
  4. 如果消費發(fā)送確認失敗應該調用業(yè)務來將MQDB消息的狀態(tài)更新為發(fā)送失敗,如果發(fā)送狀態(tài)成功就將MQDB中的記錄更新為成功
  5. 使用分布式定時Job來對我們的消息進行判斷,只要是消息狀態(tài)不是我們發(fā)送成功的都需要進行重新發(fā)送
  6. 重試一定要有次數限制,當達到現在的次數就不再重試通知人工處理
消息補償機制
2. 消費端冪等性問題解決

為了確保發(fā)送端的投遞成功,我們都會進行消息的補償,有可能消息補償的過程中會多發(fā)送幾次消息導致重復,這個時候就需要提前考慮消費端的冪等問題。

  • 唯一ID+業(yè)務碼在數據庫中做主鍵來做去重

    • 優(yōu)點:實現簡單方便
    • 缺點:會對數據庫產生異常壓力,并且只能用來做insert的冪等
  • 為執(zhí)行的內容做前置條件,類似于mysql的樂觀鎖
    給數據更新前增加一個前置的條件,需要將拿到的前置條件做為更新的條件之一來做操作,如果在你之前已經更新了這個前置條件,那么你的更新就會不成功。這樣同樣也會對數據庫產生一定的壓力

  • 利用Redis原子性
    第一個進入并拿到鎖的線程在鎖內部先判斷是否已消費,如果沒有消費則操作并記錄這個Measge_ID已消費,并發(fā)過程中的線程如果拿不到鎖就直接返回,這個線程能拿到鎖,但拿到后要去查詢這個Measge_ID是否消費過,如果已經消費了,就中止消費

3.消費端的消息可靠性保障

如果生產端能夠保證消息的可靠性那么消費端就一定能收到消息,需要注意的有兩點

  • 開啟消息手動簽收,如果消費成功就手動發(fā)送ACK響應,表明這個消息消費成功
  • 出現異??梢酝ㄟ^NACK將消息重回隊尾變成Ready狀態(tài)然后再次消費
  • NACK要有重試次數限制,當超過次數就將這個消息發(fā)送到一個人工補償消息隊列或持久化對象中等待人工補償
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容