Dubbo源碼解析之服務(wù)引入過程

一、簡(jiǎn)介

在 Dubbo 中,我們可以通過兩種方式引用遠(yuǎn)程服務(wù)。第一種是使用服務(wù)直連的方式引用服務(wù),第二種方式是基于注冊(cè)中心進(jìn)行引用。服務(wù)直連的方式僅適合在調(diào)試或測(cè)試服務(wù)的場(chǎng)景下使用,不適合在線上環(huán)境使用。因此,本文我將重點(diǎn)分析通過注冊(cè)中心引用服務(wù)的過程。從注冊(cè)中心中獲取服務(wù)配置只是服務(wù)引用過程中的一環(huán),除此之外,服務(wù)消費(fèi)者還需要經(jīng)歷 Invoker 創(chuàng)建、代理類創(chuàng)建等步驟。這些步驟,將在后續(xù)章節(jié)中一一進(jìn)行分析。

二、服務(wù)引用原理

Dubbo 服務(wù)引用的時(shí)機(jī)有兩個(gè),第一個(gè)是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet 方法時(shí)引用服務(wù),第二個(gè)是在 ReferenceBean 對(duì)應(yīng)的服務(wù)被注入到其他類中時(shí)引用。這兩個(gè)引用服務(wù)的時(shí)機(jī)區(qū)別在于,第一個(gè)是餓漢式的,第二個(gè)是懶漢式的。默認(rèn)情況下,Dubbo 使用懶漢式引用服務(wù)。如果需要使用餓漢式,可通過配置 dubbo:reference 的 init 屬性開啟。下面我們按照 Dubbo 默認(rèn)配置進(jìn)行分析,整個(gè)分析過程從 ReferenceBean 的 getObject 方法開始。當(dāng)我們的服務(wù)被注入到其他類中時(shí),Spring 會(huì)第一時(shí)間調(diào)用 getObject 方法,并由該方法執(zhí)行服務(wù)引用邏輯。按照慣例,在進(jìn)行具體工作之前,需先進(jìn)行配置檢查與收集工作。接著根據(jù)收集到的信息決定服務(wù)用的方式,有三種,第一種是引用本地 (JVM) 服務(wù),第二是通過直連方式引用遠(yuǎn)程服務(wù),第三是通過注冊(cè)中心引用遠(yuǎn)程服務(wù)。不管是哪種引用方式,最后都會(huì)得到一個(gè) Invoker 實(shí)例。如果有多個(gè)注冊(cè)中心,多個(gè)服務(wù)提供者,這個(gè)時(shí)候會(huì)得到一組 Invoker 實(shí)例,此時(shí)需要通過集群管理類 Cluster 將多個(gè) Invoker 合并成一個(gè)實(shí)例。合并后的 Invoker 實(shí)例已經(jīng)具備調(diào)用本地或遠(yuǎn)程服務(wù)的能力了,但并不能將此實(shí)例暴露給用戶使用,這會(huì)對(duì)用戶業(yè)務(wù)代碼造成侵入。此時(shí)框架還需要通過代理工廠類 (ProxyFactory) 為服務(wù)接口生成代理類,并讓代理類去調(diào)用 Invoker 邏輯。避免了 Dubbo 框架代碼對(duì)業(yè)務(wù)代碼的侵入,同時(shí)也讓框架更容易使用。

以上就是服務(wù)引用的大致原理,下面我們深入到代碼中,詳細(xì)分析服務(wù)引用細(xì)節(jié)。

三、源碼分析

服務(wù)引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實(shí)現(xiàn)了這個(gè)方法。實(shí)現(xiàn)代碼如下:

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    // 檢測(cè) ref 是否為空,為空則通過 init 方法創(chuàng)建
    if (ref == null) {
        // init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類
        init();
    }
    return ref;
}

