RabbitMQ學(xué)習(xí)(五)消費端削峰限流

1.MQ的作用

1)解耦:在項目啟動之初是很難預(yù)測未來會遇到什么困難的,消息中間件在處理過程中插入了一個隱含的,基于數(shù)據(jù)的接口層,兩邊都實現(xiàn)這個接口,這樣就允許獨立的修改或者擴(kuò)展兩邊的處理過程,只要兩邊遵守相同的接口約束即可。
2)冗余(存儲):在某些情況下處理數(shù)據(jù)的過程中會失敗,消息中間件允許把數(shù)據(jù)持久化知道他們完全被處理
擴(kuò)展性:消息中間件解耦了應(yīng)用的過程,所以提供消息入隊和處理的效率是很容易的,只需要增加處理流程就可以了。
3)削峰:在訪問量劇增的情況下,但是應(yīng)用仍然需要發(fā)揮作用,但是這樣的突發(fā)流量并不常見。而使用消息中間件采用隊列的形式可以減少突發(fā)訪問壓力,不會因為突發(fā)的超時負(fù)荷要求而崩潰
4)可恢復(fù)性:當(dāng)系統(tǒng)一部分組件失效時,不會影響到整個系統(tǒng)。消息中間件降低了進(jìn)程間的耦合性,當(dāng)一個處理消息的進(jìn)程掛掉后,加入消息中間件的消息仍然可以在系統(tǒng)恢復(fù)后重新處理
5)順序保證:在大多數(shù)場景下,處理數(shù)據(jù)的順序也很重要,大部分消息中間件支持一定的順序性
6)緩沖:消息中間件通過一個緩沖層來幫助任務(wù)最高效率的執(zhí)行
7)異步通信:通過把把消息發(fā)送給消息中間件,消息中間件并不立即處。

本文只討論削峰填谷的應(yīng)用場景:

舉個業(yè)務(wù)場景的栗子,秒殺業(yè)務(wù):
上游發(fā)起下單操作
下游完成秒殺業(yè)務(wù)邏輯(庫存檢查,庫存凍結(jié),余額檢查,余額凍結(jié),訂單生成,余額扣減,庫存扣減,生成流水,余額解凍,庫存解凍)
上游下單業(yè)務(wù)簡單,每秒發(fā)起了10000個請求,下游秒殺業(yè)務(wù)復(fù)雜,每秒只能處理2000個請求,很有可能上游不限速的下單,導(dǎo)致下游系統(tǒng)被壓垮,引發(fā)雪崩。
為了避免雪崩,常見的優(yōu)化方案有兩種:
1)業(yè)務(wù)上游隊列緩沖,限速發(fā)送
2)業(yè)務(wù)下游隊列緩沖,限速執(zhí)行

本文只討論下游隊列,就是消費端的限速執(zhí)行

rabbitmq提供了一種服務(wù)質(zhì)量保障功能,即在非自動確認(rèn)消息的前提下,如果一定數(shù)目的消息未被確認(rèn),不進(jìn)行消費新的消息。
使用 basicqos方法:
在消費端進(jìn)行使用。 0 1 false
prefetSize:0
prefetCount:這個值一般在設(shè)置為非自動ack的情況下生效,一般大小為1
global: true是channel級別, false是消費者級別
注意:我們要使用非自動ack
消費者代碼:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //1 限流方式  第一件事就是 autoAck設(shè)置為 false
        
        channel.basicQos(0,3,false);
        channel.basicConsume(queueName,false,new MyConsumer(channel));
    }
}

自定義消費者代碼:

package com.bfxy.rabbitmq.api.limit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        
        channel.basicAck(envelope.getDeliveryTag(), false);
        
    }


}

生產(chǎn)者代碼:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");


        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

調(diào)試步驟:
1)啟動消費者類,效果如圖:

消費者啟動mq交換機信息.JPG

消費者啟動mq隊列信息.JPG

2)在自定義消費者類中注釋掉channel.basicAck(envelope.getDeliveryTag(), false);
啟動生產(chǎn)者類,mq管控臺信息
管控臺信息.JPG

可以看到1個待確認(rèn)的,4個準(zhǔn)備好的消息,
3)放開代碼channel.basicAck(envelope.getDeliveryTag(), false);
啟動生產(chǎn)者類,mq管控臺信息
管控臺信息.JPG

總結(jié):消費者消費成功一個消息后,需要設(shè)置成手動確認(rèn),當(dāng)返回確認(rèn)成功后,再去消費下一個消息,這樣可以實現(xiàn)消費端的削峰限流,不至于讓消費端服務(wù)崩潰。
到這里是不是以為結(jié)束了呢,其實還有一個知識點,就是消費端對沒有消費成功的消息,可以不進(jìn)行確認(rèn),讓其重回隊列,再次消費,與上面的代碼相比,只需修改自定義的消費者,設(shè)置如果滿足我們自己設(shè)置的條件就認(rèn)為是沒有消費成功,讓其重回隊列,這個時候broker端會再此發(fā)出這條消息。
修改如下:
重回隊列.JPG

啟動生產(chǎn)者和消費者,消費者控制臺信息如下:
重復(fù)消費未確認(rèn)的消息.JPG

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概念:微服務(wù)就是一些可獨立運行、可協(xié)同工作的小的服務(wù)。微服務(wù)是現(xiàn)在特別流行的服務(wù),微服務(wù)的字面意思是大家都很好理解...
    程序員技術(shù)圈閱讀 3,457評論 2 47
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,634評論 19 139
  • IM系統(tǒng)的MQ消息中間件選型:Kafka還是RabbitMQ? 1、前言 在IM這種講究高并發(fā)、高消息吞吐的互聯(lián)網(wǎng)...
    匆匆歲月閱讀 3,255評論 1 111
  • 秋水 紅葉與天邊 ☆田秀 一聲聲雁叫轉(zhuǎn)上紅葉放飛的長空 (我正枕靠水邊的夢鄉(xiāng)) 拉開黃昏...
    興安居士閱讀 299評論 0 3
  • 轉(zhuǎn)折點 當(dāng)每個人跟隨神的心意時,往往會面臨一個信仰的轉(zhuǎn)折點,有的人,雖有感動,卻沒有跟隨下去。中途放棄了,這樣就錯...
    玉初辰閱讀 894評論 0 51

友情鏈接更多精彩內(nèi)容