Springboot與RabbitMQ上手學(xué)習(xí)之Qos限流(四)

前言

目的主要是學(xué)習(xí)RabbitMQ的Qos限流,大概會簡單介紹學(xué)習(xí)為主:畢竟還是要來演示Springboot整合RabbitMQ注解的方式來使用

一.消費(fèi)端限流

假設(shè)一個(gè)場景,比如,由于之前Rabbitmq服務(wù)器積壓了許多之前未被處理的上萬條消息,當(dāng)我隨便打開其中一個(gè)消費(fèi)者客戶端,會出現(xiàn)這種問題,信息一涌而進(jìn),當(dāng)數(shù)據(jù)量特別大的時(shí)候可能會導(dǎo)致服務(wù)器卡頓或者直接崩潰,于是我們應(yīng)該對消費(fèi)端限流,用于保持的穩(wěn)定。

1.1 basicQos

RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。

自動(dòng)簽收要設(shè)置成false, 建議實(shí)際工作中也設(shè)置成false
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize : 消息大小限制, 一般設(shè)置為0, 消費(fèi)端不做限制
prefetchCount : 會告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息, 即一旦有N個(gè)消息還沒有ack, 則該consumer將block(阻塞), 直到有消息ack
global : true/false 是否將上面設(shè)置應(yīng)用于channel, 簡單來說就是上面的限制是channel級別的還是consumer級別
注意 :
prefetchSize和global這兩項(xiàng), RabbitMQ沒有實(shí)現(xiàn), 暫且不關(guān)注, prefetchCount在autoAck設(shè)置false的情況下生效, 即在自動(dòng)確認(rèn)的情況下這兩個(gè)值是不生效的

1.2 對消費(fèi)端進(jìn)行限流

1)我們既然要使用消費(fèi)端限流,我們需要關(guān)閉自動(dòng) ack,將 autoAck 設(shè)置為 false

channel.basicConsume(queueName, false, consumer);

2)我們來設(shè)置具體的限流大小以及數(shù)量。

// 0,15 從0到15限制15條
// 設(shè)置 false 應(yīng)該于consumer級別 
channel.basicQos(0, 15, false);
  1. 在消費(fèi)者的 handleDelivery 消費(fèi)方法中手動(dòng) ack,并且設(shè)置批量處理 ack 回應(yīng)為 true
channel.basicAck(envelope.getDeliveryTag(), true);
2 生產(chǎn)端和消費(fèi)端工程配置

省略配置....可翻之前的Rabbitmq工程配置

3.生產(chǎn)端工程

config配置

3.1 config配置

聲明隊(duì)列,此處使用direct隊(duì)列

@Component
@Slf4j
public class RabbitListenerConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
    @Bean
    @Qualifier("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //開啟mandatory模式(開啟失敗回調(diào))
        rabbitTemplate.setMandatory(true);
        //添加失敗回調(diào)方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
                    message, replyCode, replyText, exchange, routingKey);
        });
        // 添加發(fā)送方確認(rèn)模式方法
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
                log.info("correlationData:{}, ack:{}, cause:{}",
                        correlationData.getId(), ack, cause));
        return rabbitTemplate;
    }


    /***聲明 direct 隊(duì)列  一對一***/
    @Bean
    public Exchange directExchange(){
        return new DirectExchange("dircet.exchange.test");
    }

    @Bean
    public Queue directQueue(){
        return new Queue("direct.queue.test");
    }

    @Bean
    public Binding directBinding(){
        return new Binding("direct.queue.test",
                                Binding.DestinationType.QUEUE,
                                "dircet.exchange.test",
                                    "direct.key",null);
    }
}
3.2 dto
@Getter
@Setter
@ToString
public class OrderMessageDTO implements Serializable{
    private Integer orderId;
    private BigDecimal price;
    private Integer productId;
}
3.3 service

public interface DirectService {

public void sendMessage();

//  使用限流Qos
public void sendQosMessage() throws JsonProcessingException;

}

3.4 impl

此處使用了另一種傳遞消息,使用了MessageProperties,將MessageProperties傳遞的對象轉(zhuǎn)換成message

@Slf4j
@Service
public class DirectServiceImpl implements DirectService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    ObjectMapper objectMapper = new ObjectMapper();

    /**
     * 發(fā)送Qos 限流消息
     */
    @Override
    public void sendQosMessage() throws JsonProcessingException {
        /* 使用MessageProperties傳遞的對象轉(zhuǎn)換成message*/
        MessageProperties messageProperties = new MessageProperties();
        OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
        orderMessageDTO.setProductId(100);
        orderMessageDTO.setPrice(new BigDecimal("20"));
        orderMessageDTO.setOrderId(1);
        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
        Message message = new Message(messageToSend.getBytes(),messageProperties);
        // 發(fā)送端確認(rèn)是否確認(rèn)消費(fèi)
        CorrelationData correlationData = new CorrelationData();
        // 唯一ID
        correlationData.setId(orderMessageDTO.getOrderId().toString());
        rabbitTemplate.convertAndSend("dircet.exchange.test","direct.queue.test",message,correlationData);
    }
}
3.5 controller
@RestController
@Slf4j
@RequestMapping("/api")
public class SendController {
    @Autowired
    private DirectService directService;

    @GetMapping("/qosDirect")
    public void qosDirect() throws JsonProcessingException {
        for (int i = 0; i < 200; i++) {
            directService.sendQosMessage();
        }
    }
}

4.消費(fèi)端工程

4.1 config配置
@Component
@Slf4j
public class RabbitListenerConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
    
}
4.2 service
public interface DirectReciveService {
    // 處理監(jiān)聽生產(chǎn)端Qos限流
    public void QosDirectRecive(Message message, Channel channel) throws IOException;
}
4.3 impl
    @RabbitListener(
            containerFactory = "rabbitListenerContainerFactory",
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "direct.queue.test"),
                            exchange = @Exchange(name = "dircet.exchange.test",
                                    type = ExchangeTypes.DIRECT),
                            key = "direct.queue.test"
                    )
            }
    )
    @Override
    public void QosDirectRecive(@Payload Message message, Channel channel) throws IOException {
        log.info("========directQos接受消息===========");
        channel.basicQos(0,10,false);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                log.info("[x] Received '" + message + "'");

                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        };
        //. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
        channel.basicConsume("direct.queue.test",false,consumer);
    }

5.演示

5.1 訪問 http://localhost:8082/api/qosDirect

5.2 Rabbitmq控制臺


image.png

上圖中發(fā)現(xiàn) Unacked值一直存在 ,每過 5 秒 消費(fèi)一條消息即 Ready 和 Total 都減少 10,而 Unacked的值在這里代表消費(fèi)者正在處理的消息,通過我們的實(shí)驗(yàn)發(fā)現(xiàn)了消費(fèi)者一次性最多處理 10 條消息,達(dá)到了消費(fèi)者限流的預(yù)期功能。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容