【Spring Cloud Stream】Rabbit Binder介紹

【前置文章】

【官方文檔】

【本文內(nèi)容】
主要是基于官方文檔進(jìn)行的整合。

  • 默認(rèn)生成的exchange為topic類型。
  • 解釋了group的配置,不配即為publish/subscribe模式,配了即為Work Queues模式。
  • 使用已經(jīng)存在的exchange和queue。
  • RabbitMQ Binder的重試機(jī)制,以及如何配置死信隊(duì)列。
  • 消費(fèi)端設(shè)置并發(fā)數(shù)。
  • 消費(fèi)端手動(dòng)應(yīng)答模式。

另外,沒有介紹到Error Channels,可以參考官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-error-channels


1. Rabbit Binder

image.png

默認(rèn)情況下RabbitMQ Binder創(chuàng)建的是TopicExchange。每個(gè)consumer group中,會(huì)有一個(gè)queue綁定到這個(gè)TopicExchange上。

image.png

對(duì)于匿名的consumers(即沒有g(shù)roup屬性的),會(huì)創(chuàng)建一個(gè)隨機(jī)的queue,程序stop后這個(gè)queue會(huì)自動(dòng)刪除。如下圖,AD的意思是auto-delete=true:
image.png

默認(rèn)情況下routingKey=#,即匹配所有:

image.png

比如發(fā)送消息的時(shí)候,我們按routingKey=abc發(fā)送,程序的consumer端照樣能收到消息:
image.png

收到的消息:
image.png

2. 有無配置group導(dǎo)致的兩種模式

2.1 Publish/Subscribe模式(fanout)

官方的Publish/Subscribe模式:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

Publish/Subscribe模式,如需要更新所有節(jié)點(diǎn)上的基于內(nèi)存的緩存:
image.png

如果用Spring Boot實(shí)現(xiàn),可以參考上述第4個(gè)前置文章(即:【RabbitMQ的那點(diǎn)事】訂閱/發(fā)布模式下,動(dòng)態(tài)新建queue,實(shí)現(xiàn)所有節(jié)點(diǎn)都能收到消息)。

用Spring Cloud Stream實(shí)現(xiàn)的話,只要不聲明group即可(也就是第1章截圖的匿名的consumers)。

2.2 Work Queues模式

官方的Work Queues模式:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

用圖表示,可以看到多個(gè)Consumer監(jiān)聽同一個(gè)Queue,依次拿到消息:
image.png

用Spring Boot寫的話,可以新建DirectExchange,再新建Queue,這時(shí)候的程序可以布署兩個(gè)instance節(jié)點(diǎn),那么在實(shí)際環(huán)境中兩個(gè)節(jié)點(diǎn)就會(huì)依次收到消息。

用Spring Cloud Stream寫的話,只需要加上group屬性即可,比如我們有一個(gè)Consumer:

    @Bean
    public Consumer<String> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

那么在application.properties中定義:

spring.cloud.stream.bindings.log-in-0.destination=log-in-0
spring.cloud.stream.bindings.log-in-0.group=log

此時(shí)自動(dòng)創(chuàng)建的exchange=log-in-0,queue=log-in-0.log,這時(shí)候的queue則為durable的queue,即程序down后queue還是存在,并不會(huì)被刪除。

定義的方式也可按binding name進(jìn)行配置,即先給log-in-0的綁定起個(gè)描述性的name,再根據(jù)該name進(jìn)行定義group,這樣自動(dòng)創(chuàng)建的exchange則會(huì)=mylog,queue=mylog.log

spring.cloud.stream.function.bindings.log-in-0=mylog
spring.cloud.stream.bindings.mylog.group=log

3. 使用已經(jīng)存在的exchange和queue

我們先在RabbitMQ Broker中新建以下關(guān)系:

  • exchange=order.in,queue= order,routingKey=order.in.routing,截圖如下:

    image.png

  • exchange=order.out,queue=order.out.test,routingKey=order.out.routing,截圖如下:

    image.png

