eShopOnContainers之領(lǐng)域事件、集成事件、事件總線區(qū)別與關(guān)系

微軟eShopOnContainers

eShopOnContainers是一個(gè)簡(jiǎn)化版的基于.NET Core和Docker等技術(shù)開(kāi)發(fā)的面向微服務(wù)架構(gòu)的參考應(yīng)用
其中不僅包含了很多術(shù)語(yǔ)、設(shè)計(jì)模式、架構(gòu)風(fēng)格,還使用了一系列的常見(jiàn)技術(shù)(RabbitMQ、EventBus、IdentityServer4、Polly、Api Gateway、Redis、CQRS、CAP、CI/CD等),還有一些相關(guān)工具(Docker、K8S等)??梢哉f(shuō)是一份全面的技術(shù)整合實(shí)現(xiàn)的應(yīng)用參考。學(xué)習(xí)它會(huì)對(duì)涉及到的技術(shù)的實(shí)際運(yùn)用有更加清晰的了解。

相對(duì)于eshoponcontainer的其他內(nèi)容,它的領(lǐng)域事件、集成(整合)事件、事件總線之間的協(xié)作關(guān)系還是比較難懂的。接下來(lái)我們來(lái)分析一下。

領(lǐng)域事件(DomainEvent)

使用域事件顯式實(shí)現(xiàn)域中的更改的副作用。 如果使用 DDD 術(shù)語(yǔ)表述,即使用域事件跨多個(gè)聚合顯式實(shí)現(xiàn)副作用。 (可選)為了提高可伸縮性并減小對(duì)數(shù)據(jù)庫(kù)鎖定的影響,可在相同域的聚合之間使用最終一致性。

例如在 eShopOnContainers 應(yīng)用程序中,當(dāng)創(chuàng)建訂單時(shí),用戶會(huì)成為買家,因此 OrderStartedDomainEvent 會(huì)被引發(fā)并在 ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler 中進(jìn)行處理
eshop中,一個(gè)領(lǐng)域事件對(duì)應(yīng)一個(gè)handler,使用mediator的INotificationHandler實(shí)現(xiàn)

域事件和集成事件是相同的:都是對(duì)已發(fā)生事件的通知。但是,它們的實(shí)現(xiàn)必須不同。域事件是推送到域事件調(diào)度程序的消息,可基于IoC 容器或任何其他方法作為內(nèi)存中轉(zhuǎn)存進(jìn)程實(shí)現(xiàn)(就是Mediator)

集成事件的目的是將已提交事務(wù)和更新傳播到其他子系統(tǒng),無(wú)論它們是其他微服務(wù)、綁定上下文,還是外部應(yīng)用程序。(集成事件是跨服務(wù)的,領(lǐng)域事件則不是)

例如:

  1. 當(dāng)用戶發(fā)起訂單時(shí),訂單聚合將發(fā)送 OrderStarted 域事件。 OrderStarted 域事件基于標(biāo)識(shí)微服務(wù)中的原始用戶信息(包含 CreateOrder 命令中提供的信息),由買方聚合處理,以在訂購(gòu)微服務(wù)時(shí)創(chuàng)建買家對(duì)象。
  2. 每個(gè) OrderItem 子實(shí)體可以在項(xiàng)目?jī)r(jià)格高于特定金額,或產(chǎn)品項(xiàng)目金額過(guò)高時(shí),引發(fā)事件。 然后,聚合根可以接收這些事件,并執(zhí)行全局計(jì)算Order的總額。
  3. 系統(tǒng)中,當(dāng)文章類目新增或者刪除時(shí),需要刷新緩存,從而實(shí)現(xiàn)統(tǒng)一,則可以在新增類目或者刪除類目時(shí),出發(fā)刷新?lián)Q緩存的一個(gè)事件:
/// <summary>
/// 添加類目
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<CommandResult> Handle(CreateCategoryCommand request, CancellationToken cancellationToken)
{
    var rep = _uow.GetBaseRepository<Category>();
    var model = rep.GetFirstOrDefault(predicate:x => x.DisplayName == request.Name,disableTracking:true);
    if (model!=null)
    {
        return CommandResult.Fail("名稱重復(fù)");
    }
    var entity = new Category(categoryName: request.Name, displayName: request.Name);
    var res= await rep.InsertAsync(entity);
    if (res.Entity!=null)
    {
        entity.AddCacheChangeDomainEvent(new List<string> { _cacheKeyMgr.PostCategoryListlKey() }); // 發(fā)出事件后,對(duì)應(yīng)的handler會(huì)刷新緩存
        return CommandResult.Success();
    }
    return CommandResult.Fail("添加失敗");
}

集成事件(IntegrationEvent)

基于事件的通信時(shí),當(dāng)值得注意的事件發(fā)生時(shí),微服務(wù)會(huì)發(fā)布事件,例如更新業(yè)務(wù)實(shí)體時(shí)。 其他微服務(wù)訂閱這些事件。 微服務(wù)收到事件時(shí),可以更新其自己的業(yè)務(wù)實(shí)體,這可能會(huì)導(dǎo)致發(fā)布更多事件。這是最終一致性概念的本質(zhì)。 通常通過(guò)使用事件總線實(shí)現(xiàn)來(lái)執(zhí)行此發(fā)布/訂閱系統(tǒng)。最終一致事務(wù)由一系列分布式操作組成。在每個(gè)操作中,微服務(wù)會(huì)更新業(yè)務(wù)實(shí)體,并發(fā)布可觸發(fā)下一個(gè)操作的事件。

