Dubbo 服務(wù)導(dǎo)出

1. 前言

image.png

Dubbo服務(wù)啟動過程中伴隨著服務(wù)注冊的過程,也就是服務(wù)導(dǎo)出。本篇文章主要是記錄一下Dubbo的服務(wù)導(dǎo)出過程。Dubbo服務(wù)導(dǎo)出開始于Spring容器發(fā)布刷新事件。注:本篇文章選取的源代碼版本是2.7.6。

2. 源碼分析

DubboBootstrapApplicationListener#onApplicationContextEvent

    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }
    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }

這段代碼的主要邏輯是在Spring容器啟動或者刷新執(zhí)行Dubbo初始化。

  • Spring容器啟動或者刷新發(fā)布ContextRefreshedEvent
  • Dubbo通過監(jiān)聽該事件執(zhí)行DubboBootstrap#start方法

DubboBootstrap#start

    /**
     * Start the bootstrap
     */
    public DubboBootstrap start() {
        if (started.compareAndSet(false, true)) {
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            // 1. export Dubbo Services
            exportServices();

            // Not only provider register
            if (!isOnlyRegisterProvider() || hasExportedServices()) {
                // 2. export MetadataService
                exportMetadataService();
                //3. Register the local ServiceInstance if required
                registerServiceInstance();
            }

            referServices();

            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }
    
    private void exportServices() {
        configManager.getServices().forEach(sc -> {
            // TODO, compatible with ServiceConfig.export()
            ServiceConfig serviceConfig = (ServiceConfig) sc;
            serviceConfig.setBootstrap(this);

            if (exportAsync) {
                ExecutorService executor = executorRepository.getServiceExporterExecutor();
                Future<?> future = executor.submit(() -> {
                    sc.export();
                });
                asyncExportingFutures.add(future);
            } else {
                sc.export();
                exportedServices.add(sc);
            }
        });
    }

這個方法主要是做一些初始化,我們重點關(guān)注exportServices這個服務(wù)導(dǎo)出的方法。該方法的邏輯也很簡單,主要是根據(jù)開關(guān)選擇同步導(dǎo)出還是異步導(dǎo)出,最終核心方法都指向了ServiceConfig#export

ServiceConfig#export

public synchronized void export() {
        if (!shouldExport()) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        // 檢查配置
        checkAndUpdateSubConfigs();

        //init serviceMetadata
        serviceMetadata.setVersion(version);
        serviceMetadata.setGroup(group);
        serviceMetadata.setDefaultGroup(group);
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());

        // 判斷是否需要延遲暴露服務(wù)
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 執(zhí)行服務(wù)導(dǎo)出
            doExport();
        }
        // 服務(wù)導(dǎo)出成功業(yè)務(wù)邏輯處理
        // 分發(fā)服務(wù)成功導(dǎo)出事件
        exported();
    }

主要業(yè)務(wù)邏輯:

  • 檢查配置,將未填寫的配置填充默認(rèn)值
  • 初始化Service的原數(shù)據(jù)
  • 根據(jù)參數(shù)判斷是否需要延遲暴露服務(wù)
  • 執(zhí)行服務(wù)導(dǎo)出
  • 服務(wù)導(dǎo)出后的業(yè)務(wù)邏輯處理,主要是分發(fā)事件

ServiceConfig#doExport

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        // 導(dǎo)出服務(wù)
        doExportUrls();
    }

該方法沒有多少邏輯,主要是判斷是否需要導(dǎo)出服務(wù)。<dubbo:provider>提供了參數(shù)可以取消導(dǎo)出服務(wù)用于本地調(diào)試。

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" export="false"/>

ServiceConfig#doExportUrls

private void doExportUrls() {
        // 將當(dāng)前的service添加到ServiceRepository
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );

        // 加載注冊中心鏈接
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        // 遍歷 protocols,并在每個協(xié)議下導(dǎo)出服務(wù)
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            // TODO, uncomment this line once service key is unified
            serviceMetadata.setServiceKey(pathKey);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

主要邏輯:

  • 將當(dāng)前的Service信息添加至ServiceRepository,(ApplicationModel保存著服務(wù)提供者和調(diào)用者的基本信息)
  • 加載注冊中心的鏈接(根據(jù)用戶配置將注冊中心的地址轉(zhuǎn)化為URL)
  • 遍歷protocols,并在每個寫一下導(dǎo)出服務(wù)

ServiceConfig#doExportUrlsFor1Protocol

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        

        Map<String, String> map = new HashMap<String, String>();
        // 省略代碼 主要是將ProtocolConfig中的信息添加至Map中,用于構(gòu)造URL
        
        // export service
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map);
        // 構(gòu)造URL
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
        // 省略代碼
        }

