【Dubbo】服務(wù)引用

在springApplication.xml中配置服務(wù)引用申明服務(wù)接口,我們就可以方便的注入遠(yuǎn)端的服務(wù)代理,通過該代理調(diào)用到provider提供的服務(wù)。
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" />

<dubbo:reference/>解析

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }
    public void init() {
......
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    }
}
#com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser#parse
    private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
        RootBeanDefinition beanDefinition = new RootBeanDefinition();
        beanDefinition.setBeanClass(beanClass);
......
}

代理創(chuàng)建時(shí)機(jī)

解析service標(biāo)簽后spring會自動創(chuàng)建ServiceBean的實(shí)例,在ServiceBean的繼承體系中,可以知道它實(shí)現(xiàn)了FactoryBeanInitializingBean接口

ReferenceBean

消費(fèi)者只有服務(wù)端的接口,如果要調(diào)用遠(yuǎn)程服務(wù)就需要給該接口創(chuàng)建一個動態(tài)代理,而創(chuàng)建代理只能在FactoryBean.getObject時(shí)創(chuàng)建,因?yàn)镮nitializingBean.afterPropertySet會在實(shí)例初始化之后調(diào)用,這時(shí)候?qū)嵗呀?jīng)生成了


getObject調(diào)用棧信息

afterPropertySet.png

創(chuàng)建代理的過程

首先從配置中心拿到provider的地址,然后構(gòu)建成invoker,使用invoker來創(chuàng)建代理

執(zhí)行流程

ReferenceBean.getObject()
  -->ReferenceConfig.get()
    -->init()
      -->createProxy(map)
        -->refprotocol.refer(interfaceClass, urls.get(0))
          -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
          -->extension.refer(arg0, arg1);
            -->ProtocolFilterWrapper.refer
              -->RegistryProtocol.refer
                -->registryFactory.getRegistry(url)//建立zk的連接,和服務(wù)端發(fā)布一樣(省略代碼)
                -->doRefer(cluster, registry, type, url)
                  -->registry.register//創(chuàng)建zk的節(jié)點(diǎn),和服務(wù)端發(fā)布一樣(省略代碼)。節(jié)點(diǎn)名為:dubbo/com.alibaba.dubbo.demo.DemoService/consumers
                  -->registry.subscribe//訂閱zk的節(jié)點(diǎn),和服務(wù)端發(fā)布一樣(省略代碼)。   /dubbo/com.alibaba.dubbo.demo.DemoService/providers, 
                                                                        /dubbo/com.alibaba.dubbo.demo.DemoService/configurators,
                                                                         /dubbo/com.alibaba.dubbo.demo.DemoService/routers]
                    -->notify(url, listener, urls);
                      -->FailbackRegistry.notify
                        -->doNotify(url, listener, urls);
                          -->AbstractRegistry.notify
                            -->saveProperties(url);//把服務(wù)端的注冊url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
                              -->registryCacheExecutor.execute(new SaveProperties(version));//采用線程池來處理
                            -->listener.notify(categoryList)
                              -->RegistryDirectory.notify
                                -->refreshInvoker(invokerUrls)//刷新緩存中的invoker列表
                                  -->destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關(guān)閉未使用的Invoker
                                  -->最終目的:刷新Map<String, Invoker<T>> urlInvokerMap 對象
                                                                                                                           刷新Map<String, List<Invoker<T>>> methodInvokerMap對象
                  -->cluster.join(directory)//加入集群路由
                    -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension("failover");
                      -->MockClusterWrapper.join
                        -->this.cluster.join(directory)
                          -->FailoverCluster.join
                            -->return new FailoverClusterInvoker<T>(directory)
                            -->new MockClusterInvoker
        -->proxyFactory.getProxy(invoker)//創(chuàng)建服務(wù)代理
          -->ProxyFactory$Adpative.getProxy
            -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
              -->StubProxyFactoryWrapper.getProxy
                -->proxyFactory.getProxy(invoker)
                  -->AbstractProxyFactory.getProxy
                    -->getProxy(invoker, interfaces)
                      -->Proxy.getProxy(interfaces)//目前代理對象interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService
                      -->InvokerInvocationHandler// 采用jdk自帶的InvocationHandler,創(chuàng)建InvokerInvocationHandler對象。
            

詳細(xì)步驟

  1. FactoryBean.getObject
    spring將調(diào)用getObject方法返回的對象注冊容器中,其中調(diào)用的get方法由父類ReferenceConfig實(shí)現(xiàn)
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
public Object getObject() throws Exception {
        return get();
    }
}
  1. 解析配置屬性
    將標(biāo)簽的配置屬性解析到map中
