Dubbo源碼分析3之服務(wù)發(fā)布

dubbo-export-mulu.png

1.服務(wù)發(fā)布概述

Dubbo 服務(wù)導(dǎo)出過程始于 Spring 容器發(fā)布刷新事件[dubbo:service --> ServiceBean --> onApplicationEvent(ContextRefreshedEvent event)],在接收到ContextRefreshedEvent 事件后執(zhí)行服務(wù)導(dǎo)出邏輯。整個(gè)邏輯大致可分為三個(gè)部分:

第一部分是前置工作,主要用于檢查參數(shù),組裝 URL;

第二部分是導(dǎo)出服務(wù),包含導(dǎo)出服務(wù)到本地 (JVM),和導(dǎo)出服務(wù)到遠(yuǎn)程兩個(gè)過程;

第三部分是向注冊中心注冊服務(wù),用于服務(wù)發(fā)現(xiàn),包括注冊到zk和訂閱zk。

本文的重點(diǎn)實(shí)在整個(gè)發(fā)布流程,一些細(xì)節(jié)簡單描述省略,比如配置檢查,URL組裝。

2.源碼環(huán)境說明

基于dubbo2.6.4版本,使用官方的dubbo-demo項(xiàng)目,項(xiàng)目結(jié)構(gòu)圖如下:

dubbo-demo.png

修改注冊中心為zookeeper

接口和實(shí)現(xiàn)類代碼:

public interface DemoService {
    String sayHello(String name);
}
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
        return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
    }
}

3.源碼分析

服務(wù)發(fā)布的入口方法是 ServiceBean 的 onApplicationEvent,如下:

代碼塊 ServiceBean #onApplicationEvent

  @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

3.1 服務(wù)發(fā)布前置工作

3.1.1 概述

前置工作主要包含兩個(gè)部分,分別是配置檢查,以及 URL 裝配。在導(dǎo)出服務(wù)之前,Dubbo 需要檢查用戶的配置是否合理,或者為用戶補(bǔ)充缺省配置。配置檢查完成后,接下來需要根據(jù)這些配置組裝 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作為配置載體,所有的拓展點(diǎn)都是通過 URL 獲取配置。

