Spring Cloud Stream初探

最近有一段時間沒有學(xué)習(xí)Spring相關(guān)的內(nèi)容了,上次和一個朋友聊天說到了Spring Cloud Stream,聽他說的是這是一個消息隊列的技術(shù).之前我了解到的是有Spring Cloud Bus這樣一個技術(shù),但是因為精力有限肯定不能什么都去學(xué)習(xí),所以今天簡單了解了一下Spring Cloud BusSpring Cloud Stream.
文檔地址:Spring Cloud

一、簡介

Spring Cloud Bus官方的簡介:

Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions. AMQP and Kafka broker implementations are included with the project. Alternatively, any Spring Cloud Stream binder found on the classpath will work out of the box as a transport.

Spring Cloud Stream官方簡介:

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

根據(jù)上面的是簡介,我們大概能了解到Spring Cloud BusSpring Cloud Stream的區(qū)別:
Spring Cloud Bus定位于通過輕量級的消息代理來鏈接分布式系統(tǒng)中的節(jié)點,其主要用來廣播狀態(tài)(配置)或者其他管理變更。其實現(xiàn)了AMQPkafka。并且它還可以使用Spring Cloud Stream的消息綁定器.感覺Spring Cloud Bus更偏向于用來廣播配置方面的變更,而非業(yè)務(wù)方面的數(shù)據(jù)。
Spring Cloud Stream則是一個通過共有的消息系統(tǒng)構(gòu)建高擴(kuò)展性的,以事件驅(qū)動的微服務(wù)的框架。這和Spring Cloud Bus定位是不一樣的。
之前的項目中已經(jīng)多次整合了消息隊列KafkaRocketmq等等,感興趣的可以查看相關(guān)的文章。但是目前穩(wěn)定看到的Spring Cloud Stream官方支持的只有RabbitMQ、KafkaAmazon Kinesis。下面開始搭建項目。

二、創(chuàng)建項目

本次項目Spring Boot版本選擇的是:2.3.8.RELEASE;而Spring Cloud版本是:Hoxton.SR9Kafka版本是:2.5.1。在測試中需要注意相關(guān)版本問題。
按照需要我創(chuàng)建兩個項目,一個消息生產(chǎn)者,一個消息消費者,因為只是測試Spring Cloud Stream,所以我項目依賴比較簡單,其中生產(chǎn)者額外添加了數(shù)據(jù)庫的相關(guān)依賴,而消費者只有Spring Cloud Stream,為了方便這里只放出生產(chǎn)者項目的pom.xml,如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.spring.cloud.stream.kafka</groupId>
    <artifactId>producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>producer</name>
    <description>spring cloud stream use kafka</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR9</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

接下來就是配置文件,先看消息生產(chǎn)者項目(下文統(tǒng)一簡稱為生產(chǎn)者):

server.port=8080

## JPA配置
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQL10Dialect
## 指定列名,不配置指定列名不生效
spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.properties.hibernate.temp.use_jdbc_metadata_defaults=false
## 數(shù)據(jù)庫配置
spring.datasource.username=postgres
spring.datasource.password=123456
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/pgsql?useSSL=false&characterEncoding=utf8

## kafka基礎(chǔ)配置
spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.kafka.binder.health-timeout=60
## 全局配置
#spring.cloud.stream.kafka.default.producer.<property>=<value>

## channel 名稱為自定義的out_channel
spring.cloud.stream.bindings.out_channel.binder=kafka
spring.cloud.stream.bindings.out_channel.content-type=application/json
spring.cloud.stream.bindings.out_channel.destination=topic_one
spring.cloud.stream.bindings.out_channel.group=group_one
spring.cloud.stream.bindings.out_channel.producer.auto-startup=true
spring.cloud.stream.bindings.out_channel.producer.partition-count=1

上述的配置文件中關(guān)于數(shù)據(jù)庫和JPA的配置可以忽略。相關(guān)的配置可以在官方文檔找到,而且講解也比較詳細(xì),我們只是簡單分析下上面的配置,主要就是自定義消息發(fā)送的管道名稱out_channel,其對應(yīng)的topic、group還有就是發(fā)生消息的內(nèi)容類型,默認(rèn)就是application/json,支持的其他類型還有text/plain、application/xml、text/xml等。
接下來就是編寫相關(guān)的代碼,首先我們需要綁定消息通道的接口,用來綁定我們的輸入流和輸出流,代碼如下:

public interface CustomChannel {

    /**
     * 輸出channel 名稱
     */
    String OUTPUT = "out_channel";

    /**
     * 輸入channel 名稱
     */
    String INPUT = "in_channel";

    @Output(value = CustomChannel.OUTPUT)
    MessageChannel output();

    @Input(value = CustomChannel.INPUT)
    SubscribableChannel input();
}

上面的@Output@Input分別表示的輸出管道和輸入管道,這兩個注解標(biāo),而它們的名稱就是由框架創(chuàng)建的bean的名稱,也就是配置文件我們配置的輸出管道和輸入管道的名稱。對于生產(chǎn)者我們只需要配置輸出管道即可,接下來我們創(chuàng)建一個消息生產(chǎn)者