我們?cè)诖a是創(chuàng)建了一個(gè)Function,用來接收來自queue=order,處理完成后發(fā)送給exchange=order.out

    @Bean
    public Function<String, String> handleOrder() {
        return order -> {
            System.out.println("Received: " + order);
            return "finished for " + order;
        };
    }

那么,在application.yaml中配置如下:

spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn
spring.cloud.stream.bindings.orderIn.destination=order.in
spring.cloud.stream.bindings.orderIn.group=order
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindingRoutingKey=order.in.routing

spring.cloud.stream.function.bindings.handleOrder-out-0=orderOut
spring.cloud.stream.bindings.orderOut.destination=order.out
spring.cloud.stream.rabbit.bindings.orderOut.producer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.routingKeyExpression='order.out.routing'

【解釋】
首先是通用配置,以spring.cloud.stream.bindings開頭:

  • spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn:這個(gè)在第1篇前置文章#4.3有講到,為了配置方便,我們特定加了bindings的命名。
  • <通用配置>.<bindingsName>.destination表示exchnge的名字。是Spring Cloud Stream的通用配置。
  • <通用配置>.<bindingsName>.group,在#2.2有解釋。

其次是RabbitMQ的專有配置,以spring.cloud.stream.rabbit.bindings開頭,consumer端所有的配置,可以參考官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties

  • <專有配置>.<bindingsName>.consumer.bindQueue:表示是否要?jiǎng)?chuàng)建queue并綁定到相應(yīng)的exchange上。默認(rèn)為true,即會(huì)創(chuàng)建并綁定。false表示不會(huì)新建。
  • <專有配置>.<bindingsName>.consumer.declareExchange:表示是否按destination的值創(chuàng)建exchange(默認(rèn)為true,并與destination同名)。false表示不會(huì)新建。
  • <專有配置>.<bindingsName>.consumer.queueNameGroupOnly:默認(rèn)為false。當(dāng)值為true表示在consumer端的queue的名字等于我們定義的group的名字,即當(dāng)我們想要使用已經(jīng)存在的queue時(shí),我們可以設(shè)置這個(gè)值為true,并把group的名字定義為queue的名字。
  • <專有配置>.<bindingsName>.consumer.bindingRoutingKey:默認(rèn)為#(即all match,匹配所有的字符)。自定義的時(shí)候可以設(shè)置多個(gè)值,想要多個(gè)值時(shí)可配合bindingRoutingKeyDelimiter使用。

在Producer端的配置,可以參考官方文檔:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-prod-props

  • <專有配置>.<bindingsName>.producer.routingKeyExpression:值得注意的是這個(gè)配置,它是SpEL表達(dá)式,對(duì)于已經(jīng)確定的routingKey,需要用單引號(hào)標(biāo)記,以表示是字符串,如我們的例子中的'order.out.routing'。

4. RabbitMQ Binder的重試機(jī)制

當(dāng)開啟了重試機(jī)制,如果有消息被退回,那么將阻塞消費(fèi)端(會(huì)一直重復(fù)消費(fèi)該消息),通常情況下,可以使用Dead Letter Queue(死信隊(duì)列)來解決這個(gè)問題。

#disable binder retries
spring.cloud.stream.bindings.orderIn.consumer.max-attempts=5
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dlq-ttl=50000
spring.cloud.stream.rabbit.bindings.orderIn.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dead-letter-queue-name=myDlq

【解釋】

  • max-attempts表示重試次數(shù),默認(rèn)為3。如果要關(guān)閉重試,可以將value置為1。如果值>1,在遇到錯(cuò)誤時(shí),會(huì)先嘗試重試,然后才是將信息放到Dead Letter Queue中。以下是max-attempts=5時(shí)的日志:
    image.png

    關(guān)于max-attempts,每重試一次,會(huì)在message的header中deliveryAttempt的值+1,比如第一次:
    image.png

    第二次重試時(shí):
    image.png
  • dlq-ttl表示queue中的message的time-to-live的時(shí)間,即過期時(shí)間,這里的意思是生成的死信隊(duì)列myDlx中的消息過期時(shí)間為5秒。
  • auto-bind-dlq=true表示會(huì)創(chuàng)建一個(gè)Dead Letter Queue并進(jìn)行綁定,默認(rèn)情況下生成的死信隊(duì)列名為{destination}.dlq,我們上述的例子,默認(rèn)的DLQ為order.dlq。也可通過dead-letter-queue-name進(jìn)行重新命名,即我們的myDlq。
  • 如果沒有特別定義,那么默認(rèn)生成的Dead Letter Exchange名字則為DLX。也可通過dead-letter-exchange進(jìn)行定義。另外默認(rèn)生成的DLX類型為direct,可通過dead-letter-exchange-type配置成其它類型如fanout或topic。
  • 也可定義dead-letter-routing-key,即DLX和DLQ進(jìn)行綁定時(shí)的routingKey,默認(rèn)為{destination},我們的case中為order。截圖如下:
    image.png

