Dubbo2.7源碼分析-如何發(fā)布服務(wù)

Dubbo的服務(wù)發(fā)布邏輯是比較復(fù)雜的,我還是以Dubbo自帶的示例講解,這樣更方便和容易理解。

Provider配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
 
    <!-- 提供方應(yīng)用信息,用于計(jì)算依賴關(guān)系 -->
    <dubbo:application name="hello-world-app"  />
 
    <!-- 使用multicast廣播注冊(cè)中心暴露服務(wù)地址 -->
    <dubbo:registry address="multicast://224.5.6.7:1234" />
 
    <!-- 用dubbo協(xié)議在20880端口暴露服務(wù) -->
    <dubbo:protocol name="dubbo" port="20880" />
 
    <!-- 聲明需要暴露的服務(wù)接口 -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />
 
    <!-- 和本地bean一樣實(shí)現(xiàn)服務(wù) -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />
</beans>

ApplicationContext

ClassPathXmlApplicationContext父類AbstractApplicationContext的方法refresh()在實(shí)例化bean之后的最后一步finishRefresh()中,此方法作用是發(fā)布相應(yīng)的事件。

    protected void finishRefresh() {
        //省略LifeCycleProcessor刷新代碼

        // Publish the final event.
        publishEvent(new ContextRefreshedEvent(this));

        // 省略注冊(cè)到 LiveBeansView MBean代碼
    }

可以看到發(fā)布了一個(gè)ContextRefreshedEvent事件。

    protected void publishEvent(Object event, ResolvableType eventType) {
        //省略部分代碼
      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
            //省略部分代碼

首先獲取ApplicationEvent事件廣播對(duì)象,然后廣播事件。

ApplicationEvent事件廣播對(duì)象默認(rèn)是SimpleApplicationEventMulticaster,這個(gè)對(duì)象是在AbstractApplicationContext的方法initApplicationEventMulticaster()初始化的,如果需要自定義,可以實(shí)現(xiàn)接口ApplicationEventMulticaster,并將bean的名字命名為applicationEventMulticaster。

接下來看看SimpleApplicationEventMulticaster類的multicastEvent方法。

    @Override
    public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
                //事件類型
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
               
               //applicationListener
        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
                       //異常執(zhí)行
            Executor executor = getTaskExecutor();
            if (executor != null) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        invokeListener(listener, event);
                    }
                });
            }
            else {
                invokeListener(listener, event);
            }
        }
    }

可以看到此方法會(huì)調(diào)用applicationListener的方法,對(duì)于Dubbo而言,就是ServiceBean.

怎么樣獲取到ServiceBean的呢?

ServiceBean實(shí)現(xiàn)了好幾個(gè)接口,其中有兩個(gè)接口ApplicationContextAwareApplicationListener<ContextRefreshedEvent>,其中ApplicationContextAware使ServiceBean具有獲取ApplicationContext的能力(了解bean的生命周期),而ApplicationListener使ServiceBean具有響應(yīng)事件響應(yīng)的能力。dubbo實(shí)現(xiàn)ApplicationContextAware的目的是通過反射把自己添加到ApplicationContext的ApplicationListener列表中,即使不實(shí)現(xiàn)ApplicationContextAware接口,spring也會(huì)將實(shí)現(xiàn)了ApplicationListener接口的bean添加到其listener列表中的,dubbo這樣做估計(jì)是向后兼容。

接著看invokeListener(listener, event);方法

protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
        ErrorHandler errorHandler = getErrorHandler();
        if (errorHandler != null) {
            try {
                doInvokeListener(listener, event);
            }
            catch (Throwable err) {
                errorHandler.handleError(err);
            }
        }
        else {
            doInvokeListener(listener, event);
        }
    }

    private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
        try {
            listener.onApplicationEvent(event);
        }
        catch (ClassCastException ex) {
            //省略異常處理
        }
    }

invokeListener方法內(nèi)部調(diào)用了doInvokeListener方法,而doInvokeListener方法調(diào)用了listener(ServiceBean)的onApplicationEvent方法.

ServiceBean

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

onApplicationEvent方法調(diào)用了export方法,export方法首先判斷是否已經(jīng)發(fā)布了服務(wù),發(fā)布了則直接返回,沒有發(fā)布則會(huì)判斷是否需要延遲發(fā)布,如果需要延遲,則將發(fā)布服務(wù)做為一個(gè)任務(wù)添加到ScheduledThreadPoolExecutor線程池中,如果不延遲,則調(diào)用doExport方法立即發(fā)布服務(wù)。
doExport方法中會(huì)獲取application/registries/monitor/module/protocols,并做一些檢查和屬性填充,然后調(diào)用doExportUrls();發(fā)布服務(wù)。doExportUrls()首先調(diào)用loadRegistries方法得到要注冊(cè)的url,然后發(fā)布相關(guān)Protocol的服務(wù)。

