四 捋代碼--dubbo源碼之服務(wù)引用

服務(wù)發(fā)布分析完了,下面讓我們開(kāi)始服務(wù)消費(fèi)端。首先看下官方的時(shí)序圖,有個(gè)總體印象


服務(wù)引用時(shí)序圖

根據(jù)上一篇服務(wù)發(fā)布的邏輯,先看我們配置文件的使用

    <dubbo:application name="dubbo-client"/>
    <dubbo:registry id="nina-register" address="127.0.0.1" port="2181" protocol="zookeeper"/>
    <!--故意配置兩個(gè)注冊(cè)中心的情況測(cè)試-->
    <dubbo:registry id="nina-register2" address="127.0.0.1" port="2181" protocol="zookeeper"/>
    <dubbo:reference interface="com.alibaba.dubbo.kai.api.TestApi" id="TestApi" protocol="dubbo" check="true"  version="0.0" timeout="10000"/> 
    <!--測(cè)試通過(guò)generic泛化方式實(shí)現(xiàn)調(diào)用-->
    <dubbo:reference interface="com.alibaba.dubbo.kai.api.HelloApi" id="helloApi2" protocol="dubbo" check="true"   version="0.0" timeout="10000" generic="true"/>

根據(jù)spring自定義配置文件的規(guī)則,我們一樣可以找到入口ReferenceBean

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    public void init() {
      ......
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
      ......
    }

}

先看ReferenceBean的定義

public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

發(fā)現(xiàn)其實(shí)現(xiàn)了 FactoryBean和InitializingBean接口,想必和服務(wù)發(fā)布一樣InitializingBean對(duì)應(yīng)的afterPropertiesSet用來(lái)服務(wù)注入,F(xiàn)actoryBean對(duì)應(yīng)的getObject一定是去實(shí)現(xiàn)代理實(shí)現(xiàn)的。首先大家一看看下我最初到代碼分析流水圖(代碼分析到一種方法),有個(gè)總體印象:


圖大請(qǐng)查看原圖

按圖索驥,我們找到afterPropertiesSet中進(jìn)行參數(shù)設(shè)置

    public void afterPropertiesSet() throws Exception {
        if (getConsumer() == null) {
            Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
            if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
                ConsumerConfig consumerConfig = null;
           .......
        }
        if (getApplication() == null
                && (getConsumer() == null || getConsumer().getApplication() == null)) {
          ........
        }
        if (getModule() == null
                && (getConsumer() == null || getConsumer().getModule() == null)) {
         ........
        }
        if ((getRegistries() == null || getRegistries().size() == 0)
                && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().size() == 0)
                && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
        .......
        }
        if (getMonitor() == null
                && (getConsumer() == null || getConsumer().getMonitor() == null)
                && (getApplication() == null || getApplication().getMonitor() == null)) {
         ........
        }
        Boolean b = isInit();
        if (b == null && getConsumer() != null) {
            b = getConsumer().isInit();
        }
        if (b != null && b.booleanValue()) {
            getObject();
        }
    }

發(fā)現(xiàn)實(shí)際入口的確在getObject方法,而此時(shí)b又不會(huì)等于true,所以此時(shí)不會(huì)進(jìn)入次方法,則實(shí)際進(jìn)入getObject的一定是通過(guò)FactoryBean的特性。下面開(kāi)始重點(diǎn)分析怎么獲取到的這個(gè)引用對(duì)象

    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

進(jìn)去init方法

 private void init() {
    //和服務(wù)端對(duì)應(yīng)賦值
     .....
      // 獲取消費(fèi)者全局配置
      checkDefault();
      appendProperties(this);
      if (getGeneric() == null && getConsumer() != null) {
          setGeneric(getConsumer().getGeneric());
      }
      if (ProtocolUtils.isGeneric(getGeneric())) {
          .....
      }
      String resolve = System.getProperty(interfaceName);
      String resolveFile = null;
      if (resolve == null || resolve.length() == 0) {
        .....
      }
      if (consumer != null) {
          ...
      }
      if (module != null) {
        ...
      }
      checkApplication();
      checkStubAndMock(interfaceClass);
       ....
      //attributes通過(guò)系統(tǒng)context進(jìn)行存儲(chǔ).
      StaticContext.getSystemContext().putAll(attributes);
      //重點(diǎn),去創(chuàng)建引用代理
      ref = createProxy(map);
  }