#com.alibaba.dubbo.config.ReferenceConfig#get
    public synchronized T get() {
        if (destroyed){
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
private void init() {
//將配置屬性解析到map中
......
//創(chuàng)建代理類
        ref = createProxy(map);
}
  1. 從注冊中心獲取provider服務(wù)的地址生成invoker對象并創(chuàng)建代理類
private T createProxy(Map<String, String> map) {
if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
if (urls.size() == 1) {
                //根據(jù)接口獲取遠(yuǎn)端服務(wù)提供者的invoker對象
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個registry url
                    }
                }
                if (registryURL != null) { // 有 注冊中心協(xié)議的URL
                    // 對有注冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 注冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
// 創(chuàng)建服務(wù)代理
        return (T) proxyFactory.getProxy(invoker);
}
  1. 生成invoker
    invoker = refprotocol.refer(interfaceClass, url);
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
      ......
        //獲取registry對象 獲取之后會做緩存key=zookeeper://192.168.99.100:2181/com.alibaba.dubbo.registry.RegistryService
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
        ......
        return doRefer(cluster, registry, type, url);
    }

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        //directory內(nèi)部包含一個registry,同時(shí)實(shí)現(xiàn)了NotifyListener接口訂閱完成后會回其方法notify方法來刷新invoker列表
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
    }
#com.alibaba.dubbo.registry.integration.RegistryDirectory#subscribe
    public void subscribe(URL url) {
        setConsumerUrl(url);
        registry.subscribe(url, this);
    }

在調(diào)用directory.subscribe進(jìn)行訂閱的時(shí)候傳入的listener參數(shù)是RegistryDirectory類的對象,它實(shí)現(xiàn)了NotifyListener接口,在訂閱完成之后會回調(diào)其notify方法,在這個方法中會從urls解析注冊中心的配置,然后重新刷新invoker,而且每次訂閱的節(jié)點(diǎn)屬性變更都會回調(diào)這個方法

public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category) 
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // providers
        //刷新invoker
        refreshInvoker(invokerUrls);
    }
private void refreshInvoker(List<URL> invokerUrls){
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // 禁止訪問
            this.methodInvokerMap = null; // 置空列表
            destroyAllInvokers(); // 關(guān)閉所有Invoker
        } else {
        ......
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉(zhuǎn)成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表
            // state change
            //如果計(jì)算錯誤,則不進(jìn)行處理.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
                return ;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            //刷新invoker
            this.urlInvokerMap = newUrlInvokerMap;
......
    }
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
...
}

最終通過DubboProtocol. refer創(chuàng)建Invoker并加入緩存中

 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker. 引用遠(yuǎn)端服務(wù)
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
  1. invoker包裝
    cluster.join(directory);
    包裝invoker并返回自己的invoker對象,外部調(diào)用invoker.invoke方法的時(shí)候會從directory中獲取invoker列表,用于實(shí)現(xiàn)重試快速失敗操作;這里使用了MockClusterWrapper、FailoverCluster兩種包裝


    image.png
public class MockClusterWrapper implements Cluster {
    private Cluster cluster;
    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
}
  1. 創(chuàng)建代理
    proxyFactory.getProxy(invoker)創(chuàng)建動態(tài)代理的過程就是根據(jù)需要引用的interface使用Javassist進(jìn)行字節(jié)碼操作生成一個代理類,而且其構(gòu)造方法的參數(shù)為InvocationHandler,最后將這個代理類加入spring容器。這樣當(dāng)我們調(diào)用代理類方法的時(shí)候會回調(diào)InvocationHandler.invoke->invoker.invoke方法來,調(diào)用invoker遠(yuǎn)端的實(shí)現(xiàn)。
public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        //(T) Proxy.getProxy(interfaces)動態(tài)拼裝成接口的代理類,并使用Javassist編譯
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}
public class InvokerInvocationHandler implements InvocationHandler {
    private final Invoker<?> invoker;
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
      ......
        //調(diào)用遠(yuǎn)端服務(wù)
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

總結(jié)

  • Directory:目錄服務(wù)
  • StaticDirectory:靜態(tài)目錄服務(wù),他的Invoker是固定的。
  • RegistryDirectory:注冊目錄服務(wù),他的Invoker集合數(shù)據(jù)來源于zk注冊中心的,他實(shí)現(xiàn)了NotifyListener接口,并且實(shí)現(xiàn)回調(diào)notify(List<URL> urls), 整個過程有一個重要的map變量,methodInvokerMap(它是數(shù)據(jù)的來源;同時(shí)也是notify的重要操作對象,重點(diǎn)是寫操作。)


    服務(wù)引用的流程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 上一篇我們介紹了暴露服務(wù),這一篇我們來說引用服務(wù)。首先我們看下應(yīng)用層引用服務(wù)所做的 在使用的地方,聲明了一個Ref...
    數(shù)齊閱讀 3,323評論 0 3
  • 0 準(zhǔn)備 安裝注冊中心:Zookeeper、Dubbox自帶的dubbo-registry-simple;安裝Du...
    七寸知架構(gòu)閱讀 14,106評論 0 88
  • 上周基本都在研究代碼,所以沒有及時(shí)更新服務(wù)的啟動過程,大概看了一遍之后現(xiàn)在開始更新一下服務(wù)的引用過程。 服務(wù)的引用...
    此魚不得水閱讀 1,385評論 0 0
  • dubbo服務(wù)引用初始化過程就是創(chuàng)建服務(wù)動態(tài)代理的過程,與服務(wù)發(fā)布一樣,同樣借助bean初始化完成動態(tài)代理的創(chuàng)建。...
    mikewt閱讀 1,094評論 0 2
  • 今天去了 南京博物館 明孝陵 中山陵 美齡宮 顏真卿碑書 紅樓夢館 充實(shí) 滿足 快樂。
    Leo子熙閱讀 121評論 0 0

友情鏈接更多精彩內(nèi)容