Spring Cloud Stream 進(jìn)階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)

ps:
1、本文示例使用的消息中間件為 Rabbitmq。
2、示例代碼是以測試用例的形式給出。
3、使用@ActiveProfiles( active_profile(s) ) 讓指定配置生效。

前言

前面兩篇文章 Spring Cloud Stream 進(jìn)階配置——高吞吐量(一)、Spring Cloud Stream 進(jìn)階配置——高吞吐量(二),第一篇是通過增加消費者數(shù)量進(jìn)而提高消費端的吞吐量,但配置的消費者數(shù)量是固定,配置的過少,吞吐量提升的有限,而過多會造成系統(tǒng)資源浪費;于是就有了第二篇,通過配置最大消費者數(shù)量,讓消費端有了動態(tài)增加/銷毀消費者的能力,另外最大消費者數(shù)量同時也是一個閾值,動態(tài)增加的消費者數(shù)量無法超過該閾值,也就避免創(chuàng)建過多消費者(線程)占用過多系統(tǒng)資源影響到其他程序。

俗話說,凡事都是有兩面的。比如,消費者數(shù)量多了之后,就會衍生另一個問題——如何更均衡地分發(fā)消息給不同消費者?

下面先來介紹一下 Rabbitmq 的分發(fā)策略。

輪詢分發(fā)

輪詢分發(fā) Rabbitmq 默認(rèn)的分發(fā)策略。顧名思義,有多個消費者,當(dāng)消息來了,會一條一條按順序平均分發(fā)給消費者,而且來多少分發(fā)多少,并不管你消費能力如何。

可以看到輪詢分發(fā)策略,思路很簡單,容易實現(xiàn),但是有很多弊端。

第一,消息是平均分發(fā)給所有消費者??吹竭@里,你可能會說,這沒毛病啊,平均分發(fā)消息,讓所有消費者平攤消費消息。表面上看,是這樣沒錯,但是可能會出現(xiàn)這樣的場景:假設(shè)有3個消費者A, B, C,其中消費者A由于各種原因?qū)е孪M力下降,但還是分配了與其他2個消費者一樣的待消費消息,于是當(dāng)其他2個消費者把消息都消費完了,消費者A還堆積了好多消息。

另一個弊端,來多少消息分發(fā)多少,這種機(jī)制,當(dāng)生產(chǎn)者大量發(fā)布消息而消費者又消費力低下時,消費者會大量堆積消息,造成系統(tǒng)資源(特別是堆內(nèi)存)被大量占用。

由上可以看出,輪詢分發(fā)策略雖然簡單,但存在很大的隱患,因此 Spring Cloud Stream 默認(rèn)不支持這種分發(fā)策略,而使用另一種——公平分發(fā)策略。

公平分發(fā)

一般情況下,不同消費者之間,消費能力(消費消息的速度)都是不一樣的,有快有慢,為了提高吞吐量,那么就應(yīng)該消費快的,分擔(dān)多一點,反之,量力而行。于是 Spring Cloud Stream 提供了一個配置 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. prefetch,默認(rèn)值為 1,prefetch預(yù)取 的意思,那么該配置可以理解為:消費者每次從隊列獲取的消息數(shù)量。

使用 prefetch 有什么好處呢?消費者每次只從隊列獲取一定數(shù)量的消息,當(dāng)所有消息消費完了,再接著從隊列獲取相同數(shù)量的消息。

這樣一來,消費快的消費者,向隊列獲取消息的頻率就高,反之,頻率就低,因此,單位時間內(nèi),消費力強(qiáng)的消費者消費的消息就多,而不會出現(xiàn)無論消費力強(qiáng)弱,卻需要消費相同數(shù)量的消息。由木桶效應(yīng)我們可以知道,若是使用輪詢分發(fā)策略,消費一定數(shù)量的消息,全部消費完所花費的時間肯定取決于消費力最弱的消費者所花的時間。但是,公平分發(fā)策略不會出現(xiàn)這種情況,最壞的情況也只有 prefetch 對應(yīng)數(shù)量的消息被阻塞在消費力異常的消費者上,而其他消息會被其他消費者消費。