此方法中重點(diǎn)是把相關(guān)參數(shù)加載到paramterMap中(服務(wù)端也又類(lèi)似操作),然后進(jìn)入createProxy核心方法

   private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
         ......

        if (isJvmRefer) {
            //同一jvm引用,不是分析重點(diǎn)
           ....
        } else {
            if (url != null && url.length() > 0) { 
          // 用戶(hù)指定URL,指定的URL可能是對(duì)點(diǎn)對(duì)直連地址,也可能是注冊(cè)中心URL
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { 
          // 通過(guò)注冊(cè)中心配置拼裝URL
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個(gè)registry url
                    }
                }
                if (registryURL != null) { // 有 注冊(cè)中心協(xié)議的URL
                    // 對(duì)有注冊(cè)中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // 不是 注冊(cè)中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

         .......
        // 創(chuàng)建服務(wù)代理
        return (T) proxyFactory.getProxy(invoker);
    }

(略過(guò)tempUrl對(duì)應(yīng)的injvm邏輯,此處是同一jvm內(nèi)引用情況)首先檢查用戶(hù)是否在此引用上單獨(dú)設(shè)置對(duì)端(或在多個(gè)注冊(cè)中心情況下指定哪個(gè)url)url,如果配置則直接用此url,否則加載配置文件所有配置的注冊(cè)中心生成url

 List<URL> us = loadRegistries(false);

此方法在服務(wù)發(fā)布已經(jīng)分析過(guò)一次,只是此時(shí)入?yún)閒alse(服務(wù)端為true),區(qū)別在哪呢?

    protected List<URL> loadRegistries(boolean provider) {
                         ......
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                
        }
        return registryList;

大致意思是當(dāng)服務(wù)端且沒(méi)有禁用register或當(dāng)客戶(hù)端且沒(méi)有禁用訂閱時(shí)添加到注冊(cè)u(píng)rl列表中。
loadRegistries方法會(huì)返回所有<dubbo:registry>配置到對(duì)象,我們現(xiàn)在模擬的是兩個(gè)register的情況,但配置一樣,所以會(huì)返回兩個(gè)一樣但url

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-client&dubbo=2.0.0&pid=19028&registry=zookeeper&timestamp=1540603677848

然后進(jìn)行了一段和服務(wù)端類(lèi)似端操作,將剛才生成的參數(shù)map生成string,存入map中,以refer為key(服務(wù)端key為exporter)。

  if (us != null && us.size() > 0) {
                    for (URL u : us) {
                   ....
                     //監(jiān)控相關(guān)省略,有機(jī)會(huì)分享
                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }

獲取到注冊(cè)中心url后,值大概樣子是:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-client&dubbo=2.0.0&pid=20267&refer=application%3Ddubbo-client%26check%3Dtrue%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.kai.api.TestApi%26methods%3Dgo%26pid%3D20267%26protocol%3Ddubbo%26register.ip%3D192.168.199.130%26revision%3D0.0%26side%3Dconsumer%26timeout%3D10000%26timestamp%3D1540622309609%26version%3D0.0&registry=zookeeper&timestamp=1540622309684

下面到重點(diǎn)是什么呢?當(dāng)然應(yīng)該是從注冊(cè)中心中訂閱到對(duì)應(yīng)服務(wù)


            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個(gè)registry url
                    }
                }
                if (registryURL != null) { // 有 注冊(cè)中心協(xié)議的URL
                    // 對(duì)有注冊(cè)中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // 不是 注冊(cè)中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }

他先判斷了注冊(cè)中心是否只有一個(gè),如果只有一個(gè)則會(huì)直接發(fā)布賦值invoker,如果又多個(gè),會(huì)生成個(gè)invokerList,將每個(gè)注冊(cè)中心訂閱到到url添加進(jìn)去,最后生成一個(gè)新到clusterInvoker(具體是什么后邊會(huì)講)。
那么我們只需要先分析完單個(gè)注冊(cè)中心發(fā)布的過(guò)程,多個(gè)注冊(cè)中心的發(fā)布過(guò)程就只是在上面又套了一層StaticDirectory而已了。
好了,讓我們看每個(gè)url是怎么發(fā)布的:

  • refprotocol.refer(interfaceClass, url)
    這是服務(wù)發(fā)布生成invoker的重點(diǎn)了,首先refprotocol這個(gè)是什么?
//根據(jù)spi機(jī)制,此處適配protocol為ProtocolAdaptive
    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

根據(jù)我們spi的知識(shí),已經(jīng)找到此類(lèi)

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    .......
    public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

      ........
 }

