RabbitMQ之Qos prefetch

實(shí)際使用RabbitMQ過程中,如果完全不配置QoS,這樣Rabbit會(huì)盡可能快速地
發(fā)送隊(duì)列中的所有消息到client端。因?yàn)閏onsumer在本地緩存所有的message,
從而極有可能導(dǎo)致OOM或者導(dǎo)致服務(wù)器內(nèi)存不足影響其它進(jìn)程的正常運(yùn)行。所以我們
需要通過設(shè)置Qos的prefetch count來控制consumer的流量。同時(shí)設(shè)置得當(dāng)也會(huì)提高consumer的吞吐量。

prefetch與消息投遞

prefetch允許為每個(gè)consumer指定最大的unacked messages數(shù)目。簡(jiǎn)單來說就是用來指定一個(gè)consumer一次可以從Rabbit中獲取多少條message并緩存在client中(RabbitMQ提供的各種語言的client library)。一旦緩沖區(qū)滿了,Rabbit將會(huì)停止投遞新的message到該consumer中直到它發(fā)出ack。

假設(shè)prefetch值設(shè)為10,共有兩個(gè)consumer。意味著每個(gè)consumer每次會(huì)從queue中預(yù)抓取 10 條消息到本地緩存著等待消費(fèi)。同時(shí)該channel的unacked數(shù)變?yōu)?0。而Rabbit投遞的順序是,先為consumer1投遞滿10個(gè)message,再往consumer2投遞10個(gè)message。如果這時(shí)有新message需要投遞,先判斷channel的unacked數(shù)是否等于20,如果是則不會(huì)將消息投遞到consumer中,message繼續(xù)呆在queue中。之后其中consumer對(duì)一條消息進(jìn)行ack,unacked此時(shí)等于19,Rabbit就判斷哪個(gè)consumer的unacked少于10,就投遞到哪個(gè)consumer中。

總的來說,consumer負(fù)責(zé)不斷處理消息,不斷ack,然后只要unacked數(shù)少于prefetch * consumer數(shù)目,broker就不斷將消息投遞過去。

如何設(shè)置

官方提供的java client可以通過channel來設(shè)置:

channel = connection.createChannel();
channel.basicQos(prefetch);

spring-amqp的話可通過配置文件來配置

<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
    <rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>

這里需要注意的是,spring-amqp中的prefetch默認(rèn)值是250。

1.0 的spring-amqp默認(rèn)值是1; 2.0版本的為250。具體可看官方文檔
https://docs.spring.io/spring-amqp/docs/2.2.9.RELEASE/reference/html/#async-consumer

客戶端源碼剖析

官方Java客戶端提供了DefaultConsumer和QueueingConsumer兩種類來從queue中獲取消息。 其中QueueingConsumer內(nèi)部維護(hù)了一個(gè)阻塞隊(duì)列BlockingQueue,此隊(duì)列就是用來緩存從queue獲取的message。當(dāng)調(diào)用 channel.basicConsume后,broker就會(huì)不斷往consumer投遞message,直到prefetch條。

初始化的時(shí)候,如果不指定BlockingQueue的長(zhǎng)度,默認(rèn)值會(huì)設(shè)為Integer.MAX_VALUE,所以這就解釋了文章開頭所說的如果不設(shè)置Qos的話為什么會(huì)有可能導(dǎo)致OOM,因?yàn)榇藭r(shí)BlockingQueue會(huì)不斷膨脹,消耗內(nèi)存。所以設(shè)置了prefetch后,建議BlockingQueue的長(zhǎng)度(capacity)也初始化為prefetch。

另外需要注意的是,在調(diào)用channel.basicConsume之后,consumer是通過異步方式來抓取message的,通過debug可以發(fā)現(xiàn)BlockingQueue的size是在異步地不斷增長(zhǎng)直到prefetch。而客戶端代碼可以通過consumer.nextDelivery()或consumer.nextDelivery(long timeout)方法來獲取message,其對(duì)應(yīng)的就是BlockingQueue的take()和poll(long timeout)方法。

再來看看spring-amqp的comsumer,大致也一樣。核心類BlockingQueueConsumer

public class BlockingQueueConsumer {
    
    private final BlockingQueue<Delivery> queue;

    //some code
    ...

public BlockingQueueConsumer(ConnectionFactory connectionFactory,
            MessagePropertiesConverter messagePropertiesConverter,
            ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
            boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
            Map<String, Object> consumerArgs, boolean exclusive, String... queues) {

        //... some code

        this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
    }


BlockingQueueConsumer的構(gòu)造函數(shù)清楚說明了每個(gè)消費(fèi)者內(nèi)部的隊(duì)列大小就是prefetch的大小。

吞吐量、延遲

prefetch并不是說設(shè)置得越大越好。過大可能導(dǎo)致consumer處理不過來,一直在本地緩存的BlockingQueue里呆太久,這樣消息在客戶端的延遲就大大增加;而對(duì)于多個(gè)consumer的情況,則會(huì)分配不均勻,導(dǎo)致有些consumer一直在忙,有些則非常空閑。

然而設(shè)置的過小,又會(huì)令到consumer不能充分工作,因?yàn)槲覀兛傁胨?00%的時(shí)間都是處于繁忙狀態(tài),而這時(shí)可能會(huì)在處理完一條消息后,BlockingQueue為空,因?yàn)樾碌南⑦€未來得及到達(dá),所以consumer就處于空閑狀態(tài)了。

prefetch應(yīng)該設(shè)置多大,具體可參考這篇文章

Some queuing theory: throughput, latency and bandwidth

里面詳細(xì)論述吞吐量與prefetch之間的關(guān)系。prefetch的設(shè)置與以下幾點(diǎn)有關(guān):

  1. 客戶端服務(wù)端之間網(wǎng)絡(luò)傳輸時(shí)間
  2. consumer消耗一條消息所執(zhí)行的業(yè)務(wù)邏輯的耗時(shí)
  3. 網(wǎng)絡(luò)狀況

【完】

如有紕漏,歡迎指出

參考資料:

RabbitMQ QOS vs. Competing Consumers

RabbitMQ消費(fèi)者的幾個(gè)參數(shù)

rabbitmq——prefetch count

AMQP: acknowledgement and prefetching

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,545評(píng)論 19 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,512評(píng)論 2 34
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,234評(píng)論 3 51
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,599評(píng)論 51 787
  • 為了一些初學(xué)習(xí)者更好理解我就從簡(jiǎn)單的解釋一下Rabbitmq的原理吧?,首先你可以這樣想RabbitMq就是一個(gè)隊(duì)...
    螃蟹和駱駝先生Yvan閱讀 7,554評(píng)論 6 4

友情鏈接更多精彩內(nèi)容