代碼塊 ServiceConfig#doExport

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("Already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;
    // 檢測 interfaceName 是否合法
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("interface not allow null!");
    }
    // 檢測 provider 是否為空,為空則新建一個(gè),并通過系統(tǒng)變量為其初始化
    checkDefault();

    // 下面幾個(gè) if 語句用于檢測 provider、application 等核心配置類對象是否為空,
    // 若為空,則嘗試從其他配置類對象中獲取相應(yīng)的實(shí)例。
    if (provider != null) {
        if (application == null) {
            application = provider.getApplication();
        }
        if (module == null) {
            module = provider.getModule();
        }
        if (registries == null) {...}
        if (monitor == null) {...}
        if (protocols == null) {...}
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {...}
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {...}
    }

    // 檢測 ref 是否為泛化服務(wù)類型
    if (ref instanceof GenericService) {
        // 設(shè)置 interfaceClass 為 GenericService.class
        interfaceClass = GenericService.class;
        if (StringUtils.isEmpty(generic)) {
            // 設(shè)置 generic = "true"
            generic = Boolean.TRUE.toString();
        }
        
    // ref 非 GenericService 類型
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // 對 interfaceClass,以及 <dubbo:method> 標(biāo)簽中的必要字段進(jìn)行檢查
        checkInterfaceAndMethods(interfaceClass, methods);
        // 對 ref 合法性進(jìn)行檢測
        checkRef();
        // 設(shè)置 generic = "false"
        generic = Boolean.FALSE.toString();
    }

    // local 和 stub 在功能應(yīng)該是一致的,用于配置本地存根
    if (local != null) {
        if ("true".equals(local)) {
            local = interfaceName + "Local";
        }
        Class<?> localClass;
        try {
            // 獲取本地存根類
            localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // 檢測本地存根類是否可賦值給接口類,若不可賦值則會(huì)拋出異常,提醒使用者本地存根類類型不合法
        if (!interfaceClass.isAssignableFrom(localClass)) {
            throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
        }
    }

    if (stub != null) {
        // 此處的代碼和上一個(gè) if 分支的代碼基本一致,這里省略
    }

    // 檢測各種對象是否為空,為空則新建,或者拋出異常
    checkApplication();
    checkRegistry();
    checkProtocol();
    appendProperties(this);
    checkStubAndMock(interfaceClass);
    if (path == null || path.length() == 0) {
        path = interfaceName;
    }

    // 導(dǎo)出服務(wù)
    doExportUrls();

    // ProviderModel 表示服務(wù)提供者模型,此對象中存儲(chǔ)了與服務(wù)提供者相關(guān)的信息。
    // 比如服務(wù)的配置信息,服務(wù)實(shí)例等。每個(gè)被導(dǎo)出的服務(wù)對應(yīng)一個(gè) ProviderModel。
    // ApplicationModel 持有所有的 ProviderModel。
    ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
    ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

3.1.2 對配置檢查的邏輯進(jìn)行簡單的總結(jié):

  1. 檢測 <dubbo:service> 標(biāo)簽的 interface 屬性合法性,不合法則拋出異常

  2. 檢測 ProviderConfig、ApplicationConfig 等核心配置類對象是否為空,若為空,則嘗試從其他配置類對象中獲取相應(yīng)的實(shí)例。

  3. 檢測并處理泛化服務(wù)和普通服務(wù)類

  4. 檢測本地存根配置,并進(jìn)行相應(yīng)的處理

  5. 對 ApplicationConfig、RegistryConfig 等配置類進(jìn)行檢測,為空則嘗試創(chuàng)建,若無法創(chuàng)建則拋出異常

3.2 服務(wù)暴露

下面進(jìn)入doExportUrls();方法:

    private void doExportUrls() {
         // 加載注冊中心鏈接
        List<URL> registryURLs = loadRegistries(true);
        // 遍歷 protocols,并在每個(gè)協(xié)議下導(dǎo)出服務(wù)
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

代碼塊:ServiceConfig#doExportUrlsFor1Protocol

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        /***
        代碼有點(diǎn)長,省略組裝url部分的代碼
        配置檢查完畢后,緊接著要做的事情是根據(jù)配置,以及其他一些信息組裝 URL。
        URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個(gè)模塊之間傳遞。
        ***/
        //...
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
        /***此處組裝的url示例:
        dubbo://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.43.174&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8564&qos.port=22222&side=provider&timestamp=1578456375449
        ***/

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        //下面開始要進(jìn)入暴露服務(wù)的代碼了
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //暴露服務(wù)到本地
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    //暴露服務(wù)到遠(yuǎn)程
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

上面代碼根據(jù) url 中的 scope 參數(shù)決定服務(wù)導(dǎo)出方式,分別如下:

  • scope = none,不導(dǎo)出服務(wù),注意這里是none字符串
  • scope != remote,導(dǎo)出到本地
  • scope != local,導(dǎo)出到遠(yuǎn)程

我們示例中到這里socpe=null,所以會(huì)同時(shí)暴露服務(wù)到本地和遠(yuǎn)程

3.2.1 暴露服務(wù)到本地

接下來進(jìn)入ServiceConfig#exportLocal(URL url)方法

private void exportLocal(URL url) {
    // 如果 URL 的協(xié)議頭等于 injvm,說明已經(jīng)導(dǎo)出到本地了,無需再次導(dǎo)出
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
            .setProtocol(Constants.LOCAL_PROTOCOL)    // 設(shè)置協(xié)議頭為 injvm
            .setHost(LOCALHOST)
            .setPort(0);
        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
        // 創(chuàng)建 Invoker,并導(dǎo)出服務(wù),這里的 protocol 會(huì)在運(yùn)行時(shí)調(diào)用 InjvmProtocol 的 export 方法
        Exporter<?> exporter = protocol.export(
            proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
    }
}

到這里看出服務(wù)暴露的結(jié)果是生成了一個(gè)Exporter對象存起來,關(guān)聯(lián)一個(gè)Invoker對象,這兩個(gè)是什么呢?

介紹Invoker和Exporter

Invoker 是實(shí)體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn),也可能一個(gè)集群實(shí)現(xiàn)。

這是官方描述,看起來還是不清楚具體Invoker是做什么的,有什么用?

這樣說,以開頭的DemoService為例,中有一個(gè)sayHello(String s)方法,這個(gè)方法是給其他地方使用的,有可能是本地也可能是遠(yuǎn)程調(diào)用,通過對應(yīng)的Invoker.invoke()方法就可以調(diào)用了。調(diào)用invoker的結(jié)果就是最終調(diào)用DemoService.sayHello()。

public interface Exporter<T> {
    Invoker<T> getInvoker();
    void unexport();
}

通過Exporter可以獲取到Invoker,把緩存起來,后面需要調(diào)用的時(shí)候就可以獲取inoker調(diào)用對應(yīng)的本地或者遠(yuǎn)程方法了。先這么理解就可以了,Invoker具體如何來的就先不分析了

接下來繼續(xù)看這段代碼:

Exporter<?> exporter = protocol.export(
            proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

此處protocol為生產(chǎn)的動(dòng)態(tài)代理類Protocol$Adaptive如下:

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        //執(zhí)行到此處的時(shí)候extName=Injvm
        
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

protocol.export執(zhí)行的時(shí)候先獲取Protocol的擴(kuò)展實(shí)例,在這里是InjvmProtocol,然后調(diào)用InjvmProtocol#export方法(如下)返回了一個(gè)InjvmExporter。

InjvmProtocol#export

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

到這里服務(wù)本地暴露就分析完了。

3.2.2 暴露服務(wù)到遠(yuǎn)程

然后回到ServiceConfig#doExportUrlsFor1Protocol中的這行代碼 Exporter<?> exporter = protocol.export(wrapperInvoker);

這里的wrapperInvoker信息如下:

interface com.alibaba.dubbo.demo.DemoService -> registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.43.174%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16888%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1578470846696&pid=16888&qos.port=22222&registry=zookeeper&timestamp=1578470846603

protocol.export在執(zhí)行的時(shí)候會(huì)根據(jù)protocol擴(kuò)展名獲取具體的實(shí)現(xiàn):

Protocol$Adaptive#export 方法中部分代碼(這個(gè)類在上面以及貼過了)

 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
//在這里extension就是RegistryProtocol了
extension.refer(arg0, arg1);

RegistryProtocol #export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 導(dǎo)出服務(wù)
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // 獲取注冊中心 URL,以 zookeeper 注冊中心為例,得到的示例 URL 如下:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);

    // 根據(jù) URL 加載 Registry 實(shí)現(xiàn)類,比如 ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
    
    // 獲取已注冊的服務(wù)提供者 URL,比如:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    // 獲取 register 參數(shù)
    boolean register = registeredProviderUrl.getParameter("register", true);

    // 向服務(wù)提供者與消費(fèi)者注冊表中注冊服務(wù)提供者
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    // 根據(jù) register 的值決定是否注冊服務(wù)
    if (register) {
        // 向注冊中心注冊服務(wù)
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // 獲取訂閱 URL,比如:
    // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    // 創(chuàng)建監(jiān)聽器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // 創(chuàng)建并返回 DestroyableExporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

上面代碼看起來比較復(fù)雜,主要做如下一些操作:

  1. 調(diào)用 doLocalExport 導(dǎo)出服務(wù)
  2. 向注冊中心注冊服務(wù)
  3. 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
  4. 創(chuàng)建并返回 DestroyableExporter

下面先來分析 doLocalExport 方法的邏輯,如下:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);
    // 訪問緩存
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                // 創(chuàng)建 Invoker 為委托類對象
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                // 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);              
                // 寫緩存
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

接下來,我們把重點(diǎn)放在 Protocol 的 export 方法上。假設(shè)運(yùn)行時(shí)協(xié)議為 dubbo,此處的 protocol 變量會(huì)在運(yùn)行時(shí)加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法。所以,接下來我們目光轉(zhuǎn)移到 DubboProtocol 的 export 方法上,相關(guān)分析如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // 獲取服務(wù)標(biāo)識(shí),理解成服務(wù)坐標(biāo)也行。由服務(wù)組名,服務(wù)名,服務(wù)版本號(hào)以及端口組成。比如:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    // 創(chuàng)建 DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 將 <key, exporter> 鍵值對放入緩存中
    exporterMap.put(key, exporter);

    // 本地存根相關(guān)代碼
    //本地存根是一個(gè)代理對象,一般用于在真正調(diào)用服務(wù)前做一些參數(shù)見檢查之類的
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            // 省略日志打印代碼
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // 啟動(dòng)服務(wù)器
    openServer(url);
    // 優(yōu)化序列化
    optimizeSerialization(url);
    return exporter;
}

重點(diǎn)關(guān)注 DubboExporter 的創(chuàng)建以及 openServer 方法,下面分析 openServer 方法。

private void openServer(URL url) {
    // 獲取 host:port,并將其作為服務(wù)器實(shí)例的 key,用于標(biāo)識(shí)當(dāng)前的服務(wù)器實(shí)例
    String key = url.getAddress();
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        // 訪問緩存
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // 創(chuàng)建服務(wù)器實(shí)例
            serverMap.put(key, createServer(url));
        } else {
            // 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器
            //在同一臺(tái)機(jī)器上(單網(wǎng)卡),同一個(gè)端口上僅允許啟動(dòng)一個(gè)服務(wù)器實(shí)例。若某個(gè)端口上已有服務(wù)器實(shí)例,此時(shí)則調(diào)用 reset 方法重置服務(wù)器的一些配置。
            server.reset(url);
        }
    }
}