適配邏輯和服務(wù)端一樣,主要還是看url中的protocol,回頭看下url發(fā)現(xiàn)此時(shí)協(xié)議為registry,所以最終會(huì)進(jìn)入RegistryProtocol的refer方法中,

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //還原最初的注冊(cè)中心協(xié)議頭zookeeper
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //從工廠(chǎng)中獲取registry對(duì)象,此處根據(jù)spi機(jī)制肯定獲取到ZookeeperRegistry
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*" 分組邏輯,暫時(shí)不考慮,不影響主流程
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        //核心引用邏輯
        return doRefer(cluster, registry, type, url);
    }

我們看到此時(shí)獲取到了ZookeeperRegistry,再調(diào)用了doRefer(cluster, registry, type, url)方法

  private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  //  創(chuàng)建RegistryDirectory對(duì)象,此為注冊(cè)中心目錄對(duì)象,重點(diǎn)對(duì)象
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
 // 獲取REFER_KEY的所有屬性
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
 //生成自身的消費(fèi)端urlconsumer://192.168.199.130/com.alibaba.dubbo.kai.api.TestApi?application=dubbo-client&check=true&dubbo=2.0.0&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=20496&protocol=dubbo&revision=0.0&side=consumer&timeout=10000&timestamp=1540623417681&version=0.0
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
//注冊(cè)自己本身端訂閱地址 ,供別人可見(jiàn)          registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
  //核心邏輯,訂閱providers、configurators、routers三個(gè)目錄下信息,生成一個(gè)復(fù)合的invoker
 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
    //cluster進(jìn)行join聯(lián)合,根據(jù)cluster適配選擇不同的集群調(diào)用策略
        return cluster.join(directory);
    }

這里提出了引用端重要的一個(gè)對(duì)象RegistryDirectory,顧名思義是注冊(cè)中心的一個(gè)文件目錄,里邊應(yīng)該包含了當(dāng)前注冊(cè)中心的所有內(nèi)容。方法中先是創(chuàng)建了ci對(duì)象,然后將當(dāng)前消費(fèi)者自身注冊(cè)到注冊(cè)中心,以便別人能看到自己。然后重點(diǎn)是通過(guò) directory.subscribe方法去訂閱providers、configurators、routers三個(gè)文件的目錄,看下此方法:

 public void subscribe(URL url) {
        setConsumerUrl(url);
        registry.subscribe(url, this);
    }

落到了 registry.subscribe(url, this)方法上,此方法我們是不是很眼熟?對(duì)了,分析服務(wù)發(fā)布時(shí),也曾分析過(guò)此段代碼,看下當(dāng)時(shí)那張大圖端這塊


服務(wù)端訂閱邏輯

