應(yīng)用范圍為服務(wù)訪問量突然劇增,原因可能有多種外部的調(diào)用或內(nèi)部的一些問題導(dǎo)致消息積壓,對服務(wù)的訪問超過服務(wù)所能處理的最大峰值,導(dǎo)致系統(tǒng)超時負(fù)載從而崩潰。
業(yè)務(wù)場景
舉一些我們平常生活中的消費場景,例如:火車票、機票、門票等,通常來說這些服務(wù)在下單之后,后續(xù)的出票結(jié)果都是異步通知的,如果服務(wù)本身只支持每秒1000訪問量,由于外部服務(wù)的原因突然訪問量增加到每秒2000并發(fā),這個時候服務(wù)接收者因為流量的劇增,超過了自己系統(tǒng)本身所能處理的最大峰值,如果沒有對消息做限流措施,系統(tǒng)在這段時間內(nèi)就會造成不可用,在生產(chǎn)環(huán)境這是一個很嚴(yán)重的問題,實際應(yīng)用場景不止于這些,本文通過RabbitMQ來講解如果對消費端做限流措施。
消費端限流機制
RabbitMQ提供了服務(wù)質(zhì)量保證 (QOS) 功能,對channel(通道)預(yù)先設(shè)置一定的消息數(shù)目,每次發(fā)送的消息條數(shù)都是基于預(yù)先設(shè)置的數(shù)目,如果消費端一旦有未確認(rèn)的消息,這時服務(wù)端將不會再發(fā)送新的消費消息,直到消費端將消息進(jìn)行完全確認(rèn),注意:此時消費端不能設(shè)置自動簽收,否則會無效。
在 RabbitMQ v3.3.0 之后,放寬了限制,除了對channel設(shè)置之外,還可以對每個消費者進(jìn)行設(shè)置。
以下為 Node.js 開發(fā)語言 amqplib 庫對于限流實現(xiàn)提供的接口方法 prefetch
export interface Channel extends events.EventEmitter {
prefetch(count: number, global?: boolean): Promise<Replies.Empty>;
...
}
prefetch 參數(shù)說明:
- number:每次推送給消費端 N 條消息數(shù)目,如果這 N 條消息沒有被ack,生產(chǎn)端將不會再次推送直到這 N 條消息被消費。
- global:在哪個級別上做限制,ture 為 channel 上做限制,false 為消費端上做限制,默認(rèn)為 false。
建立生產(chǎn)端
生產(chǎn)端沒什么變化,和正常聲明一樣,關(guān)于源碼參見rabbitmq-prefetch(Node.js客戶端版Demo)
const amqp = require('amqplib');
async function producer() {
// 1. 創(chuàng)建鏈接對象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 獲取通道
const channel = await connection.createChannel();
// 3. 聲明參數(shù)
const exchangeName = 'qosEx';
const routingKey = 'qos.test001';
const msg = 'Producer:';
// 4. 聲明交換機
await channel.assertExchange(exchangeName, 'topic', { durable: true });
for (let i=0; i<5; i++) {
// 5. 發(fā)送消息
await channel.publish(exchangeName, routingKey, Buffer.from(`${msg} 第${i}條消息`));
}
await channel.close();
}
producer();
建立消費端
const amqp = require('amqplib');
async function consumer() {
// 1. 創(chuàng)建鏈接對象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 獲取通道
const channel = await connection.createChannel();
// 3. 聲明參數(shù)
const exchangeName = 'qosEx';
const queueName = 'qosQueue';
const routingKey = 'qos.#';
// 4. 聲明交換機、對列進(jìn)行綁定
await channel.assertExchange(exchangeName, 'topic', { durable: true });
await channel.assertQueue(queueName);
await channel.bindQueue(queueName, exchangeName, routingKey);
// 5. 限流參數(shù)設(shè)置
await channel.prefetch(1, false);
// 6. 限流,noAck參數(shù)必須設(shè)置為false
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
// channel.ack(msg);
}, { noAck: false });
}
consumer();
- 未確認(rèn)消息情況測試
在 consumer 中我們暫且將 channel.ack(msg) 注釋掉,分別啟動生產(chǎn)者和消費者,看看是什么情況?
如上圖所示,總共5條消息按照預(yù)先設(shè)置的發(fā)送了一條消息,因為我將 channel.ack(msg) 注釋掉了,服務(wù)端在未得到 ack 確認(rèn),將不會在發(fā)送剩下已 Ready 消息。
- 確認(rèn)消息測試
修改 consumer 代碼,打開確認(rèn)消息注釋,重新啟動消費端進(jìn)行測試
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg); // 打開注釋
}, { noAck: false });
如上圖所示,Unacked 為0,消息已全部消費成功。
RabbitMQ限流使用總結(jié)
限流在我們的實際工作中還是很有意義的,在使用上生產(chǎn)端沒有變化,重點在消費端,著重看以下兩點:
- 限流情況 ack 不能設(shè)置自動簽收,修改
{ noAck: false } - 增加限流參數(shù)設(shè)置
channel.prefetch(1, false)
資料
- 個人博客: Node.js技術(shù)棧
- RabbitMQ系列:RabbitMQ高級消息隊列系列文章不斷更新中
- 項目源碼: rabbitmq-prefetch(Node.js客戶端版Demo)
作者:五月君
鏈接:https://www.imooc.com/article/287046
來源:慕課網(wǎng)