接下來分析服務(wù)器實(shí)例的創(chuàng)建過程,如下:

private ExchangeServer createServer(URL url) {
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
    // 添加心跳檢測配置到 url 中
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 獲取 server 參數(shù),默認(rèn)為 netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    // 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    // 添加編碼解碼器參數(shù)
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 創(chuàng)建 ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server...");
    }
                                   
    // 獲取 client 參數(shù),可指定 netty,mina
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 獲取所有的 Transporter 實(shí)現(xiàn)類名稱集合,比如 supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 檢測當(dāng)前 Dubbo 所支持的 Transporter 實(shí)現(xiàn)類名稱列表中,
        // 是否包含 client 所表示的 Transporter,若不包含,則拋出異常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type...");
        }
    }
    return server;
}

繼續(xù)看創(chuàng)建服務(wù)器的部分:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger,默認(rèn)為 HeaderExchanger。
    // 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例
    return getExchanger(url).bind(url, handler);
}

下面看一下 HeaderExchanger 的 bind 方法。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    // 創(chuàng)建 HeaderExchangeServer 實(shí)例,該方法包含了多個(gè)邏輯,分別如下:
    //   1. new HeaderExchangeHandler(handler)
    //   2. new DecodeHandler(new HeaderExchangeHandler(handler))
    //   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關(guān)心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handlers 元素?cái)?shù)量大于1,則創(chuàng)建 ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 獲取自適應(yīng) Transporter 實(shí)例,并調(diào)用實(shí)例方法
    return getTransporter().bind(url, handler);
}

