微軟eShopOnContainers
eShopOnContainers是一個(gè)簡(jiǎn)化版的基于.NET Core和Docker等技術(shù)開(kāi)發(fā)的面向微服務(wù)架構(gòu)的參考應(yīng)用
其中不僅包含了很多術(shù)語(yǔ)、設(shè)計(jì)模式、架構(gòu)風(fēng)格,還使用了一系列的常見(jiàn)技術(shù)(RabbitMQ、EventBus、IdentityServer4、Polly、Api Gateway、Redis、CQRS、CAP、CI/CD等),還有一些相關(guān)工具(Docker、K8S等)??梢哉f(shuō)是一份全面的技術(shù)整合實(shí)現(xiàn)的應(yīng)用參考。學(xué)習(xí)它會(huì)對(duì)涉及到的技術(shù)的實(shí)際運(yùn)用有更加清晰的了解。
相對(duì)于eshoponcontainer的其他內(nèi)容,它的領(lǐng)域事件、集成(整合)事件、事件總線之間的協(xié)作關(guān)系還是比較難懂的。接下來(lái)我們來(lái)分析一下。
領(lǐng)域事件(DomainEvent)
使用域事件顯式實(shí)現(xiàn)域中的更改的副作用。 如果使用 DDD 術(shù)語(yǔ)表述,即使用域事件跨多個(gè)聚合顯式實(shí)現(xiàn)副作用。 (可選)為了提高可伸縮性并減小對(duì)數(shù)據(jù)庫(kù)鎖定的影響,可在相同域的聚合之間使用最終一致性。
例如在 eShopOnContainers 應(yīng)用程序中,當(dāng)創(chuàng)建訂單時(shí),用戶會(huì)成為買家,因此 OrderStartedDomainEvent 會(huì)被引發(fā)并在 ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler 中進(jìn)行處理
eshop中,一個(gè)領(lǐng)域事件對(duì)應(yīng)一個(gè)handler,使用mediator的INotificationHandler實(shí)現(xiàn)
域事件和集成事件是相同的:都是對(duì)已發(fā)生事件的通知。但是,它們的實(shí)現(xiàn)必須不同。域事件是推送到域事件調(diào)度程序的消息,可基于IoC 容器或任何其他方法作為內(nèi)存中轉(zhuǎn)存進(jìn)程實(shí)現(xiàn)(就是Mediator)
集成事件的目的是將已提交事務(wù)和更新傳播到其他子系統(tǒng),無(wú)論它們是其他微服務(wù)、綁定上下文,還是外部應(yīng)用程序。(集成事件是跨服務(wù)的,領(lǐng)域事件則不是)
例如:
- 當(dāng)用戶發(fā)起訂單時(shí),訂單聚合將發(fā)送 OrderStarted 域事件。 OrderStarted 域事件基于標(biāo)識(shí)微服務(wù)中的原始用戶信息(包含 CreateOrder 命令中提供的信息),由買方聚合處理,以在訂購(gòu)微服務(wù)時(shí)創(chuàng)建買家對(duì)象。
- 每個(gè) OrderItem 子實(shí)體可以在項(xiàng)目?jī)r(jià)格高于特定金額,或產(chǎn)品項(xiàng)目金額過(guò)高時(shí),引發(fā)事件。 然后,聚合根可以接收這些事件,并執(zhí)行全局計(jì)算Order的總額。
- 系統(tǒng)中,當(dāng)文章類目新增或者刪除時(shí),需要刷新緩存,從而實(shí)現(xiàn)統(tǒng)一,則可以在新增類目或者刪除類目時(shí),出發(fā)刷新?lián)Q緩存的一個(gè)事件:
/// <summary>
/// 添加類目
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<CommandResult> Handle(CreateCategoryCommand request, CancellationToken cancellationToken)
{
var rep = _uow.GetBaseRepository<Category>();
var model = rep.GetFirstOrDefault(predicate:x => x.DisplayName == request.Name,disableTracking:true);
if (model!=null)
{
return CommandResult.Fail("名稱重復(fù)");
}
var entity = new Category(categoryName: request.Name, displayName: request.Name);
var res= await rep.InsertAsync(entity);
if (res.Entity!=null)
{
entity.AddCacheChangeDomainEvent(new List<string> { _cacheKeyMgr.PostCategoryListlKey() }); // 發(fā)出事件后,對(duì)應(yīng)的handler會(huì)刷新緩存
return CommandResult.Success();
}
return CommandResult.Fail("添加失敗");
}
集成事件(IntegrationEvent)
基于事件的通信時(shí),當(dāng)值得注意的事件發(fā)生時(shí),微服務(wù)會(huì)發(fā)布事件,例如更新業(yè)務(wù)實(shí)體時(shí)。 其他微服務(wù)訂閱這些事件。 微服務(wù)收到事件時(shí),可以更新其自己的業(yè)務(wù)實(shí)體,這可能會(huì)導(dǎo)致發(fā)布更多事件。這是最終一致性概念的本質(zhì)。 通常通過(guò)使用事件總線實(shí)現(xiàn)來(lái)執(zhí)行此發(fā)布/訂閱系統(tǒng)。最終一致事務(wù)由一系列分布式操作組成。在每個(gè)操作中,微服務(wù)會(huì)更新業(yè)務(wù)實(shí)體,并發(fā)布可觸發(fā)下一個(gè)操作的事件。
集成事件用于跨多個(gè)微服務(wù)或外部系統(tǒng)保持域狀態(tài)同步。這可通過(guò)在微服務(wù)外發(fā)布集成事件完成。 將事件發(fā)布到多個(gè)接收方微服務(wù)時(shí),每個(gè)接收方微服務(wù)中的相應(yīng)事件處理程序會(huì)處理該事件。(消息隊(duì)列的生產(chǎn)者和消費(fèi)者)
集成事件是單個(gè)應(yīng)用程序級(jí)別的,不建議跨應(yīng)用使用同一個(gè)集成事件,這將導(dǎo)致事件來(lái)源混亂(微服務(wù)必須獨(dú)立)
事件總線(EventBus)
事件總線可實(shí)現(xiàn)發(fā)布/訂閱式通信,無(wú)需組件之間相互顯式識(shí)別,
微服務(wù) A 發(fā)布到事件總線,這會(huì)分發(fā)到訂閱微服務(wù) B 和 C,發(fā)布服務(wù)器無(wú)需知道訂閱服務(wù)器。:

如何實(shí)現(xiàn)發(fā)布服務(wù)器和訂閱服務(wù)器之間的匿名? 一個(gè)簡(jiǎn)單方法是讓中轉(zhuǎn)站處理所有通信。事件總線是一個(gè)這樣的中轉(zhuǎn)站。
事件總線通常由兩部分組成:
- 抽象或接口。
- 一個(gè)或多個(gè)實(shí)現(xiàn)。(RabbitMQ\Azure.ServiceBus\kafka等)
接口的功能很簡(jiǎn)單,就是只有發(fā)布(發(fā)布本系統(tǒng)的集成事件)和訂閱(訂閱其余子系統(tǒng)的事件),發(fā)布和訂過(guò)程中自身無(wú)需知道時(shí)那些個(gè)子系統(tǒng)進(jìn)行了參與。
領(lǐng)域事件、集成事件、事件總線的協(xié)作(以eshop中的實(shí)現(xiàn)為例)
三種事件相互作用,最終是為了解決整個(gè)微服務(wù)系統(tǒng)的最終一致性,微服務(wù)A自身數(shù)據(jù)發(fā)生了變化,那個(gè)這個(gè)變化所引起的一系列反應(yīng)有可能導(dǎo)致整個(gè)系統(tǒng)產(chǎn)生個(gè)各種不同結(jié)果。為了確保結(jié)果與期望一致,就需要實(shí)現(xiàn)一致性,還有就是分布式系統(tǒng)當(dāng)中的CAP原則。
eShopOnContainers中,領(lǐng)域事件激發(fā)時(shí),對(duì)應(yīng)的handler將此次事件的信息保存到集成事件的日志表中,保存這個(gè)操作,使用到了對(duì)應(yīng)發(fā)生事件的領(lǐng)域?qū)嶓w所在的上下文的事務(wù)對(duì)象,以保證內(nèi)部強(qiáng)一致性(在領(lǐng)域事件的handler中,通過(guò)獲取實(shí)體db上下文的事務(wù)對(duì)象,將事件保存到日志記錄表中)
具體實(shí)現(xiàn):
// 此處OrderCancelledDomainEvent被激發(fā)時(shí)的處理程序 OrderCancelledDomainEventHandler.cs
public async Task Handle(OrderCancelledDomainEvent orderCancelledDomainEvent, CancellationToken cancellationToken)
{
_logger.CreateLogger<OrderCancelledDomainEvent>()
.LogTrace("Order with Id: {OrderId} has been successfully updated to status {Status} ({Id})",
orderCancelledDomainEvent.Order.Id, nameof(OrderStatus.Cancelled), OrderStatus.Cancelled.Id);
var order = await _orderRepository.GetAsync(orderCancelledDomainEvent.Order.Id);
var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString());
var orderStatusChangedToCancelledIntegrationEvent = new OrderStatusChangedToCancelledIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name);
// 通過(guò)集成事件服務(wù)來(lái)保存此次領(lǐng)域事件的信息
await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToCancelledIntegrationEvent);
}
// 繼承事件服務(wù)OrderingIntegrationEventService.cs
public async Task AddAndSaveEventAsync(IntegrationEvent evt)
{
_logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})",
evt.Id, evt);
// _orderingContext.GetCurrentTransaction() 獲取實(shí)體發(fā)出域事件時(shí)當(dāng)前上下問(wèn)的事務(wù)對(duì)象
// 由于獲取上下文事務(wù)對(duì)象的存在,導(dǎo)致eshop中使用了TransactionBehaviour。確保了當(dāng)前事務(wù)對(duì)象的存在。
await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction());
}
然后再TransactionBehaviour中,提交事務(wù),實(shí)體的更改和對(duì)應(yīng)域事件都會(huì)被記錄到數(shù)據(jù)庫(kù)中。然后通過(guò)集成事件服務(wù),根據(jù)次事務(wù)id查詢出需要激發(fā)的領(lǐng)域事件數(shù)據(jù),然后遍歷操作:
- 標(biāo)記為處理中
- 通過(guò)事件總線(消息隊(duì)列或者其他組件進(jìn)行發(fā)布)
- 標(biāo)記為處理完成、遇到異常標(biāo)記為失敗(此處可增加policy的策略重試功能,但是會(huì)影響本次操作的響應(yīng)時(shí)間),由后臺(tái)任務(wù)定時(shí)重試這些失敗的事件。確保最終一致性。但是消費(fèi)者端的一致性無(wú)法保證(需要其他策略機(jī)行處理)。