前言
目的主要是學(xué)習(xí)RabbitMQ的Qos限流,大概會簡單介紹學(xué)習(xí)為主:畢竟還是要來演示Springboot整合RabbitMQ注解的方式來使用
一.消費(fèi)端限流
假設(shè)一個(gè)場景,比如,由于之前Rabbitmq服務(wù)器積壓了許多之前未被處理的上萬條消息,當(dāng)我隨便打開其中一個(gè)消費(fèi)者客戶端,會出現(xiàn)這種問題,信息一涌而進(jìn),當(dāng)數(shù)據(jù)量特別大的時(shí)候可能會導(dǎo)致服務(wù)器卡頓或者直接崩潰,于是我們應(yīng)該對消費(fèi)端限流,用于保持的穩(wěn)定。
1.1 basicQos
RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
自動(dòng)簽收要設(shè)置成false, 建議實(shí)際工作中也設(shè)置成false
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize : 消息大小限制, 一般設(shè)置為0, 消費(fèi)端不做限制
prefetchCount : 會告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息, 即一旦有N個(gè)消息還沒有ack, 則該consumer將block(阻塞), 直到有消息ack
global : true/false 是否將上面設(shè)置應(yīng)用于channel, 簡單來說就是上面的限制是channel級別的還是consumer級別
注意 :
prefetchSize和global這兩項(xiàng), RabbitMQ沒有實(shí)現(xiàn), 暫且不關(guān)注, prefetchCount在autoAck設(shè)置false的情況下生效, 即在自動(dòng)確認(rèn)的情況下這兩個(gè)值是不生效的
1.2 對消費(fèi)端進(jìn)行限流
1)我們既然要使用消費(fèi)端限流,我們需要關(guān)閉自動(dòng) ack,將 autoAck 設(shè)置為 false
channel.basicConsume(queueName, false, consumer);
2)我們來設(shè)置具體的限流大小以及數(shù)量。
// 0,15 從0到15限制15條
// 設(shè)置 false 應(yīng)該于consumer級別
channel.basicQos(0, 15, false);
- 在消費(fèi)者的 handleDelivery 消費(fèi)方法中手動(dòng) ack,并且設(shè)置批量處理 ack 回應(yīng)為 true
channel.basicAck(envelope.getDeliveryTag(), true);
2 生產(chǎn)端和消費(fèi)端工程配置
省略配置....可翻之前的Rabbitmq工程配置
3.生產(chǎn)端工程
config配置
3.1 config配置
聲明隊(duì)列,此處使用direct隊(duì)列
@Component
@Slf4j
public class RabbitListenerConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
connectionFactory.createConnection();
return connectionFactory;
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//開啟mandatory模式(開啟失敗回調(diào))
rabbitTemplate.setMandatory(true);
//添加失敗回調(diào)方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
});
// 添加發(fā)送方確認(rèn)模式方法
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
log.info("correlationData:{}, ack:{}, cause:{}",
correlationData.getId(), ack, cause));
return rabbitTemplate;
}
/***聲明 direct 隊(duì)列 一對一***/
@Bean
public Exchange directExchange(){
return new DirectExchange("dircet.exchange.test");
}
@Bean
public Queue directQueue(){
return new Queue("direct.queue.test");
}
@Bean
public Binding directBinding(){
return new Binding("direct.queue.test",
Binding.DestinationType.QUEUE,
"dircet.exchange.test",
"direct.key",null);
}
}
3.2 dto
@Getter
@Setter
@ToString
public class OrderMessageDTO implements Serializable{
private Integer orderId;
private BigDecimal price;
private Integer productId;
}
3.3 service
public interface DirectService {
public void sendMessage();
// 使用限流Qos
public void sendQosMessage() throws JsonProcessingException;
}
3.4 impl
此處使用了另一種傳遞消息,使用了MessageProperties,將MessageProperties傳遞的對象轉(zhuǎn)換成message
@Slf4j
@Service
public class DirectServiceImpl implements DirectService {
@Autowired
private RabbitTemplate rabbitTemplate;
ObjectMapper objectMapper = new ObjectMapper();
/**
* 發(fā)送Qos 限流消息
*/
@Override
public void sendQosMessage() throws JsonProcessingException {
/* 使用MessageProperties傳遞的對象轉(zhuǎn)換成message*/
MessageProperties messageProperties = new MessageProperties();
OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
orderMessageDTO.setProductId(100);
orderMessageDTO.setPrice(new BigDecimal("20"));
orderMessageDTO.setOrderId(1);
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
Message message = new Message(messageToSend.getBytes(),messageProperties);
// 發(fā)送端確認(rèn)是否確認(rèn)消費(fèi)
CorrelationData correlationData = new CorrelationData();
// 唯一ID
correlationData.setId(orderMessageDTO.getOrderId().toString());
rabbitTemplate.convertAndSend("dircet.exchange.test","direct.queue.test",message,correlationData);
}
}
3.5 controller
@RestController
@Slf4j
@RequestMapping("/api")
public class SendController {
@Autowired
private DirectService directService;
@GetMapping("/qosDirect")
public void qosDirect() throws JsonProcessingException {
for (int i = 0; i < 200; i++) {
directService.sendQosMessage();
}
}
}
4.消費(fèi)端工程
4.1 config配置
@Component
@Slf4j
public class RabbitListenerConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
connectionFactory.createConnection();
return connectionFactory;
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
4.2 service
public interface DirectReciveService {
// 處理監(jiān)聽生產(chǎn)端Qos限流
public void QosDirectRecive(Message message, Channel channel) throws IOException;
}
4.3 impl
@RabbitListener(
containerFactory = "rabbitListenerContainerFactory",
bindings = {
@QueueBinding(
value = @Queue(name = "direct.queue.test"),
exchange = @Exchange(name = "dircet.exchange.test",
type = ExchangeTypes.DIRECT),
key = "direct.queue.test"
)
}
)
@Override
public void QosDirectRecive(@Payload Message message, Channel channel) throws IOException {
log.info("========directQos接受消息===========");
channel.basicQos(0,10,false);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
log.info("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume("direct.queue.test",false,consumer);
}
5.演示
5.1 訪問 http://localhost:8082/api/qosDirect
5.2 Rabbitmq控制臺

上圖中發(fā)現(xiàn) Unacked值一直存在 ,每過 5 秒 消費(fèi)一條消息即 Ready 和 Total 都減少 10,而 Unacked的值在這里代表消費(fèi)者正在處理的消息,通過我們的實(shí)驗(yàn)發(fā)現(xiàn)了消費(fèi)者一次性最多處理 10 條消息,達(dá)到了消費(fèi)者限流的預(yù)期功能。