分布式--RabbitMQ集成SpringBoot、消息可靠性、死信隊(duì)列、延遲交換機(jī)、集群

接著上篇分布式--RabbitMQ入門(mén)

一、SpringBoot中使用RabbitMQ

1. 導(dǎo)入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2. yml配置

spring:
  rabbitmq:
    host: 192.168.42.4
    port: 5672
    username: aruba
    password: aruba
    virtual-host: /
    listener:
      direct:
        acknowledge-mode: manual # 手動(dòng)ack
      simple:
        prefetch: 1 # 流控
        concurrency: 10 # 多線程監(jiān)控

3. 配置交換機(jī)和隊(duì)列

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "MY-MQ-EX";
    public static final String QUEUE_NAME = "MY-MQ-QUEUE";
    public static final String ROUTING_KEY = "key.#";

    /**
     * 注入交換機(jī)
     *
     * @return
     */
    @Bean
    public Exchange exchangeProvider() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 注入隊(duì)列
     *
     * @return
     */
    @Bean
    public Queue queueProvider() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 注入交換機(jī)隊(duì)列綁定關(guān)系
     *
     * @return
     */
    @Bean
    public Binding bootBinding(Exchange exchangeProvider, Queue queueProvider) {
        return BindingBuilder.bind(queueProvider).to(exchangeProvider).with(ROUTING_KEY).noargs();
    }
}

4. 發(fā)送消息

