有些場(chǎng)景下,我們可能想做一些自動(dòng)發(fā)現(xiàn)的一些功能,調(diào)用指定接口的所有g(shù)roup實(shí)現(xiàn),并將所有結(jié)果追加輸出。
如:一種基于Dubbo訂閱模式,我們提供了一個(gè)訂閱者接口。附上一段偽代碼(全文提供的都是偽代碼,主要提供一種設(shè)計(jì)思路):
public interface Subscriber{
/**
** 獲取當(dāng)前訂閱者信息,比如訂閱的事件類型,訂閱的服務(wù)id等
**/
List<SubsriberInfo> getSubsriberInfos();
/**
** 發(fā)送消息
**/
void sendMessage(Message message);
}
使用Dubbo的merge方式,組合查詢出所有的訂閱者信息
public class SubsribeService{
@DubboReference(group="*",merger="true")
private Subscriber subsriber;
public List<SubsriberInfo> getSubsriberInfos(){
// 注意不要加緩存,要每次獲取,因?yàn)樽?cè)的服務(wù)可能有變化,Dubbo會(huì)自動(dòng)獲取所有注冊(cè)的服務(wù)并合并結(jié)果集
return subsriber.getSubsriberInfos();
}
}
有時(shí)候我們可能希望返回的每一個(gè)SubsriberInfo對(duì)象中,能帶上group字段,但如果讓服務(wù)發(fā)布方去set這個(gè)字段感覺(jué)會(huì)很另類,這時(shí)可以借助dubbo的filter機(jī)制實(shí)現(xiàn)。
為了讓Filter能夠統(tǒng)一攔截,我們?yōu)镾ubsriberInfo提取出了一個(gè)基接口IProviderEntity,用于提供setGroup方法。當(dāng)發(fā)現(xiàn)dubbp返回值實(shí)現(xiàn)了這個(gè)接口,就為結(jié)果集setGroup。
public class SetGroupFilter implements Filter{
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Result result = invoker.invoke(invocation);
if(result instanceof AsyncRpcResult){
return (Result) Proxy.newProxyInstance(Result.class.getClassLoader(), new Class[]{Result.class}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
if("get".equals(method.getName())){
Result getResultVal = (Result) method.invoke(result, args);
Object retVal = getResultVal.getValue();
setProviderId(retVal,invoker);
return getResultVal;
}else if("whenCompleteWithContext".equals(method.getName())){
method.invoke(result,args);
return proxy;
}
return method.invoke(result,args);
}
});
}else{
Object retVal = result.getValue();
setGroup(retVal,invoker);
return result;
}
}
private void setGroup(Object retVal,Invoker invoker){
if(retVal instanceof IProviderEntity){
((IProviderEntity) retVal).setGroup(StringUtils.parseQueryString(invoker.getUrl().toString()).get("group"));
}else if(retVal instanceof Collection){
Iterator iterator = ((Collection) retVal).iterator();
while(iterator.hasNext()){
Object next = iterator.next();
setGroup(next, invoker);
}
}
}
}
我們獲取到了所有Group注冊(cè)的訂閱者之后,我們可能就需要調(diào)用指定的Group,來(lái)實(shí)現(xiàn)發(fā)送消息。
如:group1系統(tǒng)訂閱了訂單變更通知。
此時(shí)就要調(diào)用group1的訂閱者實(shí)現(xiàn),來(lái)執(zhí)行發(fā)送消息。
那么如何動(dòng)態(tài)的去調(diào)用group呢?Dubbo為我們提供了api的方式,可通過(guò)這種方式拿到指定group的代理接口實(shí)現(xiàn)。再附上一段偽代碼,如執(zhí)行g(shù)roup1。
ReferenceConfig referenceConfig = new ReferenceConfig();
referenceConfig.setInterface(Subscriber.class);
referenceConfig.setGroup("group1");
Subscriber subscriber = referenceConfig.get();
subscriber.sendMessage(message);
當(dāng)然了,這種設(shè)計(jì)架構(gòu)也有著他致命的缺陷,那就是如果被訂閱者的服務(wù)某一時(shí)刻掛掉了,那么此時(shí)對(duì)應(yīng)的服務(wù)可能就會(huì)丟消息。因此,此設(shè)計(jì)的架構(gòu)適用于實(shí)時(shí)消息訂閱的一種情況,只需要實(shí)時(shí)消息,對(duì)歷史的消息沒(méi)有什么價(jià)值的場(chǎng)景,如多節(jié)點(diǎn)的Websocket事件推送。
至此,基于dubbo的發(fā)布訂閱模式完成,此文僅僅闡述了一個(gè)設(shè)計(jì)思路,在具體應(yīng)用中可能還需要結(jié)合實(shí)際的業(yè)務(wù)場(chǎng)景做一些修改。