相關(guān)資料以及注意事項(xiàng):
- 工程GitHub
- 需要配置本地RabbitMQ環(huán)境,方法見SpringBoot - 消息隊(duì)列RabbitMQ安裝測試
- SpringCloud Stream官方文檔
- SpringCloud Stream官方文檔中文翻譯
- 環(huán)境:SpringBoot 2.0.4.RELEASE + SpringCloud Finchley.SR1
簡介
SpringCloud Stream是SpringCloud對(duì)于消息中間件的進(jìn)一步封裝,它簡化了開發(fā)人員對(duì)于消息隊(duì)列的操作。目前僅支持RabbitMQ與Kafka。

示例代碼
1.引入Maven
根據(jù)你的消息隊(duì)列類型引入spring-cloud-starter-stream-rabbit或者spring-cloud-starter-stream-kafka
2.相關(guān)注釋
-
@input: 設(shè)置輸入信道名稱。不設(shè)置參數(shù),通道名稱就默認(rèn)為方法名。 -
@output:設(shè)置輸出信道名稱。不設(shè)置參數(shù),通道名稱就默認(rèn)為方法名。 -
@StreamListener:設(shè)置監(jiān)聽信道,用于接受來自消息隊(duì)列的消息 -
@SendTo:配合@StreamListener使用,在收到信息后發(fā)送反饋信息 -
@EnableBinding:注解用于綁定一個(gè)或者多個(gè)接口作為參數(shù)
3.預(yù)設(shè)類
-
Sink:stream中接受消息的接口 -
Source:stream中輸出消息的接口 -
Processor:stream中綁定輸入輸出的接口,主要用來講發(fā)布者和訂閱者綁定到一起
4.配置文件參數(shù)
對(duì)于RabbitMQ,destination 對(duì)應(yīng)的是exchange,group對(duì)應(yīng)的是queue(帶有前綴)。對(duì)于kafka,destination 對(duì)應(yīng)的是Topic,group就是對(duì)應(yīng)的消費(fèi)group。對(duì)于一個(gè)應(yīng)用集群,如果不需要重復(fù)消費(fèi)消息,必須定義group,否則不必定義group(比如刷新配置消息)。接收消息通道和發(fā)送消息通道名不可以重復(fù)。需要在后臺(tái)監(jiān)控頁面看到直觀的對(duì)象數(shù)據(jù),需要設(shè)置 content-type: application/json。
cloud:
stream:
bindings:
input: #通道名稱
group: group1
content-type: application/json
destination: exchange
5.程序代碼
一個(gè)controller,分別是傳遞字符串與傳對(duì)象。
一個(gè)receiver,一個(gè)是接受并發(fā)出反饋信息,另一個(gè)為接收反饋。
@RestController
public class SendMessageController {
@Autowired
private Processor processor;
@GetMapping("/sendMessage")
public void process()
{
String message = new StringBuilder().append("now ").append(new Date()).toString();
processor.output().send(MessageBuilder.withPayload(message).build());
}
@GetMapping("/sendObject")
public void sendObject()
{
Person person = new Person();
person.setName("張三");
person.setAge(123);
//process中已設(shè)置input
processor.output().send(MessageBuilder.withPayload(person).build());
}
}
@Component
@EnableBinding(Processor.class)
@Slf4j
public class StreamReceiver {
@StreamListener(Processor.OUTPUT)
@SendTo(Processor.INPUT)
public String process(String message)
{
System.out.println(message);
log.info("process: StreamReceiver:{}",message);
return "我是回執(zhí)";
}
//@StreamListener(Processor.OUTPUT)
//public void process1(Person person)
//{
// System.out.println(person);
// log.info("process1: StreamReceiver:{}",person);
//}
@StreamListener(Processor.INPUT)
public void process2(String message)
{
System.out.println(message);
log.info("process2: StreamReceiver:{}",message);
}
}
好了,那么有關(guān)于SpringCloud Stream就介紹到這里。