微服務(wù)消息隊列-RabbitMQ的簡單使用

1. 下載RabbitMQ

可以使用docker鏡像:
https://python.iitter.com/other/170428.html

2. 添加依賴

生產(chǎn)者和消費(fèi)者都需要添加以下依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3. 生產(chǎn)者

@Service
@Slf4j
public class Producer {

    @Autowired
    private StreamBridge streamBridge;

    public void send(Object message) {
        log.info("sent: {}", message);
        streamBridge.send("testMessage-out-0", message);
    }

}

4. 消費(fèi)者


@Slf4j
@Service
public class Consumer {

    @Autowired
    private ConsumerService consumerService;

    @Bean
    public Consumer<Object> testMessage() {
        return request -> {
            log.info("received: {}", request);
            consumerService.testConsumer(request);
        };
    }
}

在默認(rèn)情況下,框架會使用消費(fèi)者方法的 method name 作為當(dāng)前消費(fèi)者的標(biāo)識,如果消費(fèi)者標(biāo)識和配置文件中的名稱不一致,那么 Spring 應(yīng)用就不知道該把當(dāng)前的消費(fèi)者綁定到哪一個 Stream 信道上去。

5. 配置文件

  • Binder

如果程序中只使用了單一的中間件,比如只接入了 RabbitMQ,那么可以直接在 spring.rabbitmq 節(jié)點(diǎn)下配置連接串,不需要特別指定 binders 配置。

spring:
  cloud:
    stream:
      # 如果你項目里只對接一個中間件,那么不用定義binders
      # 當(dāng)系統(tǒng)要定義多個不同消息中間件的時候,使用binders定義
      binders:
        my-rabbit:
          type: rabbit # 消息中間件類型
          environment: # 連接信息
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
  • bindings

配置了生產(chǎn)者、消費(fèi)者、binder 和 RabbitMQ 四方的關(guān)聯(lián)關(guān)系。

spring:
  cloud:
    stream:
      bindings:
        # 添加 Producer
        testMessage-out-0:
          destination: testMessage-topic
          content-type: application/json
          binder: my-rabbit
        # Consumer
        testMessage-in-0:
          destination: testMessage-topic
          content-type: application/json
          # 消費(fèi)組,同一個組內(nèi)只能被消費(fèi)一次
          group: add-coupon-group
          binder: my-rabbit
      function:
        definition: testMessage

如上就可以生成和消費(fèi)消息了。

6. 異常處理

  • 重試

消息重試是一種簡單高效的異?;謴?fù)手段,當(dāng) Consumer 端拋出異常的時候,Stream 會自動執(zhí)行 2 次重試。

spring:
  cloud:
    stream:
      bindings:
        testMessage-in-0:
          destination: testMessage-topic
          content-type: application/json
          # 消費(fèi)組,同一個組內(nèi)只能被消費(fèi)一次
          group: testMessage-group
          binder: my-rabbit
          consumer:
            # 如果最大嘗試次數(shù)為1,即不重試
            # 默認(rèn)是做3次嘗試
            max-attempts: 5
            # 兩次重試之間的初始間隔
            backOffInitialInterval: 2000
            # 重試最大間隔
            backOffMaxInterval: 10000
            # 每次重試后,間隔時間乘以的系數(shù)
            backOffMultiplier: 2
            # 如果某個異常你不想重試,寫在這里
            retryableExceptions:
              java.lang.IllegalArgumentException: false

除了本地重試以外,還可以把這個失敗的消息丟回到原始隊列中,做一個 requeue 的操作。

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          # requeue重試
          testMessage-in-0:
            consumer:
              requeue-rejected: true
  • 降級

通過 spring-integration 的注解 @ServiceActivator 做了一個橋接,將指定 Channel 的異常錯誤轉(zhuǎn)到本地方法里。

@ServiceActivator(inputChannel = "testMessage-topic.testMessage-group.errors")
public void requestCouponFallback(ErrorMessage errorMessage) throws Exception {
    log.info("consumer error: {}", errorMessage);
    // 實現(xiàn)自己的邏輯
}

  • 死信隊列

如果想要保留這條出錯的 Message,可以選擇將它發(fā)送到另一個 Queue 里。這個特殊的 Queue 就叫做死信隊列。

開啟

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

配置

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          testMessage-in-0:
            consumer:
              auto-bind-dlq: true

死信隊列的名稱和第一個隊列幾乎一樣,唯一區(qū)別就是末尾多了一個.dlq,這個 dlq 就是死信隊列的標(biāo)志。

7.延遲消息

下載rabbitmq_delayed_message_exchange插件

https://blog.csdn.net/u010375456/article/details/106323962/

  • 生產(chǎn)者

// 使用延遲消息發(fā)送
public void sendInDelay(Object message) {
    log.info("sent: {}", coupon);
    streamBridge.send("testMessageDelay-out-0"
            MessageBuilder.withPayload(message)
                    .setHeader("x-delay", 10 * 1000)
                    .build());
}
  • 消費(fèi)者

@Bean
public Consumer<Object> testMessageDelay() {
    return request -> {
        log.info("received: {}", request);
        service.consumer(request);
    };
}
  • 配置文件

spring:
  cloud:
    stream:
      bindings:
        # 延遲發(fā)券 - producer
        testMessageDelay-out-0:
          destination: testMessage-delayed-topic
          content-type: application/json
          binder: my-rabbit
        # 延遲發(fā)券 - Consumer
        testMessageDelay-in-0:
          destination: testMessage-delayed-topic
          content-type: application/json
          # 消費(fèi)組,同一個組內(nèi)只能被消費(fèi)一次
          group: testMessage-group
          binder: my-rabbit
          consumer:
            # 如果最大嘗試次數(shù)為1,即不重試
            # 默認(rèn)是做3次嘗試
            max-attempts: 1
      function:
        definition: testMessageDelay
      rabbit:
        bindings:
          testMessageDelay-out-0:
            producer:
              delayed-exchange: true
          testMessageDelay-in-0:
            consumer:
              delayed-exchange: true

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容