如上,getTransporter() 方法獲取的 Transporter 是在運(yùn)行時(shí)動(dòng)態(tài)創(chuàng)建的,類名為 TransporterAdaptive,也就是自適應(yīng)拓展類。TransporterAdaptive 會(huì)在運(yùn)行時(shí)根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter,默認(rèn)為 NettyTransporter。下面我們繼續(xù)跟下去,這次分析的是 NettyTransporter 的 bind 方法。

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyServer
    return new NettyServer(url, listener);
}
public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // 調(diào)用父類構(gòu)造方法
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
    //doOpen()..
    //doClose()..
    //...
}

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        // 調(diào)用父類構(gòu)造方法,這里就不用跟進(jìn)去了,沒什么復(fù)雜邏輯
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        // 獲取 ip 和端口
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            // 設(shè)置 ip 為 0.0.0.0
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 獲取最大可接受連接數(shù)
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            // 調(diào)用模板方法 doOpen 啟動(dòng)服務(wù)器
            doOpen();
        } catch (Throwable t) {
            throw new RemotingException("Failed to bind ");
        }

        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
    
    protected abstract void doOpen() throws Throwable;

    protected abstract void doClose() throws Throwable;
}

我們重點(diǎn)關(guān)注 doOpen 抽象方法,該方法需要子類實(shí)現(xiàn)

NettyServer#doOpen

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    // 創(chuàng)建 boss 和 worker 線程池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    
    // 創(chuàng)建 ServerBootstrap
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setOption("child.tcpNoDelay", true);
    // 設(shè)置 PipelineFactory
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // 綁定到指定的 ip 和端口上
    channel = bootstrap.bind(getBindAddress());
}

看到這段代碼用過netty的同學(xué)應(yīng)該很熟悉了,其啟動(dòng)netty服務(wù)端。到這里服務(wù)暴露到遠(yuǎn)程就分析完了。

