Spring Cloud Stream 進(jìn)階配置——高可用(二)——死信隊(duì)列

前言

前文 Spring Cloud Stream 進(jìn)階配置——高可用(一)——失敗重試 介紹了 失敗重試 機(jī)制如何保障消息被正確消費(fèi),對(duì)于短暫性故障,消費(fèi)失敗后重試,可以得到有效解決;但是如果是諸如程序問題導(dǎo)致消費(fèi)失敗的情況,短時(shí)間內(nèi)(未修復(fù)bug之前),當(dāng)重試次數(shù)消耗完之后,消息則會(huì)被丟棄。

對(duì)于無關(guān)緊要的消息,丟了也就丟了,但如果是類似賬單這種敏感數(shù)據(jù),一旦丟了,老板就要找你談話了。

針對(duì)上面所述場(chǎng)景,rabbitmq 有對(duì)應(yīng)的方案,即 死信隊(duì)列。

死信隊(duì)列

何為死信

在開始了解死信隊(duì)列之前,我們需要知道什么死信,從字面看就是“死掉了的信息(消息)”,不過這是相對(duì)于隊(duì)列來說的,在消息所在的隊(duì)列看來,沒有意義、沒有價(jià)值的消息,就應(yīng)該丟棄,任其消亡。那么問題來了,隊(duì)列是怎么界定這類消息的?這里,不得不先說明一下哪類消息屬于死信,有如下幾種情況:

  • 消費(fèi)者通過 basic.reject 拒絕確認(rèn)消息;
  • 消費(fèi)者使用 basic.nack 否定確認(rèn)消息,并參數(shù) requeue 設(shè)置為 false
  • 消息在隊(duì)列中停留時(shí)間超過 ttl,即未能被及時(shí)消費(fèi);
  • 消息體過大,超過隊(duì)列所允許的消息體大?。?/li>

對(duì)于前2種情況,都是消費(fèi)者主動(dòng)放棄消息,而后面2種,則因?yàn)殛?duì)列的自我保護(hù)機(jī)制被隊(duì)列無情地丟棄。不過,這幾種情況都有一個(gè)共同點(diǎn),如果再保留這些死信,很大可能會(huì)影響整個(gè)隊(duì)列的正常工作,因?yàn)檫@些都屬于消費(fèi)者不疼,隊(duì)列不愛的消息,所以只好選擇從隊(duì)列踢掉。

死信的歸宿

而死信隊(duì)列則是死信的歸宿,也可以將它比做死信的回收站(下文會(huì)揭秘為什么)。死信隊(duì)列其實(shí)也是一個(gè)普通隊(duì)列,可以被消費(fèi)者訂閱,當(dāng)消息成為死信后,會(huì)被投遞到與原隊(duì)列 “綁定” 的隊(duì)列,該隊(duì)列就是死信隊(duì)列。

死信交換機(jī)

我們都知道,發(fā)布者在發(fā)布消息后,需要經(jīng)過消息交換機(jī),根據(jù)特定的路由,才能被正確投遞到目標(biāo)隊(duì)列。而死信要被投遞到死信隊(duì)列,那肯定還需要一個(gè)消息交換機(jī),該交換機(jī)為 Dead Letters Exchanges(DLX),即 死信交換機(jī)。當(dāng)然,死信交換機(jī)也是一個(gè)普通的消息交換機(jī),可以通過正常的聲明方式去創(chuàng)建。

思考

這里先拋出一個(gè)問題,既然死信隊(duì)列、死信交換機(jī)都是普通的隊(duì)列、消息交換機(jī),在可視化界面怎么去區(qū)分死信隊(duì)列與其他隊(duì)列、死信交換機(jī)與其他交換機(jī)。

聲明死信隊(duì)列

ps: 這里的主題是如何在 SpringCloud Stream 中使用死信隊(duì)列,其他聲明方式(如原生SDK)不在本文討論范圍。