大致意思是,服務(wù)端訂閱了configurtors目錄,此目錄內(nèi)發(fā)生改變時(shí),zookeeper回調(diào)監(jiān)聽(tīng)器端通知方法notify(),然后判斷新端url和舊的url是否一致,不一致則重新發(fā)布服務(wù)。那消費(fèi)端呢?


消費(fèi)端訂閱

我們看到添加監(jiān)聽(tīng)邏輯和服務(wù)端一樣最終都會(huì)落到zookeeperRegistry的doSubscribe(final URL url, final NotifyListener listener) 的方法上(具體跳轉(zhuǎn)流程,可看服務(wù)發(fā)布)
protected void doSubscribe(final URL url, final NotifyListener listener) {
           .............
.                List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
            .........
    }

此處和服務(wù)端不同端是服務(wù)端 toCategoriesPath(url)返回只有
/dubbo/com.alibaba.dubbo.kai.api.TestApi/configurators,而消費(fèi)端有/dubbo/com.alibaba.dubbo.kai.api.TestApi/configurators、/dubbo/com.alibaba.dubbo.kai.api.TestApi/routers和/dubbo/com.alibaba.dubbo.kai.api.TestApi/providers三個(gè)目錄。
創(chuàng)建完訂閱目錄,消費(fèi)端一樣會(huì)調(diào)用監(jiān)聽(tīng)器端notify方法,而此時(shí)端listener為RegistryDirectory

   public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators
        if (configuratorUrls != null && configuratorUrls.size() > 0) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers
        if (routerUrls != null && routerUrls.size() > 0) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // 合并override參數(shù),更該配置時(shí)刷新invoker
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && localConfigurators.size() > 0) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers
        refreshInvoker(invokerUrls);
    }

前邊是對(duì)configurator和router的合并,更新overrideDirectoryUrl,此時(shí)初次引用過(guò)程configurator和router都沒(méi)空,直接調(diào)用refreshInvoker(invokerUrls)

    /**
     * 根據(jù)invokerURL列表轉(zhuǎn)換為invoker列表。轉(zhuǎn)換規(guī)則如下:
     * 1.如果url已經(jīng)被轉(zhuǎn)換為invoker,則不在重新引用,直接從緩存中獲取,注意如果url中任何一個(gè)參數(shù)變更也會(huì)重新引用
     * 2.如果傳入的invoker列表不為空,則表示最新的invoker列表
     * 3.如果傳入的invokerUrl列表是空,則表示只是下發(fā)的override規(guī)則或route規(guī)則,需要重新交叉對(duì)比,決定是否需要重新引用。
     *
     * @param invokerUrls 傳入的參數(shù)不能為null
     */
    private void refreshInvoker(List<URL> invokerUrls) {
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // 禁止訪(fǎng)問(wèn)
            this.methodInvokerMap = null; // 置空列表
            destroyAllInvokers(); // 關(guān)閉所有Invoker
        } else {
            this.forbidden = false; // 允許訪(fǎng)問(wèn)
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便于交叉對(duì)比
            }
            if (invokerUrls.size() == 0) {
                return;
            }
            // 將URL列表轉(zhuǎn)成Invoker列表
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
            // 換方法名映射Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 
            // state change
            //如果計(jì)算錯(cuò)誤,則不進(jìn)行處理.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 關(guān)閉未使用的Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

我們看到主流程重點(diǎn)在

      // 將URL列表轉(zhuǎn)成Invoker列表
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
            // 換方法名映射Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 