以上兩個(gè)方法的代碼比較簡(jiǎn)短,并不難理解。這里需要特別說明一下,如果你對(duì) 2.6.4 及以下版本的 getObject 方法進(jìn)行調(diào)試時(shí),會(huì)碰到比較奇怪的的問題。這里假設(shè)你使用 IDEA,且保持了 IDEA 的默認(rèn)配置。當(dāng)你面調(diào)試到 get 方法的if (ref == null)時(shí),你會(huì)發(fā)現(xiàn) ref 不為空,導(dǎo)致你無法進(jìn)入到 init 方法中繼續(xù)調(diào)試。導(dǎo)致這個(gè)現(xiàn)象的原因是 Dubbo 框架本身有一些小問題。該問題已經(jīng)在 pull request #2754 修復(fù)了此問題,并跟隨 2.6.5 版本發(fā)布了。如果你正在學(xué)習(xí) 2.6.4 及以下版本,可通過修改 IDEA 配置規(guī)避這個(gè)問題。首先 IDEA 配置彈窗中搜索 toString,然后取消Enable 'toString' object view勾選。具體如下:

3.1 處理配置

Dubbo 提供了豐富的配置,用于調(diào)整和優(yōu)化框架行為,性能等。Dubbo 在引用或?qū)С龇?wù)時(shí),首先會(huì)對(duì)這些配置進(jìn)行檢查和處理,以保證配置到正確性。配置解析邏輯封裝在 ReferenceConfig 的 init 方法中,下面進(jìn)行分析。

private void init() {
    // 避免重復(fù)初始化
    if (initialized) {
        return;
    }
    initialized = true;
    // 檢測(cè)接口名合法性
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("interface not allow null!");
    }

    // 檢測(cè) consumer 變量是否為空,為空則創(chuàng)建
    checkDefault();
    appendProperties(this);
    if (getGeneric() == null && getConsumer() != null) {
        // 設(shè)置 generic
        setGeneric(getConsumer().getGeneric());
    }

    // 檢測(cè)是否為泛化接口
    if (ProtocolUtils.isGeneric(getGeneric())) {
        interfaceClass = GenericService.class;
    } else {
        try {
            // 加載類
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, methods);
    }
    
    // -------------------------------? 分割線1 ?------------------------------

    // 從系統(tǒng)變量中獲取與接口名對(duì)應(yīng)的屬性值
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
        // 從系統(tǒng)屬性中獲取解析文件路徑
        resolveFile = System.getProperty("dubbo.resolve.file");
        if (resolveFile == null || resolveFile.length() == 0) {
            // 從指定位置加載配置文件
            File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
            if (userResolveFile.exists()) {
                // 獲取文件絕對(duì)路徑
                resolveFile = userResolveFile.getAbsolutePath();
            }
        }
        if (resolveFile != null && resolveFile.length() > 0) {
            Properties properties = new Properties();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(resolveFile));
                // 從文件中加載配置
                properties.load(fis);
            } catch (IOException e) {
                throw new IllegalStateException("Unload ..., cause:...");
            } finally {
                try {
                    if (null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
            // 獲取與接口名對(duì)應(yīng)的配置
            resolve = properties.getProperty(interfaceName);
        }
    }
    if (resolve != null && resolve.length() > 0) {
        // 將 resolve 賦值給 url
        url = resolve;
    }
    
    // -------------------------------? 分割線2 ?------------------------------
    if (consumer != null) {
        if (application == null) {
            // 從 consumer 中獲取 Application 實(shí)例,下同
            application = consumer.getApplication();
        }
        if (module == null) {
            module = consumer.getModule();
        }
        if (registries == null) {
            registries = consumer.getRegistries();
        }
        if (monitor == null) {
            monitor = consumer.getMonitor();
        }
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {
            monitor = module.getMonitor();
        }
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    
    // 檢測(cè) Application 合法性
    checkApplication();
    // 檢測(cè)本地存根配置合法性
    checkStubAndMock(interfaceClass);
    
    // -------------------------------? 分割線3 ?------------------------------
    
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();

    // 添加 side、協(xié)議版本信息、時(shí)間戳和進(jìn)程號(hào)等信息到 map 中
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }

    // 非泛化服務(wù)
    if (!isGeneric()) {
        // 獲取版本
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }

        // 獲取接口方法列表,并添加到 map 中
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對(duì)象的字段信息添加到 map 中
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    
    // -------------------------------? 分割線4 ?------------------------------
    
    String prefix = StringUtils.getServiceKey(map);
    if (methods != null && !methods.isEmpty()) {
        // 遍歷 MethodConfig 列表
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            // 檢測(cè) map 是否包含 methodName.retry
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    // 添加重試次數(shù)配置 methodName.retries
                    map.put(method.getName() + ".retries", "0");
                }
            }
 
            // 添加 MethodConfig 中的“屬性”字段到 attributes
            // 比如 onreturn、onthrow、oninvoke 等
            appendAttributes(attributes, method, prefix + "." + method.getName());
            checkAndConvertImplicitConfig(method, map, attributes);
        }
    }
    
    // -------------------------------? 分割線5 ?------------------------------

    // 獲取服務(wù)消費(fèi)者 ip 地址
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property..." );
    }
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

    // 存儲(chǔ) attributes 到系統(tǒng)上下文中
    StaticContext.getSystemContext().putAll(attributes);

    // 創(chuàng)建代理類
    ref = createProxy(map);

    // 根據(jù)服務(wù)名,ReferenceConfig,代理類構(gòu)建 ConsumerModel,
    // 并將 ConsumerModel 存入到 ApplicationModel 中
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