簡單敘述一下獲取url的過程,url通過map組裝參數(shù)和對(duì)應(yīng)的值,參數(shù)有ApplicationConfigRegistryConfig對(duì)象的屬性以及path、dubbo、timestamppid、protocol、registry。

本示例applicationConfig是:

<dubbo:application name="demo-provider" qosPort="22222" id="demo-provider" />

registryURL
registryConfig是:

<dubbo:registry address="multicast://224.5.6.7:1234" id="org.apache.dubbo.config.RegistryConfig" />

最終map組裝結(jié)果是:
url parameters

最后得到registryURL是:

registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=4892&qos.port=22222&registry=multicast&timestamp=1536112339884

然后調(diào)用doExportUrlsFor1Protocol方法發(fā)布服務(wù),此方法開始部分是構(gòu)造發(fā)布的服務(wù)URL,然后再發(fā)布url。

服務(wù)URL
URL包括以下幾部分:服務(wù)端還是客戶端標(biāo)識(shí),Dubbo版本,時(shí)間戳,Pid,服務(wù)的方法名,tokenApplicationConfig,MoudleConfig,ProviderConfig,ProtocolConfig,*MethodConfig對(duì)象的相關(guān)屬性等。
例如本示例的url:

dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider&timestamp=1536114090787

我們來著重看一下在構(gòu)造URL過程中port的獲取過程。

//protocolConfig是配置的<dubbo:protocol />生成的對(duì)象
//name是protocol的name,本示例為"dubbo"
//map保存了url的鍵值對(duì)
Integer port = this.findConfigedPorts(protocolConfig, name, map);

findConfigedPorts顧名思義是查找配置的port,從哪查呢,先從系統(tǒng)環(huán)境變量查,如果沒找到,再查找名字為name的protocol協(xié)義。

    private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) {
        Integer portToBind = null;

        // 從環(huán)境變量從查找綁定的port
        String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
        portToBind = parsePort(port);

        // 如果沒有從環(huán)境變量從查到,則從名稱為name的protocol查找
        if (portToBind == null) {
            portToBind = protocolConfig.getPort();
            if (provider != null && (portToBind == null || portToBind == 0)) {
                portToBind = provider.getPort();
            }
           //這一句是關(guān)鍵,示例中name值是"dubbo",所以會(huì)實(shí)例化DubboProtocol,得到默認(rèn)的port:20880
            final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
            if (portToBind == null || portToBind == 0) {
                portToBind = defaultPort;
            }
            if (portToBind == null || portToBind <= 0) {
                portToBind = getRandomPort(name);
                if (portToBind == null || portToBind < 0) {
                    portToBind = getAvailablePort(defaultPort);
                    putRandomPort(name, portToBind);
                }
                logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
            }
        }

        //保存port到map中,以便后面url使用
        map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));

        // 從環(huán)境變量中查找注冊(cè)的port,如果沒有找到,則等于綁定的Port.
        String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
        Integer portToRegistry = parsePort(portToRegistryStr);
        if (portToRegistry == null) {
            portToRegistry = portToBind;
        }
        return portToRegistry;
    }

有人或許有疑問,ServiceConfig在實(shí)例化時(shí),不是已經(jīng)加載過Protocol了嗎?為什么還要使用ExtensionLoader加載一次呢?

final int defaultPort =ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();

答: ServiceConfig實(shí)例化時(shí),加載的Protocol是自適應(yīng)的Protocol,是動(dòng)態(tài)生成的,類名是Protocol$Adaptive(見Dubbo源碼分析-SPI的應(yīng)用中有分析)。而這里獲取Port時(shí)加載的也是Protocol類,但指名了具體加載的是哪個(gè)Protocol(本示例是名稱為dubbo的Protocol,即DubboProtocol,此類默認(rèn)的端口是20880)。

發(fā)布URL

發(fā)布本地服務(wù)

調(diào)用ServiceConfig類的exportLocal(URL url)發(fā)布本地服務(wù)。

    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
              //本地服務(wù)url
               URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            
            
           ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

本示例的本地服務(wù) url是:

injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3008&qos.port=22222&side=provider&timestamp=1536125473655

重點(diǎn)看這一句:

Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

其中涉及到ProxyFactory和Protocol,下面分別來看一看。

ProxyFactory

proxyFactory也是通過SPI加載的自適應(yīng)類對(duì)象,類名為ProxyFactory$Adaptive,我們來看一下其class文件反編譯后的源碼。

package org.apache.dubbo.rpc;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements ProxyFactory {
    public ProxyFactory$Adaptive() {
    }

    public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException {
        if (var3 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var5 = var3.getParameter("proxy", "javassist");
            if (var5 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
            } else {
                ProxyFactory var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5);
                return var6.getInvoker(var1, var2, var3);
            }
        }
    }

    public Object getProxy(Invoker var1, boolean var2) throws RpcException {
        if (var1 == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        } else if (var1.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        } else {
            URL var3 = var1.getUrl();
            String var4 = var3.getParameter("proxy", "javassist");
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
            } else {
                ProxyFactory var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4);
                return var5.getProxy(var1, var2);
            }
        }
    }

    public Object getProxy(Invoker var1) throws RpcException {
        if (var1 == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        } else if (var1.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        } else {
            URL var2 = var1.getUrl();
            String var3 = var2.getParameter("proxy", "javassist");
            if (var3 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])");
            } else {
                ProxyFactory var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3);
                return var4.getProxy(var1);
            }
        }
    }
}

其中有三個(gè)方法,兩個(gè)獲取代理,一個(gè)獲取Invoker。我們來看其中的getInvoker方法,默認(rèn)獲取名稱為javassist的ProxyFactory。
由于本地服務(wù)URL中沒有proxy參數(shù),所以會(huì)調(diào)用JavassistProxyFactory的getInvoker(T proxy, Class<T> type, URL url)方法,返回AbstractProxyInvoker的匿名類對(duì)象,此對(duì)象代理了服務(wù)對(duì)象(本示例中為DemoServiceImpl對(duì)象)。

其實(shí)(ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");獲取到的并不是JavassistProxyFactory對(duì)象,而是StubProxyFactoryWrapper對(duì)象,為什么呢?我們可以看下ExtensionLoader的getExtension(String name)方法

   public T getExtension(String name) {
       //檢查name是否合法
       if (name == null || name.length() == 0)
           throw new IllegalArgumentException("Extension name == null");
      //如果name等于true,則加載SPI的默認(rèn)插件
     if ("true".equals(name)) {
         return getDefaultExtension();
      }
      //從當(dāng)前插件類的緩存實(shí)例對(duì)象中獲取
      Holder<Object> holder = cachedInstances.get(name);
      if (holder == null) {
         cachedInstances.putIfAbsent(name, new Holder<Object>());
         holder = cachedInstances.get(name);
     }
     Object instance = holder.get();
     if (instance == null) {
       synchronized (holder) {
             instance = holder.get();
             if (instance == null) {
                 //創(chuàng)建插件實(shí)例
                 instance = createExtension(name);
                 holder.set(instance);
             }
         }
     }
     return (T) instance;
 }

   private T createExtension(String name) {
      //從文件目錄中加載插件類
      Class<?> clazz = getExtensionClasses().get(name);
      if (clazz == null) {
         throw findException(name);
    }
   
    //從已加載的所有插件實(shí)例集合中獲取
     try {
         T instance = (T) EXTENSION_INSTANCES.get(clazz);
         if (instance == null) {
             //實(shí)例化插件實(shí)例,并放入集合
             EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
         
        //注入屬性
        injectExtension(instance);

        //插件的包裹類
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
           for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
 }

重點(diǎn)的地方就在于插件的包裹類,StubProxyFactoryWrapper就是JavassistProxyFactory的包裹類,為什么這么說呢,因?yàn)镾tubProxyFactoryWrapper有一個(gè)帶ProxyFactory參數(shù)的構(gòu)造函數(shù)而且實(shí)現(xiàn)了ProxyFactory接口,具體可以看Extension的loadExtensionClasses方法源碼(裝飾者模式)。

Protocol

protocol對(duì)象也是一個(gè)自適應(yīng)插件類,類名為Protocol$Adaptive,在上一篇文章中已有講解。這個(gè)類會(huì)根據(jù)url的協(xié)義取得對(duì)應(yīng)轉(zhuǎn)義的插件類,沒有的話,默認(rèn)為dubbo協(xié)義,本地服務(wù)url協(xié)義為injvm,所以會(huì)加載InjvmProtocol,但是在加載InjvmProtocol并實(shí)例化后,發(fā)現(xiàn)InjvmProtocol還有對(duì)應(yīng)的包裹類即(其實(shí)是所有Protocol的包裹類):ProtocolFilterWrapper和ProtocolListenerWrapper。ProtocolFilterWrapper類的作用是添加一些過濾器,ProtocolListenerWrapper的作用是添加ExporterListener。InjvmProtocol的export方法僅僅創(chuàng)建一個(gè)InjvmExporter實(shí)例,沒有開啟服務(wù)。

發(fā)布遠(yuǎn)程服務(wù)

如果注冊(cè)u(píng)rl不為空,調(diào)用proxyFactory得到服務(wù)對(duì)象的代理類,然后使用protocol發(fā)布服務(wù)。由于注冊(cè)u(píng)rl的協(xié)義是registry,在使用ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("registry");會(huì)加載RegistryProtocol類并實(shí)例化,而且會(huì)添加其包裹類:ProtocolFilterWrapper和ProtocolListenerWrapper。而在這兩個(gè)包裹類的export方法的首行,都會(huì)對(duì)registry協(xié)義進(jìn)行單獨(dú)處理。

RegistryProtocol
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }

經(jīng)過這兩個(gè)包裹類后,最終會(huì)調(diào)用RegistryProtocol的export方法。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //發(fā)布服務(wù)
       //originInvoker中包含了代理服務(wù)對(duì)象的代理類
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        //注冊(cè)相關(guān)代碼省略

        //訂閱相關(guān)代碼省略
    }

       private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
       
         //key為發(fā)布的服務(wù)url
        String key = getCacheKey(originInvoker);
        //從map緩存中獲取
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
      //double check
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