上面涉及到protocol,exchange,transport這幾個(gè)概念,回顧一下:

  • protocol 遠(yuǎn)程調(diào)用層:封裝 RPC 調(diào)用,以 Invocation, Result 為中心,擴(kuò)展接口為 Protocol, Invoker, Exporter
  • exchange 信息交換層:封裝請求響應(yīng)模式,同步轉(zhuǎn)異步,以 Request, Response 為中心,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 網(wǎng)絡(luò)傳輸層:抽象 mina 和 netty 為統(tǒng)一接口,以 Message 為中心,擴(kuò)展接口為 Channel, Transporter, Client, Server, Codec

3.3 服務(wù)注冊

回到RegistryProtocol#export 方法上

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // ${導(dǎo)出服務(wù)}
    // 省略其他代碼
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // 注冊服務(wù)
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
    
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 訂閱 override 數(shù)據(jù)
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    // 省略部分代碼
}

RegistryProtocol 的 export 方法包含了服務(wù)導(dǎo)出,注冊,以及數(shù)據(jù)訂閱等邏輯。其中服務(wù)導(dǎo)出邏輯上一節(jié)已經(jīng)分析過了,本節(jié)將分析服務(wù)注冊邏輯,相關(guān)代碼如下:

public void register(URL registryUrl, URL registedProviderUrl) {
    // 獲取 Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注冊服務(wù)
    registry.register(registedProviderUrl);
}

register 方法包含兩步操作,第一步是獲取注冊中心實(shí)例,第二步是向注冊中心注冊服務(wù)。

3.3.1 創(chuàng)建注冊中心

文章開頭已經(jīng)說了,本文使用的注冊中心是 Zookeeper

AbstractRegistryFactory #getRegistry

public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();
    LOCK.lock();
    try {
        // 訪問緩存
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }  
        // 緩存未命中,創(chuàng)建 Registry 實(shí)例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry...");
        }
        // 寫入緩存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        LOCK.unlock();
    }
}

protected abstract Registry createRegistry(URL url);

如上,getRegistry 方法先訪問緩存,緩存未命中則調(diào)用 createRegistry 創(chuàng)建 Registry,然后寫入緩存。這里的 createRegistry 是一個(gè)模板方法,由具體的子類實(shí)現(xiàn)。

ZookeeperRegistryFactory #AbstractRegistryFactory

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    // zookeeperTransporter 由 SPI 在運(yùn)行時(shí)注入,類型為 ZookeeperTransporter$Adaptive
    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        // 創(chuàng)建 ZookeeperRegistry
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    
    // 獲取組名,默認(rèn)為 dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        // group = "/" + group
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter
    //在2.5.x版本默認(rèn)的是ZkclientZookeeperClient,
    //在2.6.4默認(rèn)的CuratorZookeeperClient
    //在2.7.x版本已經(jīng)移除Zkclient,若要使用需要自己擴(kuò)展
    zkClient = zookeeperTransporter.connect(url);
    // 添加狀態(tài)監(jiān)聽器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

在上面的代碼代碼中,我們重點(diǎn)關(guān)注 ZookeeperTransporter 的 connect 方法調(diào)用,這個(gè)方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端,意味著注冊中心的創(chuàng)建過程就結(jié)束了。接下來,再來分析一下 Zookeeper 客戶端的創(chuàng)建過程。

前面說過,這里的 zookeeperTransporter 類型為自適應(yīng)拓展類,因此 connect 方法會(huì)在被調(diào)用時(shí)決定加載什么類型的 ZookeeperTransporter 拓展,默認(rèn)為 CuratorZookeeperTransporter。下面我們到 CuratorZookeeperTransporter 中看一看。