這兩個(gè)方法,第一個(gè)將url生成對(duì)應(yīng)都invoker,第二將生成都invoker改成對(duì)應(yīng)目的方法名對(duì)應(yīng)都invoker,看下toInvokers(invokerUrls)生成邏輯

   private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.size() == 0) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            //如果reference端配置了protocol,則只選擇匹配的protocol
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            //對(duì)于空目錄,沒(méi)有配置都直接跳過(guò)
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                        + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            //合并consumer和provider兩面都url生成新都url
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // URL參數(shù)是排序的
            if (keys.contains(key)) { // 重復(fù)URL
                continue;
            }
            keys.add(key);
            // 緩存key為沒(méi)有合并消費(fèi)端參數(shù)的URL,不管消費(fèi)端如何合并參數(shù),如果服務(wù)端URL發(fā)生變化,則重新refer
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // 緩存中沒(méi)有,重新refer
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        //重點(diǎn)生成invoker邏輯
                        //url為合并后地址dubbo://172.18.166.201:20880/com.alibaba.dubbo.kai.api.TestApi?anyhost=true&application=dubbo-client&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=9629&protocol=dubbo&register.ip=172.18.166.201&remote.timestamp=1540454917247&revision=0.0&server=netty4&side=consumer&timeout=10000&timestamp=1540456251008&version=0.0
                        //providerUrl為服務(wù)端地址dubbo://172.18.166.201:20880/com.alibaba.dubbo.kai.api.TestApi?anyhost=true&application=kai-nina-server&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=9346&revision=0.0&server=netty4&side=provider&timestamp=1540454917247&version=0.0
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // 將新的引用放入緩存
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

還是先做了url合并檢查邏輯(畢竟修改和新增都是通過(guò)此方法),當(dāng)緩存中沒(méi)有此invoker時(shí)通過(guò)消費(fèi)端的invokerDelegete委派器生成新的invoker,看其入?yún)⒃俅苏{(diào)用了protocol.refer(serviceType, url)方法進(jìn)行發(fā)布,此時(shí)的url是什么呢?

dubbo://192.168.199.130:20880/com.alibaba.dubbo.kai.api.TestApi?anyhost=true&application=dubbo-client&check=false&client=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=21873&protocol=dubbo&register.ip=192.168.199.130&remote.timestamp=1540603631243&revision=0.0&server=netty4&side=consumer&timeout=10000&timestamp=1540632189708&version=0.0

因?yàn)樯线?mergeUrl(providerUrl)方法,已將服務(wù)端url進(jìn)行合并,改為dubbo協(xié)議頭地址,此時(shí)發(fā)布將會(huì)調(diào)用DubboProtocol的refer方法(是不是和服務(wù)發(fā)布的邏輯類(lèi)似)

   public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

生成了DubboInvoker對(duì)象,想必這就是最終返回的invoker對(duì)象,我們注意到其入?yún)⒂袀€(gè)getClients(url)

    private ExchangeClient[] getClients(URL url) {
        //是否共享連接
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //如果connections不配置,則共享連接,否則每服務(wù)每連接
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }
        //對(duì)服務(wù)端鏈接端封裝
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

看到這個(gè)方法感覺(jué)終于看到曙光了,又是一個(gè)Exchange,顯然是在對(duì)應(yīng)著服務(wù)端端socket服務(wù),生成端客戶(hù)端。此處有個(gè)connections參數(shù),表示dubbo可以配置客戶(hù)端是否所有invoker用同一條netty通道,我們沒(méi)配置端化默認(rèn)就是共享,進(jìn)入getSharedClient(url)方法

   private ExchangeClient getSharedClient(URL url) {
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }
        synchronized (key.intern()) {
            ExchangeClient exchangeClient = initClient(url);
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            return client;
        }
    }

發(fā)現(xiàn)dubbo是用了個(gè)緩存來(lái)保證唯一,緩存有則移除原有,然后重新生成一個(gè),最后還是通過(guò)initClient(url)生成一個(gè)新的客戶(hù)端

    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
        boolean compatible = (version != null && version.startsWith("1.0."));
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        //默認(rèn)開(kāi)啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO存在嚴(yán)重性能問(wèn)題,暫時(shí)不允許使用
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            //設(shè)置連接應(yīng)該是lazy的 
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

初始化client通過(guò) client = Exchangers.connect(url, requestHandler)方法進(jìn)行,Exchangers類(lèi)就是我們上一篇看到的發(fā)布類(lèi),此處服務(wù)引用一樣用的這個(gè)類(lèi)(肯定都是一一對(duì)應(yīng)的)。再看看這個(gè)類(lèi)

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