集成事件用于跨多個(gè)微服務(wù)或外部系統(tǒng)保持域狀態(tài)同步。這可通過(guò)在微服務(wù)外發(fā)布集成事件完成。 將事件發(fā)布到多個(gè)接收方微服務(wù)時(shí),每個(gè)接收方微服務(wù)中的相應(yīng)事件處理程序會(huì)處理該事件。(消息隊(duì)列的生產(chǎn)者和消費(fèi)者)

集成事件是單個(gè)應(yīng)用程序級(jí)別的,不建議跨應(yīng)用使用同一個(gè)集成事件,這將導(dǎo)致事件來(lái)源混亂(微服務(wù)必須獨(dú)立)

事件總線(EventBus)

事件總線可實(shí)現(xiàn)發(fā)布/訂閱式通信,無(wú)需組件之間相互顯式識(shí)別,
微服務(wù) A 發(fā)布到事件總線,這會(huì)分發(fā)到訂閱微服務(wù) B 和 C,發(fā)布服務(wù)器無(wú)需知道訂閱服務(wù)器。:


image

如何實(shí)現(xiàn)發(fā)布服務(wù)器和訂閱服務(wù)器之間的匿名? 一個(gè)簡(jiǎn)單方法是讓中轉(zhuǎn)站處理所有通信。事件總線是一個(gè)這樣的中轉(zhuǎn)站。

事件總線通常由兩部分組成:

  1. 抽象或接口。
  2. 一個(gè)或多個(gè)實(shí)現(xiàn)。(RabbitMQ\Azure.ServiceBus\kafka等)

接口的功能很簡(jiǎn)單,就是只有發(fā)布(發(fā)布本系統(tǒng)的集成事件)和訂閱(訂閱其余子系統(tǒng)的事件),發(fā)布和訂過(guò)程中自身無(wú)需知道時(shí)那些個(gè)子系統(tǒng)進(jìn)行了參與。

領(lǐng)域事件、集成事件、事件總線的協(xié)作(以eshop中的實(shí)現(xiàn)為例)

三種事件相互作用,最終是為了解決整個(gè)微服務(wù)系統(tǒng)的最終一致性,微服務(wù)A自身數(shù)據(jù)發(fā)生了變化,那個(gè)這個(gè)變化所引起的一系列反應(yīng)有可能導(dǎo)致整個(gè)系統(tǒng)產(chǎn)生個(gè)各種不同結(jié)果。為了確保結(jié)果與期望一致,就需要實(shí)現(xiàn)一致性,還有就是分布式系統(tǒng)當(dāng)中的CAP原則。

eShopOnContainers中,領(lǐng)域事件激發(fā)時(shí),對(duì)應(yīng)的handler將此次事件的信息保存到集成事件的日志表中,保存這個(gè)操作,使用到了對(duì)應(yīng)發(fā)生事件的領(lǐng)域?qū)嶓w所在的上下文的事務(wù)對(duì)象,以保證內(nèi)部強(qiáng)一致性(在領(lǐng)域事件的handler中,通過(guò)獲取實(shí)體db上下文的事務(wù)對(duì)象,將事件保存到日志記錄表中)
具體實(shí)現(xiàn):

// 此處OrderCancelledDomainEvent被激發(fā)時(shí)的處理程序 OrderCancelledDomainEventHandler.cs
public async Task Handle(OrderCancelledDomainEvent orderCancelledDomainEvent, CancellationToken cancellationToken)
        {
            _logger.CreateLogger<OrderCancelledDomainEvent>()
                .LogTrace("Order with Id: {OrderId} has been successfully updated to status {Status} ({Id})",
                    orderCancelledDomainEvent.Order.Id, nameof(OrderStatus.Cancelled), OrderStatus.Cancelled.Id);

            var order = await _orderRepository.GetAsync(orderCancelledDomainEvent.Order.Id);
            var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString());

            var orderStatusChangedToCancelledIntegrationEvent = new OrderStatusChangedToCancelledIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name);
            // 通過(guò)集成事件服務(wù)來(lái)保存此次領(lǐng)域事件的信息
            await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToCancelledIntegrationEvent);
        }
        
// 繼承事件服務(wù)OrderingIntegrationEventService.cs
public async Task AddAndSaveEventAsync(IntegrationEvent evt)
{
    _logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})",
evt.Id, evt);
    // _orderingContext.GetCurrentTransaction() 獲取實(shí)體發(fā)出域事件時(shí)當(dāng)前上下問(wèn)的事務(wù)對(duì)象
    // 由于獲取上下文事務(wù)對(duì)象的存在,導(dǎo)致eshop中使用了TransactionBehaviour。確保了當(dāng)前事務(wù)對(duì)象的存在。
    await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction());
}

然后再TransactionBehaviour中,提交事務(wù),實(shí)體的更改和對(duì)應(yīng)域事件都會(huì)被記錄到數(shù)據(jù)庫(kù)中。然后通過(guò)集成事件服務(wù),根據(jù)次事務(wù)id查詢出需要激發(fā)的領(lǐng)域事件數(shù)據(jù),然后遍歷操作:

  1. 標(biāo)記為處理中
  2. 通過(guò)事件總線(消息隊(duì)列或者其他組件進(jìn)行發(fā)布)
  3. 標(biāo)記為處理完成、遇到異常標(biāo)記為失敗(此處可增加policy的策略重試功能,但是會(huì)影響本次操作的響應(yīng)時(shí)間),由后臺(tái)任務(wù)定時(shí)重試這些失敗的事件。確保最終一致性。但是消費(fèi)者端的一致性無(wú)法保證(需要其他策略機(jī)行處理)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容