介紹Spring Cloud Stream與RabbitMQ集成

Spring Cloud Stream是一個(gè)建立在Spring Boot和Spring Integration之上的框架,有助于創(chuàng)建事件驅(qū)動(dòng)或消息驅(qū)動(dòng)的微服務(wù)。在本文中,我們將通過(guò)一些簡(jiǎn)單的例子來(lái)介紹Spring Cloud Stream的概念和構(gòu)造。

1 Maven依賴(lài)

在開(kāi)始之前,我們需要添加Spring Cloud Stream與RabbitMQ消息中間件的依賴(lài)。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

同時(shí)為支持Junit單元測(cè)試,在pom.xml文件中添加

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <scope>test</scope>
</dependency>

2 主要概念

微服務(wù)架構(gòu)遵循“智能端點(diǎn)和啞管道”的原則。端點(diǎn)之間的通信由消息中間件(如RabbitMQ或Apache Kafka)驅(qū)動(dòng)。服務(wù)通過(guò)這些端點(diǎn)或信道發(fā)布事件來(lái)進(jìn)行通信。

讓我們通過(guò)下面這個(gè)構(gòu)建消息驅(qū)動(dòng)服務(wù)的基本范例,來(lái)看看Spring Cloud Stream框架的一些主要概念。

2.1 服務(wù)類(lèi)

通過(guò)Spring Cloud Stream建立一個(gè)簡(jiǎn)單的應(yīng)用,從Input通道監(jiān)聽(tīng)消息然后返回應(yīng)答到Output通道。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

注解@EnableBinding聲明了這個(gè)應(yīng)用程序綁定了2個(gè)通道:INPUT和OUTPUT。這2個(gè)通道是在接口Processor中定義的(Spring Cloud Stream默認(rèn)設(shè)置)。所有通道都是配置在一個(gè)具體的消息中間件或綁定器中。

讓我們來(lái)看下這些概念的定義:

  • Bindings — 聲明輸入和輸出通道的接口集合。
  • Binder — 消息中間件的實(shí)現(xiàn),如Kafka或RabbitMQ
  • Channel — 表示消息中間件和應(yīng)用程序之間的通信管道
  • StreamListeners — bean中的消息處理方法,在中間件的MessageConverter特定事件中進(jìn)行對(duì)象序列化/反序列化之后,將在信道上的消息上自動(dòng)調(diào)用消息處理方法。
  • Message Schemas — 用于消息的序列化和反序列化,這些模式可以靜態(tài)讀取或者動(dòng)態(tài)加載,支持對(duì)象類(lèi)型的演變。

將消息發(fā)布到指定目的地是由發(fā)布訂閱消息模式傳遞。發(fā)布者將消息分類(lèi)為主題,每個(gè)主題由名稱(chēng)標(biāo)識(shí)。訂閱方對(duì)一個(gè)或多個(gè)主題表示興趣。中間件過(guò)濾消息,將感興趣的主題傳遞給訂閱服務(wù)器。訂閱方可以分組,消費(fèi)者組是由組ID標(biāo)識(shí)的一組訂戶(hù)或消費(fèi)者,其中從主題或主題的分區(qū)中的消息以負(fù)載均衡的方式遞送。

2.2 測(cè)試類(lèi)

測(cè)試類(lèi)是一個(gè)綁定器的實(shí)現(xiàn),允許與通道交互和檢查消息。讓我們向上面的enrichLogMessage 服務(wù)發(fā)送一條消息,并檢查響應(yīng)中是否包含文本“[ 1 ]:”:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationIntegrationTest {
    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input().send(MessageBuilder.withPayload(new LogMessage("This is my message")).build());

        Object payload = messageCollector.forChannel(pipe.output()).poll().getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

2.3 RabbitMQ配置

我們需要在工程src/main/resources目錄下的application.yml文件里增加RabbitMQ綁定器的配置。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /

input綁定使用名為queue.log.messages的消息交換機(jī),output綁定使用名為queue.pretty.log.messages的消息交換機(jī)。所有的綁定都使用名為local_rabbit的綁定器。
請(qǐng)注意,我們不需要預(yù)先創(chuàng)建RabbitmQ交換機(jī)或隊(duì)列。運(yùn)行應(yīng)用程序時(shí),兩個(gè)交換機(jī)都會(huì)自動(dòng)創(chuàng)建。

3 自定義通道

在上面的例子里,我們使用Spring Cloud提供的Processor接口,這個(gè)接口有一個(gè)input通道和一個(gè)output通道。

如果我們想創(chuàng)建一些不同,比如說(shuō)一個(gè)input通道和兩個(gè)output通道,可以新建一個(gè)自定義處理器。

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

3.1 服務(wù)類(lèi)

Spring將為我們提供這個(gè)接口的實(shí)現(xiàn)。通道的名稱(chēng)可以通過(guò)使用注解來(lái)設(shè)定,比如@Output(“myOutput”)。如果沒(méi)有設(shè)置的話(huà),Spring將使用方法名來(lái)作為通道名稱(chēng)。因此這里有三個(gè)通道:myInput, myOutput, anotherOutput。

現(xiàn)在我們可以增加一些路由規(guī)則,如果接收到的值小于10則走一個(gè)output通道;如果接收到的值大于等于10則走另一個(gè)output通道。

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}

3.2 測(cè)試類(lèi)

發(fā)送不同的消息,判斷返回值是否是通過(guò)不同的通道獲得。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
@DirtiesContext
public class MultipleOutputsServiceApplicationIntegrationTest {
    @Autowired
    private MyProcessor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseIsInAOutput() {
        whenSendMessage(1);
        thenPayloadInChannelIs(pipe.anOutput(), 1);
    }

    @Test
    public void whenSendMessage_thenResponseIsInAnotherOutput() {
        whenSendMessage(11);
        thenPayloadInChannelIs(pipe.anotherOutput(), 11);
    }

    private void whenSendMessage(Integer val) {
        pipe.myInput().send(MessageBuilder.withPayload(val).build());
    }

    private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
        Object payload = messageCollector.forChannel(channel).poll().getPayload();
        assertEquals(expectedValue, payload);
    }
}

4 根據(jù)條件分派

使用@StreamListener 注釋?zhuān)覀冞€可以使用自定義的SpEL表達(dá)式來(lái)過(guò)濾用戶(hù)期望的消息。下面這個(gè)例子,我們使用條件調(diào)度將消息路由到不同的輸出。

@Autowired
private MyProcessor processor;
 
@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}
 
@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

5 總結(jié)

在本教程中,我們介紹了Spring Cloud Stream的主要概念,并展示了如何通過(guò)RabbitMQ上的一些簡(jiǎn)單示例來(lái)使用它。工程代碼可參看https://download.csdn.net/download/peterwanghao/10412121

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容