最重要的是這一句:

exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);

其中protocol也是Protocol$Adaptive對(duì)象,而invokerDelegete的URL是服務(wù)的url.

本示例中為:

dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8468&qos.port=22222&side=provider&timestamp=1536138127517

DubboProtocol

Protocol$Adaptive在解析URL的時(shí)得到dubbo,所以會(huì)加載DubboProtocol并實(shí)例化(DubboProtocol實(shí)際在前面獲取默認(rèn)接口時(shí)已經(jīng)實(shí)例化并緩存起來了,此處取的是緩存的實(shí)例),并調(diào)用了DubboProtocol的export方法(與上面一樣,在得到DubboProtocol實(shí)例后,仍然會(huì)在外面包裹一下)。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 服務(wù)名:本例中為org.apache.dubbo.demo.DemoService:20880
        String key = serviceKey(url);
       //exporter 控制服務(wù)打開與關(guān)閉
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //省略發(fā)布子服務(wù)的相關(guān)代碼
       
       //打開服務(wù)
        openServer(url);
       //優(yōu)化序列化處理
        optimizeSerialization(url);
        return exporter;
    }

經(jīng)過層層探索,曲折迂回,終于到openServer了,進(jìn)去看看。

    private void openServer(URL url) {
        // 服務(wù)ip:端口號(hào)
        String key = url.getAddress();
        
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // 服務(wù)支持重置
                server.reset(url);
            }
        }
    }

可以看到其中有一個(gè)重要方法createServer(url)。

    private ExchangeServer createServer(URL url) {
        // 當(dāng)服務(wù)關(guān)閉時(shí),默認(rèn)啟動(dòng)發(fā)送只讀事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // 默認(rèn)啟動(dòng)心跳
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        //str默認(rèn)為netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        
        //添加編解碼器
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            //啟動(dòng)服務(wù),并傳入請(qǐng)求處理器
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
       
         //判斷客戶端使用的是網(wǎng)絡(luò)傳輸層框架是否支持服務(wù)端的網(wǎng)絡(luò)傳輸層。
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
Exchangers

進(jìn)入Exchangers.bind方法一探究竟。

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        //如果編碼碼器沒有,則添加參數(shù)exchange
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

getExchanger(url)默認(rèn)得到的是HeaderExchanger,可通過exchanger參數(shù)配置。
到HeaderExchanger中看看bind方法

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
Transporter

看下Transporters的bind方法。

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

通過getTransporter方法獲取一個(gè)自適應(yīng)的Transporter,類名為Transporter$Adaptive,我們來看一下其源碼:

package org.apache.dubbo.remoting;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

public class Transporter$Adaptive implements Transporter {
    public Transporter$Adaptive() {
    }

    public Client connect(URL var1, ChannelHandler var2) throws RemotingException {
        if (var1 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var4 = var1.getParameter("client", var1.getParameter("transporter", "netty"));
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.remoting.Transporter) name from url(" + var1.toString() + ") use keys([client, transporter])");
            } else {
                Transporter var5 = (Transporter)ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(var4);
                return var5.connect(var1, var2);
            }
        }
    }

    public Server bind(URL var1, ChannelHandler var2) throws RemotingException {
        if (var1 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var4 = var1.getParameter("server", var1.getParameter("transporter", "netty"));
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.remoting.Transporter) name from url(" + var1.toString() + ") use keys([server, transporter])");
            } else {
                Transporter var5 = (Transporter)ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(var4);
                return var5.bind(var1, var2);
            }
        }
    }
}

