C1-RabbitMQ(消息隊列)--- 分解 C 2021年8月8日 20:42:34

分布式框架中間件總綱

http://www.itdecent.cn/p/00aa796bb5b8

友情鏈接(消息三解序)

1、RabbitMQ(消息隊列)--- 分解 A
2、RabbitMQ(消息隊列)--- 分解 B
3、RabbitMQ(消息隊列)--- 分解 C
4、RabbitMQ(消息隊列)--- 面試題

本章目錄

一、發(fā)布確認高級
???????1、發(fā)布確認 springboot 版本
???????2、回退消息
???????3、備份交換機
二、RabbitMQ 其他知識點
???????1、冪等性
???????2、優(yōu)先級隊列
???????3、惰性隊列
三、RabbitMQ 集群
???????1、clustering
???????2、鏡像隊列
???????3、Haproxy+Keepalive 實現(xiàn)高可用負載均衡
???????4、Federation Exchange
???????5、Federation Queue
???????6、Shovel

一、發(fā)布確認高級

引出問題:
在生產(chǎn)環(huán)境中由于一些不明原因,導致 rabbitmq 重啟,在RabbitMQ 重啟期間生產(chǎn)者消息投遞失敗,導致消息丟失,需要手動處理和恢復。于是,我們開始思考,如何才能進行 RabbitMQ 的消息可靠投遞呢? 特別是在這樣比較極端的情況,RabbitMQ 集群不可用的時候,無法投遞的消息該如何處理呢?

1、發(fā)布確認 springboot 版本

1、消息確認機制方案


image.png

2、代碼架構圖


image.png

3、配置文件

spring.rabbitmq.publisher-confirm-type=correlated
? NONE
禁用發(fā)布確認模式,是默認值
? CORRELATED
發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法
? SIMPLE
經(jīng)測試有兩種效果,
其一效果和 CORRELATED 值一樣會觸發(fā)回調(diào)方法,
其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 節(jié)點返回發(fā)送結果,根據(jù)返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel,則接下來無法發(fā)送消息到 broker

spring.rabbitmq.host=106.52.23.202
spring.rabbitmq.port=5672
spring.rabbitmq.username=mykk
spring.rabbitmq.password=abc666
spring.rabbitmq.publisher-confirm-type=correlated #開啟回調(diào)

4、配置類

package com.rabbit.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";


    // 聲明交換機
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange (CONFIRM_EXCHANGE_NAME);
    }

    // 聲明隊列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable (CONFIRM_QUEUE_NAME).build ();
    }

    // 綁定
    @Bean
    public Binding queueBinding(@Qualifier("confirmExchange")DirectExchange confirmExchange,
                                @Qualifier("confirmQueue")Queue queue){
        return  BindingBuilder.bind (queue).to (confirmExchange).with (CONFIRM_ROUTING_KEY);
    }
}

5、回調(diào)接口