上面的代碼很長(zhǎng),做的事情比較多。這里根據(jù)代碼邏輯,對(duì)代碼進(jìn)行了分塊,下面我們一起來看一下。

首先是方法開始到分割線1之間的代碼。這段代碼主要用于檢測(cè) ConsumerConfig 實(shí)例是否存在,如不存在則創(chuàng)建一個(gè)新的實(shí)例,然后通過系統(tǒng)變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接著是檢測(cè)泛化配置,并根據(jù)配置設(shè)置 interfaceClass 的值。接著來看分割線1到分割線2之間的邏輯。這段邏輯用于從系統(tǒng)屬性或配置文件中加載與接口名相對(duì)應(yīng)的配置,并將解析結(jié)果賦值給 url 字段。url 字段的作用一般是用于點(diǎn)對(duì)點(diǎn)調(diào)用。繼續(xù)向下看,分割線2和分割線3之間的代碼用于檢測(cè)幾個(gè)核心配置類是否為空,為空則嘗試從其他配置類中獲取。分割線3與分割線4之間的代碼主要用于收集各種配置,并將配置存儲(chǔ)到 map 中。分割線4和分割線5之間的代碼用于處理 MethodConfig 實(shí)例。該實(shí)例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。分割線5到方法結(jié)尾的代碼主要用于解析服務(wù)消費(fèi)者 ip,以及調(diào)用 createProxy 創(chuàng)建代理對(duì)象。關(guān)于該方法的詳細(xì)分析,將會(huì)在接下來的章節(jié)中展開。

3.2 引用服務(wù)

本節(jié)我們要從 createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用于創(chuàng)建代理對(duì)象的。但實(shí)際上并非如此,該方法還會(huì)調(diào)用其他方法構(gòu)建以及合并 Invoker 實(shí)例。具體細(xì)節(jié)如下。

private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    if (isInjvm() == null) {
        // url 配置被指定,則不做本地引用
        if (url != null && url.length() > 0) {
            isJvmRefer = false;
        // 根據(jù) url 的協(xié)議、scope 以及 injvm 等參數(shù)檢測(cè)是否需要本地引用
        // 比如如果用戶顯式配置了 scope=local,此時(shí) isInjvmRefer 返回 true
        } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
            isJvmRefer = true;
        } else {
            isJvmRefer = false;
        }
    } else {
        // 獲取 injvm 配置值
        isJvmRefer = isInjvm().booleanValue();
    }

    // 本地引用
    if (isJvmRefer) {
        // 生成本地引用 URL,協(xié)議為 injvm
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        // 調(diào)用 refer 方法構(gòu)建 InjvmInvoker 實(shí)例
        invoker = refprotocol.refer(interfaceClass, url);
        
    // 遠(yuǎn)程引用
    } else {
        // url 不為空,表明用戶可能想進(jìn)行點(diǎn)對(duì)點(diǎn)調(diào)用
        if (url != null && url.length() > 0) {
            // 當(dāng)需要配置多個(gè) url 時(shí),可用分號(hào)進(jìn)行分割,這里會(huì)進(jìn)行切分
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        // 設(shè)置接口全限定名為 url 路徑
                        url = url.setPath(interfaceName);
                    }
                    
                    // 檢測(cè) url 協(xié)議是否為 registry,若是,表明用戶想使用指定的注冊(cè)中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性),
                        // 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置,比如版本,group,時(shí)間戳等
                        // 最后將合并后的配置設(shè)置為 url 查詢字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            // 加載注冊(cè)中心 url
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }

            // 未配置注冊(cè)中心,拋出異常
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference...");
            }
        }

        // 單個(gè)注冊(cè)中心或服務(wù)提供者(服務(wù)直連,下同)
        if (urls.size() == 1) {
            // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
            
        // 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者,或者兩者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會(huì)在運(yùn)行時(shí)
                // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例,并調(diào)用實(shí)例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 如果注冊(cè)中心鏈接不為空,則將使用 AvailableCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對(duì)多個(gè) Invoker 進(jìn)行合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else {
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true;
    }
    
    // invoker 可用性檢查
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("No provider available for the service...");
    }

    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}