一樣還是調(diào)用getExchanger(url)獲取Exhanger對(duì)象,不重復(fù)解釋?zhuān)@里獲取到到HeaderExchanger

   public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

生成了一個(gè)HeaderExchangeClient,內(nèi)部一樣包裝了一個(gè)Transporters進(jìn)行管道連接

 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
       if (url == null) {
           throw new IllegalArgumentException("url == null");
       }
       ChannelHandler handler;
       if (handlers == null || handlers.length == 0) {
           handler = new ChannelHandlerAdapter();
       } else if (handlers.length == 1) {
           handler = handlers[0];
       } else {
           handler = new ChannelHandlerDispatcher(handlers);
       }
       return getTransporter().connect(url, handler);
   }

這里包裝了一個(gè)ChannelHandlerDispatcher作為委派器,里邊不過(guò)時(shí)當(dāng)handler有多個(gè)handler循環(huán)進(jìn)行處理,最后一樣getTransporter()肯定獲得當(dāng)還是和服務(wù)對(duì)應(yīng)的NettyTransporter

 public Client connect(URL url, ChannelHandler listener) throws RemotingException {
       return new NettyClient(url, listener);
   }

誒,找到了,終于看到了熟悉的NettyClient,進(jìn)去看一眼


    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())//
                        .addLast("encoder", adapter.getEncoder())//
                        .addLast("handler", nettyClientHandler);
            }
        });
    }

    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try {
            boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);

            if (ret && future.isSuccess()) {
                Channel newChannel = future.channel();
                try {
                    // 關(guān)閉舊的連接
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        NettyClient.this.channel = newChannel;
                    }
                }
            } else if (future.cause() != null) {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
            } else {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            }
        } finally {
            if (!isConnected()) {
                //future.cancel(true);
            }
        }
    }

一樣的套路,構(gòu)造方法調(diào)用父類(lèi),然后兩個(gè)do方法進(jìn)行實(shí)際操作,父類(lèi)方法肯定還是個(gè)模版模式,調(diào)用實(shí)際的do方法,不過(guò)調(diào)用之前和服務(wù)端一樣都是有一個(gè)包裝的操作wrapChannelHandler(url, handler)

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

和服務(wù)端一樣了,最后都生成了
new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)))
這個(gè)和服務(wù)端做功能的一一對(duì)應(yīng)。
兩個(gè)do方法就簡(jiǎn)單了doOpen常規(guī)的Netty客戶(hù)端代碼,doConnect進(jìn)行服務(wù)鏈接獲取通道進(jìn)行緩存。而其處理消息的核心handler還是一樣是通過(guò)DubboProtocol傳過(guò)來(lái)的requestHandler

 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要處理高版本調(diào)用低版本的問(wèn)題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

    .........
    };

所以消費(fèi)端和客戶(hù)端對(duì)消息處理的handler一致,收消息一定最后調(diào)用這個(gè)reply方法,那發(fā)消息呢?
還記得NettyClient的這段代碼嗎?

         Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        NettyClient.this.channel = newChannel;

這里邊有個(gè)NettyChannel每次有廢棄通道時(shí),為什么會(huì)調(diào)用這個(gè)方法呢?原來(lái)dubbo中的netty通道都緩存在此類(lèi)中

final class NettyChannel extends AbstractChannel {

    private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;

channelMap緩存了所有正在使用都channel,并包裝成dubbo內(nèi)部都NettyChannel,在此類(lèi)中對(duì)netty對(duì)channel進(jìn)行了些包裝,主要處理狀態(tài)檢查等等操作。當(dāng)需要發(fā)送消息時(shí),也是從NettyChannel對(duì)靜態(tài)方法getOrAddChannel中獲取到通道進(jìn)行發(fā)送的。

好了,到這里client終于連接上服務(wù)端了?;氐狡瘘c(diǎn),生成了
ExchangeClient exchangeClient = initClient(url)
這個(gè)client對(duì)象,會(huì)被client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap)包裝一層,和服務(wù)端的ExporterChangeableWrapper類(lèi)型,一個(gè)可變的引用包裝。完成后,生成我們的DubboInvoker(還記得吧,可以結(jié)合大圖看),看眼大圖


