Dubbo訂閱者設(shè)計(jì)模式

有些場(chǎng)景下,我們可能想做一些自動(dòng)發(fā)現(xiàn)的一些功能,調(diào)用指定接口的所有g(shù)roup實(shí)現(xiàn),并將所有結(jié)果追加輸出。

如:一種基于Dubbo訂閱模式,我們提供了一個(gè)訂閱者接口。附上一段偽代碼(全文提供的都是偽代碼,主要提供一種設(shè)計(jì)思路):

public interface Subscriber{
          /**
          ** 獲取當(dāng)前訂閱者信息,比如訂閱的事件類型,訂閱的服務(wù)id等
          **/
         List<SubsriberInfo>   getSubsriberInfos();
        
         /**
          **  發(fā)送消息
          **/
         void sendMessage(Message message);
}

使用Dubbo的merge方式,組合查詢出所有的訂閱者信息

public class SubsribeService{
          @DubboReference(group="*",merger="true")
          private Subscriber subsriber;

          public List<SubsriberInfo> getSubsriberInfos(){
                 // 注意不要加緩存,要每次獲取,因?yàn)樽?cè)的服務(wù)可能有變化,Dubbo會(huì)自動(dòng)獲取所有注冊(cè)的服務(wù)并合并結(jié)果集
                  return subsriber.getSubsriberInfos();
          }
}

有時(shí)候我們可能希望返回的每一個(gè)SubsriberInfo對(duì)象中,能帶上group字段,但如果讓服務(wù)發(fā)布方去set這個(gè)字段感覺(jué)會(huì)很另類,這時(shí)可以借助dubbo的filter機(jī)制實(shí)現(xiàn)。

為了讓Filter能夠統(tǒng)一攔截,我們?yōu)镾ubsriberInfo提取出了一個(gè)基接口IProviderEntity,用于提供setGroup方法。當(dāng)發(fā)現(xiàn)dubbp返回值實(shí)現(xiàn)了這個(gè)接口,就為結(jié)果集setGroup。

public class SetGroupFilter implements Filter{
      @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Result result = invoker.invoke(invocation);
        if(result instanceof AsyncRpcResult){
            return (Result) Proxy.newProxyInstance(Result.class.getClassLoader(), new Class[]{Result.class}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    if (Object.class.equals(method.getDeclaringClass())) {
                        return method.invoke(this, args);
                    }
                    if("get".equals(method.getName())){
                        Result getResultVal = (Result) method.invoke(result, args);
                        Object retVal = getResultVal.getValue();
                        setProviderId(retVal,invoker);
                        return getResultVal;
                    }else if("whenCompleteWithContext".equals(method.getName())){
                        method.invoke(result,args);
                        return proxy;
                    }
                    return method.invoke(result,args);
                }
            });
        }else{
            Object retVal = result.getValue();
            setGroup(retVal,invoker);
            return result;
        }
    }

    private void setGroup(Object retVal,Invoker invoker){
        if(retVal instanceof IProviderEntity){
            ((IProviderEntity) retVal).setGroup(StringUtils.parseQueryString(invoker.getUrl().toString()).get("group"));
        }else if(retVal instanceof Collection){
            Iterator iterator = ((Collection) retVal).iterator();
            while(iterator.hasNext()){
                Object next = iterator.next();
                setGroup(next, invoker);
            }
        }
    }
}

我們獲取到了所有Group注冊(cè)的訂閱者之后,我們可能就需要調(diào)用指定的Group,來(lái)實(shí)現(xiàn)發(fā)送消息。
如:group1系統(tǒng)訂閱了訂單變更通知。

此時(shí)就要調(diào)用group1的訂閱者實(shí)現(xiàn),來(lái)執(zhí)行發(fā)送消息。
那么如何動(dòng)態(tài)的去調(diào)用group呢?Dubbo為我們提供了api的方式,可通過(guò)這種方式拿到指定group的代理接口實(shí)現(xiàn)。再附上一段偽代碼,如執(zhí)行g(shù)roup1。

ReferenceConfig referenceConfig = new ReferenceConfig();
referenceConfig.setInterface(Subscriber.class);
referenceConfig.setGroup("group1");
Subscriber  subscriber = referenceConfig.get();
subscriber.sendMessage(message);

當(dāng)然了,這種設(shè)計(jì)架構(gòu)也有著他致命的缺陷,那就是如果被訂閱者的服務(wù)某一時(shí)刻掛掉了,那么此時(shí)對(duì)應(yīng)的服務(wù)可能就會(huì)丟消息。因此,此設(shè)計(jì)的架構(gòu)適用于實(shí)時(shí)消息訂閱的一種情況,只需要實(shí)時(shí)消息,對(duì)歷史的消息沒(méi)有什么價(jià)值的場(chǎng)景,如多節(jié)點(diǎn)的Websocket事件推送。

至此,基于dubbo的發(fā)布訂閱模式完成,此文僅僅闡述了一個(gè)設(shè)計(jì)思路,在具體應(yīng)用中可能還需要結(jié)合實(shí)際的業(yè)務(wù)場(chǎng)景做一些修改。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容