RabbitMQ 消費(fèi)端的限流策略

假設(shè)一個(gè)場(chǎng)景,由于我們的消費(fèi)端突然全部不可用了,導(dǎo)致 rabbitMQ 服務(wù)器上有上萬(wàn)條未處理的消息,這時(shí)候如果沒(méi)做任何現(xiàn)在,隨便開啟一個(gè)消費(fèi)端客戶端,就會(huì)導(dǎo)致巨量的消息瞬間全部推送過(guò)來(lái),但是我們單個(gè)客戶端無(wú)法同時(shí)處理這么多的數(shù)據(jù),就會(huì)導(dǎo)致消費(fèi)端變得巨卡,有可能直接崩潰不可用了。所以在實(shí)際生產(chǎn)中,限流保護(hù)是很重要的。

rabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過(guò)基于 consume 或者 channel 設(shè)置 QOS 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。關(guān)鍵代碼就是在聲明消費(fèi)者代碼里面的

void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
  1. prefetchSize:0

  2. prefetchCount:會(huì)告訴 RabbitMQ 不要同時(shí)給一個(gè)消費(fèi)者推送多于 N 個(gè)消息,即一旦有 N 個(gè)消息還沒(méi)有 ack,則該 consumer 將 block 掉,直到有消息 ack

  3. global:true、false 是否將上面設(shè)置應(yīng)用于 channel,簡(jiǎn)單點(diǎn)說(shuō),就是上面限制是 channel 級(jí)別的還是 consumer 級(jí)別

備注:prefetchSize 和 global 這兩項(xiàng),rabbitmq 沒(méi)有實(shí)現(xiàn),暫且不研究。特別注意一點(diǎn),prefetchCount 在 no_ask=false 的情況下才生效,即在自動(dòng)應(yīng)答的情況下這兩個(gè)值是不生效的。

代碼演示:

代碼地址:    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項(xiàng)目下

生產(chǎn)端代碼基本沒(méi)變化,改了 exchange 和 routingKey 而已

public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String msg = "Hello RabbitMQ limit Message";
        for(int i = 0; i < 5; i ++){
            log.info("生產(chǎn)端發(fā)送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
    }
}

消費(fèi)端代碼需要修改一下 autoAck 設(shè)置為 false **
增加 ** channel.basicQos(0, 1, false);

完整的消費(fèi)端代碼如下

/**
 * 使用自定義消費(fèi)者
 */
public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
    public static final String EXCHANGE_NAME = "test_qos_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTING_KEY_TYPE = "qos.#";
    public static final String ROUTING_KEY = "qos.save";
    public static final String QUEUE_NAME = "test_qos_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C onnection
        Connection connection = connectionFactory.newConnection();
        //3 通過(guò)Connection創(chuàng)建一個(gè)新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
        /**
         * prefetchSize:0
         prefetchCount:會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,限速N個(gè)
            即一旦有 N 個(gè)消息還沒(méi)有 ack,則該 consumer 將 block 掉,直到有消息 ack 回來(lái),你再發(fā)送 N 個(gè)過(guò)來(lái)
         global:true\false 是否將上面設(shè)置應(yīng)用于channel級(jí)別,false是consumer級(jí)別
         prefetchSize 和global這兩項(xiàng),rabbitmq沒(méi)有實(shí)現(xiàn),暫且不研究
         */
        channel.basicQos(0, 1, false);

        //使用自定義消費(fèi)者
        //1 限流方式  第一件事就是 autoAck設(shè)置為 false
      //使用自定義消費(fèi)者
        channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
        log.info("消費(fèi)端啟動(dòng)成功");
    }
}

自定義消費(fèi)者

public class MyConsumer extends DefaultConsumer {

    private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    
     private Channel channel;
     
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,  //消費(fèi)者標(biāo)簽
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //一定要手動(dòng)ACK回去
       //channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

然后啟動(dòng)消費(fèi)端,上管控臺(tái)查看 test_qos_exchange 和 test_qos_queue 是否生成了


image

確認(rèn) test_qos_exchange 上綁定了 test_qos_queue

image

啟動(dòng)生產(chǎn)端發(fā)送 5 條消息

image

發(fā)現(xiàn)消費(fèi)端只打印了一條消息

image

從管控臺(tái)上也看到總共 5 條消息,有 4 條等待著,一條消費(fèi)了但是沒(méi)有 ack 回去


image

修改自定義消費(fèi)者里面的代碼,如下所示

public class MyConsumer extends DefaultConsumer {

    private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    
     private Channel channel;
     
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,  //消費(fèi)者標(biāo)簽
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //一定要手動(dòng)ACK回去
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

重啟消費(fèi)端,看到消費(fèi)端就按照一條一條消費(fèi),并且 ACK 回去了

image
image

如上所示就是簡(jiǎn)單的RabbitMQ消費(fèi)端的限流策略

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 網(wǎng)上rabbitmq的學(xué)習(xí)日志非常豐富,官網(wǎng)文檔也很完美,這里主要記錄學(xué)習(xí)和部署過(guò)程中的一些記錄。會(huì)按以下菜單進(jìn)行...
    恐龍打醬油閱讀 2,656評(píng)論 0 4
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評(píng)論 2 11
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒(méi)有及時(shí)更新。 術(shù)語(yǔ)對(duì)...
    joyenlee閱讀 7,806評(píng)論 0 3
  • RabbitMQ 簡(jiǎn)介 MQ 消息隊(duì)列,上承生產(chǎn)者,下接消費(fèi)者。從生產(chǎn)者側(cè)獲取消息,然后將消息轉(zhuǎn)發(fā)給消費(fèi)者。由此可...
    2205閱讀 3,658評(píng)論 1 11
  • 2017年12月13日(十月廿六) 星期三 晴轉(zhuǎn)陰 晚上雪 今天是國(guó)家公祭日,第四年了,每年的這一天在南京...
    紅葉與藍(lán)鯨的美好相遇閱讀 400評(píng)論 0 0

友情鏈接更多精彩內(nèi)容