Spring Cloud入門(mén)教程(十):消息總線(Bus)

Spring Cloud入門(mén)教程系列:

本人和同事撰寫(xiě)的《Spring Cloud微服務(wù)架構(gòu)開(kāi)發(fā)實(shí)戰(zhàn)》一書(shū)也在京東、當(dāng)當(dāng)?shù)葧?shū)店上架,大家可以點(diǎn)擊這里前往購(gòu)買(mǎi),多謝大家支持和捧場(chǎng)!


在我們開(kāi)始講Spring Cloud Bus之前來(lái)看另外一個(gè)IT術(shù)語(yǔ):ESB(Enterprise Service Bus)。ESB在維基百科中是這樣描述的:

企業(yè)服務(wù)總線(Enterprise Service Bus,ESB)的概念是從服務(wù)導(dǎo)向架構(gòu)(Service Oriented Architecture, SOA)發(fā)展而來(lái)。SOA描述了一種IT基礎(chǔ)設(shè)施的應(yīng)用集成模型;其中的軟構(gòu)件集是以一種定義清晰的層次化結(jié)構(gòu)來(lái)相互耦合。一個(gè)ESB是一個(gè)預(yù)先組裝的SOA實(shí)現(xiàn),它包含了實(shí)現(xiàn)SOA分層目標(biāo)所必需的基礎(chǔ)功能部件。

在企業(yè)計(jì)算領(lǐng)域,企業(yè)服務(wù)總線是指由中間件基礎(chǔ)設(shè)施產(chǎn)品技術(shù)實(shí)現(xiàn)的、 通過(guò)事件驅(qū)動(dòng)和基于XML消息引擎,為更復(fù)雜的面向服務(wù)的架構(gòu)提供的軟件架構(gòu)的構(gòu)造物。企業(yè)服務(wù)總線通常在企業(yè)消息系統(tǒng)上提供一個(gè)抽象層,使得集成架構(gòu)師能夠不用編碼而是利用消息的價(jià)值完成集成工作。

企業(yè)服務(wù)總線提供可靠消息傳輸,服務(wù)接入,協(xié)議轉(zhuǎn)換,數(shù)據(jù)格式轉(zhuǎn)換,基于內(nèi)容的路由等功能,屏蔽了服務(wù)的物理位置,協(xié)議和數(shù)據(jù)格式。

其中,最重要的一句就是:企業(yè)服務(wù)總線通常在企業(yè)消息系統(tǒng)上提供一個(gè)抽象層,使得集成架構(gòu)師能夠不用編碼而是利用消息的價(jià)值完成集成工作。 通俗一點(diǎn)來(lái)講就是企業(yè)服務(wù)總線是架構(gòu)在消息中間件之上的另外一個(gè)抽象層,使得我們可以不用關(guān)心消息相關(guān)的處理就可以完成業(yè)務(wù)邏輯的處理。

到這里你是不是有點(diǎn)突然明白Spring Cloud Bus 和 Spring Cloud Stream之間的關(guān)系了,剛開(kāi)始接觸這兩個(gè)組件時(shí),大部分都會(huì)迷惑到底這兩者有什么區(qū)別?它們又有什么聯(lián)系?Stream通過(guò)對(duì)消息中間件進(jìn)行抽象封裝,提供一個(gè)統(tǒng)一的接口供我們發(fā)送和監(jiān)聽(tīng)消息,而B(niǎo)us則是在Stream基礎(chǔ)之上再次進(jìn)行抽象封裝,使得我們可以在不用理解消息發(fā)送、監(jiān)聽(tīng)等概念的基礎(chǔ)上使用消息來(lái)完成業(yè)務(wù)邏輯的處理。

那么Spring Cloud Bus是如何為我們實(shí)現(xiàn)的呢?一句話概括就是事件機(jī)制。

1. Spring的事件機(jī)制

