Spring Cloud Stream 進階配置——高可用(一)——失敗重試

前言

軟件的高可用一直是軟件建設(shè)的難點,接下來探討一下如何借助 Spring Cloud Stream 讓我們的 rabbitmq 變得更加高可用。

消息消費失敗

消息的消費,說到底其實就是:根據(jù)接收到的消息(攜帶了某種信號)執(zhí)行一系列業(yè)務(wù)邏輯。而執(zhí)行過程中,由于種種異常情況,或多或少都會出現(xiàn)執(zhí)行失敗的情況,那么問題來了,當消息消費失敗后,該怎么處理呢?

對于那種因為突發(fā)的異常情況導(dǎo)致消息消費失敗的,可以簡單的分為:

  • 短暫性異常
  • 持久性異常

短暫性異常比如有:網(wǎng)絡(luò)抖動導(dǎo)致遠程調(diào)用失敗無法繼續(xù)執(zhí)行導(dǎo)致消費失敗,這種短暫性異常一般在短時間內(nèi)就能恢復(fù)正常,所以如果能讓消費失敗后的消息等待一小段時間后重新被投遞并消費,那豈不是能大大減少因為異常導(dǎo)致消費失敗的消息數(shù)量,因為異常恢復(fù)了,消息也就能正常消費了。

持久性異常比如有:某個服務(wù)因為一個未在測試階段發(fā)現(xiàn)的bug導(dǎo)致整個遠程服務(wù)不可用,遠程服務(wù)不可用,消息也就注定消費失敗了,這種情況下,肯定沒辦法短時間內(nèi)就解決并重新部署服務(wù),因此,就算消息被重新投遞多少次,也不可能被正常消費,所以簡單的重復(fù)投遞消費失敗的消息是無法讓消息被正常消費的。這樣反而只會無謂的浪費系統(tǒng)資源,說不定還會因此影響到其他服務(wù)。

失敗重試

上面說到,失敗重試可以解決短暫性導(dǎo)致的消費失敗的情況。那么,Spring Cloud Stream 支不支持呢?答案是肯定的,而且還非常簡單,只需加入幾個配置即可。

首先,配置 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempts 是用來決定:消息最大可以被嘗試消費的次數(shù),包含第一次投遞。舉個例子,假設(shè)為默認值 3,在第一次投遞后,消費失敗了,那么該消息還可以再被重復(fù)投遞2次。如果設(shè)為1,也就代表不重試。另外,該配置的值必須大于0,當配置了 0 或 負數(shù),直接無法啟動成功,并報如下錯誤:

max attempts should be greater than zero

其次,既然有了失敗重試機制,那么肯定得有重試策略,所以還需另外3個參數(shù)的配合,分別為(以下參數(shù)的前綴與maxAttempts 一樣,均為 spring.cloud.stream.bindings.<channelName>.consumer):

  • backOffInitialInterval: 消息消費失敗后重試消費消息的初始化間隔時間。默認1s,即第一次重試消費會在1s后進行
  • backOffMultiplier: 相鄰兩次重試之間的間隔時間的倍數(shù)。默認2,即第二次是第一次間隔時間的2倍,第三次是第二次的2倍
  • backOffMaxInterval: 下一次嘗試重試的最大時間間隔,默認為10000ms,即10s。

那么怎么結(jié)合起來理解呢?舉個例子:假設(shè)這幾個配置均使用默認值,重試第一次1s,第二次2秒,因為默認最大重試次數(shù)為3,所以也就不會進行第三次重試;而如果最大重試次數(shù)配置了大于3的值,比如10,那么第三次4秒,第四次為8秒,而在第五次重試的時候,若沒有最大重試時間間隔的限制,重試時間為 2^4^ = 16,但是因為有了不超過10秒的限制,第五次重試的時間間隔為10秒,而不是剛剛算出的16秒;而接下來剩余的重試次數(shù),其重試時間間隔均為10秒。