上面代碼很多,不過邏輯比較清晰。首先根據(jù)配置檢查是否為本地調(diào)用,若是,則調(diào)用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實(shí)例。若不是,則讀取直連配置項(xiàng),或注冊(cè)中心 url,并將讀取到的 url 存儲(chǔ)到 urls 中。然后根據(jù) urls 元素?cái)?shù)量進(jìn)行后續(xù)操作。若 urls 元素?cái)?shù)量為1,則直接通過 Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口。若 urls 元素?cái)?shù)量大于1,即存在多個(gè)注冊(cè)中心或服務(wù)直連 url,此時(shí)先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個(gè) Invoker,最后調(diào)用 ProxyFactory 生成代理類。Invoker 的構(gòu)建過程以及代理類的過程比較重要,因此接下來將分兩小節(jié)對(duì)這兩個(gè)過程進(jìn)行分析。

3.2.1 創(chuàng)建 Invoker

Invoker 是 Dubbo 的核心模型,代表一個(gè)可執(zhí)行體。在服務(wù)提供方,Invoker 用于調(diào)用服務(wù)提供類。在服務(wù)消費(fèi)方,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。Invoker 是由 Protocol 實(shí)現(xiàn)類構(gòu)建而來。Protocol 實(shí)現(xiàn)類有很多,本節(jié)會(huì)分析最常用的兩個(gè),分別是 RegistryProtocol 和 DubboProtocol,其他的大家自行分析。下面先來分析 DubboProtocol 的 refer 方法源碼。如下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // 創(chuàng)建 DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

上面方法看起來比較簡(jiǎn)單,不過這里有一個(gè)調(diào)用需要我們注意一下,即 getClients。這個(gè)方法用于獲取客戶端實(shí)例,實(shí)例類型為 ExchangeClient。ExchangeClient 實(shí)際上并不具備通信能力,它需要基于更底層的客戶端實(shí)例進(jìn)行通信。比如 NettyClient、MinaClient 等,默認(rèn)情況下,Dubbo 使用 NettyClient 進(jìn)行通信。接下來,我們簡(jiǎn)單看一下 getClients 方法的邏輯。

private ExchangeClient[] getClients(URL url) {
    // 是否共享連接
    boolean service_share_connect = false;
    // 獲取連接數(shù),默認(rèn)為0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未配置 connections,則共享連接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共享客戶端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例,默認(rèn)情況下,使用共享客戶端實(shí)例。getSharedClient 方法中也會(huì)調(diào)用 initClient 方法,因此下面我們一起看一下這兩個(gè)方法。

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    // 獲取帶有“引用計(jì)數(shù)”功能的 ExchangeClient
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            // 增加引用計(jì)數(shù)
            client.incrementAndGetCount();
            return client;
        } else {
            referenceClientMap.remove(key);
        }
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        if (referenceClientMap.containsKey(key)) {
            return referenceClientMap.get(key);
        }

        // 創(chuàng)建 ExchangeClient 客戶端
        ExchangeClient exchangeClient = initClient(url);
        // 將 ExchangeClient 實(shí)例傳給 ReferenceCountExchangeClient,這里使用了裝飾模式
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        locks.remove(key);
        return client;
    }
}

