SpringCloud 進(jìn)階: 消息驅(qū)動(入門)Spring Cloud Stream【Greenwich.SR3】

?我的博客:程序員笑笑生,歡迎瀏覽博客!

?上一章 SpringCloud 基礎(chǔ)教程(十二)-Zipkin分布式鏈路追蹤系統(tǒng)搭建當(dāng)中,我們使用Zipkin搭建完整的實時數(shù)據(jù)追蹤系統(tǒng)。本章開始我們將進(jìn)入Spring Cloud的更高階的內(nèi)容部分,首先從消息驅(qū)動Spring Cloud Stream開始。

前言

?消息驅(qū)動,顧明思議,在企業(yè)級應(yīng)用中,消息中間件經(jīng)常用于處理非同步場景、消息通知、應(yīng)用解耦等。常用的有RabbitMq、kafka、Redis等消息隊列等。Spring Cloud Stream是一個構(gòu)建事件消息驅(qū)動的微服務(wù)框架,提供了一個靈活的編程模型。并基于Spring的基礎(chǔ)之上,支持發(fā)布-訂閱模型、消費(fèi)者分組、數(shù)據(jù)分片等功能。

一、Stream 應(yīng)用模型

file
  • Middleware: 消息中間件,如RabbitMq等
  • Binder:可以認(rèn)為是適配器,用來將Stream與中間連接起來的,不同的Binder對應(yīng)不同的中間件,需要我們配置
  • Application Core:由Stream封裝的消息機(jī)制,很少情況下自定義開發(fā)
  • inputs:輸入,可以自定義開發(fā)
  • outputs:輸出,可以自定義開發(fā)

接下來快速開始,主要就是針對以上幾個組件進(jìn)行不同的配置。

二、快速開始

?接下來,我們以RabbitMQ為例(消息隊列的環(huán)境搭建整這里不做過多的介紹,本章以Stream為主),新建2個Maven工程,分別當(dāng)做消息生產(chǎn)者(server-receiver)、消息生產(chǎn)者(server-sender),在2個項目中引入Stream依賴和Stream對RabbitMq的依賴,在生產(chǎn)者單獨(dú)的添加web的依賴,為了能夠通過HTTP調(diào)用發(fā)送信息:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
       <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
</dependency>

2.1 server-receiver消費(fèi)者

?啟動主類:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
 * @EnableBinding 表示告訴當(dāng)前應(yīng)用,增加消息通道的監(jiān)聽功能
 * 監(jiān)聽Sink類中名為input的輸入通道:
 */
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReceiverApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication.class, args);
    }
    /**
     * 監(jiān)聽rabbitmq的消息,具體什么隊列,什么topic,通過配置信息application獲取
     *
     * @param msg
     */
    @StreamListener(Sink.INPUT)
    public void reader(String msg) {
        System.out.print("receiver {}:" + msg);
    }
}

application.yml配置:

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: mytopic
            binder: defaultRabbit
      binders:
         defaultRabbit:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8081

?具體配置詳解,spring.cloud.stream為前綴:

bindings配置:

  • input:表示channelName,這里為什么是input,是因為啟動類中@EnableBinding(Sink.class)注解當(dāng)中配置Sink接口,該接口中默認(rèn)定義了channelName的名稱,當(dāng)然我們也可以自己寫Sink接口
  • destination:消息中間件的Topic
  • binder:當(dāng)前bingding綁定的對應(yīng)的適配器,該實例表示適配rabbitmq,名稱默認(rèn)為defaultRabbit,可以自定義,接著需要配置該名稱對應(yīng)的類型,環(huán)境信息等

binders配置:

  • defaultRabbit:binder配置的適配器的名稱,和spring.cloud.stream.bindings.input.binder值一樣
  • environment:表示當(dāng)前binder對應(yīng)的配置信息

2.2 生產(chǎn)者server-sender

?SenderApplication啟動類,添加@EnableBinding注解:

import com.microservice.stream.controller.SenderSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
 * @EnableBinding(SenderSource.class) 表示監(jiān)聽Stream通道功能
 * 
 * SenderSource為自定義的通道接口
 * 
 */
@SpringBootApplication
@EnableBinding(SenderSource.class)
public class SenderApplication {
   
    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class,args);

    }
}

?自定義SenderSource接口,參考o(jì)rg.springframework.cloud.stream.messaging.Source,將channel的名稱改成和消費(fèi)者的Sink的channel名稱一樣。

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface SenderSource {
    /**
     * Name of the output channel.
     */
    String OUTPUT = "input";
    /**
     * @return output channel
     */
    @Output(SenderSource.OUTPUT)
    MessageChannel output();
}

?編寫控制器,通過HTTP發(fā)送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SenderController {
    @Autowired
    SenderSource source;
    
    @RequestMapping("/send")
    public String sender(String msg) {
        source.output().send(MessageBuilder.withPayload(msg).build());
        return "ok";
    }

}

?applicaiton.yml配置,配置和消費(fèi)者的配置一樣

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: mytopic
            binder: defaultRabbit
      binders:
         defaultRabbit:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8081                     