dubboProtocol生成客戶(hù)端

回到上文的RegistryDirectory訂閱后的notify邏輯


訂閱通知邏輯

通過(guò)此refreshInvoker方法將所有服務(wù)端url轉(zhuǎn)換為invoker對(duì)象,

//轉(zhuǎn)換起點(diǎn)在這
 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);

此時(shí)訂閱的主流程就完成了,當(dāng)然還有一寫(xiě)廢棄以前就invoker等等操作,都是為了更新時(shí)使用的。
流程回到RegistryProtocol的refer中

 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);

directory已經(jīng)包含了此接口對(duì)應(yīng)的所有invoker對(duì)象,又引入了另外一個(gè)概念cluster。
顧名思義,cluster是集群的意思,必然是dubbo對(duì)集群處理的不同實(shí)現(xiàn),看看這個(gè)cluster對(duì)象

    private Cluster cluster;

是個(gè)成員變量,那么一定是通過(guò)spi依賴(lài)注入來(lái)的

public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

看到字節(jié)碼生成的動(dòng)態(tài)類(lèi)默認(rèn)用的是failover擴(kuò)展名,我們找到對(duì)應(yīng)的類(lèi)是public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}

}
調(diào)用其join方法,會(huì)返回一個(gè)對(duì)應(yīng)其策略的FailoverClusterInvoker。
那么知識(shí)點(diǎn)來(lái)了:

  • Failover Cluster(默認(rèn))
    失敗自動(dòng)切換,當(dāng)出現(xiàn)失敗,重試其它服務(wù)器 。通常用于讀操作,但重試會(huì)帶來(lái)更長(zhǎng)延遲??赏ㄟ^(guò) retries="2" 來(lái)設(shè)置重試次數(shù)(不含第一次)。

    重試次數(shù)配置如下:

<dubbo:service retries="2" />
<dubbo:reference retries="2" />
<dubbo:reference>
    <dubbo:method name="sayHello" retries="2" />