另外,配置了 prefetch ,也不會出現(xiàn)像輪詢分發(fā)策略那樣,消費者消費力低下時,消費者會大量堆積消息的隱患。

看到這里,應(yīng)該就可以明白,Spring Cloud Stream 為什么不支持輪詢分發(fā)策略了吧。

批量獲取消息

上面提到,prefetch 的默認(rèn)值是 1,也就是說消費者一次只會向隊列取回一條消息進(jìn)行消費。每一次獲取消息會消耗一定的時間,而一個來回又只取回一條消息,這妥妥讓人感覺有很大的提升空間啊。試下一下,如果你在搬磚,從樓下把磚搬到樓上,一次只搬一塊,這是不是讓人感覺閑得蛋疼。

所以,如果配置 prefetch 的值為10,這就憑空減少9個來回,不說消耗的時間會減少為原來總時間的1/10,但消耗的時間變少是可以預(yù)見的。接下來我們通過代碼驗證一下。

示例

以下代碼可在 源碼 查看。

配置

spring:
  application:
    name: scas-data-collection
  profiles:
    active:
      default

  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit
          consumer:
            concurrency: 1 # 初始/最少/空閑時 消費者數(shù)量。默認(rèn)1

      rabbit:
        bindings:
          packetUplinkInput:
            consumer:
              prefetch: 1 # 限制consumer在消費消息時,一次能同時獲取的消息數(shù)量,默認(rèn):1。
消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 設(shè)備 eui
     */
    private String devEui;

    /**
     * 數(shù)據(jù)
     */
    private String data;

    // 省略其他字段
}