上面方法先訪問緩存,若緩存未命中,則通過 initClient 方法創(chuàng)建新的 ExchangeClient 實(shí)例,并將該實(shí)例傳給 ReferenceCountExchangeClient 構(gòu)造方法創(chuàng)建一個(gè)帶有引用計(jì)數(shù)功能的 ExchangeClient 實(shí)例。ReferenceCountExchangeClient 內(nèi)部實(shí)現(xiàn)比較簡(jiǎn)單,就不分析了。下面我們?cè)賮砜匆幌?initClient 方法的代碼。

private ExchangeClient initClient(URL url) {

    // 獲取客戶端類型,默認(rèn)為 netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    // 添加編解碼和心跳包參數(shù)到 url 中
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // 檢測(cè)客戶端類型是否存在,不存在則拋出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: ...");
    }

    ExchangeClient client;
    try {
        // 獲取 lazy 配置,并根據(jù)配置值決定創(chuàng)建的客戶端類型
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 創(chuàng)建懶加載 ExchangeClient 實(shí)例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 創(chuàng)建普通 ExchangeClient 實(shí)例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service...");
    }
    return client;
}

initClient 方法首先獲取用戶配置的客戶端類型,默認(rèn)為 netty。然后檢測(cè)用戶配置的客戶端類型是否存在,不存在則拋出異常。最后根據(jù) lazy 配置決定創(chuàng)建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼并不是很復(fù)雜,該類會(huì)在 request 方法被調(diào)用時(shí)通過 Exchangers 的 connect 方法創(chuàng)建 ExchangeClient 客戶端,該類的代碼本節(jié)就不分析了。下面我們分析一下 Exchangers 的 connect 方法。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實(shí)例,默認(rèn)為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

如上,getExchanger 會(huì)通過 SPI 加載 HeaderExchangeClient 實(shí)例,這個(gè)方法比較簡(jiǎn)單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實(shí)現(xiàn)。

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 這里包含了多個(gè)調(diào)用,分別如下:
    // 1. 創(chuàng)建 HeaderExchangeHandler 對(duì)象
    // 2. 創(chuàng)建 DecodeHandler 對(duì)象
    // 3. 通過 Transporters 構(gòu)建 Client 實(shí)例
    // 4. 創(chuàng)建 HeaderExchangeClient 對(duì)象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

這里的調(diào)用比較多,我們這里重點(diǎn)看一下 Transporters 的 connect 方法。如下:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數(shù)量大于1,則創(chuàng)建一個(gè) ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    
    // 獲取 Transporter 自適應(yīng)拓展類,并調(diào)用 connect 方法生成 Client 實(shí)例
    return getTransporter().connect(url, handler);
}

如上,getTransporter 方法返回的是自適應(yīng)拓展類,該類會(huì)在運(yùn)行時(shí)根據(jù)客戶端類型加載指定的 Transporter 實(shí)現(xiàn)類。若用戶未配置客戶端類型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類的 connect 方法。如下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyClient 對(duì)象
    return new NettyClient(url, listener);
}

到這里就不繼續(xù)跟下去了,在往下就是通過 Netty 提供的 API 構(gòu)建 Netty 客戶端了,大家有興趣自己看看。到這里,關(guān)于 DubboProtocol 的 refer 方法就分析完了。接下來,繼續(xù)分析 RegistryProtocol 的 refer 方法邏輯。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 參數(shù)值,并將其設(shè)置為協(xié)議頭
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 獲取注冊(cè)中心實(shí)例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // 將 url 查詢字符串轉(zhuǎn)為 Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    // 獲取 group 配置
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            // 通過 SPI 加載 MergeableCluster 實(shí)例,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    
    // 調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
    return doRefer(cluster, registry, type, url);
}

上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)加載注冊(cè)中心實(shí)例。然后獲取 group 配置,根據(jù) group 配置決定 doRefer 第一個(gè)參數(shù)的類型。這里的重點(diǎn)是 doRefer 方法,如下:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 創(chuàng)建 RegistryDirectory 實(shí)例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設(shè)置注冊(cè)中心和協(xié)議
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服務(wù)消費(fèi)者鏈接
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

    // 注冊(cè)服務(wù)消費(fèi)者,在 consumers 目錄下新節(jié)點(diǎn)
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }

    // 訂閱 providers、configurators、routers 等節(jié)點(diǎn)數(shù)據(jù)
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 一個(gè)注冊(cè)中心可能有多個(gè)服務(wù)提供者,因此這里需要將多個(gè)服務(wù)提供者合并為一個(gè)
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

