微服務(wù)實(shí)戰(zhàn)(二):落地微服務(wù)架構(gòu)到直銷系統(tǒng)(構(gòu)建消息總線框架接口)

微服務(wù)實(shí)戰(zhàn)(二):落地微服務(wù)架構(gòu)到直銷系統(tǒng)(構(gòu)建消息總線框架接口)

從上一篇文章大家可以看出,實(shí)現(xiàn)一個(gè)自己的消息總線框架是非常重要的內(nèi)容,消息總線可以將界限上下文之間進(jìn)行解耦,也可以為大并發(fā)訪問提供必要的支持。

消息總線的作用:

1.界限上下文解耦:在DDD第一波文章中,當(dāng)更新了訂單信息后,我們通過調(diào)用經(jīng)銷商界限上下文的領(lǐng)域模型和倉(cāng)儲(chǔ),進(jìn)行了經(jīng)銷商信息的更新,這造成了耦合。通過一個(gè)消息總線,可以在訂單界限上下文的WebApi服務(wù)(來源微服務(wù)-生產(chǎn)者)更新了訂單信息后,發(fā)布一個(gè)事件消息到消息總線的某個(gè)隊(duì)列中,經(jīng)銷商界限上下文的WebApi服務(wù)(消費(fèi)者)訂閱這個(gè)事件消息,然后交給自己的Handler進(jìn)行消息處理,更新自己的經(jīng)銷商信息。這樣就實(shí)現(xiàn)了訂單界限上下文與經(jīng)銷商界限上下文解耦。

2.大并發(fā)支持:可以通過消息總線進(jìn)一步提升下單的性能。我們可以將用戶下單的操作直接交給一個(gè)下單命令WebApi接收,下單命令WebApi接收到命令后,直接丟給一個(gè)消息總線的隊(duì)列,然后立即給前端返回下單結(jié)果。這樣用戶就不用等待后續(xù)的復(fù)雜訂單業(yè)務(wù)邏輯,加快速度。后續(xù)訂單的一系列處理交給消息的Handler進(jìn)行后續(xù)的處理與消息的進(jìn)一步投遞。


消息總線設(shè)計(jì)重點(diǎn):

1.定義消息(事件)的接口:所有需要投遞與處理的消息,都從這個(gè)消息接口繼承,因?yàn)樾枰s束消息中必須包含的內(nèi)容,比如消息的ID、消息產(chǎn)生的時(shí)間等。

publicinterface IEvent

? ? {

? ? ? ? Guid Id { get;set; }

? ? ? ? DateTime CreateDate { get;set; }

? ? }

2.定義消息(事件)處理器接口:當(dāng)消息投遞到消息總線隊(duì)列中后,一定有消費(fèi)者WebApi接收并處理這個(gè)消息,具體的處理方法邏輯在訂閱方處理器中實(shí)現(xiàn),這里先需要定義處理器的接口,便于在消息總線框架中使用。

publicinterface IEventHandler

? ? {

? ? ? ? Task HandleAsync(TEvent @event)where TEvent : IEvent;

? ? }

從上面代碼可以看出,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類。

3.定義消息(事件)與消息(事件)處理器關(guān)聯(lián)接口:一種類型的消息被投遞后,一定要在訂閱方找到這種消息的處理器進(jìn)行處理,所以一定要定義二者的關(guān)聯(lián)接口,這樣才能將消息與消息處理器對(duì)應(yīng)起來,才能實(shí)現(xiàn)消息被訂閱后的處理。

publicinterface IEventHandlerExecutionContext

? ? {

? ? ? ? voidRegisterEventHandler()where TEvent : IEvent

? ? ? ? ? ? where TEventHandler : IEventHandler;

? ? ? ? boolIsRegisterEventHandler()where TEvent : IEvent

? ? ? ? ? ? where TEventHandler : IEventHandler;

? ? ? ? Task HandleAsync(TEvent @event)where TEvent : IEvent;

? ? }

