假設(shè)一個(gè)場(chǎng)景,由于我們的消費(fèi)端突然全部不可用了,導(dǎo)致 rabbitMQ 服務(wù)器上有上萬(wàn)條未處理的消息,這時(shí)候如果沒(méi)做任何現(xiàn)在,隨便開啟一個(gè)消費(fèi)端客戶端,就會(huì)導(dǎo)致巨量的消息瞬間全部推送過(guò)來(lái),但是我們單個(gè)客戶端無(wú)法同時(shí)處理這么多的數(shù)據(jù),就會(huì)導(dǎo)致消費(fèi)端變得巨卡,有可能直接崩潰不可用了。所以在實(shí)際生產(chǎn)中,限流保護(hù)是很重要的。
rabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過(guò)基于 consume 或者 channel 設(shè)置 QOS 的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。關(guān)鍵代碼就是在聲明消費(fèi)者代碼里面的
void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
prefetchSize:0
prefetchCount:會(huì)告訴 RabbitMQ 不要同時(shí)給一個(gè)消費(fèi)者推送多于 N 個(gè)消息,即一旦有 N 個(gè)消息還沒(méi)有 ack,則該 consumer 將 block 掉,直到有消息 ack
global:true、false 是否將上面設(shè)置應(yīng)用于 channel,簡(jiǎn)單點(diǎn)說(shuō),就是上面限制是 channel 級(jí)別的還是 consumer 級(jí)別
備注:prefetchSize 和 global 這兩項(xiàng),rabbitmq 沒(méi)有實(shí)現(xiàn),暫且不研究。特別注意一點(diǎn),prefetchCount 在 no_ask=false 的情況下才生效,即在自動(dòng)應(yīng)答的情況下這兩個(gè)值是不生效的。
代碼演示:
代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項(xiàng)目下
生產(chǎn)端代碼基本沒(méi)變化,改了 exchange 和 routingKey 而已
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ limit Message";
for(int i = 0; i < 5; i ++){
log.info("生產(chǎn)端發(fā)送:{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
消費(fèi)端代碼需要修改一下 autoAck 設(shè)置為 false **
增加 ** channel.basicQos(0, 1, false);
完整的消費(fèi)端代碼如下
/**
* 使用自定義消費(fèi)者
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "qos.#";
public static final String ROUTING_KEY = "qos.save";
public static final String QUEUE_NAME = "test_qos_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 通過(guò)Connection創(chuàng)建一個(gè)新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
/**
* prefetchSize:0
prefetchCount:會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,限速N個(gè)
即一旦有 N 個(gè)消息還沒(méi)有 ack,則該 consumer 將 block 掉,直到有消息 ack 回來(lái),你再發(fā)送 N 個(gè)過(guò)來(lái)
global:true\false 是否將上面設(shè)置應(yīng)用于channel級(jí)別,false是consumer級(jí)別
prefetchSize 和global這兩項(xiàng),rabbitmq沒(méi)有實(shí)現(xiàn),暫且不研究
*/
channel.basicQos(0, 1, false);
//使用自定義消費(fèi)者
//1 限流方式 第一件事就是 autoAck設(shè)置為 false
//使用自定義消費(fèi)者
channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
log.info("消費(fèi)端啟動(dòng)成功");
}
}
自定義消費(fèi)者
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消費(fèi)者標(biāo)簽
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
//一定要手動(dòng)ACK回去
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
然后啟動(dòng)消費(fèi)端,上管控臺(tái)查看 test_qos_exchange 和 test_qos_queue 是否生成了
確認(rèn) test_qos_exchange 上綁定了 test_qos_queue
啟動(dòng)生產(chǎn)端發(fā)送 5 條消息
發(fā)現(xiàn)消費(fèi)端只打印了一條消息
從管控臺(tái)上也看到總共 5 條消息,有 4 條等待著,一條消費(fèi)了但是沒(méi)有 ack 回去
修改自定義消費(fèi)者里面的代碼,如下所示
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消費(fèi)者標(biāo)簽
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
//一定要手動(dòng)ACK回去
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
重啟消費(fèi)端,看到消費(fèi)端就按照一條一條消費(fèi),并且 ACK 回去了
如上所示就是簡(jiǎn)單的RabbitMQ消費(fèi)端的限流策略