</dubbo:reference>
  • Failfast Cluster
    快速失敗,只發(fā)起一次調(diào)用,失敗立即報(bào)錯(cuò)。通常用于非冪等性的寫(xiě)操作,比如新增記錄
  • Failsafe Cluster
    失敗安全,出現(xiàn)異常時(shí),直接忽略。通常用于寫(xiě)入審計(jì)日志等操作。
  • Failback Cluster
    失敗自動(dòng)恢復(fù),后臺(tái)記錄失敗請(qǐng)求,定時(shí)重發(fā)。通常用于消息通知操作。
  • Forking Cluster
    并行調(diào)用多個(gè)服務(wù)器,只要一個(gè)成功即返回。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源??赏ㄟ^(guò) forks="2" 來(lái)設(shè)置最大并行數(shù)。
  • Broadcast Cluster
    廣播調(diào)用所有提供者,逐個(gè)調(diào)用,任意一臺(tái)報(bào)錯(cuò)則報(bào)錯(cuò) [2]。通常用于通知所有提供者更新緩存或日志等本地資源信息
    dubbo一共主要又這六種集群處理機(jī)制(沒(méi)有算mock,和多注冊(cè)中心的AvailableCluster操作),每個(gè)都實(shí)現(xiàn)了一種調(diào)用,通過(guò)父類(lèi)的AbstractClusterInvoker的select和doSelect進(jìn)行負(fù)載均衡算法,然后通過(guò)自己實(shí)現(xiàn)doInvoke方法,每個(gè)ClusterInvoker進(jìn)行不同的容錯(cuò)處理機(jī)制,欲知詳情,請(qǐng)看下章調(diào)用過(guò)程講解。
    生成了clusterInvoker,基本的引用創(chuàng)建invoker過(guò)程就基本完了。
    回到ReferenceConfig的 createProxy(Map<String, String> map)方法中,生成完invoker,如果是多注冊(cè)中心,會(huì)進(jìn)行一次AvailableCluster包裝
  if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個(gè)registry url
                    }
                }
                if (registryURL != null) { // 有 注冊(cè)中心協(xié)議的URL
                    // 對(duì)有注冊(cè)中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // 不是 注冊(cè)中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }

這邏輯就大同小異了,總之最后獲得了clusterinvoker。
再往下

 return (T) proxyFactory.getProxy(invoker);

通過(guò)proxyFactory生成對(duì)應(yīng)invoker的代理類(lèi),看下實(shí)現(xiàn),最后都回到

  public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

生成了一個(gè)動(dòng)態(tài)代理,當(dāng)然此處proxyFactory又兩個(gè)實(shí)現(xiàn)一個(gè)JdkProxyFactory和JavassistProxyFactory。前者使用jdk的動(dòng)態(tài)代碼實(shí)現(xiàn),后者則通過(guò)javasssist的方法先手寫(xiě)一個(gè)的class文件,然后去創(chuàng)建對(duì)象

   public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
     .......
        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try {
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<String>();
            List<Method> methods = new ArrayList<Method>();

            for (int i = 0; i < ics.length; i++) {
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg))
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
                ccp.addInterface(ics[i]);

                for (Method method : ics[i].getMethods()) {
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc))
                        continue;
                    worked.add(desc);

                    int ix = methods.size();
                    Class<?> rt = method.getReturnType();
                    Class<?>[] pts = method.getParameterTypes();

                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++)
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                    if (!Void.TYPE.equals(rt))
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");

                    methods.add(method);
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if (pkg == null)
                pkg = PACKAGE_NAME;

            // create ProxyInstance class.
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy) pc.newInstance();
  ..........
        return proxy;
    }

此處會(huì)生成一個(gè)通過(guò)javassist生成一個(gè)代理類(lèi)繼承Proxy且實(shí)現(xiàn)你的接口,最終調(diào)用時(shí)通過(guò)定義的InvokerInvocationHandler進(jìn)行實(shí)際調(diào)用(和jdk動(dòng)態(tài)代理實(shí)際機(jī)制一樣)。我們看一眼InvokerInvocationHandler類(lèi)

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

內(nèi)容很少,最主要的就是最后通過(guò)剛才生成invoker進(jìn)行最終調(diào)用。
好了,到這里最終的代理對(duì)象終于生成完了,讓我們總體看下類(lèi)圖


dubbo引用模型

(上圖中的HeaderExchangeChannel是在HeaderExchangeClient的一個(gè)屬性,NettyClient將被包裝在此channel中,而DubboProtocol$requestHandler并沒(méi)有在此圖最下面展示,因?yàn)橄M(fèi)端接收消息并沒(méi)有用到,下章會(huì)細(xì)講)
總結(jié)下,dubbo最終生成代理類(lèi),把生成的invoker集合以目錄類(lèi)Directory的形式存入代理類(lèi)的handler中,并通過(guò)cluster層包裝來(lái)實(shí)現(xiàn)集群調(diào)用策略。不過(guò)我們分析過(guò)程中少了一部分 DubboProtocol調(diào)用時(shí)肯定會(huì)經(jīng)過(guò)對(duì)應(yīng)的wrapper類(lèi),此時(shí)會(huì)將invoker進(jìn)行層層包裝形成圖中的filtersInvoker,此處和服務(wù)端一樣,不再贅述。
好了到此,服務(wù)引用分析完了,要想看怎么調(diào)用的,請(qǐng)看下節(jié)吧!

下一篇 ??? dubbo調(diào)用源碼之消費(fèi)端
首頁(yè) ??? dubbo源碼欣賞簡(jiǎn)介

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容