在Spring框架中有一個(gè)事件機(jī)制,該機(jī)制是一個(gè)觀察者模式的實(shí)現(xiàn)。觀察者模式建立一種對(duì)象與對(duì)象之間的依賴(lài)關(guān)系,當(dāng)一個(gè)對(duì)象(稱(chēng)之為:觀察目標(biāo))發(fā)生改變時(shí)將自動(dòng)通知其它對(duì)象(稱(chēng)之為:觀察者),這些觀察者將做出相應(yīng)的反應(yīng)。一個(gè)觀察目標(biāo)可以對(duì)應(yīng)多個(gè)觀察者,而且這些觀察者之間沒(méi)有相互聯(lián)系,可以根據(jù)需要增加和刪除觀察者,使得系統(tǒng)更易于擴(kuò)展。通過(guò)Spring事件機(jī)制可以達(dá)到如下目的:

  • 應(yīng)用模塊之間的解耦;
  • 對(duì)同一種事件可以根據(jù)需要定義多種處理方式;
  • 對(duì)主線應(yīng)用不干擾,是一個(gè)極佳的開(kāi)閉原則(OCP)實(shí)踐。

當(dāng)我們?cè)趹?yīng)用中引入事件機(jī)制時(shí)需要借助Spring中以下接口或抽象類(lèi):

  • ApplicationEventPublisher: 這是一個(gè)接口,用來(lái)發(fā)布一個(gè)事件;
  • ApplicationEvent: 這是一個(gè)抽象類(lèi),用來(lái)定義一個(gè)事件;
  • ApplicationListener<E extends ApplicationEvent>: 這是一個(gè)接口,實(shí)現(xiàn)事件的監(jiān)聽(tīng)。

其中Spring應(yīng)用的上下文ApplicationContext默認(rèn)是實(shí)現(xiàn)了ApplicationEventPublisher接口,因此在發(fā)布事件時(shí)我們可以直接使用ApplicationContext.publishEvent()方法來(lái)發(fā)送。

一個(gè)典型的Spring事件發(fā)送與監(jiān)聽(tīng)代碼如下。

1.1 定義事件

比如,我們定義一個(gè)用戶事件:

/**
 * 用戶事件
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class UserEvent extends ApplicationEvent {
    /** 消息類(lèi)型:更新用戶,值為: {@value} */
    public static final String ET_UPDATE = "update";

    // ========================================================================
    // fields =================================================================
    private String action;
    private User user;

    // ========================================================================
    // constructor ============================================================
    public UserEvent(User user) {
        super(user);
        this.user = user;
    }

    public UserEvent(User user, String action) {
        super(user);
        this.action = action;
        this.user = user;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("action", this.getAction())
                .add("user", this.getUser()).toString();
    }

    // ==================================================================
    // setter/getter ====================================================
    public String getAction() {
        return action;
    }
    public void setAction(String action) {
        this.action = action;
    }

    public User getUser() {
        return user;
    }
    public void setUser(User user) {
        this.user = user;
    }
}

1.2 定義監(jiān)聽(tīng)

我們定義一個(gè)用戶事件監(jiān)聽(tīng)器,當(dāng)用戶變更時(shí)做相應(yīng)處理:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * 用戶事件監(jiān)聽(tīng)
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Component
public class UserEventListener implements ApplicationListener<UserEvent> {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Override
    public void onApplicationEvent(UserEvent userEvent) {
        this.logger.debug("收到用戶事件:{} ", userEvent);
        // TODO: 實(shí)現(xiàn)具體的業(yè)務(wù)處理
    }
}

用戶事件監(jiān)聽(tīng)比較簡(jiǎn)單,只需要實(shí)現(xiàn)ApplicationListener接口,進(jìn)行相應(yīng)處理即可。

1.3 發(fā)送消息

發(fā)送消息比較簡(jiǎn)單,我們也可以直接在Event中實(shí)現(xiàn),比如我們將上面UserEvent更改為如下:

/**
 * 用戶事件
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class UserEvent extends ApplicationEvent {
     // 省略了之前的代碼
    /**
     * 發(fā)布事件
     */
    public void fire() {
        ApplicationContext context = ApplicationContextHolder.getApplicationContext();
        if(null != context) {
            logger.debug("發(fā)布事件:{}", this);
            context.publishEvent(this);            
        }else{
            logger.warn("無(wú)法獲取到當(dāng)前Spring上下文信息,不能夠發(fā)布事件");
        }
    }
}

那么我們就可以在需要的地方通過(guò)下面的代碼來(lái)發(fā)布事件了:

new UserEvent(user, UserEvent.ET_UPDATE).fire();

