[TOC]
Spring Cloud Stream簡介
Spring Cloud Stream是什么:
Spring Cloud Stream是Spring Cloud的一個子項目,是一個能讓我們更加方便操作MQ的框架,其目的用于構(gòu)建與消息中間件連接的高度可伸縮的消息事件驅(qū)動的微服務(wù)
簡單來說Spring Cloud Stream就是一個簡化了MQ操作的框架,其架構(gòu)圖如下:

- 圖片來自官方文檔,從圖中可以看到應(yīng)用通過input和output與Binder進(jìn)行交互,而Binder是一個讓我們的微服務(wù)與MQ集成的組件。圖中的Middleware即是消息中間件,目前支持Kafka、RabbitMQ以及RocketMQ
Spring Cloud Stream編程模型:

- 圖片來自官方文檔,微服務(wù)(Application)集成了Stream后,Stream的Destination Binder會創(chuàng)建兩個Binding,左邊的Binding連接著RabbitMQ,右邊的Binding連接著Kafka。左邊的Binding從RabbitMQ消費消息,然后經(jīng)過圖中代碼的處理后,把處理結(jié)果通過右邊的Binding投遞到Kafka。簡單來說,就是這個微服務(wù)消費了RabbitMQ里的消息并對其進(jìn)行處理,最后將處理的結(jié)果投遞到Kafka中。Input和Output是消息相對與微服務(wù)的走向,input表示微服務(wù)接收消息,output表示微服務(wù)投遞消息或發(fā)送消息
關(guān)于圖中的概念:
- Destination Binder(目標(biāo)綁定器):與消息中間件通信的組件,用于實現(xiàn)消息的消費和投遞
- Destination Bindings(目標(biāo)綁定):Binding是連接應(yīng)用程序跟消息中間件的橋梁,用于消息的消費和生產(chǎn),由binder創(chuàng)建
使用Spring Cloud Stream
現(xiàn)在有一個微服務(wù)項目:content-center,該微服務(wù)作為生產(chǎn)者,我們來為這個微服務(wù)集成Spring Cloud Stream,第一步添加stream依賴:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
- Tips:該項目的Spring Cloud版本為:Greenwich.SR1;Spring Cloud Alibaba版本為:2.1.0.RELEASE
第二步,在啟動類上添加@EnableBinding注解,如下:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
...
第三步,在配置文件中,添加與stream相關(guān)的配置項:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# 生產(chǎn)者為output
output:
# 用于指定topic
destination: stream-test-topic
完成以上步驟后,項目就已經(jīng)集成了Spring Cloud Stream,現(xiàn)在我們來使用Spring Cloud Stream編寫生產(chǎn)者,具體代碼如下:
package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 生產(chǎn)者
*
* @author 01
* @date 2019-08-10
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
private final Source source;
@GetMapping("/test-stream")
public String testStream(){
Message<String> message = MessageBuilder
.withPayload("消息體")
.build();
source.output()
.send(message);
return "send message success!";
}
}
啟動項目,測試該接口是否能成功執(zhí)行:

然后為另一個作為消費者的微服務(wù)項目:user-center,集成Spring Cloud Stream,由于依賴配置是一樣的,這里就不進(jìn)行重復(fù)了,但是配置和注解里的類需要更改一下。首先是配置如下:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# 消費者為input
input:
# 用于指定topic
destination: stream-test-topic
# rocketmq必須配置group,否則啟動會報錯
# 如果使用的是其他MQ,則不是必須配置的
group: binder-group
啟動類的注解如下:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
...
完成集成后,使用Spring Cloud Stream編寫消費者,具體代碼如下:
package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
/**
* 消費者
*
* @author 01
* @date 2019-08-10
**/
@Slf4j
@Service
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody) {
log.info("通過stream收到了消息,messageBody = {}", messageBody);
}
}
完成代碼的編寫后啟動項目,由于先前我們已經(jīng)通過生產(chǎn)者往RocketMQ投遞了消息,所以此時控制臺會輸出接收到的消息,如下:

Spring Cloud Stream自定義接口
通過以上小節(jié)的學(xué)習(xí),我們已經(jīng)了解了Spring Cloud Stream的基本使用。從以上示例可以得知,input用于綁定一個topic消費消息,output則反之,用于綁定一個topic投遞消息。
但在實際的項目中,可能會有多個topic,甚至在極端場景下,不同的topic可能使用不同的MQ實現(xiàn),而stream默認(rèn)提供的input和output都只能綁定一個topic,所以這個時候就需要用到stream的自定義接口來實現(xiàn)多個“input”和“output”綁定不同的topic了。
在以上小節(jié)的示例中可以得知,生產(chǎn)者發(fā)送消息時使用的是Source接口里的output方法,而消費者發(fā)送消息時使用的是Sink接口里的input方法,并且都需要配置到啟動類的@EnableBinding注解里。所以實際上我們需要自定義接口的源碼與這兩個接口的源碼幾乎一致,只是名稱有所不同而已,使用上也只是將Source和Sink改為自定義的接口即可。
接下來簡單演示一下如何自定義接口并使用,我們基于上一小節(jié)的例子進(jìn)行改造。首先是生產(chǎn)者,定義一個用于發(fā)送消息的接口,具體代碼如下:
package com.zj.node.contentcenter.rocketmq;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 自定義發(fā)送消息接口,與stream默認(rèn)提供的Source源碼是類似的
*
* @author 01
* @date 2019-08-10
**/
public interface MySource {
/**
* Name of the output channel.
*/
String MY_OUTPUT = "my-output";
/**
* @return output channel
*/
@Output(MY_OUTPUT)
MessageChannel output();
}
然后在啟動類的@EnableBinding中,添加這個接口:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding({Source.class, MySource.class})
...
在配置文件中添加如下配置:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# 生產(chǎn)者為output
output:
# 用于指定topic
destination: stream-test-topic
# 自定義的”output“,這里的名稱需要與MySource接口里的MY_OUTPUT相對應(yīng)
my-output:
# 綁定不同的topic
destination: stream-my-topic
修改生產(chǎn)者的代碼如下即可:
package com.zj.node.contentcenter.controller.content;
import com.zj.node.contentcenter.rocketmq.MySource;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 生產(chǎn)者
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
private final MySource mySource;
@GetMapping("/test-stream")
public String testStream(){
Message<String> message = MessageBuilder
.withPayload("消息體")
.build();
mySource.output()
.send(message);
return "send message success!";
}
}
然后啟動項目訪問該接口,測試消息是否能正常發(fā)送:

改造完生產(chǎn)者后接著改造消費者,首先定義一個用于消費消息的接口,具體代碼如下:
package com.zj.node.usercenter.rocketmq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 自定義消費消息接口,與stream默認(rèn)提供的Sink源碼是類似的
*
* @author 01
* @date 2019-08-10
**/
public interface MySink {
/**
* Input channel name.
*/
String MY_INPUT = "my-input";
/**
* @return input channel.
*/
@Input(MY_INPUT)
SubscribableChannel input();
}
同樣需要在啟動類的@EnableBinding中,添加這個接口:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding({Sink.class, MySink.class})
...
在配置文件中添加如下配置:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# 消費者為input
input:
# 用于指定topic
destination: stream-test-topic
# rocketmq必須配置group,否則啟動會報錯
# 如果使用的是其他MQ,則不是必須配置的
group: binder-group
# 自定義的”input“,這里的名稱需要與MySink接口里的MY_INPUT相對應(yīng)
my-input:
# 綁定不同的topic
destination: stream-my-topic
group: my-group
修改消費者的代碼如下:
package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
/**
* 消費者
*
* @author 01
* @date 2019-08-10
**/
@Slf4j
@Service
public class TestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void receive(String messageBody) {
log.info("自定義接口 - 通過stream收到了消息,messageBody = {}", messageBody);
}
}
啟動項目,由于先前我們已經(jīng)通過生產(chǎn)者往RocketMQ投遞了消息,所以此時控制臺會輸出接收到的消息,如下:

Spring Cloud Stream的監(jiān)控
我們都知道Spring Boot Actuator組件用于暴露監(jiān)控端點,很多監(jiān)控工具都需要依賴該組件的監(jiān)控端點實現(xiàn)監(jiān)控。而項目集成了Stream及Actuator后也會暴露相應(yīng)的監(jiān)控端點,首先需要在項目里集成Actuator,添加依賴如下:
<!-- actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
在配置文件中添加如下配置:
management:
endpoints:
web:
exposure:
# 暴露所有監(jiān)控端點
include: '*'
endpoint:
health:
# 顯示健康檢測詳情
show-details: always
訪問http://127.0.0.1:{項目端口}/actuator可以獲取所有暴露出來的監(jiān)控端點,Stream的相關(guān)監(jiān)控端點也在其列,如下圖:

/actuator/bindings端點可以用于查看bindings相關(guān)信息:

/actuator/channels端點用于查看channels的相關(guān)信息,而“input”和“output”就是所謂的channel,可以認(rèn)為這些channel是topic的抽象:

在/actuator/health端點中可以查看binder及RocketMQ的狀態(tài),主要是用于查看MQ的連接情況,如果連接不上其status則為DOWN:

Spring Cloud Stream + RocketMQ實現(xiàn)事務(wù)消息
先前在Spring Cloud Alibaba RocketMQ - 構(gòu)建異步通信的微服務(wù)一文的末尾中,我們介紹了RocketMQ的事務(wù)消息并且也演示了如何編碼實現(xiàn)。在本文學(xué)習(xí)了Spring Cloud Stream之后,我們來結(jié)合Stream對之前實現(xiàn)事務(wù)消息的代碼進(jìn)行重構(gòu)。
首先修改配置文件如下:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
output:
producer:
# 開啟事務(wù)消息,這樣通過output這個channel發(fā)送的消息都是半消息
transactional: true
# 生產(chǎn)者所在的事務(wù)組名稱
group: tx-test-producer-group
bindings:
# 生產(chǎn)者為output
output:
# 用于指定topic
destination: stream-test-topic
然后重構(gòu)TestProducerService,具體代碼如下:
package com.zj.node.contentcenter.service.test;
import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* @author 01
* @date 2019-08-08
**/
@Service
@RequiredArgsConstructor
public class TestProducerService {
private final NoticeMapper noticeMapper;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
private final Source source;
public String testSendMsg(Notice notice) {
// 生成事務(wù)id
String transactionId = UUID.randomUUID().toString();
// 通過stream發(fā)送消息,這里實際發(fā)送的就是半消息
source.output().send(
MessageBuilder.withPayload("消息體")
// header是消息的頭部分,可以用作傳參
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("notice_id", notice.getId())
// 對象需要轉(zhuǎn)換成json,否則默認(rèn)是調(diào)用對象的toString方法轉(zhuǎn)換為字符串
.setHeader("notice", JSON.toJSONString(notice))
.build()
);
return "send message success";
}
@Transactional(rollbackFor = Exception.class)
public void updateNotice(Integer noticeId, Notice notice) {
Notice newNotice = new Notice();
newNotice.setId(noticeId);
newNotice.setContent(notice.getContent());
noticeMapper.updateByPrimaryKeySelective(newNotice);
}
@Transactional(rollbackFor = Exception.class)
public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
updateNotice(noticeId, notice);
// 寫入事務(wù)日志
rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("updateNotice")
.build()
);
}
}
最后是重構(gòu)TestTransactionListener,具體代碼如下:
package com.zj.node.contentcenter.rocketmq;
import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
/**
* 本地事務(wù)監(jiān)聽器
*
* @author 01
* @date 2019-08-08
**/
@Slf4j
@RequiredArgsConstructor
// 這里的txProducerGroup需要與配置文件里配置的一致
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {
private final TestProducerService service;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 用于執(zhí)行本地事務(wù)的方法
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("執(zhí)行本地事務(wù)方法. 事務(wù)id: {}", transactionId);
Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
// 由于從header里獲取的對象是json格式所以需要進(jìn)行轉(zhuǎn)換
Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class);
try {
// 執(zhí)行帶有事務(wù)注解的方法
service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId);
// 正常執(zhí)行向MQ Server發(fā)送commit消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事務(wù)方法發(fā)生異常,消息將被回滾", e);
// 發(fā)生異常向MQ Server發(fā)送rollback消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 用于回查本地事務(wù)的執(zhí)行結(jié)果
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.warn("回查本地事務(wù)狀態(tài). 事務(wù)id: {}", transactionId);
// 按事務(wù)id查詢?nèi)罩緮?shù)據(jù)
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
// 如果能按事務(wù)id查詢出來數(shù)據(jù)表示本地事務(wù)執(zhí)行成功,沒有數(shù)據(jù)則表示本地事務(wù)執(zhí)行失敗
if (transactionLog == null) {
log.warn("本地事務(wù)執(zhí)行失敗,事務(wù)日志不存在,消息將被回滾. 事務(wù)id: {}", transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}