示例

以下代碼可在 源碼 查看。

配置

spring:
  application:
    name: scas-data-collection
  profiles:
    active:
      default

  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit
          consumer:
            maxAttempts: 3 # 當消息消費失敗時,嘗試消費該消息的最大次數(shù)(消息消費失敗后,發(fā)布者會重新投遞)。默認3
            backOffInitialInterval: 1000 # 消息消費失敗后重試消費消息的初始化間隔時間。默認1s,即第一次重試消費會在1s后進行
            backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時間的倍數(shù)。默認2,即第二次是第一次間隔時間的2倍,第三次是第二次的2倍
            backOffMaxInterval: 10000 # 下一次嘗試重試的最大時間間隔,默認為10000ms,即10s。

上面的配置均使用默認配置。

消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 設(shè)備 eui
     */
    private String devEui;

    /**
     * 數(shù)據(jù)
     */
    private String data;

    // 省略其他字段
}

測試用例


@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     *
     */
    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
        log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) {
            log.info("消費上行數(shù)據(jù)包消息. model: [{}].", model);
            throw new RuntimeException();
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     *
     */
    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
        log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) {
            log.info("消費上行數(shù)據(jù)包消息. model: [{}].", model);
            throw new RuntimeException();
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

運行測試用例

使用默認配置

運行測試用例后,你會看到控制臺打印類似如下的日志:


使用默認配置

可以看到,打印的日志與上文分析的一致,第一次消費失敗后,會再重試2次,一共嘗試消費3次,最后一次也失敗后,直接拋出異常,不再繼續(xù)重試。

增加最大重試次數(shù)

配置 maxAttempts = 10,再次啟動測試用例,日志打印如下:

最大嘗試重試次數(shù)為10

可以看到,從第五次重試開始,剩下的重試次數(shù),重試時間間隔均為10s。

如何配置更合適

其實 Spring Cloud Stream 的默認配置基本就夠了,因為如果是因為短暫性異常導(dǎo)致消息消費失敗,重試2次基本就差不多了,重試太多反而可能會導(dǎo)致出現(xiàn)其他問題。

但是考慮到有些短暫性異??赡軣o法在1、2秒內(nèi)恢復(fù)正常,那我們可以稍微增大配置 backOffInitialIntervalbackOffMultiplier 的值,比如:backOffInitialInterval = 5000backOffMultiplier = 5,backOffMaxInterval =60000,這種配置可能就比較適合實時性不高的情況。

總之,我們可以根據(jù)具體業(yè)務(wù)以及生產(chǎn)環(huán)境,調(diào)整這幾個配置的值。

重試次數(shù)用完后消息會去哪?

你可能會好奇,當重試次數(shù)用完后,消息會跑去哪呢?這時如果訪問 Rabbitmq可視化頁面,你會看到:

消息被丟棄了

可以看到,Ready Unacked Total 均為0,也就是說,消息被丟棄了?

事實上,消息確實被丟棄了,但是這樣不好吧,這樣會存在丟失部分消息的隱患,于是不得不引入另一個概念——死信隊列。死信隊列有什么用呢?死信隊列是用來接收因為種種原因?qū)е孪o法正常消費后的消息,當然這里的原因不止消息重試次數(shù)用完后的消息。

因為死信隊列超出本文的范疇,這里就不詳細說明,會在以后的文章詳講。

持久性異常的消費失敗

當異常情況為持久性異常,在異常情況恢復(fù)正常之前,那么無論重試多少次,消息都無法被正常消費,所以只能在重試次數(shù)用完之后,要么丟棄該消息或進入死信隊列。所以重試次數(shù)不能設(shè)置過大,避免浪費系統(tǒng)資源。

推薦閱讀

Spring Cloud Stream 進階配置——高吞吐量(一)——多消費者
Spring Cloud Stream 進階配置——高吞吐量(二)——彈性消費者數(shù)量
Spring Cloud Stream 進階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容