2. Spring Cloud Bus機(jī)制

我們上面了解了Spring的事件機(jī)制,那么Spring Cloud Bus又是如何將事件機(jī)制和Stream結(jié)合在一起的呢?總起來(lái)說(shuō)機(jī)制如下:

  1. 在需要發(fā)布或者監(jiān)聽(tīng)事件的應(yīng)用中增加@RemoteApplicationEventScan注解,通過(guò)該注解就可以啟動(dòng)Stream中所說(shuō)的消息通道的綁定;
  2. 對(duì)于事件發(fā)布,則需要繼承ApplicationEvent的擴(kuò)展類(lèi) -- RemoteApplicationEvent,當(dāng)通過(guò)ApplicationContext.publishEvent()發(fā)布此種類(lèi)型的事件時(shí),Spring Cloud Bus就會(huì)對(duì)所要發(fā)布的事件進(jìn)行包裝,形成一個(gè)我們所熟知的消息,然后通過(guò)默認(rèn)的springCloudBus消息通道發(fā)送到消息中間件;
  3. 對(duì)于事件監(jiān)聽(tīng)者則不需要進(jìn)行任何變更,仍舊按照上面的方式就可以實(shí)現(xiàn)消息的監(jiān)聽(tīng)。但,需要注意的一點(diǎn)就是在消費(fèi)的微服務(wù)工程中也必須定義第2步所定義的事件,并且需要保障全類(lèi)名一致(如果不一致,則需要做一點(diǎn)工作)。

嗯,就是這么簡(jiǎn)單。通過(guò)Bus我們就可以像編寫(xiě)單體架構(gòu)應(yīng)用一樣進(jìn)行開(kāi)發(fā),而不需要關(guān)系什么消息中間件、主題、消息、通道呀等等一大堆概念。

你也行在懷疑,是不是這么簡(jiǎn)單呀。那好,讓我們來(lái)看看是不是很容易就可以實(shí)現(xiàn)Stream中示例。

3. 重構(gòu)Spring Cloud Stream中的示例

3.1 重構(gòu)商品微服務(wù)

3.1.1 增加對(duì)Bus的依賴(lài)

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

3.1.2 構(gòu)建商品事件

我們將原來(lái)商品配置變更所發(fā)送的消息更改為一個(gè)事件,代碼如下:

package io.twostepsfromjava.cloud.bus;


import com.google.common.base.MoreObjects;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;

/**
 * 商品事件
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class ProductEvent extends RemoteApplicationEvent {
    /** 消息類(lèi)型:更新商品,值為: {@value} */
    public static final String ET_UPDATE = "update";
    /** 消息類(lèi)型:刪除商品,值為: {@value} */
    public static final String ET_DELETE = "delete";

    // ========================================================================
    // fields =================================================================
    private String action;
    private String itemCode;

    // ========================================================================
    // constructor ============================================================
    public ProductEvent() {
        super();
    }

    public ProductEvent(Object source, String originService, String destinationService, String action, String itemCode) {
        super(source, originService, destinationService);
        this.action = action;
        this.itemCode = itemCode;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("action", this.getAction())
                .add("itemCode", this.getItemCode()).toString();
    }

    // ==================================================================
    // setter/getter ====================================================
    public String getAction() {
        return action;
    }
    public void setAction(String action) {
        this.action = action;
    }

    public String getItemCode() {
        return itemCode;
    }
    public void setItemCode(String itemCode) {
        this.itemCode = itemCode;
    }
}

這里和之前事件構(gòu)建函數(shù)不同的是:在構(gòu)建一個(gè)事件時(shí)需要指定originServicedestinationService。對(duì)于事件發(fā)布者來(lái)說(shuō)originService就是自己,而destinationService則是指將事件發(fā)布到那些微服務(wù)實(shí)例。destinationService配置的格式為:{serviceId}:{appContextId},在配置時(shí)serviceIdappContextId可以使用通配符,如果這兩個(gè)變量都使用通配符的話(*:**),則事件將發(fā)布到所有的微服務(wù)實(shí)例。如只省略appContextId,則事件只會(huì)發(fā)布給指定微服務(wù)的所有實(shí)例,如:userservice:**,則只會(huì)將事件發(fā)布給userservice微服務(wù)。