如上,doRefer 方法創(chuàng)建一個(gè) RegistryDirectory 實(shí)例,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊(cè)中心進(jìn)行注冊(cè)。注冊(cè)完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點(diǎn)下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會(huì)收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息。由于一個(gè)服務(wù)可能部署在多臺(tái)服務(wù)器上,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè),并生成一個(gè) Invoker。關(guān)于 RegistryDirectory 和 Cluster,本文不打算進(jìn)行分析,相關(guān)分析將會(huì)在隨后的文章中展開。

3.2.2 創(chuàng)建代理

Invoker 創(chuàng)建完畢后,接下來要做的事情是為服務(wù)接口生成代理對(duì)象。有了代理對(duì)象,即可進(jìn)行遠(yuǎn)程調(diào)用。代理對(duì)象生成的入口方法為 ProxyFactory 的 getProxy,接下來進(jìn)行分析。

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    // 調(diào)用重載方法
    return getProxy(invoker, false);
}

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Class<?>[] interfaces = null;
    // 獲取接口列表
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        // 切分接口列表
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            // 設(shè)置服務(wù)接口類和 EchoService.class 到 interfaces 中
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                // 加載接口類
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    }

    // 為 http 和 hessian 協(xié)議提供泛化調(diào)用支持,參考 pull request #1827
    if (!invoker.getInterface().equals(GenericService.class) && generic) {
        int len = interfaces.length;
        Class<?>[] temp = interfaces;
        // 創(chuàng)建新的 interfaces 數(shù)組
        interfaces = new Class<?>[len + 1];
        System.arraycopy(temp, 0, interfaces, 0, len);
        // 設(shè)置 GenericService.class 到數(shù)組中
        interfaces[len] = GenericService.class;
    }

    // 調(diào)用重載方法
    return getProxy(invoker, interfaces);
}

public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

如上,上面大段代碼都是用來獲取 interfaces 數(shù)組的,我們繼續(xù)往下看。getProxy(Invoker, Class<?>[]) 這個(gè)方法是一個(gè)抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實(shí)現(xiàn)代碼。