Spring Cloud Stream 聲明死信隊(duì)列非常簡(jiǎn)單,簡(jiǎn)單到只需要一個(gè)配置就能搞定,這里不得不說 Spring BootSpring Cloud 的設(shè)計(jì)思想是真厲害。

“開啟” 死信隊(duì)列的相關(guān)配置為:spring.cloud.stream.bindings.<channelName>.consumer.autoBindDlq,該配置的作用為:是否自動(dòng)聲明死信隊(duì)列(DLQ)并將其綁定到死信交換機(jī)(DLX)。默認(rèn)是false。

上面的 開啟 用了雙引號(hào),為什么呢?配置 autoBindDlq 翻譯一下就能大概猜到原因了,因?yàn)檫@個(gè)開關(guān),當(dāng)為 true 時(shí),開啟的是 自動(dòng)聲明死信隊(duì)列,并將其綁定到死信交換機(jī)。所以,我們也是可以自己手動(dòng)創(chuàng)建的。

示例

以下代碼可在 源碼 查看。

配置

spring:
  cloud:
    stream:
      bindings:
        packetUplinkOutput:
          destination: packetUplinkDlxTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkDlxTopic
          content-type: application/json
          group: ${spring.application.name}.dlx
          binder: rabbit
      rabbit:
        bindings:
          packetUplinkInput:
            consumer:
              ttl: 20000 # 默認(rèn)不做限制,即無限。消息在隊(duì)列中最大的存活時(shí)間。當(dāng)消息滯留超過ttl時(shí),會(huì)被當(dāng)成消費(fèi)失敗消息,即會(huì)被轉(zhuǎn)發(fā)到死信隊(duì)列或丟棄.
              # DLQ相關(guān)
              autoBindDlq: true # 是否自動(dòng)聲明死信隊(duì)列(DLQ)并將其綁定到死信交換機(jī)(DLX)。默認(rèn)是false。

ps: 上文提到有幾種情況,消息會(huì)變成死信,而上面使用的配置是通過設(shè)置隊(duì)列的 ttl,即消息在隊(duì)列中存活的最大時(shí)間為 20s。因?yàn)檫@是制造死信最簡(jiǎn)單粗暴的方法。

代碼

消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 設(shè)備 eui
     */
    private String devEui;

    /**
     * 數(shù)據(jù)
     */
    private String data;

    // 省略其他字段
}
測(cè)試用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dlq")
@EnableBinding({ScasDlqTest.MessageSink.class, ScasDlqTest.MessageSource.class})
public class ScasDlqTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     *
     */
    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
            log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) throws InterruptedException {
            Thread.sleep(1000);
            log.info("消費(fèi)上行數(shù)據(jù)包消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

運(yùn)行測(cè)試用例

運(yùn)行測(cè)試用例后,訪問 Rabbitmq可視化頁(yè)面 可以看到類似下圖的頁(yè)面:

死信隊(duì)列1

因?yàn)槟繕?biāo)隊(duì)列的消費(fèi)者 1s 才消費(fèi)一條消息,而隊(duì)列的 ttl 只有 20s,所以差不多 20s 后,再刷新頁(yè)面,可以看到:

死信隊(duì)列2

可以看到,隊(duì)列中的待消費(fèi)消息為80條,而我們一共發(fā)布了100條,消費(fèi)力為1條/s,20s后,未消費(fèi)的消息全部進(jìn)入死信隊(duì)列,所以80條對(duì)得上。

驗(yàn)證死信被丟棄

為了驗(yàn)證只有創(chuàng)建死信隊(duì)列并綁定到死信交換機(jī),死信才不會(huì)被丟棄,可以將 autoBindDlq 改成 false,再跑一次,20s 后,看目標(biāo)隊(duì)列是不是沒有消息。不過,需要先把目標(biāo)隊(duì)列刪除,不然會(huì)出現(xiàn)如下錯(cuò)誤:

2019-08-19 17:37:17.545 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:18.552 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:20.557 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:24.566 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:29.575 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
...
刪除隊(duì)列
刪除隊(duì)列
驗(yàn)證結(jié)果