public ZookeeperClient connect(URL url) {
    // 創(chuàng)建 CuratorZookeeperClient
    return new CuratorZookeeperClient(url);
}
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {

    private final CuratorFramework client;
    
    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            // 創(chuàng)建 CuratorFramework 構(gòu)造器
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(5000);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            // 構(gòu)建 CuratorFramework 實(shí)例
            client = builder.build();
            // 添加監(jiān)聽器
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            
            // 啟動(dòng)客戶端
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

再順便看下ZkclientZookeeperClient

public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {

    private final ZkClientWrapper client;

    private volatile KeeperState state = KeeperState.SyncConnected;

    public ZkclientZookeeperClient(URL url) {
        super(url);
        client = new ZkClientWrapper(url.getBackupAddress(), 30000);
        client.addListener(new IZkStateListener() {
            @Override
            public void handleStateChanged(KeeperState state) throws Exception {
                ZkclientZookeeperClient.this.state = state;
                if (state == KeeperState.Disconnected) {
                    stateChanged(StateListener.DISCONNECTED);
                } else if (state == KeeperState.SyncConnected) {
                    stateChanged(StateListener.CONNECTED);
                }
            }

            @Override
            public void handleNewSession() throws Exception {
                stateChanged(StateListener.RECONNECTED);
            }
        });
        client.start();
    }

}

過程類似,都是創(chuàng)建客戶端,然后增加一個(gè)監(jiān)聽器。

到這里注冊中心實(shí)例創(chuàng)建好了,接下來要做的事情是向注冊中心注冊服務(wù)。

3.3.2 服務(wù)注冊

以 Zookeeper 為例,所謂的服務(wù)注冊,本質(zhì)上是將服務(wù)配置數(shù)據(jù)寫入到 Zookeeper 的某個(gè)路徑的節(jié)點(diǎn)下。

Zookeeper 可視化客戶端 ZooInspector 查看節(jié)點(diǎn)數(shù)據(jù)如下:

dubbo-zk.png

圖中可以看到 com.alibaba.dubbo.demo.DemoService 這個(gè)服務(wù)對應(yīng)的配置信息(存儲(chǔ)在 URL 中)最終被注冊到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節(jié)點(diǎn)下。

附一張dubbo注冊到zookeper的節(jié)點(diǎn)層次說明圖:

image

像注冊中心注冊的代碼在RegistryProtocol#register(registryUrl, registeredProviderUrl)

public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

FailbackRegistry#register(URL url)

public void register(URL url) {
    //需要注冊的url:  dubbo://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16024&side=provider&timestamp=1578478503772
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 模板方法,由子類實(shí)現(xiàn)
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 獲取 check 參數(shù),若 check = true 將會(huì)直接拋出異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register");
        } else {
            logger.error("Failed to register");
        }

        // 記錄注冊失敗的鏈接
        failedRegistered.add(url);
    }
}

protected abstract void doRegister(URL url);

doRegister 方法是一個(gè)模板方法,因此我們到 FailbackRegistry 子類 ZookeeperRegistry 中進(jìn)行分析。如下:

protected void doRegister(URL url) {
    try {
        // 通過 Zookeeper 客戶端創(chuàng)建節(jié)點(diǎn),節(jié)點(diǎn)路徑由 toUrlPath 方法生成,路徑格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register...");
    }
}

如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點(diǎn)。節(jié)點(diǎn)路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,如下:

public void create(String path, boolean ephemeral) {
    //path:
  ///dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16024%26side%3Dprovider%26timestamp%3D1578478503772
    if (!ephemeral) {
        // 如果要?jiǎng)?chuàng)建的節(jié)點(diǎn)類型非臨時(shí)節(jié)點(diǎn),那么這里要檢測節(jié)點(diǎn)是否存在
        if (checkExists(path)) {
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 遞歸創(chuàng)建上一級(jí)路徑
        create(path.substring(0, i), false);
    }
    
    // 根據(jù) ephemeral 的值創(chuàng)建臨時(shí)或持久節(jié)點(diǎn)
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
    }
}

經(jīng)過這段代碼會(huì)創(chuàng)建這些節(jié)點(diǎn):

持久節(jié)點(diǎn) /dubbo

持久節(jié)點(diǎn) /com.alibaba.dubbo.demo.DemoService

持久節(jié)點(diǎn) /providers

臨時(shí)節(jié)點(diǎn)

/dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16024%26side%3Dprovider%26timestamp%3D1578478503772

對于上面的樹型結(jié)構(gòu)數(shù)據(jù)

3.3.2 訂閱override 數(shù)據(jù)