package com.rabbit.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback {

    /**
     * 交換機無論是否接受消息都會進入回調(diào)
     *
     * CorrelationData 消息相關數(shù)據(jù)
     * ack             交換機是否接受消息
     * cause ack 為 true 成功返回null ,akc 為 false 失敗 返回原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId ( ) : "";
        if (ack) {
            log.info ("回調(diào)成功:交換機已經(jīng)收到 id 為:{}的消息", id);
        } else {
            log.info ("回調(diào)失?。航粨Q機還未收到 id 為:{}消息,由于原因:{}", id, cause);
        }
    }

}

6、消息生產(chǎn)者

package com.rabbit.controller;

import com.rabbit.callback.MyCallback;
import com.rabbit.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyCallback myCallback;

    // 依賴注入 rabbitTemplate 之后再設置它的回調(diào)對象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback (myCallback);
    }

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        // 指定消息的id 為 1
        CorrelationData correlationData1 = new CorrelationData ("1");
        String routingKey = "key1";
        rabbitTemplate.convertAndSend (ConfirmConfig.CONFIRM_EXCHANGE_NAME, routingKey, message, correlationData1);
        log.info ("發(fā)送消息內(nèi)容:{}", message);

        // 模擬發(fā)不出去(發(fā)送的交換機名字 改錯)
        rabbitTemplate.convertAndSend ("123"+ConfirmConfig.CONFIRM_EXCHANGE_NAME, routingKey, message, correlationData1);
        log.info ("發(fā)送消息內(nèi)容:{},測試交換機問題", message);
        // 默認隊列出問題
        CorrelationData correlationData2 = new CorrelationData ("2");
        routingKey = "key2";// 改為一個沒有綁定過的key2
        rabbitTemplate.convertAndSend (ConfirmConfig.CONFIRM_EXCHANGE_NAME, routingKey, message , correlationData2);

        log.info ("發(fā)送消息內(nèi)容:{},測試隊列問題", message);

    }
}

image.png

7、消息消費者

package com.rabbit.consumer;

import com.rabbit.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message){
        String msg = new String (message.getBody ( ));
        log.info("接受到隊列 confirm.queue 消息:{}",msg);
    }
}

8、結果分析


image.png

可以看到,發(fā)送了兩條消息,第一條消息的 RoutingKey 為 "key1",第二條消息的 RoutingKey 為
"key2",兩條消息都成功被交換機接收,也收到了交換機的確認回調(diào),但消費者只收到了一條消息,因為
第二條消息的 RoutingKey 與隊列的 BindingKey 不一致,也沒有其它隊列能接收這個消息,所有第二條
消息被直接丟棄了。

2、回退消息

1、Mandatory 參數(shù)
在僅開啟了生產(chǎn)者確認機制的情況下,交換機接收到消息后,會直接給消息生產(chǎn)者發(fā)送確認消息,如果發(fā)現(xiàn)該消息不可路由,那么消息會被直接丟棄,此時生產(chǎn)者是不知道消息被丟棄這個事件的。那么如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過設置 mandatory 參數(shù)可以在當消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者(簡單來說就是解決上面的routingKey錯誤,導致隊列發(fā)送的消息被丟棄,解決的就是隊列發(fā)送的消息不被丟棄而是回退,有點類似事務的回滾)

2、回調(diào)接口(改寫:初始化主要在這里)

package com.rabbit.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 依賴注入 rabbitTemplate 之后再設置它的回調(diào)對象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback (this);
        rabbitTemplate.setReturnCallback (this);
    }

    /**
     * 當消息無法路由的時候的回調(diào)方法
     *
     * @param message
     * @param replyCode     失敗碼
     * @param replyText     失敗原因
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error (" 消 息 {}, 被 交 換 機 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",
                new String (message.getBody ( )), exchange, replyText, routingKey);
    }



    /**
     * 交換機無論是否接受消息都會進入回調(diào)
     * <p>
     * CorrelationData 消息相關數(shù)據(jù)
     * ack             交換機是否接受消息
     * cause ack 為 true 成功返回null ,akc 為 false 失敗 返回原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId ( ) : "";
        if (ack) {
            log.info ("回調(diào)成功:交換機已經(jīng)收到 id 為:{}的消息", id);
        } else {
            log.info ("回調(diào)失敗:交換機還未收到 id 為:{}消息,由于原因:{}", id, cause);
        }
    }

}

3、消息生產(chǎn)者代碼(改寫:取消注入回調(diào),不在此處初始化)

package com.rabbit.controller;

import com.rabbit.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        // 指定消息的id 為 1
        CorrelationData correlationData1 = new CorrelationData ("1");
        String routingKey = "key1";
        rabbitTemplate.convertAndSend (ConfirmConfig.CONFIRM_EXCHANGE_NAME, routingKey, message, correlationData1);
        log.info ("發(fā)送消息內(nèi)容:{}", message);

        // 模擬發(fā)不出去(發(fā)送的交換機名字 改錯)
//        rabbitTemplate.convertAndSend ("123"+ConfirmConfig.CONFIRM_EXCHANGE_NAME, routingKey, message, correlationData1);
//        log.info ("發(fā)送消息內(nèi)容:{},測試交換機問題", message);

        // 默認隊列出問題
        CorrelationData correlationData2 = new CorrelationData ("2");
        routingKey = "key2";// 改為一個沒有綁定過的key2
        rabbitTemplate.convertAndSend (ConfirmConfig.CONFIRM_EXCHANGE_NAME, routingKey, message , correlationData2);

        log.info ("發(fā)送消息內(nèi)容:{},測試隊列問題", message);

    }
}

4、配置文件

spring.rabbitmq.host=106.52.23.202
spring.rabbitmq.port=5672
spring.rabbitmq.username=mykk
spring.rabbitmq.password=aaa666
#回調(diào)開啟
spring.rabbitmq.publisher-confirm-type=correlated
#回退開啟
spring.rabbitmq.publisher-returns=true

5、結果分析:捕獲到隊列不可路由的數(shù)據(jù)(消息)


image.png
3、備份交換機

個人小結:上面提到了交換機出問題,就無法接受到消息,通過回調(diào)可以檢測到,但是需要重新發(fā)送消息,可以通過備份交換機的隊列去發(fā)送。

比較官方的理論:
有了 mandatory 參數(shù)和回退消息,我們獲得了對無法投遞消息的感知能力,有機會在生產(chǎn)者的消息
無法被投遞時發(fā)現(xiàn)并處理。但有時候,我們并不知道該如何處理這些無法路由的消息,最多打個日志,然
后觸發(fā)報警,再來手動處理。而通過日志來處理這些無法路由的消息是很不優(yōu)雅的做法,特別是當生產(chǎn)者
所在的服務有多臺機器的時候,手動復制日志會更加麻煩而且容易出錯。而且設置 mandatory 參數(shù)會增
加生產(chǎn)者的復雜性,需要添加處理這些被退回的消息的邏輯。如果既不想丟失消息,又不想增加生產(chǎn)者的
復雜性,該怎么做呢?前面在設置死信隊列的文章中,我們提到,可以為隊列設置死信交換機來存儲那些
處理失敗的消息,可是這些不可路由消息根本沒有機會進入到隊列,因此無法使用死信隊列來保存消息。
在 RabbitMQ 中,有一種備份交換機的機制存在,可以很好的應對這個問題。什么是備份交換機呢?備份
交換機可以理解為 RabbitMQ 中交換機的“備胎”,當我們?yōu)槟骋粋€交換機聲明一個對應的備份交換機時,就
是為它創(chuàng)建一個備胎,當交換機接收到一條不可路由消息時,將會把這條消息轉發(fā)到備份交換機中,由備
份交換機來進行轉發(fā)和處理,通常備份交換機的類型為 Fanout ,這樣就能把所有消息都投遞到與其綁定
的隊列中,然后我們在備份交換機下綁定一個隊列,這樣所有那些原交換機無法被路由的消息,就會都進
入這個隊列了。當然,我們還可以建立一個報警隊列,用獨立的消費者來進行監(jiān)測和報警。

1、代碼架構圖(已下代碼根據(jù)此圖編寫)


image.png

2、修改配置

package com.rabbit.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfigBack {

    // 交換機
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    // 隊列
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    // routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    // 備份交換機
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    // 備份隊列
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    // 報警隊列
    public static final String WARNING_QUEUE_NAME = "warning.queue";


    // 聲明備份交換機(扇出)
    @Bean
    public FanoutExchange backupExchange(){
        return new FanoutExchange (BACKUP_EXCHANGE_NAME);
    }

    // 聲明備份隊列
    @Bean
    public Queue backupQueue(){
        return QueueBuilder.durable (BACKUP_QUEUE_NAME).build ();
    }
    // 聲明報警隊列
    @Bean
    public Queue warningQueue(){
        return QueueBuilder.durable (WARNING_QUEUE_NAME).build ();
    }

    // 備份隊列綁定備份交換機
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupExchange")FanoutExchange backupExchange,
                                @Qualifier("backupQueue")Queue backupQueue){
        return  BindingBuilder.bind (backupQueue).to (backupExchange);// 無綁定 routingKey,扇出(廣播)類型沒有必要有
    }

    // 報警隊列綁定備份交換機
    @Bean
    public Binding warningQueueBindingBackupExchange(@Qualifier("backupExchange")FanoutExchange backupExchange,
                                @Qualifier("warningQueue")Queue warningQueue){
        return  BindingBuilder.bind (warningQueue).to (backupExchange);// 無綁定 routingKey,扇出(廣播)類型沒有必要有
    }





    // 聲明交換機(改寫:若無法投遞需要轉發(fā)到備份交換機上面)
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        //return new DirectExchange (CONFIRM_EXCHANGE_NAME);
        // 代碼意思:直接交換機 confirm.exchange ,持久化,并且無法投遞的時候轉發(fā)到備份交換機
        return ExchangeBuilder.directExchange (CONFIRM_EXCHANGE_NAME).durable (true)
                .withArgument ("alternate-exchange",BACKUP_EXCHANGE_NAME).build ();
    }

    // ------------------------------以上為核心擴展部分--------------------------------------------------

    // 聲明隊列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable (CONFIRM_QUEUE_NAME).build ();
    }

    // 綁定
    @Bean
    public Binding queueBinding(@Qualifier("confirmExchange")DirectExchange confirmExchange,
                                @Qualifier("confirmQueue")Queue queue){
        return  BindingBuilder.bind (queue).to (confirmExchange).with (CONFIRM_ROUTING_KEY);
    }
}

3、報警消費者

package com.rabbit.consumer;

import com.rabbit.config.ConfirmConfigBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class WarningConsumer {
    @RabbitListener(queues = ConfirmConfigBack.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message){
        log.error("報警發(fā)現(xiàn)不可路由消息:{}", new String (message.getBody ()));
    }
}

4、注意事項:需要刪除掉之前定義好的交換機(confrim.exchange)因為配置關系已經(jīng)變了


image.png

5、輸出結果


image.png

mandatory 參數(shù)與備份交換機可以一起使用的時候,如果兩者同時開啟,備份交換機優(yōu)先級高。

二、RabbitMQ 其他知識點

1、冪等性

1、概念
用戶對于同一操作發(fā)起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產(chǎn)生了副作用。
舉個最簡單的例子,那就是支付,用戶購買商品后支付,支付扣款成功,但是返回結果的時候網(wǎng)絡異常,
此時錢已經(jīng)扣了,用戶再次點擊按鈕,此時會進行第二次扣款,返回結果成功,用戶查詢余額發(fā)現(xiàn)多扣錢
了,流水記錄也變成了兩條。在以前的單應用系統(tǒng)中,我們只需要把數(shù)據(jù)操作放入事務中即可,發(fā)生錯誤
立即回滾,但是再響應客戶端的時候也有可能出現(xiàn)網(wǎng)絡中斷或者異常等等

2、消息重復消費
消費者在消費 MQ 中的消息時,MQ 已把消息發(fā)送給消費者,消費者在給MQ 返回 ack 時網(wǎng)絡中斷,
故 MQ 未收到確認信息,該條消息會重新發(fā)給其他的消費者,或者在網(wǎng)絡重連后再次發(fā)送給該消費者,但
實際上該消費者已成功消費了該條消息,造成消費者消費了重復的消息。

3、解決思路
MQ 消費者的冪等性的解決一般使用全局 ID 或者寫個唯一標識比如時間戳 或者 UUID 或者訂單消費
者消費 MQ 中的消息也可利用 MQ 的該 id 來判斷,或者可按自己的規(guī)則生成一個全局唯一 id,每次消費消
息時用該 id 先判斷該消息是否已消費過。

4、消費端的冪等性保障
在海量訂單生成的業(yè)務高峰期,生產(chǎn)端有可能就會重復發(fā)生了消息,這時候消費端就要實現(xiàn)冪等性,
這就意味著我們的消息永遠不會被消費多次,即使我們收到了一樣的消息。業(yè)界主流的冪等性有兩種操作:a.
唯一 ID+指紋碼機制,利用數(shù)據(jù)庫主鍵去重, b.利用 redis 的原子性去實現(xiàn)

5、唯一ID+指紋碼機制
指紋碼:我們的一些規(guī)則或者時間戳加別的服務給到的唯一信息碼,它并不一定是我們系統(tǒng)生成的,基
本都是由我們的業(yè)務規(guī)則拼接而來,但是一定要保證唯一性,然后就利用查詢語句進行判斷這個 id 是否存
在數(shù)據(jù)庫中,優(yōu)勢就是實現(xiàn)簡單就一個拼接,然后查詢判斷是否重復;劣勢就是在高并發(fā)時,如果是單個數(shù)
據(jù)庫就會有寫入性能瓶頸當然也可以采用分庫分表提升性能,但也不是我們最推薦的方式。

6、Redis原子性(最佳方案)
利用 redis 執(zhí)行 setnx 命令,天然具有冪等性。從而實現(xiàn)不重復消費

2、優(yōu)先級隊列

1、場景
在我們系統(tǒng)中有一個訂單催付的場景,我們的客戶在天貓下的訂單,淘寶會及時將訂單推送給我們,如
果在用戶設定的時間內(nèi)未付款那么就會給用戶推送一條短信提醒,很簡單的一個功能對吧,但是,tmall
商家對我們來說,肯定是要分大客戶和小客戶的對吧,比如像蘋果,小米這樣大商家一年起碼能給我們創(chuàng)
造很大的利潤,所以理應當然,他們的訂單必須得到優(yōu)先處理,而曾經(jīng)我們的后端系統(tǒng)是使用 redis 來存
放的定時輪詢,大家都知道 redis 只能用 List 做一個簡簡單單的消息隊列,并不能實現(xiàn)一個優(yōu)先級的場景,所以訂單量大了后采用 RabbitMQ 進行改造和優(yōu)化,如果發(fā)現(xiàn)是大客戶的訂單給一個相對比較高的優(yōu)先級,否則就是默認優(yōu)先級。(簡稱,開后門)

2、如何添加
方式一:客戶端操作


image.png

方式二:代碼控制,在隊列層面

        Map<String, Object> params = new HashMap ( );
        params.put ("x-max-priority", 10);
        channel.queueDeclare ("hello", true, false, false, params);

方式三:代碼控制,在消息層面

        AMQP.BasicProperties properties = new
                AMQP.BasicProperties ( ).builder ( ).priority (5).build ( );

注意事項:
要讓隊列實現(xiàn)優(yōu)先級需要做的事情有如下事情:隊列需要設置為優(yōu)先級隊列,消息需要設置消息的優(yōu)先級,消費者需要等待消息已經(jīng)發(fā)送到隊列中才去消費因為,這樣才有機會對消息進行排序

3、惰性隊列

1、場景:
RabbitMQ 從 3.6.0 版本開始引入了惰性隊列的概念。惰性隊列會盡可能的將消息存入磁盤中,而在消
費者消費到相應的消息時才會被加載到內(nèi)存中,它的一個重要的設計目標是能夠支持更長的隊列,即支持
更多的消息存儲。當消費者由于各種各樣的原因(比如消費者下線、宕機亦或者是由于維護而關閉等)而致
使長時間內(nèi)不能消費消息造成堆積時,惰性隊列就很有必要了。

默認情況下,當生產(chǎn)者將消息發(fā)送到 RabbitMQ 的時候,隊列中的消息會盡可能的存儲在內(nèi)存之中,
這樣可以更加快速的將消息發(fā)送給消費者。即使是持久化的消息,在被寫入磁盤的同時也會在內(nèi)存中駐留
一份備份。當 RabbitMQ 需要釋放內(nèi)存的時候,會將內(nèi)存中的消息換頁至磁盤中,這個操作會耗費較長的
時間,也會阻塞隊列的操作,進而無法接收新的消息。雖然 RabbitMQ 的開發(fā)者們一直在升級相關的算法,
但是效果始終不太理想,尤其是在消息量特別大的時候。

2、兩種模式
隊列具備兩種模式:default 和 lazy。默認的為default 模式,在3.6.0 之前的版本無需做任何變更。lazy
模式即為惰性隊列的模式,可以通過調(diào)用 channel.queueDeclare 方法的時候在參數(shù)中設置,也可以通過
Policy 的方式設置,如果一個隊列同時使用這兩種方式設置的話,那么 Policy 的方式具備更高的優(yōu)先級。

如果要通過聲明的方式改變已有隊列的模式的話,那么只能先刪除隊列,然后再重新聲明一個新的。
在隊列聲明的時候可以通過“x-queue-mode”參數(shù)來設置隊列的模式,取值為“default”和“l(fā)azy”。下面示
例中演示了一個惰性隊列的聲明細節(jié):

        Map<String, Object> args = new HashMap<String, Object> ( );
        args.put ("x-queue-mode", "lazy");
        channel.queueDeclare ("myqueue", false, false, false, args);

3、內(nèi)存開銷問題


image.png

在發(fā)送 1 百萬條消息,每條消息大概占 1KB 的情況下,普通隊列占用內(nèi)存是 1.2GB,而惰性隊列僅僅
占用 1.5MB

三、RabbitMQ 集群(因沒機子沒硬盤搞集群,暫更新)

1、clustering
2、鏡像隊列
3、Haproxy+Keepalive 實現(xiàn)高可用負載均衡
4、Federation Exchange
5、Federation Queue
6、Shovel

四、遇到的問題

1、rabbitmq宕機:Management API returned status code 500


image.png

重啟命令不是一下博客的:因為有改過
rabbitmqctl start_app,其他參考以下
解決方案:
https://blog.csdn.net/qq_38890137/article/details/104403234

2、補充:查看liunx剩余內(nèi)存:free -h

image.png

五、總結了幾點

1、交換機出問題可以通過回調(diào),但是隊列無法區(qū)分;所以隊列無法路由只能通過回退操作。

2、
image.png
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容