RegisterEventHandler方法就是建立消息與消息處理器的關(guān)聯(lián),這個(gè)方法其實(shí)是在訂閱方使用,訂閱方告訴消息總線,什么樣的消息應(yīng)該交給我的哪個(gè)處理器進(jìn)行處理。

IsRegisterEventHandler方法是判斷消息與處理器之間是否已經(jīng)存在關(guān)聯(lián)。

HandleAsync方法是通過查找到消息對(duì)應(yīng)的處理器后,然后調(diào)用處理器自己的Handle方法進(jìn)行消息的處理.

4.定義消息發(fā)布、訂閱與消息總線接口:消息總線至少要支持兩個(gè)功能,一個(gè)是生產(chǎn)者能夠發(fā)布消息到我的消息總線,另一個(gè)是訂閱方需要能夠從我這個(gè)消息總線訂閱消息。

publicinterface IEventPublisher

? ? {

? ? ? ? voidPublish(TEvent @event)where TEvent : IEvent;

? ? }

從上面代碼可以看出,生產(chǎn)者發(fā)布的消息仍然要從IEvent繼承的類型。

publicinterface IEventSubscriber

? ? {

? ? ? ? voidSubscribe()where TEvent : IEvent

? ? ? ? ? ? where TEventHandler : IEventHandler;

? ? }

上面代碼是訂閱方用于從消息總線訂閱消息,從代碼中可以看出,它的最終的實(shí)現(xiàn)其實(shí)就是建立消息與處理器之間的關(guān)聯(lián)。

publicinterface IEventBus:IEventPublisher,IEventSubscriber

? ? {

? ? }

消息(事件)總線從兩個(gè)接口繼承下來,同時(shí)支持消息的發(fā)布與消息的訂閱。

5.實(shí)現(xiàn)事件基類:上面已經(jīng)訂閱了消息(事件)的接口,這里來實(shí)現(xiàn)事件的基類,其實(shí)就是實(shí)現(xiàn)消息ID與產(chǎn)生的時(shí)間:

publicclass BaseEvent : IEvent

? ? {

? ? ? ? publicGuid Id {get;set; }

? ? ? ? publicDateTime CreateDate {get;set; }

? ? ? ? public BaseEvent()

? ? ? ? {

? ? ? ? ? ? this.Id = Guid.NewGuid();

? ? ? ? ? ? this.CreateDate = DateTime.Now;

? ? ? ? }

? ? }

6.實(shí)現(xiàn)消息總線基類:消息總線底層的依賴可以是各種消息代理產(chǎn)品,比如RabbitMq、Kafaka或第三方云平臺(tái)提供的消息代理產(chǎn)品,通常我們要封裝這些消息代理產(chǎn)品。在封裝之前,我們需要定義頂層的消息總線基類實(shí)現(xiàn),主要的目的是未來依賴于它的具體實(shí)現(xiàn)可替換,另外也將消息與消息處理器的關(guān)聯(lián)接口傳遞進(jìn)來,便于訂閱方使用。

publicabstractclass BaseEventBus : IEventBus

? ? {

? ? ? ? protectedreadonly IEventHandlerExecutionContext eventHandlerExecutionContext;

? ? ? ? protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)

? ? ? ? {

? ? ? ? ? ? this.eventHandlerExecutionContext = eventHandlerExecutionContext;

? ? ? ? }

? ? ? ? publicabstractvoidPublish(TEvent @event)

? ? ? ? ? ? where TEvent : IEvent;

? ? ? ? publicabstractvoidSubscribe()

? ? ? ? ? ? where TEvent : IEvent

? ? ? ? ? ? where TEventHandler : IEventHandler;

? ? }

7.實(shí)現(xiàn)消息與處理器關(guān)聯(lián):消息必須與處理器關(guān)聯(lián),訂閱方收到特定類型的消息后,才知道交給哪個(gè)處理器處理。

