微服務(wù)實(shí)戰(zhàn)(二):落地微服務(wù)架構(gòu)到直銷系統(tǒng)(構(gòu)建消息總線框架接口)
從上一篇文章大家可以看出,實(shí)現(xiàn)一個(gè)自己的消息總線框架是非常重要的內(nèi)容,消息總線可以將界限上下文之間進(jìn)行解耦,也可以為大并發(fā)訪問提供必要的支持。

消息總線的作用:
1.界限上下文解耦:在DDD第一波文章中,當(dāng)更新了訂單信息后,我們通過調(diào)用經(jīng)銷商界限上下文的領(lǐng)域模型和倉(cāng)儲(chǔ),進(jìn)行了經(jīng)銷商信息的更新,這造成了耦合。通過一個(gè)消息總線,可以在訂單界限上下文的WebApi服務(wù)(來源微服務(wù)-生產(chǎn)者)更新了訂單信息后,發(fā)布一個(gè)事件消息到消息總線的某個(gè)隊(duì)列中,經(jīng)銷商界限上下文的WebApi服務(wù)(消費(fèi)者)訂閱這個(gè)事件消息,然后交給自己的Handler進(jìn)行消息處理,更新自己的經(jīng)銷商信息。這樣就實(shí)現(xiàn)了訂單界限上下文與經(jīng)銷商界限上下文解耦。
2.大并發(fā)支持:可以通過消息總線進(jìn)一步提升下單的性能。我們可以將用戶下單的操作直接交給一個(gè)下單命令WebApi接收,下單命令WebApi接收到命令后,直接丟給一個(gè)消息總線的隊(duì)列,然后立即給前端返回下單結(jié)果。這樣用戶就不用等待后續(xù)的復(fù)雜訂單業(yè)務(wù)邏輯,加快速度。后續(xù)訂單的一系列處理交給消息的Handler進(jìn)行后續(xù)的處理與消息的進(jìn)一步投遞。
消息總線設(shè)計(jì)重點(diǎn):
1.定義消息(事件)的接口:所有需要投遞與處理的消息,都從這個(gè)消息接口繼承,因?yàn)樾枰s束消息中必須包含的內(nèi)容,比如消息的ID、消息產(chǎn)生的時(shí)間等。
publicinterface IEvent
? ? {
? ? ? ? Guid Id { get;set; }
? ? ? ? DateTime CreateDate { get;set; }
? ? }
2.定義消息(事件)處理器接口:當(dāng)消息投遞到消息總線隊(duì)列中后,一定有消費(fèi)者WebApi接收并處理這個(gè)消息,具體的處理方法邏輯在訂閱方處理器中實(shí)現(xiàn),這里先需要定義處理器的接口,便于在消息總線框架中使用。
publicinterface IEventHandler
? ? {
? ? ? ? Task HandleAsync(TEvent @event)where TEvent : IEvent;
? ? }
從上面代碼可以看出,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類。
3.定義消息(事件)與消息(事件)處理器關(guān)聯(lián)接口:一種類型的消息被投遞后,一定要在訂閱方找到這種消息的處理器進(jìn)行處理,所以一定要定義二者的關(guān)聯(lián)接口,這樣才能將消息與消息處理器對(duì)應(yīng)起來,才能實(shí)現(xiàn)消息被訂閱后的處理。
publicinterface IEventHandlerExecutionContext
? ? {
? ? ? ? voidRegisterEventHandler()where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? ? ? boolIsRegisterEventHandler()where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? ? ? Task HandleAsync(TEvent @event)where TEvent : IEvent;
? ? }
RegisterEventHandler方法就是建立消息與消息處理器的關(guān)聯(lián),這個(gè)方法其實(shí)是在訂閱方使用,訂閱方告訴消息總線,什么樣的消息應(yīng)該交給我的哪個(gè)處理器進(jìn)行處理。
IsRegisterEventHandler方法是判斷消息與處理器之間是否已經(jīng)存在關(guān)聯(lián)。
HandleAsync方法是通過查找到消息對(duì)應(yīng)的處理器后,然后調(diào)用處理器自己的Handle方法進(jìn)行消息的處理.
4.定義消息發(fā)布、訂閱與消息總線接口:消息總線至少要支持兩個(gè)功能,一個(gè)是生產(chǎn)者能夠發(fā)布消息到我的消息總線,另一個(gè)是訂閱方需要能夠從我這個(gè)消息總線訂閱消息。
publicinterface IEventPublisher
? ? {
? ? ? ? voidPublish(TEvent @event)where TEvent : IEvent;
? ? }
從上面代碼可以看出,生產(chǎn)者發(fā)布的消息仍然要從IEvent繼承的類型。
publicinterface IEventSubscriber
? ? {
? ? ? ? voidSubscribe()where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? }
上面代碼是訂閱方用于從消息總線訂閱消息,從代碼中可以看出,它的最終的實(shí)現(xiàn)其實(shí)就是建立消息與處理器之間的關(guān)聯(lián)。
publicinterface IEventBus:IEventPublisher,IEventSubscriber
? ? {
? ? }
消息(事件)總線從兩個(gè)接口繼承下來,同時(shí)支持消息的發(fā)布與消息的訂閱。
5.實(shí)現(xiàn)事件基類:上面已經(jīng)訂閱了消息(事件)的接口,這里來實(shí)現(xiàn)事件的基類,其實(shí)就是實(shí)現(xiàn)消息ID與產(chǎn)生的時(shí)間:
publicclass BaseEvent : IEvent
? ? {
? ? ? ? publicGuid Id {get;set; }
? ? ? ? publicDateTime CreateDate {get;set; }
? ? ? ? public BaseEvent()
? ? ? ? {
? ? ? ? ? ? this.Id = Guid.NewGuid();
? ? ? ? ? ? this.CreateDate = DateTime.Now;
? ? ? ? }
? ? }
6.實(shí)現(xiàn)消息總線基類:消息總線底層的依賴可以是各種消息代理產(chǎn)品,比如RabbitMq、Kafaka或第三方云平臺(tái)提供的消息代理產(chǎn)品,通常我們要封裝這些消息代理產(chǎn)品。在封裝之前,我們需要定義頂層的消息總線基類實(shí)現(xiàn),主要的目的是未來依賴于它的具體實(shí)現(xiàn)可替換,另外也將消息與消息處理器的關(guān)聯(lián)接口傳遞進(jìn)來,便于訂閱方使用。
publicabstractclass BaseEventBus : IEventBus
? ? {
? ? ? ? protectedreadonly IEventHandlerExecutionContext eventHandlerExecutionContext;
? ? ? ? protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
? ? ? ? {
? ? ? ? ? ? this.eventHandlerExecutionContext = eventHandlerExecutionContext;
? ? ? ? }
? ? ? ? publicabstractvoidPublish(TEvent @event)
? ? ? ? ? ? where TEvent : IEvent;
? ? ? ? publicabstractvoidSubscribe()
? ? ? ? ? ? where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? }
7.實(shí)現(xiàn)消息與處理器關(guān)聯(lián):消息必須與處理器關(guān)聯(lián),訂閱方收到特定類型的消息后,才知道交給哪個(gè)處理器處理。
publicclass EventHandlerExecutionContext : IEventHandlerExecutionContext
? ? {
? ? ? ? privatereadonly IServiceCollection registry;
? ? ? ? privatereadonly IServiceProvider serviceprovider;
? ? ? ? privateDictionary> registrations =newDictionary>();
? ? ? ? publicEventHandlerExecutionContext(IServiceCollection registry,Func
? ? ? ? ? ? IServiceProvider> serviceProviderFactory =null)
? ? ? ? {
? ? ? ? ? ? this.registry = registry;
? ? ? ? ? ? this.serviceprovider =this.registry.BuildServiceProvider();
? ? ? ? }
? ? ? //查找消息關(guān)聯(lián)的處理器,然后調(diào)用處理器的處理方法publicasyncTask HandleAsync(TEvent @event)where TEvent : IEvent
? ? ? ? {
? ? ? ? ? ? vareventtype = @event.GetType();
? ? ? ? ? ? if(registrations.TryGetValue(eventtype,outList handlertypes) && handlertypes.Count >0)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? using(varchildscope =this.serviceprovider.CreateScope())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? foreach(varhandlertypein handlertypes)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? varhandler = Activator.CreateInstance(handlertype)as IEventHandler;
? ? ? ? ? ? ? ? ? ? ? ? await handler.HandleAsync(@event);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? //判斷消息與處理器之間是否有關(guān)聯(lián)publicboolIsRegisterEventHandler()
? ? ? ? ? ? where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler
? ? ? ? {
? ? ? ? ? ? if(registrations.TryGetValue(typeof(TEvent),outList handlertypelist))
? ? ? ? ? ? {
? ? ? ? ? ? ? ? returnhandlertypelist !=null&& handlertypelist.Contains(typeof(IEventHandler));
? ? ? ? ? ? }
? ? ? ? ? ? returnfalse;
? ? ? ? }
? ? ? //將消息與處理器關(guān)聯(lián)起來,可以在內(nèi)存中建立關(guān)聯(lián),也可以建立在數(shù)據(jù)庫單獨(dú)表中publicvoidRegisterEventHandler()
? ? ? ? ? ? where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler
? ? ? ? {
? ? ? ? ? ? Utils.DictionaryRegister(typeof(TEvent),typeof(TEventHandler), registrations);
? ? ? ? }
? ? }
上面我們基本上就將消息總線的架子搭建起來了,也實(shí)現(xiàn)了基本的功能,下一章我們基于它來實(shí)現(xiàn)RabbitMq的消息總線。
QQ討論群:309287205?
微服務(wù)實(shí)戰(zhàn)視頻請(qǐng)關(guān)注微信公眾號(hào):MSSHCJ
