Spring Cloud入門(mén)教程系列:
- Spring Cloud入門(mén)教程(一):服務(wù)治理(Eureka)
- Spring Cloud入門(mén)教程(二):客戶端負(fù)載均衡(Ribbon)
- Spring Cloud入門(mén)教程(三):聲明式服務(wù)調(diào)用(Feign)
- Spring Cloud入門(mén)教程(四):微服務(wù)容錯(cuò)保護(hù)(Hystrix)
- Spring Cloud入門(mén)教程(五):API服務(wù)網(wǎng)關(guān)(Zuul) 上
- Spring Cloud入門(mén)教程(六):API服務(wù)網(wǎng)關(guān)(Zuul) 下
- Spring Cloud入門(mén)教程(七):分布式鏈路跟蹤(Sleuth)
- Spring Cloud入門(mén)教程(八):統(tǒng)一配置中心(Config)
- Spring Cloud入門(mén)教程(九):基于消息驅(qū)動(dòng)開(kāi)發(fā)(Stream)
本人和同事撰寫(xiě)的《Spring Cloud微服務(wù)架構(gòu)開(kāi)發(fā)實(shí)戰(zhàn)》一書(shū)也在京東、當(dāng)當(dāng)?shù)葧?shū)店上架,大家可以點(diǎn)擊這里前往購(gòu)買(mǎi),多謝大家支持和捧場(chǎng)!
在我們開(kāi)始講Spring Cloud Bus之前來(lái)看另外一個(gè)IT術(shù)語(yǔ):ESB(Enterprise Service Bus)。ESB在維基百科中是這樣描述的:
企業(yè)服務(wù)總線(Enterprise Service Bus,ESB)的概念是從服務(wù)導(dǎo)向架構(gòu)(Service Oriented Architecture, SOA)發(fā)展而來(lái)。SOA描述了一種IT基礎(chǔ)設(shè)施的應(yīng)用集成模型;其中的軟構(gòu)件集是以一種定義清晰的層次化結(jié)構(gòu)來(lái)相互耦合。一個(gè)ESB是一個(gè)預(yù)先組裝的SOA實(shí)現(xiàn),它包含了實(shí)現(xiàn)SOA分層目標(biāo)所必需的基礎(chǔ)功能部件。
在企業(yè)計(jì)算領(lǐng)域,企業(yè)服務(wù)總線是指由中間件基礎(chǔ)設(shè)施產(chǎn)品技術(shù)實(shí)現(xiàn)的、 通過(guò)事件驅(qū)動(dòng)和基于XML消息引擎,為更復(fù)雜的面向服務(wù)的架構(gòu)提供的軟件架構(gòu)的構(gòu)造物。企業(yè)服務(wù)總線通常在企業(yè)消息系統(tǒng)上提供一個(gè)抽象層,使得集成架構(gòu)師能夠不用編碼而是利用消息的價(jià)值完成集成工作。
企業(yè)服務(wù)總線提供可靠消息傳輸,服務(wù)接入,協(xié)議轉(zhuǎn)換,數(shù)據(jù)格式轉(zhuǎn)換,基于內(nèi)容的路由等功能,屏蔽了服務(wù)的物理位置,協(xié)議和數(shù)據(jù)格式。
其中,最重要的一句就是:企業(yè)服務(wù)總線通常在企業(yè)消息系統(tǒng)上提供一個(gè)抽象層,使得集成架構(gòu)師能夠不用編碼而是利用消息的價(jià)值完成集成工作。 通俗一點(diǎn)來(lái)講就是企業(yè)服務(wù)總線是架構(gòu)在消息中間件之上的另外一個(gè)抽象層,使得我們可以不用關(guān)心消息相關(guān)的處理就可以完成業(yè)務(wù)邏輯的處理。
到這里你是不是有點(diǎn)突然明白Spring Cloud Bus 和 Spring Cloud Stream之間的關(guān)系了,剛開(kāi)始接觸這兩個(gè)組件時(shí),大部分都會(huì)迷惑到底這兩者有什么區(qū)別?它們又有什么聯(lián)系?Stream通過(guò)對(duì)消息中間件進(jìn)行抽象封裝,提供一個(gè)統(tǒng)一的接口供我們發(fā)送和監(jiān)聽(tīng)消息,而B(niǎo)us則是在Stream基礎(chǔ)之上再次進(jìn)行抽象封裝,使得我們可以在不用理解消息發(fā)送、監(jiān)聽(tīng)等概念的基礎(chǔ)上使用消息來(lái)完成業(yè)務(wù)邏輯的處理。
那么Spring Cloud Bus是如何為我們實(shí)現(xiàn)的呢?一句話概括就是事件機(jī)制。
1. Spring的事件機(jī)制
在Spring框架中有一個(gè)事件機(jī)制,該機(jī)制是一個(gè)觀察者模式的實(shí)現(xiàn)。觀察者模式建立一種對(duì)象與對(duì)象之間的依賴(lài)關(guān)系,當(dāng)一個(gè)對(duì)象(稱(chēng)之為:觀察目標(biāo))發(fā)生改變時(shí)將自動(dòng)通知其它對(duì)象(稱(chēng)之為:觀察者),這些觀察者將做出相應(yīng)的反應(yīng)。一個(gè)觀察目標(biāo)可以對(duì)應(yīng)多個(gè)觀察者,而且這些觀察者之間沒(méi)有相互聯(lián)系,可以根據(jù)需要增加和刪除觀察者,使得系統(tǒng)更易于擴(kuò)展。通過(guò)Spring事件機(jī)制可以達(dá)到如下目的:
- 應(yīng)用模塊之間的解耦;
- 對(duì)同一種事件可以根據(jù)需要定義多種處理方式;
- 對(duì)主線應(yīng)用不干擾,是一個(gè)極佳的開(kāi)閉原則(OCP)實(shí)踐。
當(dāng)我們?cè)趹?yīng)用中引入事件機(jī)制時(shí)需要借助Spring中以下接口或抽象類(lèi):
- ApplicationEventPublisher: 這是一個(gè)接口,用來(lái)發(fā)布一個(gè)事件;
- ApplicationEvent: 這是一個(gè)抽象類(lèi),用來(lái)定義一個(gè)事件;
- ApplicationListener<E extends ApplicationEvent>: 這是一個(gè)接口,實(shí)現(xiàn)事件的監(jiān)聽(tīng)。
其中Spring應(yīng)用的上下文ApplicationContext默認(rèn)是實(shí)現(xiàn)了ApplicationEventPublisher接口,因此在發(fā)布事件時(shí)我們可以直接使用ApplicationContext.publishEvent()方法來(lái)發(fā)送。
一個(gè)典型的Spring事件發(fā)送與監(jiān)聽(tīng)代碼如下。
1.1 定義事件
比如,我們定義一個(gè)用戶事件:
/**
* 用戶事件
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class UserEvent extends ApplicationEvent {
/** 消息類(lèi)型:更新用戶,值為: {@value} */
public static final String ET_UPDATE = "update";
// ========================================================================
// fields =================================================================
private String action;
private User user;
// ========================================================================
// constructor ============================================================
public UserEvent(User user) {
super(user);
this.user = user;
}
public UserEvent(User user, String action) {
super(user);
this.action = action;
this.user = user;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("action", this.getAction())
.add("user", this.getUser()).toString();
}
// ==================================================================
// setter/getter ====================================================
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}
}
1.2 定義監(jiān)聽(tīng)
我們定義一個(gè)用戶事件監(jiān)聽(tīng)器,當(dāng)用戶變更時(shí)做相應(yīng)處理:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 用戶事件監(jiān)聽(tīng)
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Component
public class UserEventListener implements ApplicationListener<UserEvent> {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onApplicationEvent(UserEvent userEvent) {
this.logger.debug("收到用戶事件:{} ", userEvent);
// TODO: 實(shí)現(xiàn)具體的業(yè)務(wù)處理
}
}
用戶事件監(jiān)聽(tīng)比較簡(jiǎn)單,只需要實(shí)現(xiàn)ApplicationListener接口,進(jìn)行相應(yīng)處理即可。
1.3 發(fā)送消息
發(fā)送消息比較簡(jiǎn)單,我們也可以直接在Event中實(shí)現(xiàn),比如我們將上面UserEvent更改為如下:
/**
* 用戶事件
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class UserEvent extends ApplicationEvent {
// 省略了之前的代碼
/**
* 發(fā)布事件
*/
public void fire() {
ApplicationContext context = ApplicationContextHolder.getApplicationContext();
if(null != context) {
logger.debug("發(fā)布事件:{}", this);
context.publishEvent(this);
}else{
logger.warn("無(wú)法獲取到當(dāng)前Spring上下文信息,不能夠發(fā)布事件");
}
}
}
那么我們就可以在需要的地方通過(guò)下面的代碼來(lái)發(fā)布事件了:
new UserEvent(user, UserEvent.ET_UPDATE).fire();
2. Spring Cloud Bus機(jī)制
我們上面了解了Spring的事件機(jī)制,那么Spring Cloud Bus又是如何將事件機(jī)制和Stream結(jié)合在一起的呢?總起來(lái)說(shuō)機(jī)制如下:
- 在需要發(fā)布或者監(jiān)聽(tīng)事件的應(yīng)用中增加
@RemoteApplicationEventScan注解,通過(guò)該注解就可以啟動(dòng)Stream中所說(shuō)的消息通道的綁定; - 對(duì)于事件發(fā)布,則需要繼承
ApplicationEvent的擴(kuò)展類(lèi) --RemoteApplicationEvent,當(dāng)通過(guò)ApplicationContext.publishEvent()發(fā)布此種類(lèi)型的事件時(shí),Spring Cloud Bus就會(huì)對(duì)所要發(fā)布的事件進(jìn)行包裝,形成一個(gè)我們所熟知的消息,然后通過(guò)默認(rèn)的springCloudBus消息通道發(fā)送到消息中間件; - 對(duì)于事件監(jiān)聽(tīng)者則不需要進(jìn)行任何變更,仍舊按照上面的方式就可以實(shí)現(xiàn)消息的監(jiān)聽(tīng)。但,需要注意的一點(diǎn)就是在消費(fèi)的微服務(wù)工程中也必須定義第2步所定義的事件,并且需要保障全類(lèi)名一致(如果不一致,則需要做一點(diǎn)工作)。
嗯,就是這么簡(jiǎn)單。通過(guò)Bus我們就可以像編寫(xiě)單體架構(gòu)應(yīng)用一樣進(jìn)行開(kāi)發(fā),而不需要關(guān)系什么消息中間件、主題、消息、通道呀等等一大堆概念。
你也行在懷疑,是不是這么簡(jiǎn)單呀。那好,讓我們來(lái)看看是不是很容易就可以實(shí)現(xiàn)Stream中示例。
3. 重構(gòu)Spring Cloud Stream中的示例
3.1 重構(gòu)商品微服務(wù)
3.1.1 增加對(duì)Bus的依賴(lài)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
3.1.2 構(gòu)建商品事件
我們將原來(lái)商品配置變更所發(fā)送的消息更改為一個(gè)事件,代碼如下:
package io.twostepsfromjava.cloud.bus;
import com.google.common.base.MoreObjects;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
/**
* 商品事件
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class ProductEvent extends RemoteApplicationEvent {
/** 消息類(lèi)型:更新商品,值為: {@value} */
public static final String ET_UPDATE = "update";
/** 消息類(lèi)型:刪除商品,值為: {@value} */
public static final String ET_DELETE = "delete";
// ========================================================================
// fields =================================================================
private String action;
private String itemCode;
// ========================================================================
// constructor ============================================================
public ProductEvent() {
super();
}
public ProductEvent(Object source, String originService, String destinationService, String action, String itemCode) {
super(source, originService, destinationService);
this.action = action;
this.itemCode = itemCode;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("action", this.getAction())
.add("itemCode", this.getItemCode()).toString();
}
// ==================================================================
// setter/getter ====================================================
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getItemCode() {
return itemCode;
}
public void setItemCode(String itemCode) {
this.itemCode = itemCode;
}
}
這里和之前事件構(gòu)建函數(shù)不同的是:在構(gòu)建一個(gè)事件時(shí)需要指定originService和destinationService。對(duì)于事件發(fā)布者來(lái)說(shuō)originService就是自己,而destinationService則是指將事件發(fā)布到那些微服務(wù)實(shí)例。destinationService配置的格式為:{serviceId}:{appContextId},在配置時(shí)serviceId和appContextId可以使用通配符,如果這兩個(gè)變量都使用通配符的話(*:**),則事件將發(fā)布到所有的微服務(wù)實(shí)例。如只省略appContextId,則事件只會(huì)發(fā)布給指定微服務(wù)的所有實(shí)例,如:userservice:**,則只會(huì)將事件發(fā)布給userservice微服務(wù)。
3.1.3 實(shí)現(xiàn)事件發(fā)布
我們將商品微服務(wù)中商品變更中的代碼修改為如下:
package io.twostepsfromjava.cloud.product.service;
import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.product.dto.ProductDto;
import io.twostepsfromjava.cloud.product.mq.ProductMsg;
import io.twostepsfromjava.cloud.product.util.ApplicationContextHolder;
import io.twostepsfromjava.cloud.product.util.RemoteApplicationEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* 商品服務(wù)
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Service
public class ProductService {
protected Logger logger = LoggerFactory.getLogger(ProductService.class);
private List<ProductDto> productList;
@Autowired
public ProductService() {
this.productList = this.buildProducts();
}
// 省略了不相干的代碼
/**
* 保存或更新商品信息
* @param productDto
* @return
*/
public ProductDto save(ProductDto productDto) {
// TODO: 實(shí)現(xiàn)商品保存處理
for (ProductDto sourceProductDto : this.productList) {
if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
sourceProductDto.setName(sourceProductDto.getName() + "-new");
sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
productDto = sourceProductDto;
break;
}
}
// 發(fā)送商品消息
// this.sendMsg(ProductMsg.MA_UPDATE, productDto.getItemCode());
// 發(fā)布商品變更消息
this.fireEvent(ProductEvent.ET_UPDATE, productDto);
return productDto;
}
// 這里已不再使用該方法
protected void sendMsg(String msgAction, String itemCode) {
ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
this.logger.debug("發(fā)送商品消息:{} ", productMsg);
// 發(fā)送消息
// this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}
protected void fireEvent(String eventAction, ProductDto productDto) {
ProductEvent productEvent = new ProductEvent(productDto,
ApplicationContextHolder.getApplicationContext().getId(), "*:**",
eventAction, productDto.getItemCode());
// 發(fā)布事件
RemoteApplicationEventPublisher.publishEvent(productEvent);
}
}
其中RemoteApplicationEventPublisher的源碼如下:
package io.twostepsfromjava.cloud.product.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationContext;
/**
* 遠(yuǎn)程事件發(fā)布者
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class RemoteApplicationEventPublisher {
protected static Logger logger = LoggerFactory.getLogger(RemoteApplicationEventPublisher.class);
/**
* 發(fā)布一個(gè)事件
* @param event
*/
public static void publishEvent(RemoteApplicationEvent event){
ApplicationContext context = ApplicationContextHolder.getApplicationContext();
if(null != context) {
context.publishEvent(event);
logger.debug("已發(fā)布事件:{}", event);
}else{
logger.warn("無(wú)法獲取到當(dāng)前Spring上下文信息,不能夠發(fā)布事件");
}
}
}
3.1.4 開(kāi)啟遠(yuǎn)程消息掃描
最后,修改微服務(wù)啟動(dòng)類(lèi),添加@RemoteApplicationEventScan注解:
package io.twostepsfromjava.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* TwoStepsFromJava Cloud -- ProductDto Service 服務(wù)器
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@EnableDiscoveryClient
@RemoteApplicationEventScan
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
注意: 這里再次聲明,遠(yuǎn)程事件必須定義在
@RemoteApplicationEventScan注解所注解類(lèi)的子包中,否則無(wú)法實(shí)現(xiàn)遠(yuǎn)程事件發(fā)布。
到這里我們的商品微服務(wù)重構(gòu)就完成了。下面接著對(duì)Mall-Web微服務(wù)進(jìn)行修改。
3.2 重構(gòu)Mall-Web微服務(wù)
3.2.1 增加對(duì)Bus依賴(lài)
和商品微服務(wù)一樣,就不重復(fù)了。
3.2.2 拷貝ProductEvent到本項(xiàng)目
呃,這個(gè)就不描述了。
3.2.3 實(shí)現(xiàn)事件監(jiān)聽(tīng)處理
這個(gè)代碼非常簡(jiǎn)單,不多說(shuō),具體如下:
package io.twostepsfromjava.cloud.web.mall.service;
import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.web.mall.dto.ProductDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* 遠(yuǎn)程事件監(jiān)聽(tīng)
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Component
public class ProductEventListener implements ApplicationListener<ProductEvent> {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected ProductService productService;
@Override
public void onApplicationEvent(ProductEvent productEvent) {
if (ProductEvent.ET_UPDATE.equalsIgnoreCase(productEvent.getAction())) {
this.logger.debug("Web微服務(wù)收到商品變更事件,商品貨號(hào): {}", productEvent.getItemCode());
// 重新獲取該商品信息
ProductDto productDto = this.productService.loadByItemCode(productEvent.getItemCode());
if (null != productDto)
this.logger.debug("重新獲取到的商品信息為:{}", productDto);
else
this.logger.debug("貨號(hào)為:{} 的商品不存在", productEvent.getItemCode());
} else if (ProductEvent.ET_DELETE.equalsIgnoreCase(productEvent.getAction())) {
this.logger.debug("Web微服務(wù)收到商品刪除事件,所要?jiǎng)h除商品貨號(hào)為: {}", productEvent.getItemCode());
} else {
this.logger.debug("Web微服務(wù)收到未知商品事件: {}", productEvent);
}
}
}
3.2.3 開(kāi)啟遠(yuǎn)程消息掃描
和商品微服務(wù)一樣,不論是事件的發(fā)布還是事件的監(jiān)聽(tīng)都需要開(kāi)啟遠(yuǎn)程消息掃描。直接在微服務(wù)引導(dǎo)類(lèi)中增加@RemoteApplicationEventScan注解即可。
3.3 測(cè)試
我們的重構(gòu)到此就全部完成了,下面依次分別啟動(dòng):
- Kafka服務(wù)器;
- 服務(wù)治理服務(wù)器: Service-discovery;
- 商品微服務(wù): Product-Service;
- Mall-Web微服務(wù)。
然后,使用Postman訪問(wèn)原來(lái)的消息測(cè)試端點(diǎn): http://localhost:2100/products/item-2。在商品微服務(wù)的控制臺(tái),可以看到類(lèi)似下面輸出:

從輸出日志中可以看到商品事件已經(jīng)發(fā)布出去。如果這個(gè)時(shí)候我們查看Mall-Web微服務(wù)的控制臺(tái),可以看到下圖的輸出:

從日志輸出中可以看到Mall-Web微服務(wù)已經(jīng)能夠正確接收到商品變更事件,并進(jìn)行相應(yīng)的處理。
3.4 小結(jié)
從重構(gòu)后的代碼來(lái)說(shuō)的確使用Bus會(huì)更容易理解,也更容易上手。這對(duì)于當(dāng)使用場(chǎng)合比較簡(jiǎn)單會(huì)非常好,比如:廣播。典型的應(yīng)用就是Config中的配置刷新,當(dāng)在項(xiàng)目中同時(shí)引入了Config和Bus時(shí),就可以通過(guò)/bus/refresh端點(diǎn)實(shí)現(xiàn)配置更改的廣播,從而讓相應(yīng)的微服務(wù)重新加載配置數(shù)據(jù)。
當(dāng)然,Bus簡(jiǎn)便性的另外一層含義就是不夠靈活,因此具體是在項(xiàng)目中使用Bug還是直接使用Stream就看你的需要了,總起來(lái)一句就是:夠用就好。
你可以到這里下載本篇的代碼。
