?我的博客:程序員笑笑生,歡迎瀏覽博客!
?上一章 SpringCloud 基礎(chǔ)教程(十二)-Zipkin分布式鏈路追蹤系統(tǒng)搭建當(dāng)中,我們使用Zipkin搭建完整的實時數(shù)據(jù)追蹤系統(tǒng)。本章開始我們將進(jìn)入Spring Cloud的更高階的內(nèi)容部分,首先從消息驅(qū)動Spring Cloud Stream開始。
前言
?消息驅(qū)動,顧明思議,在企業(yè)級應(yīng)用中,消息中間件經(jīng)常用于處理非同步場景、消息通知、應(yīng)用解耦等。常用的有RabbitMq、kafka、Redis等消息隊列等。Spring Cloud Stream是一個構(gòu)建事件消息驅(qū)動的微服務(wù)框架,提供了一個靈活的編程模型。并基于Spring的基礎(chǔ)之上,支持發(fā)布-訂閱模型、消費(fèi)者分組、數(shù)據(jù)分片等功能。
一、Stream 應(yīng)用模型

- Middleware: 消息中間件,如RabbitMq等
- Binder:可以認(rèn)為是適配器,用來將Stream與中間連接起來的,不同的Binder對應(yīng)不同的中間件,需要我們配置
- Application Core:由Stream封裝的消息機(jī)制,很少情況下自定義開發(fā)
- inputs:輸入,可以自定義開發(fā)
- outputs:輸出,可以自定義開發(fā)
接下來快速開始,主要就是針對以上幾個組件進(jìn)行不同的配置。
二、快速開始
?接下來,我們以RabbitMQ為例(消息隊列的環(huán)境搭建整這里不做過多的介紹,本章以Stream為主),新建2個Maven工程,分別當(dāng)做消息生產(chǎn)者(server-receiver)、消息生產(chǎn)者(server-sender),在2個項目中引入Stream依賴和Stream對RabbitMq的依賴,在生產(chǎn)者單獨(dú)的添加web的依賴,為了能夠通過HTTP調(diào)用發(fā)送信息:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
2.1 server-receiver消費(fèi)者
?啟動主類:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* @EnableBinding 表示告訴當(dāng)前應(yīng)用,增加消息通道的監(jiān)聽功能
* 監(jiān)聽Sink類中名為input的輸入通道:
*/
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(ReceiverApplication.class, args);
}
/**
* 監(jiān)聽rabbitmq的消息,具體什么隊列,什么topic,通過配置信息application獲取
*
* @param msg
*/
@StreamListener(Sink.INPUT)
public void reader(String msg) {
System.out.print("receiver {}:" + msg);
}
}
application.yml配置:
spring:
cloud:
stream:
bindings:
input:
destination: mytopic
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
server:
port: 8081
?具體配置詳解,spring.cloud.stream為前綴:
bindings配置:
- input:表示channelName,這里為什么是input,是因為啟動類中@EnableBinding(Sink.class)注解當(dāng)中配置Sink接口,該接口中默認(rèn)定義了channelName的名稱,當(dāng)然我們也可以自己寫Sink接口
- destination:消息中間件的Topic
- binder:當(dāng)前bingding綁定的對應(yīng)的適配器,該實例表示適配rabbitmq,名稱默認(rèn)為defaultRabbit,可以自定義,接著需要配置該名稱對應(yīng)的類型,環(huán)境信息等
binders配置:
- defaultRabbit:binder配置的適配器的名稱,和spring.cloud.stream.bindings.input.binder值一樣
- environment:表示當(dāng)前binder對應(yīng)的配置信息
2.2 生產(chǎn)者server-sender
?SenderApplication啟動類,添加@EnableBinding注解:
import com.microservice.stream.controller.SenderSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @EnableBinding(SenderSource.class) 表示監(jiān)聽Stream通道功能
*
* SenderSource為自定義的通道接口
*
*/
@SpringBootApplication
@EnableBinding(SenderSource.class)
public class SenderApplication {
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class,args);
}
}
?自定義SenderSource接口,參考o(jì)rg.springframework.cloud.stream.messaging.Source,將channel的名稱改成和消費(fèi)者的Sink的channel名稱一樣。
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface SenderSource {
/**
* Name of the output channel.
*/
String OUTPUT = "input";
/**
* @return output channel
*/
@Output(SenderSource.OUTPUT)
MessageChannel output();
}
?編寫控制器,通過HTTP發(fā)送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SenderController {
@Autowired
SenderSource source;
@RequestMapping("/send")
public String sender(String msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
return "ok";
}
}
?applicaiton.yml配置,配置和消費(fèi)者的配置一樣
spring:
cloud:
stream:
bindings:
input:
destination: mytopic
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
server:
port: 8081
2.3 啟動接受者和消費(fèi)者,發(fā)送消息
?首先啟動消費(fèi)者,看啟動日志,我們看到程序聲明了一個名稱為:mytopic.anonymous.88A97a5vQ9Ox07GnNBlKYQ的隊列,并且綁定了mytopic 主題,創(chuàng)建了一個連上rabbit的連接:

