Spring Cloud Stream 進(jìn)階配置——高吞吐量(一)——多消費(fèi)者

ps:
1、本文示例使用的消息中間件為 Rabbitmq。
2、示例代碼是以測(cè)試用例的形式給出。
3、使用@ActiveProfiles( active_profile(s) ) 讓指定配置生效。

多消費(fèi)者

Spring Cloud Stream 消費(fèi)消息時(shí),默認(rèn)只啟動(dòng)一個(gè)消費(fèi)者,`spring.cloud.stream.binding
可以簡(jiǎn)單類比為單線程,所以最簡(jiǎn)單的提高消費(fèi)端吞吐量的方式就是增加消費(fèi)者數(shù)量。

消費(fèi)者數(shù)量的配置為:spring.cloud.stream.bindings.<channelName>.consumer,例如:spring.cloud.stream.bindings.input.consumer.concurrency=3。

示例

以下代碼可在 源碼 查看。

配置

spring:
  application:
    name: scas-data-collection
  profiles:
    active:
      default

  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit
          consumer:
            concurrency: 10 # 初始/最少/空閑時(shí) 消費(fèi)者數(shù)量。默認(rèn)1

上述配置,應(yīng)用啟動(dòng)后會(huì)創(chuàng)建10個(gè)消費(fèi)者??梢钥吹?,spring.cloud.stream.bindings.<channelName>.consumer 的默認(rèn)配置為1,所以若沒有顯式配置,Spring Cloud Stream 初始只會(huì)幫我們創(chuàng)建一個(gè)消費(fèi)者。注意,這里使用的是 初始 而不是 最終,至于為什么,請(qǐng)?jiān)试S我賣個(gè)關(guān)子。

代碼

消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 設(shè)備 eui
     */
    private String devEui;

    /**
     * 數(shù)據(jù)
     */
    private String data;

    // 省略其他字段
}
測(cè)試用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("concurrency")
@EnableBinding({ScasConcurrencyTest.MessageSink.class, ScasConcurrencyTest.MessageSource.class})
public class ScasConcurrencyTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    @Test
    public void test() throws InterruptedException {

        for (int i = 0; i < 100000; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
            log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) throws InterruptedException {
            Thread.sleep(10);
            log.info("消費(fèi)上行數(shù)據(jù)包消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

運(yùn)行測(cè)試用例

運(yùn)行測(cè)試用例后,訪問 Rabbitmq可視化頁面 可以看到類似下圖的頁面:

10個(gè)消費(fèi)者的吞吐量

若把 spring.cloud.stream.bindings.packetUplinkInput.consumer 改成1,即只啟動(dòng)一個(gè)消費(fèi)者,然后再次啟動(dòng)測(cè)試用例,可以看到:

1個(gè)消費(fèi)者的吞吐量

上圖中,Message rates 展示了消息的發(fā)布(incoming)及消費(fèi)的速率(ack),即本文所說的吞吐量。

結(jié)合上面兩個(gè)測(cè)試用例,在大量消息堆積的情況下,增加消費(fèi)者數(shù)量能大幅度提高消費(fèi)端的吞吐量。消費(fèi)者數(shù)量從 1 增加到 10后,吞吐量提高接近10倍。

各種情景下不同消費(fèi)者數(shù)量的吞吐量

上述的測(cè)試用例,為了模擬真實(shí)環(huán)境并控制消費(fèi)速度,消費(fèi)消息時(shí)會(huì)睡眠 10ms,而接下來的測(cè)試需要放開消費(fèi)速度的控制,所以需要先把 Thread.sleep(10); 注釋掉。

消息發(fā)布速度 >= 消息消費(fèi)速度
1個(gè)消費(fèi)者的吞吐量——消息發(fā)布速度 >> 消息消費(fèi)速度

2個(gè)消費(fèi)者的吞吐量——消息發(fā)布速度 >> 消息消費(fèi)速度

3個(gè)消費(fèi)者的吞吐量——消息發(fā)布速度>消息消費(fèi)速度

4個(gè)消費(fèi)者的吞吐量——消息發(fā)布速度=消息消費(fèi)速度

5個(gè)消費(fèi)者的吞吐量——消息發(fā)布速度=消息消費(fèi)速度

上述所示的圖片,在 消息發(fā)布速度 >> 消息消費(fèi)速度 的情況下,增加消費(fèi)者數(shù)量能大幅度提高消費(fèi)端的吞吐量,一直到 消息發(fā)布速度=消息消費(fèi)速度 吞吐量達(dá)到最大,可以看到,吞吐量的增長,基本呈指數(shù)級(jí)增長。當(dāng)然,理論上肯定沒這么夸張的,會(huì)出現(xiàn)這種情況,很大程度是因?yàn)橄⒌南M(fèi)端和發(fā)布端都在同一臺(tái)機(jī)器上,而且消費(fèi)消息時(shí)基本沒有其他時(shí)間消耗。以后有條件的話,會(huì)把消費(fèi)端和發(fā)布端部署在不同機(jī)器上,再做測(cè)試。

消費(fèi)大量堆積消息
消息發(fā)布最大吞吐量

1個(gè)消費(fèi)者的吞吐量——消費(fèi)大量堆積消息

5個(gè)消費(fèi)者的吞吐量——消費(fèi)大量堆積消息

10個(gè)消費(fèi)者的吞吐量——消費(fèi)大量堆積消息

上述所示的圖片,消費(fèi)端的吞吐量并沒有隨著消費(fèi)者數(shù)量的增加而成倍增長,甚至增加到10的時(shí)候,約等于5個(gè)消費(fèi)者時(shí)的吞吐量。出現(xiàn)這種情況,是因?yàn)殡S著消費(fèi)者數(shù)量的增加,I/O逐漸達(dá)到飽和,即I/O成為了瓶頸,所以增加消費(fèi)者數(shù)量并沒有達(dá)到預(yù)期效果。而之所以I/O會(huì)成為了瓶頸,是因?yàn)镾pring Cloud Stream 默認(rèn)會(huì)創(chuàng)建持久化隊(duì)列,即消息會(huì)除了會(huì)保存在內(nèi)存,還會(huì)序列化到磁盤,而相應(yīng)的,消息被成功消費(fèi)后,也需要從內(nèi)存和磁盤清除。

結(jié)論

無論怎樣,毋庸置疑的是,適當(dāng)增加消費(fèi)者數(shù)量,肯定可以提高消費(fèi)端的吞吐量。重點(diǎn)就在于 適當(dāng) 二字,但是如何做到適當(dāng)呢?好在 Spring Cloud Stream 已經(jīng)幫我們考慮到這種情況了,并給出了解決方案——?jiǎng)討B(tài)增加消費(fèi)者,并且可以控制最大消費(fèi)者數(shù)量。所以前文才會(huì)說:應(yīng)用啟動(dòng)時(shí)創(chuàng)建的消費(fèi)者數(shù)量可能只是暫時(shí)的,并不是最終的數(shù)量。至于如何配置,可參考 Spring Cloud Stream 進(jìn)階配置——高吞吐量(二)

推薦閱讀

Spring Cloud Stream 進(jìn)階配置——高吞吐量(二)——彈性消費(fèi)者數(shù)量
Spring Cloud Stream 進(jìn)階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容