可以看到Transporter$Adaptive通過判斷URL中是否有transporter參數(shù),如果沒有,就默認(rèn)為netty。

示例中服務(wù)的URL為

dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider&timestamp=1536114090787

其中沒有transporter參數(shù),所以就使用netty。然后dubbo就去查找netty對(duì)應(yīng)的是哪個(gè)Transporter,結(jié)果找到是NettyTransporter。

package org.apache.dubbo.remoting.transport.netty4;

//省略導(dǎo)入部分

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

NettyTransporter很簡單,只有兩個(gè)方法,一個(gè)用于開啟服務(wù),一個(gè)用于連接服務(wù)。到這里已經(jīng)明白了Dubbo是如何發(fā)布一個(gè)服務(wù)的。

我們?cè)龠M(jìn)一步看下NettyServer的構(gòu)造函數(shù)

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

可以看出其調(diào)用父類的構(gòu)造函數(shù),并傳入url和handler的包裹類。handler的包裹類有哪些呢,進(jìn)去看一看。

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    
     protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }

注意到有一個(gè)接口Dispatcher,其自適應(yīng)插件類是AllDispatcher,AllDispatcher的dispatch方法返回AllChannelHandler實(shí)例(此實(shí)例會(huì)將所有請(qǐng)求做為任務(wù)放入線程池中處理),在此實(shí)例基礎(chǔ)上又包裹了HeartbeatHandlerMultiMessageHandler
NettyServer會(huì)將MultiMessageHandler層層往上傳到其父類AbstractPeer。

我們來回憶一下正向流程

從ServiceConfig發(fā)布registryURL開始(見doExportUrlsFor1Protocol方法)
1.ServiceConfig生成服務(wù)實(shí)例的代理工廠類JavassistProxyFactory(ProxyFactory SPI默認(rèn)代理工廠類)并包裹到DelegateProviderMetaDataInvoker(此類記錄代理工廠類和服務(wù)信息ServiceBean(<dubbo:service />標(biāo)簽對(duì)應(yīng)的類))
2.由于registryURL的protocol協(xié)義是registry,所以會(huì)加載RegistryProtocol(Protocol類的外面都包裹了ProtocolFilterWrapper和ProtocolListenerWrapper,下面不再特殊說明),并傳入上一步的invoker。
3.RegistryProtocol又找到DubboProtocol,也會(huì)帶上Invoker(此時(shí)的Invoker包含上一次的Invoker并帶有服務(wù)地址(dubbo://IP:端口/服務(wù)接口全稱?參數(shù)=xxx))。

所以requestHandler又會(huì)調(diào)用正向傳過來的Invoker,經(jīng)過ProtocolFilterWrapper和ProtocolListenerWrapper,最終調(diào)用到服務(wù)實(shí)現(xiàn)類相應(yīng)的方法。

最后以一張圖總結(jié):

標(biāo)識(shí)為SPI的類,是可以動(dòng)態(tài)加載的。圖片看不清楚的話,請(qǐng)查看原圖。

再簡單說下接收到請(qǐng)求后的處理流程:NettyServer接收到請(qǐng)求后,交給NettyServerHandler處理,NettyServerHandler轉(zhuǎn)交給NettyServer的父類AbstractPeer處理,AbstractPeer又交給MultiMessageHandler處理,這樣就開始了handler鏈的處理,handler的終點(diǎn)是HeaderExchangerHandler,HeaderExchangerHandler調(diào)用DubboProtocol傳過來的成員變量requestHandler調(diào)用相應(yīng)的服務(wù)類方法,然后得到結(jié)果,調(diào)用NettyServerHandler傳過來的NettyChannel發(fā)送結(jié)果到Client。

用力不如用心!用心寫好每一篇文章!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容