3.1.3 實(shí)現(xiàn)事件發(fā)布

我們將商品微服務(wù)中商品變更中的代碼修改為如下:

package io.twostepsfromjava.cloud.product.service;

import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.product.dto.ProductDto;
import io.twostepsfromjava.cloud.product.mq.ProductMsg;
import io.twostepsfromjava.cloud.product.util.ApplicationContextHolder;
import io.twostepsfromjava.cloud.product.util.RemoteApplicationEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;


/**
 * 商品服務(wù)
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Service
public class ProductService {
    protected Logger logger = LoggerFactory.getLogger(ProductService.class);

    private List<ProductDto> productList;
    
    @Autowired
    public ProductService() {
        this.productList = this.buildProducts();
    }
    
     // 省略了不相干的代碼
    
    /**
     * 保存或更新商品信息
     * @param productDto
     * @return
     */
    public ProductDto save(ProductDto productDto) {
        // TODO: 實(shí)現(xiàn)商品保存處理
        for (ProductDto sourceProductDto : this.productList) {
            if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
                sourceProductDto.setName(sourceProductDto.getName() + "-new");
                sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
                productDto = sourceProductDto;
                break;
            }
        }

        // 發(fā)送商品消息
        // this.sendMsg(ProductMsg.MA_UPDATE, productDto.getItemCode());
        // 發(fā)布商品變更消息
        this.fireEvent(ProductEvent.ET_UPDATE, productDto);

        return productDto;
    }
     
     // 這里已不再使用該方法
    protected void sendMsg(String msgAction, String itemCode) {
        ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
        this.logger.debug("發(fā)送商品消息:{} ", productMsg);

        // 發(fā)送消息
        // this.source.output().send(MessageBuilder.withPayload(productMsg).build());
    }

    protected void fireEvent(String eventAction, ProductDto productDto) {
        ProductEvent productEvent = new ProductEvent(productDto,
                ApplicationContextHolder.getApplicationContext().getId(), "*:**",
                eventAction, productDto.getItemCode());

        // 發(fā)布事件
        RemoteApplicationEventPublisher.publishEvent(productEvent);
    }
}

其中RemoteApplicationEventPublisher的源碼如下:

package io.twostepsfromjava.cloud.product.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationContext;


/**
 * 遠(yuǎn)程事件發(fā)布者
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class RemoteApplicationEventPublisher {
    protected static Logger logger = LoggerFactory.getLogger(RemoteApplicationEventPublisher.class);

    /**
     * 發(fā)布一個(gè)事件
     * @param event
     */
    public static void publishEvent(RemoteApplicationEvent event){
        ApplicationContext context = ApplicationContextHolder.getApplicationContext();
        if(null != context) {
            context.publishEvent(event);
            logger.debug("已發(fā)布事件:{}", event);
        }else{
            logger.warn("無(wú)法獲取到當(dāng)前Spring上下文信息,不能夠發(fā)布事件");
        }
    }
}

3.1.4 開(kāi)啟遠(yuǎn)程消息掃描

最后,修改微服務(wù)啟動(dòng)類(lèi),添加@RemoteApplicationEventScan注解:

package io.twostepsfromjava.cloud;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * TwoStepsFromJava Cloud -- ProductDto Service 服務(wù)器
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@EnableDiscoveryClient
@RemoteApplicationEventScan
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

注意: 這里再次聲明,遠(yuǎn)程事件必須定義在@RemoteApplicationEventScan注解所注解類(lèi)的子包中,否則無(wú)法實(shí)現(xiàn)遠(yuǎn)程事件發(fā)布。

到這里我們的商品微服務(wù)重構(gòu)就完成了。下面接著對(duì)Mall-Web微服務(wù)進(jìn)行修改。

3.2 重構(gòu)Mall-Web微服務(wù)

3.2.1 增加對(duì)Bus依賴(lài)

和商品微服務(wù)一樣,就不重復(fù)了。

3.2.2 拷貝ProductEvent到本項(xiàng)目

呃,這個(gè)就不描述了。

3.2.3 實(shí)現(xiàn)事件監(jiān)聽(tīng)處理

這個(gè)代碼非常簡(jiǎn)單,不多說(shuō),具體如下:

package io.twostepsfromjava.cloud.web.mall.service;

import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.web.mall.dto.ProductDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;


/**
 * 遠(yuǎn)程事件監(jiān)聽(tīng)
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Component
public class ProductEventListener implements ApplicationListener<ProductEvent> {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    protected ProductService productService;

    @Override
    public void onApplicationEvent(ProductEvent productEvent) {
        if (ProductEvent.ET_UPDATE.equalsIgnoreCase(productEvent.getAction())) {
            this.logger.debug("Web微服務(wù)收到商品變更事件,商品貨號(hào): {}", productEvent.getItemCode());
            // 重新獲取該商品信息
            ProductDto productDto = this.productService.loadByItemCode(productEvent.getItemCode());
            if (null != productDto)
                this.logger.debug("重新獲取到的商品信息為:{}", productDto);
            else
                this.logger.debug("貨號(hào)為:{} 的商品不存在", productEvent.getItemCode());
        } else if (ProductEvent.ET_DELETE.equalsIgnoreCase(productEvent.getAction())) {
            this.logger.debug("Web微服務(wù)收到商品刪除事件,所要?jiǎng)h除商品貨號(hào)為: {}", productEvent.getItemCode());
        } else {
            this.logger.debug("Web微服務(wù)收到未知商品事件: {}", productEvent);
        }
    }
}

3.2.3 開(kāi)啟遠(yuǎn)程消息掃描

和商品微服務(wù)一樣,不論是事件的發(fā)布還是事件的監(jiān)聽(tīng)都需要開(kāi)啟遠(yuǎn)程消息掃描。直接在微服務(wù)引導(dǎo)類(lèi)中增加@RemoteApplicationEventScan注解即可。

3.3 測(cè)試

我們的重構(gòu)到此就全部完成了,下面依次分別啟動(dòng):

  1. Kafka服務(wù)器;
  2. 服務(wù)治理服務(wù)器: Service-discovery;
  3. 商品微服務(wù): Product-Service;
  4. Mall-Web微服務(wù)。

然后,使用Postman訪問(wèn)原來(lái)的消息測(cè)試端點(diǎn): http://localhost:2100/products/item-2。在商品微服務(wù)的控制臺(tái),可以看到類(lèi)似下面輸出:

商品微服務(wù)控制臺(tái)輸出

從輸出日志中可以看到商品事件已經(jīng)發(fā)布出去。如果這個(gè)時(shí)候我們查看Mall-Web微服務(wù)的控制臺(tái),可以看到下圖的輸出:

Mall-Web微服務(wù)控制臺(tái)輸出

從日志輸出中可以看到Mall-Web微服務(wù)已經(jīng)能夠正確接收到商品變更事件,并進(jìn)行相應(yīng)的處理。

3.4 小結(jié)

從重構(gòu)后的代碼來(lái)說(shuō)的確使用Bus會(huì)更容易理解,也更容易上手。這對(duì)于當(dāng)使用場(chǎng)合比較簡(jiǎn)單會(huì)非常好,比如:廣播。典型的應(yīng)用就是Config中的配置刷新,當(dāng)在項(xiàng)目中同時(shí)引入了Config和Bus時(shí),就可以通過(guò)/bus/refresh端點(diǎn)實(shí)現(xiàn)配置更改的廣播,從而讓相應(yīng)的微服務(wù)重新加載配置數(shù)據(jù)。

當(dāng)然,Bus簡(jiǎn)便性的另外一層含義就是不夠靈活,因此具體是在項(xiàng)目中使用Bug還是直接使用Stream就看你的需要了,總起來(lái)一句就是:夠用就好。

你可以到這里下載本篇的代碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來(lái)...
    Chandler_玨瑜閱讀 6,764評(píng)論 2 39
  • (git上的源碼:https://gitee.com/rain7564/spring_microservices_...
    sprainkle閱讀 9,510評(píng)論 8 17
  • 本文參考了:http://blog.didispace.com/springcloud7/http://blog....
    WeiminSun閱讀 7,399評(píng)論 0 23
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都能連接上...
    二月_春風(fēng)閱讀 10,620評(píng)論 0 14

友情鏈接更多精彩內(nèi)容