又得回到RegistryProtocol#export方法,再貼一次

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 導(dǎo)出服務(wù)
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // 獲取注冊中心 URL,以 zookeeper 注冊中心為例,得到的示例 URL 如下:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);

    // 根據(jù) URL 加載 Registry 實(shí)現(xiàn)類,比如 ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
    
    // 獲取已注冊的服務(wù)提供者 URL,比如:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    // 獲取 register 參數(shù)
    boolean register = registeredProviderUrl.getParameter("register", true);

    // 向服務(wù)提供者與消費(fèi)者注冊表中注冊服務(wù)提供者
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    // 根據(jù) register 的值決定是否注冊服務(wù)
    if (register) {
        // 向注冊中心注冊服務(wù)
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // 獲取訂閱 URL,比如:
    // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    //表示訂閱的是服務(wù)提供者provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService的configurators節(jié)點(diǎn)的信息
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    // 創(chuàng)建監(jiān)聽器
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    // 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // 創(chuàng)建并返回 DestroyableExporter
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

關(guān)注: registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

FailbackRegistry#subscribe

public void subscribe(URL url, NotifyListener listener) {
    //url示例
    //provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=17976&side=provider&timestamp=1578479464018
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && !urls.isEmpty()) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

關(guān)注doSubscribe(url, listener);方法

ZookeeperRegistry#doSubscribe(url, listener)

protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    //省略...
            } else {
                List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                    //toCategoriesPath(url)解析出要訂閱的節(jié)點(diǎn)路徑
                    //path:/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
                    
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        //添加監(jiān)聽器,如果有變化調(diào)用notify(url, listener, urls)
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //創(chuàng)建持久節(jié)點(diǎn) /dubbo/com.alibaba.dubbo.demo.DemoService/configurators
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                       //toUrlsWithEmpty(url, path, children) 這個(gè)方法url的協(xié)議頭由provider替換為了empty       
                       //獲得provider中,和consumer匹配的url數(shù)組
                        //若不存在則創(chuàng)建 empty://的url返回,可以處理類似服務(wù)提供者為空的情況
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                         //此時(shí)url為:empty://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1384&side=provider&timestamp=1578532572533
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

notify(url, listener, urls)方法會(huì)調(diào)用AbstractRegistry# notify(URL url, NotifyListener listener, List<URL> urls)方法,如下:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            /***
            表示服務(wù)提供者存入本地緩存文件key=com.alibaba.dubbo.demo.DemoService
            value=
            provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider&timestamp=1578536147384
           ***/
            saveProperties(url);
            //調(diào)用RegistryProtocol中的OverrideListener#notify(List<URL> urls)方法
            listener.notify(categoryList);
        }
    }

OverrideListener#notify(List<URL> urls)方法

@Override
        public synchronized void notify(List<URL> urls) {
            //urls這里只有一條數(shù)據(jù)empty://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider&timestamp=1578536147384
            logger.debug("original override urls: " + urls);
            //subscribeUrl
            //provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider&timestamp=1578536147384
            //獲取匹配的url
            List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
            logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
            // No matching results
            if (matchedUrls.isEmpty()) {
                return;
            }
//提取出變化的配置
            List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);

            final Invoker<?> invoker;
            if (originInvoker instanceof InvokerDelegete) {
                invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
            } else {
                invoker = originInvoker;
            }
            //The origin invoker
            URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<?> exporter = bounds.get(key);
            if (exporter == null) {
                logger.warn(new IllegalStateException("error state, exporter should not be null"));
                return;
            }
            //The current, may have been merged many times
            URL currentUrl = exporter.getInvoker().getUrl();
            //Merged with this configuration
            //根據(jù)變化的配置信息組裝新的url
            URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
            if (!currentUrl.equals(newUrl)) {
                //如果新的url和原來的不一樣,則重新導(dǎo)出服務(wù)
                RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
                logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
            }
        }

到這里訂閱override數(shù)據(jù)的部分也分析完了

3.3.4 小結(jié)

到這里服務(wù)注冊的過程分析完了,分為兩個(gè)部分:先創(chuàng)建注冊中心實(shí)例,之后再通過注冊中心實(shí)例注冊服務(wù),然后訂閱配置信息變化。

4.總結(jié)

服務(wù)發(fā)布整個(gè)流程講完了,總結(jié)下主要由以下一個(gè)步驟:

  1. 前置工作:檢查參數(shù)組裝URl
  2. 暴露服務(wù)到本地
  3. 暴露服務(wù)到遠(yuǎn)程
  4. 啟動(dòng)netty暴露服務(wù)
  5. 創(chuàng)建連接zk注冊中心
  6. 服務(wù)注冊到zk
  7. 到zk訂閱override數(shù)據(jù)

原文地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容