測試用例

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableBinding({ScasPrefetchTest.MessageSink.class, ScasPrefetchTest.MessageSource.class})
@ActiveProfiles("prefetch")
public class ScasPrefetchTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 500000; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);
    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
            log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) throws InterruptedException {
            log.info("消費上行數(shù)據(jù)包消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

運行測試用例

1. prefetch = 1

可以看到,上面的配置均使用 Spring Cloud Stream 的默認(rèn)配置,運行測試用例后,訪問 Rabbitmq可視化頁面 可以看到類似下圖的頁面:

消息發(fā)布和消費同時存在的吞吐量

消費堆積的消息時的吞吐量

若點擊隊列列表中的 packetUplinkTopic.scas-data-collection 可以看到如下該隊列的更詳細(xì)的信息:
prefetch = 1

2. prefetch = 3

prefetch 配置為3,再次啟動測試用例,可以看到結(jié)果測試如下:

消息發(fā)布和消費同時存在的吞吐量

消費堆積的消息時的吞吐量

查看隊列 packetUplinkTopic.scas-data-collection的詳細(xì)信息,可以可以看到:
prefetch = 3

可以看到,當(dāng) prefetch = 3 的時候,不管是哪種情況,吞吐量都有相應(yīng)的提升。再看看當(dāng) prefetch = 5 的情況。

3. prefetch = 5

prefetch 配置為5,再次啟動測試用例,可以看到結(jié)果測試如下:

發(fā)布速率與消費速率基本相同

查看隊列 packetUplinkTopic.scas-data-collection的詳細(xì)信息,可以可以看到:
prefetch = 5

可以看到,發(fā)布速率與消費速率基本相同,在這種情況下,吞吐量是最大的。

Consumer utilisation

有沒有發(fā)現(xiàn),上面每一種情景下的最后一張圖片都圈出一個指標(biāo):Consumer utilisation,翻譯過來為:消費者的利用率。點擊后面的問號,會彈出下面一個提示:

Consumer utilisation

大意就是,當(dāng)該指標(biāo)小于100的時候,可以通過以下方式使消息投遞得更快,也就是消費得更快,吞吐量更高,具體的方法有:

  • 增加消費者數(shù)量
  • 提高消費者消費單個消息的速度
  • 讓消費者有更大的 prefetch

現(xiàn)在回去再觀察那3張圖,可以看見隨著 prefetch 值的增大,該指標(biāo)也得到相應(yīng)的提高。

但是,到了 prefetch = 5 的時候,發(fā)布速率與消費速率已經(jīng)基本相同了,再這種情況下,再增加 prefetch 值,提升的吞吐量就極為有限了。再結(jié)合第一條方法(因為第二條在這里無法再優(yōu)化了),我們可以嘗試增加消費者數(shù)量,看能不能讓該指標(biāo)變得更接近100。

consumer = 3 & prefetch = 3

配置 consumer = 3 & prefetch = 3,可以看到,想比consumer = 1 & prefetch = 3,指標(biāo) Consumer utilisation 得到大幅度提高。

image.png

consumer = 3 & prefetch = 5

再配置 consumer = 3 & prefetch = 5,可以看到,指標(biāo) Consumer utilisation 的值已經(jīng)很接近100了。

image.png

更大的prefetch
consumer = 3 & prefetch = 7

consumer = 3 & prefetch = 10

可以看到,當(dāng) prefetch = 5 時,指標(biāo) Consumer utilisation 很接近100了,如果再繼續(xù)增大 prefetch 的值,指標(biāo) Consumer utilisation 并沒有很大的提升。

consumer = 1 & prefetch = 5 & maxConcurrency = 5

最后再結(jié)合配置 maxConcurrency,可以看到指標(biāo) Consumer utilisation 隨著消費者數(shù)量動態(tài)增加也在逐漸增大,最后達(dá)到 100%

consumer = 2 & prefetch = 5

consumer = 3 & prefetch = 5

consumer = 5 & prefetch = 5

小結(jié)

所以,經(jīng)過上面的一系列測試后,prefetch 的值也并不是越大就越好,而prefetch = 5、maxConcurrency = 5 應(yīng)該就是相對合適的配置。

結(jié)論

prefetch 可以用于配置消費者每次從隊列預(yù)取的消息數(shù)量,當(dāng)配置大于1的數(shù)值后,可以減少從隊列獲取的消息的次數(shù),從而減少獲取相同數(shù)量消息的總耗時,這樣也就達(dá)到提高消費端吞吐量的目的。

另外,本文還提到一個指標(biāo)——消費者利用率(Consumer utilisation),可以用于衡量消費端的消費能力,最大值為100,數(shù)值越大,消費能力越強(qiáng),相應(yīng)的吞吐量也就越高。同時還介紹了當(dāng)該指標(biāo)低于100時,提升該指標(biāo)的幾種途徑:1、增加消費者數(shù)量、2、提升消費消息的速度、3、增大消費者的 prefetch 值。其中第2點具有業(yè)務(wù)相關(guān)性,這里就不細(xì)說,而其他2點則可以通過配置輕松實現(xiàn),分別對應(yīng)配置 concurrencyprefetch,而再配合 maxConcurrency,則可以動態(tài)控制消費者數(shù)量,減少不必要的資源占用。

所以 concurrencymaxConcurrencyprefetch 配合一起使用的話,可以大幅提高消費端的吞吐量,起到意想不到的效果。當(dāng)然,是建立在合理配置這幾個參數(shù)的情況下。

推薦閱讀

Spring Cloud Stream 進(jìn)階配置——高吞吐量(一)——多消費者
Spring Cloud Stream 進(jìn)階配置——高吞吐量(二)——彈性消費者數(shù)量

相關(guān)鏈接

https://blog.csdn.net/yhl_jxy/article/details/85322696
https://www.kancloud.cn/longxuan/rabbitmq-arron/117513
http://yuanwhy.com/2016/09/10/rabbitmq-concurrency-prefetch/

完?。?!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容