@Component
public class MessageProducer {

    private CustomChannel source;

    public MessageProducer(CustomChannel source) {
        this.source = source;
    }

    public CustomChannel getSource() {
        return source;
    }
}

這個代碼其實沒什么特別的意義,不創(chuàng)建也可以,因為消息最終的發(fā)送還是由綁定的輸出管道發(fā)送的。
編寫一個測試的Controller用來發(fā)送消息。

@RestController
@RequestMapping("/kafka")
public class SendMessageController {

    private TestService testService;

    public SendMessageController(TestService testService) {
        this.testService = testService;
    }

    @PostMapping("/send/message")
    public ResponseEntity<Boolean> sendMessage() {
        return ResponseEntity.ok(testService.sendMessage());
    }
}

具體的邏輯代碼,如下:

@Service
public class TestServiceImpl implements TestService {

    private MessageProducer messageProducer;

    private UserRepository userRepository;

    public TestServiceImpl(MessageProducer messageProducer, UserRepository userRepository) {
        this.messageProducer = messageProducer;
        this.userRepository = userRepository;
    }

    @Override
    public Boolean sendMessage() {
        List<UserEntity> userEntityList = userRepository.findAll();
        MessageBuilder<List<UserEntity>> messageBuilder = MessageBuilder.withPayload(userEntityList);
        boolean success = messageProducer.getSource().output().send(messageBuilder.build());
        return success;
    }
}

上面的代碼中我從數(shù)據(jù)庫查詢了一些數(shù)據(jù),然后由消息生產(chǎn)者,準(zhǔn)確的說是綁定的消息管道進(jìn)行發(fā)送,這里只是簡單的發(fā)送了一個消息體。
還有一個重要的事情沒有做,就是開啟消息輸出管道和輸入管道和broker的綁定關(guān)系。這點一定一定別忘了,所以在啟動類上添加@EnableBinding(CustomChannel.class)。這個注解是支持多個綁定關(guān)系的,如果你自定義了多個輸出管道和輸入管道都可以添加上。
這里基本上生產(chǎn)者的相關(guān)代碼已經(jīng)完成了。接下來我們開始消費者項目(下文統(tǒng)稱消費者)。
消費者主要就是綁定輸入管道,項目配置文件如下:

server.port=9090

spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092

spring.cloud.stream.bindings.in_channel.binder=kafka
spring.cloud.stream.bindings.in_channel.destination=topic_one
#spring.cloud.stream.bindings.input.group=group_one
spring.cloud.stream.bindings.in_channel.content-type=application/json

配置非常簡單,就是指定了Kafka以及輸入管道的topic和消息類型,其實這兩點只需要和生產(chǎn)者的配置保持一致即可。
消費者我們則需要創(chuàng)建消息的監(jiān)聽器來訂閱輸入管道的消息,代碼如下:

@Slf4j
@EnableBinding(CustomChannel.class)
public class TestListener {

    @StreamListener(target = CustomChannel.INPUT)
    public void consume(Message<String> message) {
        String body = message.getPayload();
        log.info(">>>> message={} <<<<",body);
    }
}

將生產(chǎn)者項目中綁定消息通道的接口復(fù)制一份到消費者,同樣將其和broker進(jìn)行綁定。這里需要注意一點,@EnableBinding不能直接添加到啟動類上(除非你在啟動類內(nèi)添加監(jiān)聽器),而應(yīng)該添加到具體的消息監(jiān)聽器所在的類上。@StreamListener表明這個方法是一個輸入通道的消息的監(jiān)聽方法,也就是真正的消費消息的方法(方法名稱無所謂),該注解的名稱就是輸入管道的名稱。
接下來我們就測試以下消息的發(fā)送和接收,分別啟動生產(chǎn)者和消費者。并調(diào)用測試發(fā)送消息的接口,成功收到了生產(chǎn)者發(fā)送的消息列表。當(dāng)然有的人會問,你在生產(chǎn)者里面使用了消息的泛型List<UserEntity>,而在消費者里面消息的泛型則為String,我的感覺這里因為發(fā)送消息的內(nèi)容是json,因此可以直接使用String,當(dāng)然和生產(chǎn)者一樣使用相應(yīng)的泛型也可以,并不會影響消息的接收。

三、總結(jié)

因為上面只是做了一個簡單的例子,要說有什么很大的收貨確實是沒有,Spring Cloud Stream就是在KafkaRabbitMQ進(jìn)行了進(jìn)一步的抽象,它不關(guān)心你具體的使用那個類型的消息隊列,只需要在配置中具體指定使用的類型即可,相當(dāng)于做了一個統(tǒng)一性的標(biāo)準(zhǔn)化的接口,不需要額外的去配置具體的消息的監(jiān)聽器等等。但是就本次學(xué)習(xí)的quick start來講,個人感覺有點雞肋,當(dāng)然這個見仁見智,大家有興趣都可以交流討論。因為時間關(guān)系本次學(xué)習(xí)就到這里,如果有什么疑問歡迎大家交流、討論,最后還是希望大家能多多關(guān)注我的VX個人號超超學(xué)堂,非常感謝。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容