ps. 上述的DLQ和DLX默認(rèn)的命名可以再通過配置prefix來加上前綴,如我們定義了prefix=apple,那么如果我們沒有自己定義命名,默認(rèn)的Queue則為apple.{destination}.dlq,默認(rèn)的Exchange則為apple.DLX。

生成的DLX和DLQ如下:

order是原本的queue,myDlq是本章節(jié)配置生成的DLQ。
image.png

DLX如下:
image.png

【測試】
想要測試消息到達(dá)死信隊(duì)列,那么可以在正常消費(fèi)端的代碼中拋出異常:AmqpRejectAndDontRequeueException。這個(gè)異常的名字已經(jīng)很直白了,即否定應(yīng)答(basic.reject)以及requeue=false。

還有一種方式即先設(shè)置requeue-rejected值等于false(默認(rèn)即為false),然后拋出任意錯(cuò)誤即可。

spring.cloud.stream.rabbit.bindings.orderIn.consumer.requeue-rejected=false

比如我們定義Function如下,即收到消息等于字符串error,則報(bào)異常。

    @Bean
    public Function<String, String> handleOrder() {
        return order -> {
            log.info("handleOrder Received: " + order);

            if (order.equals("error")) {
                throw new AmqpRejectAndDontRequeueException("error");
            }
            return "finished for " + order;
        };
    }

同時(shí)我們給DLQ也設(shè)置了監(jiān)聽:

    @RabbitListener(queues = {"myDlq"})
    public void deadLetterListener(String message) {
        log.info("Received Dead Letter message: {}", message);
    }

運(yùn)行程序,在RabbitMQ Console中發(fā)送消息=error:
image.png

查看日志,首先是5次retry嘗試,然后由于消費(fèi)端拋出異常AmqpRejectAndDontRequeueException,消息轉(zhuǎn)而通過DLX發(fā)送到了我們的死信隊(duì)列中,即myDlq:

image.png

5. 消費(fèi)端設(shè)置并發(fā)數(shù)

在消費(fèi)端設(shè)置并發(fā)數(shù),這個(gè)并不是RabbitMQ的參數(shù),而是Spring Cloud Stream的通用參數(shù),詳見:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_consumer_properties

比如我們?cè)O(shè)置了并發(fā)數(shù)為5:

spring.cloud.stream.bindings.orderIn.consumer.concurrency=5

那么消費(fèi)端則會(huì)開5個(gè)線程來消費(fèi)消息:
image.png

默認(rèn)并發(fā)數(shù)為1,即:
image.png

6. 消費(fèi)端手動(dòng)應(yīng)答模式

可通過設(shè)置acknowledge-mode,默認(rèn)為AUTO。

設(shè)置手動(dòng)應(yīng)答模式:

spring.cloud.stream.rabbit.bindings.orderIn.consumer.acknowledge-mode=MANUAL

消費(fèi)端代碼,代碼還是通過channel和delivertyTag進(jìn)行應(yīng)答,和Spring Boot的代碼差不多。

@Bean
    public Function<Message<String>, Message<String>> handleOrder() {
        return message -> {
            log.info("handleOrder Received: " + message.getPayload());

            Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

            try {
                if (message.getPayload().equals("error")) {
                    channel.basicNack(deliveryTag, false, false);
                } else {
                    channel.basicAck(deliveryTag, false);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

            return new GenericMessage<>("finished for " + message, message.getHeaders());
        };
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容