SpringBoot中使用RabbitTemplate自動(dòng)注入,即可發(fā)送消息,并對(duì)方法都進(jìn)行了封裝

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "key.send", "發(fā)送消息");
    }

    /**
     * 攜帶信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "發(fā)送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }
    
}

5. 訂閱消息

在方法上使用@RabbitListener注解,即可指定訂閱隊(duì)列。
入?yún)⑻砑?code>Channel,就可以和之前一樣發(fā)送ack。
將消息封裝成了Message,可以獲取其攜帶信息。

@Component
public class MQListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("隊(duì)列的消息為:" + msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一標(biāo)識(shí)為:" + correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

運(yùn)行結(jié)果:

二、消息可靠性

由于RabbitMQ在發(fā)送消息和訂閱消息時(shí),都是通過(guò)網(wǎng)絡(luò)傳輸,其間必然會(huì)出現(xiàn)由網(wǎng)絡(luò)問(wèn)題產(chǎn)生的消息丟失情況,要保證消息的可靠性從下面四點(diǎn)出發(fā):

  • 保證消息發(fā)送到交換機(jī)
  • 保證消息路由到隊(duì)列
  • 保證隊(duì)列中消息的持久化
  • 保證消費(fèi)者正常消費(fèi)消息

1. Client-API方式

1.1 保證消息發(fā)送到交換機(jī)

Publisher Confirms就是為了保證消息發(fā)送到交換機(jī)的機(jī)制,一般使用異步的方式:

        //4. 開(kāi)啟confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功發(fā)送到交換機(jī)");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("沒(méi)有送達(dá)交換機(jī)");
            }
        });
1.2 保證消息路由到隊(duì)列

addReturnListener方法可以確認(rèn)消息是否路由到了隊(duì)列,如果回調(diào)了說(shuō)明沒(méi)有路由到隊(duì)列
發(fā)送消息時(shí),指定mandatory參數(shù)為true

        //5. 設(shè)置return回調(diào),確認(rèn)消息是否路由到了隊(duì)列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交換機(jī)沒(méi)有路由到隊(duì)列");
            }
        });

        //參數(shù): 交換機(jī) routing-Key mandatory 消息其他參數(shù)  消息
        channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
1.3 保證隊(duì)列中消息的持久化

首先保證隊(duì)列的持久化,再保證消息的持久化

        //3. 構(gòu)建隊(duì)列  參數(shù):隊(duì)列名 是否持久化 是否排外(只允許一個(gè)消費(fèi)者) 長(zhǎng)時(shí)間未使用是否自動(dòng)刪除 其他參數(shù)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //6. 發(fā)送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //參數(shù): 交換機(jī) routing-Key mandatory 消息其他參數(shù)  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
1.4 保證消費(fèi)者正常消費(fèi)消息

保證消費(fèi)者正常消費(fèi)消息只需要手動(dòng)ack即可,生產(chǎn)者完整代碼:

public class Publisher {

    private static final String QUEUE_NAME = "confirm";

    @Test
    public void publisher() throws Exception {
        //1. 獲取連接對(duì)象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 創(chuàng)建信道
        Channel channel = connection.createChannel();

        //3. 構(gòu)建隊(duì)列  參數(shù):隊(duì)列名 是否持久化 是否排外(只允許一個(gè)消費(fèi)者) 長(zhǎng)時(shí)間未使用是否自動(dòng)刪除 其他參數(shù)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4. 開(kāi)啟confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功發(fā)送到交換機(jī)");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("沒(méi)有送達(dá)交換機(jī)");
            }
        });

        //5. 設(shè)置return回調(diào),確認(rèn)消息是否路由到了隊(duì)列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交換機(jī)沒(méi)有路由到隊(duì)列");
            }
        });

        //6. 發(fā)送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //參數(shù): 交換機(jī) routing-Key mandatory 消息其他參數(shù)  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
    }

}

2. SpringBoot方式

2.1 配置Confirm

yml中開(kāi)啟confirm

spring:
  rabbitmq:
    publisher-confirm-type: correlated

RabbitTemplate設(shè)置回調(diào):

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送達(dá)到交換機(jī)");
                } else {
                    System.out.println("消息沒(méi)有送達(dá)到交換機(jī)");
                }
            }
        });
2.2 配置Return

yml中開(kāi)啟return

spring:
  rabbitmq:
    publisher-returns: true

RabbitTemplate設(shè)置回調(diào):

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交換機(jī):%s 路由消息失敗", returned.getExchange()));
            }
        });
2.3 消息持久化

設(shè)置Message的攜帶信息:

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "發(fā)送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                });

完整代碼:

    /**
     * 攜帶信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送達(dá)到交換機(jī)");
                } else {
                    System.out.println("消息沒(méi)有送達(dá)到交換機(jī)");
                }
            }
        });
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交換機(jī):%s 路由消息失敗", returned.getExchange()));
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "發(fā)送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }

三、死信隊(duì)列

死信隊(duì)列是存放本來(lái)應(yīng)該死亡的消息的隊(duì)列,用于對(duì)這些消息的特殊處理(如:重新入隊(duì)、持久化到數(shù)據(jù)庫(kù)),具體有以下幾種消息會(huì)被存放進(jìn)死信隊(duì)列:

  • 消費(fèi)者拒絕的消息,并requeue設(shè)置為false(不重新入隊(duì)列)
  • 消息的生存時(shí)間到了,還在隊(duì)列中的信息
  • 隊(duì)列設(shè)置了整體的消息生存時(shí)間,到了生存時(shí)間的消息
  • 到達(dá)隊(duì)列中消息最大數(shù),再路由過(guò)來(lái)的消息

1. 構(gòu)建交換機(jī)

死信隊(duì)列需要一個(gè)死信交換機(jī),并把正常消息的隊(duì)列綁定死信交換機(jī):

@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE_NAME = "normal-ex";
    public static final String NORMAL_QUEUE_NAME = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE_NAME = "dead-ex";
    public static final String DEAD_QUEUE_NAME = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";

    @Bean
    public Exchange normalExchange() {
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue normalQueue() {
        // 綁定死信交換機(jī)
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") //準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
                .build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}

2. 死信隊(duì)列的實(shí)現(xiàn)方式

2.1 拒絕消息入死信隊(duì)列

對(duì)正常隊(duì)列消息進(jìn)行監(jiān)聽(tīng),來(lái)做相應(yīng)的處理,首先是拒絕消息,并且要把requeue設(shè)為false

@Component
public class DeadListener {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE_NAME)
    public void normalListener(Message msg, Channel channel) throws IOException {
        System.out.println("接收到正常隊(duì)列消息:" + new String(msg.getBody()));
        channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
//        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
    }

}

嘗試發(fā)送一個(gè)消息:

    @Test
    public void sendNormal() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME, "normal.msg", "哈嘍");
    }

運(yùn)行結(jié)果:

2.2 消息生存時(shí)間

發(fā)送消息時(shí),通過(guò)消息的額外參數(shù)MessagePropertiessetExpiration方法設(shè)置過(guò)期時(shí)間:

    @Test
    public void sendExpire() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈嘍",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        // 該消息10s后過(guò)期
                        message.getMessageProperties().setExpiration("10000");
                        return message;
                    }
                });
    }

記得把上面消息的監(jiān)聽(tīng)注釋掉,否則會(huì)消費(fèi)消息

運(yùn)行結(jié)果:

2.3 隊(duì)列消息的整體生存時(shí)間

管理頁(yè)面把之前的正常隊(duì)列刪除,在重新創(chuàng)建時(shí),為正常隊(duì)列設(shè)置ttl

設(shè)置ttl

    @Bean
    public Queue normalQueue() {
        // 綁定死信交換機(jī)
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .ttl(5000) // 整體消息過(guò)期時(shí)間為5s
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
                .build();
    }

發(fā)送正常消息,運(yùn)行結(jié)果:

2.4 達(dá)到隊(duì)列最大數(shù)

同樣先刪除正常隊(duì)列,后調(diào)用maxLength為隊(duì)列設(shè)置最大消息數(shù):

    @Bean
    public Queue normalQueue() {
        // 綁定死信交換機(jī)
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整體消息過(guò)期時(shí)間為5s
                .maxLength(1) // 設(shè)置消息最大數(shù)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
                .build();
    }

發(fā)送兩次正常消息,運(yùn)行結(jié)果:

四、延遲交換機(jī)

死信隊(duì)列的問(wèn)題:由于死信隊(duì)列只會(huì)監(jiān)聽(tīng)隊(duì)列頭的過(guò)期時(shí)間,一旦隊(duì)列頭的消息過(guò)期時(shí)間比后面排隊(duì)的消息過(guò)期時(shí)間長(zhǎng),那么后面消息的過(guò)期時(shí)間并不會(huì)生效,而是等待隊(duì)列頭的過(guò)期時(shí)間到了后,才一并進(jìn)入死信隊(duì)列

刪除正常隊(duì)列,恢復(fù)配置:

    @Bean
    public Queue normalQueue() {
        // 綁定死信交換機(jī)
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整體消息過(guò)期時(shí)間為5s
//                .maxLength(1) // 設(shè)置消息最大數(shù)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
                .build();
    }

發(fā)送兩次消息,第一次過(guò)期時(shí)間為30s,第二次為2s:

    @Test
    public void sendExpire30() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈嘍",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("30000");
                        return message;
                    }
                });
    }

    @Test
    public void sendExpire2() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈嘍",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("2000");
                        return message;
                    }
                });
    }

結(jié)果,過(guò)了幾秒后,隊(duì)列中還是兩個(gè)消息:

解決方法:根據(jù)時(shí)間創(chuàng)建多個(gè)隊(duì)列或者使用延遲交換機(jī)

延遲交換機(jī)是一個(gè)插件,默認(rèn)并不帶,原理就是將消息暫時(shí)放在交換機(jī)中,由交換機(jī)根據(jù)消息過(guò)期時(shí)間的先后來(lái)路由到隊(duì)列,缺點(diǎn):由于消息在交換機(jī)中,重啟會(huì)導(dǎo)致消息的丟失

1. 插件下載和使用

根據(jù)自己的RabbitMQ版本進(jìn)行下載:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
mv rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/local/rabbitmq/rabbitmq_server-3.8.35/plugins

啟動(dòng)插件:

cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重啟服務(wù)或系統(tǒng)后,多了一個(gè)x-delayed-message的交換機(jī)類型:

2. 配置延遲交換機(jī)

使用CustomExchange構(gòu)造x-delayed-message類型交換機(jī),并使用其他參數(shù)x-delayed-type指定使用哪種原型交換機(jī)類型,這邊使用的是topic

@Configuration
public class DelayExchangeConfig {

    public static final String EXCHANGE_NAME = "delay-exchange";
    public static final String DELAY_QUEUE = "delay_queue";
    public static final String DELAY_ROUTIN_KEY = "delay.#";

    @Bean
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        // 使用哪種原型交換機(jī)類型
        args.put("x-delayed-type", "topic");
        Exchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
        return exchange;
    }

    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }


    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTIN_KEY).noargs();
    }
    
}

3. 發(fā)送消息

MessageProperties使用setDelay方法為消息設(shè)置延遲:

    @Test
    public void sendDelay30() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈嘍",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(30000);
                        return message;
                    }
                });
    }

    @Test
    public void sendDelay5() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈嘍",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(5000);
                        return message;
                    }
                });
    }

消息在交換機(jī)進(jìn)行等待后,首先入隊(duì)列的為5秒延遲的,后面入隊(duì)列的為30秒延遲的:

五、集群

1. 配置主機(jī)名

RabbitMQ集群的搭建要配置主機(jī)名:HOSTNAME,先修改network配置文件

vi /etc/sysconfig/network

追加HOSTNAME:

HOSTNAME=rabbit1

再修改hosts文件:

vi /etc/hosts

追加內(nèi)容:

192.168.42.4 rabbit1

重啟系統(tǒng)后,RabbitMQ先前配置的管理賬號(hào)會(huì)丟失,需要重新配置

2. 克隆虛擬機(jī)

2.1 從機(jī)主機(jī)名配置

克隆后,對(duì)從機(jī)進(jìn)行主機(jī)名的配置,network配置文件:

hosts文件,中需要添加集群主節(jié)點(diǎn)的ip和hostname:

2.2 建立集群關(guān)聯(lián)

啟動(dòng)RabbitMQ服務(wù)后,管理界面的節(jié)點(diǎn)會(huì)帶上主機(jī)名:

接下來(lái),配置從機(jī)加入到主節(jié)點(diǎn)集群中,執(zhí)行以下命令即可:

cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin/ 
./rabbitmqctl stop_app
./rabbitmqctl reset 
./rabbitmqctl join_cluster rabbit@rabbit1
./rabbitmqctl start_app

加入成功后,管理界面中就會(huì)出現(xiàn)多個(gè)節(jié)點(diǎn):

3. 配置鏡像模式

目前集群是普通模式,隊(duì)列中的消息只會(huì)存在于一個(gè)節(jié)點(diǎn)上,而不會(huì)同步到其他隊(duì)列,一旦該節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)將無(wú)法訪問(wèn)消息。
鏡像模式是指,集群中所有節(jié)點(diǎn)都有一份單獨(dú)的拷貝,即使單一節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)中依然存在消息的拷貝,這樣才能實(shí)現(xiàn)高可用

在管理界面進(jìn)行配置鏡像策略:

新建一個(gè)隊(duì)列,并查看詳情:

項(xiàng)目地址:

https://gitee.com/aruba/rabbit-mqstudy.git

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容