publicclass EventHandlerExecutionContext : IEventHandlerExecutionContext

? ? {

? ? ? ? privatereadonly IServiceCollection registry;

? ? ? ? privatereadonly IServiceProvider serviceprovider;

? ? ? ? privateDictionary> registrations =newDictionary>();

? ? ? ? publicEventHandlerExecutionContext(IServiceCollection registry,Func

? ? ? ? ? ? IServiceProvider> serviceProviderFactory =null)

? ? ? ? {

? ? ? ? ? ? this.registry = registry;

? ? ? ? ? ? this.serviceprovider =this.registry.BuildServiceProvider();

? ? ? ? }

? ? ? //查找消息關(guān)聯(lián)的處理器,然后調(diào)用處理器的處理方法publicasyncTask HandleAsync(TEvent @event)where TEvent : IEvent

? ? ? ? {

? ? ? ? ? ? vareventtype = @event.GetType();

? ? ? ? ? ? if(registrations.TryGetValue(eventtype,outList handlertypes) && handlertypes.Count >0)

? ? ? ? ? ? {

? ? ? ? ? ? ? ? using(varchildscope =this.serviceprovider.CreateScope())

? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? foreach(varhandlertypein handlertypes)

? ? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? ? ? varhandler = Activator.CreateInstance(handlertype)as IEventHandler;

? ? ? ? ? ? ? ? ? ? ? ? await handler.HandleAsync(@event);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? //判斷消息與處理器之間是否有關(guān)聯(lián)publicboolIsRegisterEventHandler()

? ? ? ? ? ? where TEvent : IEvent

? ? ? ? ? ? where TEventHandler : IEventHandler

? ? ? ? {

? ? ? ? ? ? if(registrations.TryGetValue(typeof(TEvent),outList handlertypelist))

? ? ? ? ? ? {

? ? ? ? ? ? ? ? returnhandlertypelist !=null&& handlertypelist.Contains(typeof(IEventHandler));

? ? ? ? ? ? }

? ? ? ? ? ? returnfalse;

? ? ? ? }


? ? ? //將消息與處理器關(guān)聯(lián)起來,可以在內(nèi)存中建立關(guān)聯(lián),也可以建立在數(shù)據(jù)庫單獨(dú)表中publicvoidRegisterEventHandler()

? ? ? ? ? ? where TEvent : IEvent

? ? ? ? ? ? where TEventHandler : IEventHandler

? ? ? ? {

? ? ? ? ? ? Utils.DictionaryRegister(typeof(TEvent),typeof(TEventHandler), registrations);

? ? ? ? }

? ? }

上面我們基本上就將消息總線的架子搭建起來了,也實(shí)現(xiàn)了基本的功能,下一章我們基于它來實(shí)現(xiàn)RabbitMq的消息總線。

QQ討論群:309287205?

微服務(wù)實(shí)戰(zhàn)視頻請(qǐng)關(guān)注微信公眾號(hào):MSSHCJ

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,551評(píng)論 19 139
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,295評(píng)論 2 89
  • 一、apache環(huán)境安裝 正常安裝軟件步驟,到下圖時(shí)需要注意。訪問http://localhost/顯示 It w...
    殘燈古夢(mèng)閱讀 171評(píng)論 0 1
  • 半夜醒來,似乎再難入睡。雙11,8本書158.6元。漸已適應(yīng)居家的生活,或者前綴“被”。 “曾夢(mèng)想仗劍走天涯,看一...
    希冀SKY閱讀 170評(píng)論 0 0
  • 攝于2018.4.25 地點(diǎn)南寧萬芊民宿 微博@烘焙攝影肥仔雲(yún) 前期準(zhǔn)備 當(dāng)時(shí)打算在南寧玩兩天,去試試海底撈和烏布...
    肥仔雲(yún)閱讀 631評(píng)論 0 3

友情鏈接更多精彩內(nèi)容