重新跑一次測(cè)試用例,20s后,可以看到:


死信被丟棄后

如果沒有聲明死信隊(duì)列,那么死信一旦產(chǎn)生,就會(huì)直接被丟棄,也找不回來了。

如何使用死信隊(duì)列

死信在進(jìn)入死信隊(duì)列后,如果沒有類似重新消費(fèi)的邏輯,那跟被直接丟棄沒啥區(qū)別,甚至還占用磁盤空間。下面介紹2種體現(xiàn)死信隊(duì)列價(jià)值的操作與實(shí)現(xiàn)。

1. 重新發(fā)布到目標(biāo)隊(duì)列

在每個(gè)隊(duì)列的詳情頁(yè)中,有一個(gè) Move Messages 分欄,如下圖所示:

move messages(未啟動(dòng)插件)

上圖中圈中的提示,是因?yàn)闆]有啟動(dòng) rabbitmq_shovel、 rabbitmq_shovel_management 這2個(gè)插件,啟動(dòng)命令為: rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management。

如果插件已啟動(dòng),看到的界面如下:


move messages(插件已啟動(dòng))

進(jìn)入死信隊(duì)列的詳情頁(yè),將目標(biāo)隊(duì)列 packetUplinkDlxTopic.scas-data-collection.dlx 拷貝到 Move Messages 表單的 Destination queue 輸入框中,然后點(diǎn)擊 Move Messages 按鈕??梢钥吹饺缦拢?br>

move messages to target queue

re-consume dead-letters

死信隊(duì)列的所有消息全部被重新投遞到目標(biāo)隊(duì)列,看到這里,可以確定的是:通過 Move Messages 功能,是可以將死信重新投遞到原隊(duì)列,而且也可以被正常重新消費(fèi)。不過可以預(yù)見的是,再過20s,又有60條消息變成死信。

ps: 揭秘一下上文的埋點(diǎn)——死信隊(duì)列比作死信的回收站。其實(shí),看到這里,大家應(yīng)該大致能理解這句話了,消息在變成死信,這在隊(duì)列看來,就是我不要這些消息了,可以把它們丟了,所以就進(jìn)入死信隊(duì)列這一回收站,而在特定時(shí)機(jī),比如機(jī)器、環(huán)境穩(wěn)定了,又可以重新發(fā)布到原來的隊(duì)列,即對(duì)應(yīng)回收站的恢復(fù)文件功能。所以將死信隊(duì)列比作死信的回收站,在這種情況下還是可以理解的。

2. 定義死信隊(duì)列的消費(fèi)邏輯

上文提到,死信隊(duì)列其實(shí)是一個(gè)普通的隊(duì)列,那么我們直接訂閱該死信隊(duì)列是不是就可以正常消費(fèi)死信了?答案是肯定的。接下來,使用 spring-rabbitmq 的注解 @RabbitListener 定義死信隊(duì)列的處理邏輯,代碼如下(直接追加在測(cè)試用例類即可):

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dlq")
@EnableBinding({ScasDlqTest.MessageSink.class, ScasDlqTest.MessageSource.class})
public class ScasDlqTest {
    // 省略其他代碼

    /**
     * 原隊(duì)列名稱
     */
    private static final String ORIGINAL_QUEUE = "packetUplinkDlxTopic.scas-data-collection.dlx";
    /**
     * 死信隊(duì)列名稱. 由于沒有自定義, 所以根據(jù) spring cloud stream 死信隊(duì)列名稱生成規(guī)則, 在原隊(duì)列名稱后追加 '.dlq'.
     */
    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
    /**
     * 死信隊(duì)列交換機(jī). 默認(rèn)為: {@link RabbitCommonProperties#DEAD_LETTER_EXCHANGE}, 值為 "DLX".
     */
    private static final String DLX = RabbitCommonProperties.DEAD_LETTER_EXCHANGE;
    /**
     * 死信交換機(jī)將死信路由到死信隊(duì)列的 routing-key. 由于沒有自定義, 所以根據(jù) spring cloud stream 死信隊(duì)列名稱生成規(guī)則,
     * routing-key為原隊(duì)列的名稱.
     */
    private static final String routingKey = "packetUplinkDlxTopic.scas-data-collection";