2.3 啟動接受者和消費(fèi)者,發(fā)送消息

?首先啟動消費(fèi)者,看啟動日志,我們看到程序聲明了一個名稱為:mytopic.anonymous.88A97a5vQ9Ox07GnNBlKYQ的隊列,并且綁定了mytopic 主題,創(chuàng)建了一個連上rabbit的連接:

file

我們看看rabbit的web頁面隊列列表中就有了新增了一個隊列,并且綁定了mytopic主題:

file

然后再啟動生產(chǎn)者server-sender,在啟動日志中我們也看到了應(yīng)用創(chuàng)建了到對應(yīng)的消息隊列的連接:

file

接下來我們通過HTTP發(fā)送信息:http://localhost:8081/send/?msg=test,在服務(wù)消費(fèi)者的日志中,監(jiān)聽到了對應(yīng)的消息:

file

通過以上的簡單的實例,我們體驗了Spring Cloud Stream在提供消息驅(qū)動服務(wù)方面非常的方便。

三、代碼分析

3.1 @EnableBinding注解

?@EnableBinding表示告訴應(yīng)用增加了通道監(jiān)聽功能,可以是一個或者多個,可以傳入Sink和Source類,Sink和Souce可以自定義

3.2 Sink和Soure

?我們首先看看Sink類和Source類,

Sink

/**
 * Bindable interface with one input channel.
 */
public interface Sink {

    /**
     * Input channel name.
     */
    String INPUT = "input";

    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    SubscribableChannel input();

}

Source

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output";

    /**
     * @return output channel
     */
    @Output(Source.OUTPUT)
    MessageChannel output();

}

?Sink和Source一樣是一個接口類型, INPUT、OUTPUT表示channel名稱,@Input、@Output表示注入?yún)?shù)為對應(yīng)的channel名稱,在我們的上文中我們自定義了SenderSource類型的接口,為了和Sink的channel名稱一樣。

  • Sink:單一的輸入通道
  • Source:單一的輸出通道

?Spring會為每一個標(biāo)注了@Output,@Input的管道接口生成一個實現(xiàn)類

3.3 Spring-messaging的抽象

?在Sink和source的接口中,我們注意到了MessageChannel、SubscribableChannel類,在spring框架中,spring-message模塊對消息處理的抽象類:

對象 說明
Message 消息
MessageHandler 處理消息的單元
MessageChannel 發(fā)送/接受傳輸消息的信道,單項的
SubscribableChannel 繼承MessageChannel,傳送消息到所有的訂閱者
ExecutorSubscribableChannel 繼承SubscribableChannel,異步線程池傳輸消息

四、配置文件

??配置文件的格式如下,<>表示我們可以自定義:

spring:
  cloud:
    stream:
      bindings:
         <channel-name>:   #channel名稱
            destination: mytopic  # 發(fā)布-訂閱模型的消息主題topic
            binder: defaultRabbit  #binder(適配器的名稱)
      binders:
         <binder-name>:  # 根據(jù)binder配置一樣的名稱
             type: rabbit  # 中間件的類型
             environment:  # 中間件實例的環(huán)境
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672

?我們可以定義多個binding,分別為binding綁定相同或不同的Binder

總結(jié)

?本章我們初步的介紹了Spring Cloud Stream,通過對Steam的應(yīng)用模型一節(jié)通過消息生產(chǎn)者和消費(fèi)者模型實現(xiàn)了簡單的發(fā)布-訂閱的模型,對Stream有了一些了解,Steram的功能遠(yuǎn)不止于此。在后期的介紹中,我還將繼續(xù)深入的介紹Stream更多的內(nèi)容。

file
file
file

SpringCloud基礎(chǔ)教程(一)-微服務(wù)與SpringCloud

SpringCloud基礎(chǔ)教程(二)-服務(wù)發(fā)現(xiàn) Eureka

SpringCloud基礎(chǔ)教程(三)-Eureka進(jìn)階

SpringCloud 基礎(chǔ)教程(四)-配置中心入門

SpringCloud基礎(chǔ)教程(五)-配置中心熱生效和高可用

SpringCloud 基礎(chǔ)教程(六)-負(fù)載均衡Ribbon

SpringCloud 基礎(chǔ)教程(七)-Feign聲明式服務(wù)調(diào)用

SpringCloud 基礎(chǔ)教程(八)-Hystrix熔斷器(上)

SpringCloud 基礎(chǔ)教程(九)-Hystrix服務(wù)監(jiān)控(下)

SpringCloud 基礎(chǔ)教程(十)-Zull服務(wù)網(wǎng)關(guān)

SpringCloud 基礎(chǔ)教程(十一)- Sleuth 調(diào)用鏈追蹤簡介

SpringCloud 基礎(chǔ)教程(十二)-Zipkin 分布式鏈路追蹤系統(tǒng)搭建

SpringCloud 進(jìn)階: 消息驅(qū)動(入門) Spring Cloud Stream【Greenwich.SR3】

更多精彩內(nèi)容,請期待...

本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容