我們看看rabbit的web頁面隊列列表中就有了新增了一個隊列,并且綁定了mytopic主題:

然后再啟動生產(chǎn)者server-sender,在啟動日志中我們也看到了應(yīng)用創(chuàng)建了到對應(yīng)的消息隊列的連接:

接下來我們通過HTTP發(fā)送信息:http://localhost:8081/send/?msg=test,在服務(wù)消費(fèi)者的日志中,監(jiān)聽到了對應(yīng)的消息:

通過以上的簡單的實例,我們體驗了Spring Cloud Stream在提供消息驅(qū)動服務(wù)方面非常的方便。
三、代碼分析
3.1 @EnableBinding注解
?@EnableBinding表示告訴應(yīng)用增加了通道監(jiān)聽功能,可以是一個或者多個,可以傳入Sink和Source類,Sink和Souce可以自定義
3.2 Sink和Soure
?我們首先看看Sink類和Source類,
Sink
/**
* Bindable interface with one input channel.
*/
public interface Sink {
/**
* Input channel name.
*/
String INPUT = "input";
/**
* @return input channel.
*/
@Input(Sink.INPUT)
SubscribableChannel input();
}
Source
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
/**
* Name of the output channel.
*/
String OUTPUT = "output";
/**
* @return output channel
*/
@Output(Source.OUTPUT)
MessageChannel output();
}
?Sink和Source一樣是一個接口類型, INPUT、OUTPUT表示channel名稱,@Input、@Output表示注入?yún)?shù)為對應(yīng)的channel名稱,在我們的上文中我們自定義了SenderSource類型的接口,為了和Sink的channel名稱一樣。
- Sink:單一的輸入通道
- Source:單一的輸出通道
?Spring會為每一個標(biāo)注了@Output,@Input的管道接口生成一個實現(xiàn)類
3.3 Spring-messaging的抽象
?在Sink和source的接口中,我們注意到了MessageChannel、SubscribableChannel類,在spring框架中,spring-message模塊對消息處理的抽象類:
| 對象 | 說明 |
|---|---|
| Message | 消息 |
| MessageHandler | 處理消息的單元 |
| MessageChannel | 發(fā)送/接受傳輸消息的信道,單項的 |
| SubscribableChannel | 繼承MessageChannel,傳送消息到所有的訂閱者 |
| ExecutorSubscribableChannel | 繼承SubscribableChannel,異步線程池傳輸消息 |
四、配置文件
??配置文件的格式如下,<>表示我們可以自定義:
spring:
cloud:
stream:
bindings:
<channel-name>: #channel名稱
destination: mytopic # 發(fā)布-訂閱模型的消息主題topic
binder: defaultRabbit #binder(適配器的名稱)
binders:
<binder-name>: # 根據(jù)binder配置一樣的名稱
type: rabbit # 中間件的類型
environment: # 中間件實例的環(huán)境
spring:
rabbitmq:
host: localhost
port: 5672
?我們可以定義多個binding,分別為binding綁定相同或不同的Binder
總結(jié)
?本章我們初步的介紹了Spring Cloud Stream,通過對Steam的應(yīng)用模型一節(jié)通過消息生產(chǎn)者和消費(fèi)者模型實現(xiàn)了簡單的發(fā)布-訂閱的模型,對Stream有了一些了解,Steram的功能遠(yuǎn)不止于此。在后期的介紹中,我還將繼續(xù)深入的介紹Stream更多的內(nèi)容。



SpringCloud基礎(chǔ)教程(一)-微服務(wù)與SpringCloud
SpringCloud基礎(chǔ)教程(二)-服務(wù)發(fā)現(xiàn) Eureka
SpringCloud基礎(chǔ)教程(三)-Eureka進(jìn)階
SpringCloud 基礎(chǔ)教程(四)-配置中心入門
SpringCloud基礎(chǔ)教程(五)-配置中心熱生效和高可用
SpringCloud 基礎(chǔ)教程(六)-負(fù)載均衡Ribbon
SpringCloud 基礎(chǔ)教程(七)-Feign聲明式服務(wù)調(diào)用
SpringCloud 基礎(chǔ)教程(八)-Hystrix熔斷器(上)
SpringCloud 基礎(chǔ)教程(九)-Hystrix服務(wù)監(jiān)控(下)
SpringCloud 基礎(chǔ)教程(十)-Zull服務(wù)網(wǎng)關(guān)
SpringCloud 基礎(chǔ)教程(十一)- Sleuth 調(diào)用鏈追蹤簡介
SpringCloud 基礎(chǔ)教程(十二)-Zipkin 分布式鏈路追蹤系統(tǒng)搭建
SpringCloud 進(jìn)階: 消息驅(qū)動(入門) Spring Cloud Stream【Greenwich.SR3】
更多精彩內(nèi)容,請期待...
本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布!