該方法主要是用于構(gòu)造協(xié)議的URL,主要邏是將一些信息及配置對象字段放在Map中,Map中的內(nèi)容將傳遞給URL對象。(上面代碼中為快速理解整體流程,省略了具體的參數(shù)獲取源碼)

URL對象內(nèi)容大致如下

dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=127.0.0.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=3983&qos.port=22222&release=&side=provider&timestamp=1630134753068

構(gòu)造好URL對象,接下來開始服務(wù)導(dǎo)出相關(guān)代碼

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                    // 不存在注冊中心則僅導(dǎo)出本地服務(wù)
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
        this.urls.add(url);

}

這段代碼的主要邏輯如下:

  • 獲取scope參數(shù)
  • scope參數(shù)如果為none則不進(jìn)行導(dǎo)出
  • scope != remote 導(dǎo)出到本地
  • scopre != local 導(dǎo)出到遠(yuǎn)程

接下來看一下導(dǎo)出服務(wù)到本地的方法

private void exportLocal(URL url) {
        // 構(gòu)建URL,協(xié)議頭為injvm
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        // 創(chuàng)建Invoker 并調(diào)用InjvmProtocol.export方法
        Exporter<?> exporter = PROTOCOL.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

主要邏輯:

  • 構(gòu)造injvm協(xié)議頭的URL
  • 創(chuàng)建 Invoker并調(diào)用InjvmProtocol.export方法

接下來看一下InjvmProtocol.export方法

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 創(chuàng)建InjvmExporter
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

這邊邏輯較為簡單,創(chuàng)建InjvmExporter,并將該Exporter添加至Map中,key為服務(wù)名

接下來看一下服務(wù)導(dǎo)出到遠(yuǎn)程的方法,這邊我們直接看RegistryProtocol#export方法

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 獲取注冊中心的URL 示例如下
        // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F127.0.0.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D127.0.0.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26metadata-type%3Dremote%26methods%3DsayHello%2CsayHelloAsync%26pid%3D4292%26qos.port%3D22222%26release%3D%26side%3Dprovider%26timestamp%3D1630136961051&metadata-type=remote&pid=4292&qos.port=22222&timestamp=1630136958853
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        // 獲取服務(wù)提供者的URL
        // dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=127.0.0.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=4292&qos.port=22222&release=&side=provider&timestamp=1630136961051
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        // 獲取訂閱URL
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        // 創(chuàng)建監(jiān)聽器
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        // 導(dǎo)出服務(wù)
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        // decide if we need to delay publish
        // 判斷是否要注冊服務(wù)
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 服務(wù)注冊
            register(registryUrl, registeredProviderUrl);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

主要業(yè)務(wù)邏輯:

  • 獲取注冊中心URL
  • 獲取服務(wù)提供者的URL
  • 獲取訂閱URL
  • 創(chuàng)建訂閱監(jiān)聽器
  • 導(dǎo)出服務(wù)
  • 判斷是否需要服務(wù)注冊,如果需要則進(jìn)行服務(wù)注冊

該方法中主要有兩個導(dǎo)出服務(wù)(org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport)和注冊服務(wù)(org.apache.dubbo.registry.integration.RegistryProtocol#register)

先看一下導(dǎo)出服務(wù)org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        // 創(chuàng)建一個Exporter對象
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            // 通過Dubbo協(xié)議進(jìn)行導(dǎo)出得到一個exporter對象
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 獲取URL
        URL url = invoker.getUrl();

        // export service. key=org.apache.dubbo.demo.DemoService:20880
        String key = serviceKey(url);
        // 創(chuàng)建exporter存入緩存中
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }
        // 創(chuàng)建監(jiān)聽服務(wù)器 默認(rèn)是NettyServer
        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

主要邏輯:

  • 獲取URL創(chuàng)建Exporter對象
  • 將export對象存入緩存
  • 首次導(dǎo)出創(chuàng)建監(jiān)聽服務(wù)器

接下來看一下服務(wù)怎么注冊到注冊中心上(Zookeeper)

org.apache.dubbo.registry.integration.RegistryProtocol#register

public void register(URL registryUrl, URL registeredProviderUrl) {
        // 獲取Registry實例
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);

        ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
        model.addStatedUrl(new ProviderModel.RegisterStatedURL(
                registeredProviderUrl,
                registryUrl,
                true
        ));
    }

最終會調(diào)用到ZookeeperRegistry的doRegister方法,在Zookeeper中創(chuàng)建節(jié)點。

@Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

最終我們連上Zookeeper查看

image.png

3. 總結(jié)

最終用一張圖來過一下Dubbo服務(wù)導(dǎo)出的流程圖

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容