    /**
     * 死信隊(duì)列的處理邏輯
     * @param failedMessage
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(DLQ)
                    , exchange = @Exchange(DLX)
                    , key = routingKey
            ),
            concurrency = "1-5"
    )
    public void handleDlq(Message failedMessage) throws InterruptedException {
        Thread.sleep(10);
        log.info("進(jìn)入 [上行數(shù)據(jù)包隊(duì)列] 的死信隊(duì)列. 完整消息: {};", failedMessage);
        log.info("body: {}", (PacketModel) JSON.parseObject(failedMessage.getBody(), PacketModel.class));
    }
    
}

代碼中各個(gè)變量都說的很清楚,這里就不贅述了,直接重新啟動(dòng)測(cè)試用例(可以考慮先把死信隊(duì)列刪掉,因?yàn)槔镞呥€有之前遺留的死信),20s 后,可以看到控制臺(tái)打印如下:


消費(fèi)死信

消費(fèi)完成后,2個(gè)隊(duì)列中也都沒有堆積的消息,如下:


死信消被費(fèi)后

當(dāng)然,上面示例中,只是加死信打印出來,而實(shí)戰(zhàn)中,則需要根據(jù)具體業(yè)務(wù)自定義死信處理邏輯,比如,發(fā)送郵件、序列化到數(shù)據(jù)庫(kù)等。

這2種方案的區(qū)別
  • 第一種需要手動(dòng)人工去操作;而第二種是全自動(dòng)的,只要有死信,就能立即被消費(fèi);
  • 基于上面一點(diǎn),可以引出開發(fā)成本上的區(qū)別。第一種基本不用額外的編程;而第二種則需要定義對(duì)應(yīng)死信隊(duì)列的監(jiān)聽器,才能自定義消費(fèi)邏輯;
  • 再基于上面一點(diǎn),可以引出功能、擴(kuò)展性上的區(qū)別。第一種基本沒有其他擴(kuò)展能力;而第二種,因?yàn)槭盏降乃佬畔Ⅲw,不僅包含了原消息,還攜帶了成為死信的原因,比如上面的例子,在日志打印的完整消息中,可以看到 x-first-death-reason=expired,即原因是消息過期了,那我們則可以根據(jù)不同的原因再結(jié)合具體業(yè)務(wù),定制處理邏輯;

死信隊(duì)列其他配置

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          packetUplinkInput:
            consumer:
              # DLQ相關(guān)
              autoBindDlq: true # 是否自動(dòng)聲明死信隊(duì)列(DLQ)并將其綁定到死信交換機(jī)(DLX)。默認(rèn)是false。
              deadLetterQueueName: 'packetUplinkDlxTopic.scas-data-collection.dlx.dlq' # 默認(rèn)prefix + destination + group + .dlq。DLQ的名稱。
              deadLetterExchange: 'DLX' # 默認(rèn)prefix + DLX。DLX的名稱
              deadLetterRoutingKey: 'packetUplinkDlxTopic.scas-data-collection.dlx' # 默認(rèn)destination + group
              dlqExpires: 30000 # 隊(duì)列所有 customer 下線, 且在過期時(shí)間段內(nèi) queue 沒有被重新聲明, 多久之后隊(duì)列會(huì)被銷毀, 注意, 不管隊(duì)列內(nèi)有沒有消息. 默認(rèn)不設(shè)置.
              dlqLazy: false # 是否聲明為惰性隊(duì)列(Lazy Queue).默認(rèn)false
              dlqMaxLength: 100000 # 隊(duì)列中消息數(shù)量的最大限制. 默認(rèn)不限制
              dlqMaxLengthBytes: 100000000 # 隊(duì)列所有消息總字節(jié)的最大限制. 默認(rèn)不限制
              dlqMaxPriority: 255 # 隊(duì)列的消息可以設(shè)置的最大優(yōu)先級(jí). 默認(rèn)不設(shè)置
              dlqTtl: 1000000 # 隊(duì)列的消息的過期時(shí)間. 默認(rèn)不限制
              republishToDlq: true # 默認(rèn)false。當(dāng)為true時(shí),死信隊(duì)列接收到的消息的headers會(huì)更加豐富,多了異常信息和堆棧跟蹤。
              republishDeliveryMode: DeliveryMode.PERSISTENT # 默認(rèn)DeliveryMode.PERSISTENT(持久化)。當(dāng)republishToDlq為true時(shí),轉(zhuǎn)發(fā)的消息的delivery mode

ps:
如果需要驗(yàn)證 republishToDlq 配置的作用,可運(yùn)行測(cè)試用例類 `ScasRepublishToDlqTest,既可看到結(jié)果,控制臺(tái)打印結(jié)果類似如下:

死信包含異常棧

總結(jié)

消息被隊(duì)列丟棄后,會(huì)變成死信,如果隊(duì)列不聲明死信隊(duì)列,那么這些消息將被永久丟棄,而如果聲明死信隊(duì)列,則死信會(huì)進(jìn)入死信,死信可以被重新投遞回原隊(duì)列,也可以采用訂閱死信隊(duì)列的方式自定義處理邏輯,因?yàn)樗佬抨?duì)列其實(shí)也是一個(gè)普通隊(duì)列。又因?yàn)樗佬抨?duì)列是一個(gè)普通隊(duì)列,消費(fèi)過程中肯定也會(huì)產(chǎn)生死信,那么死信隊(duì)列產(chǎn)生的死信,有該何去何從?所以有了死信隊(duì)列的死信隊(duì)列,后續(xù)文章繼續(xù)說明。

擴(kuò)展

1. 鼠標(biāo)懸停標(biāo)簽查看隊(duì)列的header

隊(duì)列headers

2. 如何看出隊(duì)列是否聲明了死信隊(duì)列

dlq標(biāo)志

當(dāng)隊(duì)列聲明了死信隊(duì)列,會(huì)有上圖圈中的2個(gè)標(biāo)簽。
DLX: 代表死信會(huì)被投遞到的死信交換機(jī)。懸停該標(biāo)簽,可以看到 x-dead-letter-exchange: DLX,其中 DLX 就是交換機(jī)名稱;
DLK: 代表死信被投遞到死信交換機(jī)后,會(huì)根據(jù)什么路由準(zhǔn)確投遞到死信隊(duì)列;懸停該標(biāo)簽,可以看到 x-dead-letter-routing-key: packetUplinkDlxTopic.scas-data-collection.dlx;

3. 建議隊(duì)列盡可能聲明死信隊(duì)列

死信隊(duì)列是個(gè)好東西,當(dāng)隊(duì)列聲明了死信隊(duì)列,可以很大程度上避免消息丟失的情況,所以建議隊(duì)列都添加 autoBindDlq 配置。可以使用全局默認(rèn)配置:spring.cloud.stream.default.consumer.autoBindDlq: true,這樣所有隊(duì)列都會(huì)應(yīng)用該配置。

推薦閱讀

Spring Cloud Stream 進(jìn)階配置——高吞吐量(一)——多消費(fèi)者
Spring Cloud Stream 進(jìn)階配置——高吞吐量(二)——彈性消費(fèi)者數(shù)量
Spring Cloud Stream 進(jìn)階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)
Spring Cloud Stream 進(jìn)階配置——高可用(一)——失敗重試

相關(guān)鏈接

Spring Cloud Stream消費(fèi)失敗后的處理策略(三):使用DLQ隊(duì)列(RabbitMQ)

完!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容