public  T getProxy(Invoker invoker, Class<?>[] interfaces) {
// 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

上面代碼并不多,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創(chuàng)建 InvokerInvocationHandler 對(duì)象,并將該對(duì)象傳給 newInstance 生成 Proxy 實(shí)例。InvokerInvocationHandler 實(shí)現(xiàn)自 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調(diào)用。該類邏輯比較簡(jiǎn)單,這里就不分析了。下面我們重點(diǎn)關(guān)注一下 Proxy 的 getProxy 方法,如下。

public static Proxy getProxy(Class<?>... ics) {
    // 調(diào)用重載方法
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    // 遍歷接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 檢測(cè)類型是否為接口
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");

        Class<?> tmp = null;
        try {
            // 重新加載接口類
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }

        // 檢測(cè)接口是否相同,這里 tmp 有可能為空
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

        // 拼接接口全限定名,分隔符為 ;
        sb.append(itf).append(';');
    }

    // 使用拼接后的接口名作為 key
    String key = sb.toString();

    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            ProxyCacheMap.put(cl, cache);
        }
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 從緩存中獲取 Reference<Proxy> 實(shí)例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }

            // 并發(fā)控制,保證只有一個(gè)線程可以進(jìn)行后續(xù)操作
            if (value == PendingGenerationMarker) {
                try {
                    // 其他線程在此處進(jìn)行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置標(biāo)志位到緩存中,并跳出 while 循環(huán)進(jìn)行后續(xù)操作
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 創(chuàng)建 ClassGenerator 對(duì)象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();

        for (int i = 0; i < ics.length; i++) {
            // 檢測(cè)接口訪問級(jí)別是否為 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 獲取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        // 非 public 級(jí)別的接口必須在同一個(gè)包下,否者拋出異常
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                }
            }
            
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);

            // 遍歷接口方法
            for (Method method : ics[i].getMethods()) {
                // 獲取方法描述,可理解為方法簽名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中,則忽略。考慮這種情況,
                // A 接口和 B 接口中包含一個(gè)完全相同的方法
                if (worked.contains(desc))
                    continue;
                worked.add(desc);

                int ix = methods.size();
                // 獲取方法返回值類型
                Class<?> rt = method.getReturnType();
                // 獲取參數(shù)列表
                Class<?>[] pts = method.getParameterTypes();

                // 生成 Object[] args = new Object[1...N]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                for (int j = 0; j < pts.length; j++)
                    // 生成 args[1...N] = ($w)$1...N;
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                // 生成 InvokerHandler 接口的 invoker 方法調(diào)用語句,如下:
                // Object ret = handler.invoke(this, methods[1...N], args);
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");

                // 返回值不為 void
                if (!Void.TYPE.equals(rt))
                    // 生成返回語句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");

                methods.add(method);
                // 添加方法名、訪問控制符、參數(shù)列表、方法代碼等信息到 ClassGenerator 中 
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
            }
        }

        if (pkg == null)
            pkg = PACKAGE_NAME;

        // 構(gòu)建接口代理類名稱:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");

        // 為接口代理類添加帶有 InvocationHandler 參數(shù)的構(gòu)造方法,比如:
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        //     handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
        // 為接口代理類添加默認(rèn)構(gòu)造方法
        ccp.addDefaultConstructor();
        
        // 生成接口代理類
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // 構(gòu)建 Proxy 子類名稱,比如 Proxy1,Proxy2 等
        String fcn = Proxy.class.getName() + id;
        ccm = ClassGenerator.newInstance(cl);
        ccm.setClassName(fcn);
        ccm.addDefaultConstructor();
        ccm.setSuperClass(Proxy.class);
        // 為 Proxy 的抽象方法 newInstance 生成實(shí)現(xiàn)代碼,形如:
        // public Object newInstance(java.lang.reflect.InvocationHandler h) { 
        //     return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
        // 生成 Proxy 實(shí)現(xiàn)類
        Class<?> pc = ccm.toClass();
        // 通過反射創(chuàng)建 Proxy 實(shí)例
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        if (ccp != null)
            // 釋放資源
            ccp.release();
        if (ccm != null)
            ccm.release();
        synchronized (cache) {
            if (proxy == null)
                cache.remove(key);
            else
                // 寫緩存
                cache.put(key, new WeakReference<Proxy>(proxy));
            // 喚醒其他等待線程
            cache.notifyAll();
        }
    }
    return proxy;
}

上面代碼比較復(fù)雜,我們寫了大量的注釋。大家在閱讀這段代碼時(shí),要搞清楚 ccp 和 ccm 的用途,不然會(huì)被搞暈。ccp 用于為服務(wù)接口生成代理類,比如我們有一個(gè) DemoService 接口,這個(gè)接口代理類就是由 ccp 生成的。ccm 則是用于為 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實(shí)現(xiàn) Proxy 類的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個(gè)接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

好了,到這里代理類生成邏輯就分析完了。整個(gè)過程比較復(fù)雜,大家需要耐心看一下。

四、總結(jié)

本篇文章對(duì)服務(wù)引用的過程進(jìn)行了較為詳盡的分析,還有一些邏輯暫時(shí)沒有分析到,比如 Directory、Cluster。這些接口及實(shí)現(xiàn)類功能比較獨(dú)立,后續(xù)會(huì)單獨(dú)成文進(jìn)行分析。暫時(shí)我們可以先把這些類看成黑盒,只要知道這些類的用途即可。關(guān)于服務(wù)引用過程就分析到這里。

讀者福利:

分享免費(fèi)學(xué)習(xí)資料

針對(duì)于還會(huì)準(zhǔn)備免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)
為什么某些人會(huì)一直比你優(yōu)秀,是因?yàn)樗旧砭秃軆?yōu)秀還一直在持續(xù)努力變得更優(yōu)秀,而你是不是還在滿足于現(xiàn)狀內(nèi)心在竊喜!希望讀到這的您能點(diǎn)個(gè)小贊和關(guān)注下我,以后還會(huì)更新技術(shù)干貨,謝謝您的支持!

資料領(lǐng)取方式:加入粉絲群963944895,私信管理員即可免費(fèi)領(lǐng)取

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容