第7章 Dubbo 服務暴露流程的設計與實現(xiàn)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!-- 應用名,通常與 artifactId 相同即可 -->
    <dubbo:application name="demo-provider"/>
    <!-- 注冊中心 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181" check="false"/>
    <!-- 暴露協(xié)議 -->
    <dubbo:protocol name="dubbo" port="20880"/>
    <!-- spring bean -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
    <!-- 聲明暴露的服務 -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
</beans>

一、服務暴露簡圖

serviceexport.png
總體流程:(默認配置情況下)
  1. 首先 ServiceConfig 類拿到對外提供服務的實際類 ref(如:DemoServiceImpl);
  2. 然后通過 JavassistProxyFactory 類的 getInvoker 方法將 ref 封裝到一個 AbstractProxyInvoker 實例;
  3. 最后通過 InjvmProtocol(本地暴露)和 DubboProtocol(遠程暴露)將AbstractProxyInvoker 實例轉(zhuǎn)換成 InjvmExporter 和 DubboExporter。

注意

  • provider 端的 Invoker 封裝了對 ref 的調(diào)用邏輯;
  • Exporter 是用于管理 Invoker 的生命周期的。
public interface Invoker<T> extends Node {
    /** get service interface. */
    Class<T> getInterface();
    /** invoke. */
    Result invoke(Invocation invocation) throws RpcException;
}

public interface Exporter<T> {
    /** get invoker. */
    Invoker<T> getInvoker();
    /** unexport. 銷毀 Invoker */
    void unexport();
}
image.png
  • 服務暴露第一步,會使用 ProxyFactory#getInvoker(...) 將 ref 封裝到 AbstractProxyInvoker 實例中,該實例是在 ProxyFactory#getInvoker(...) 中創(chuàng)建的匿名內(nèi)部類,并復寫了 AbstractProxyInvoker#doInvoker 方法
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
    // 真實對象 ref, eg. DemoServiceImpl
    private final T proxy;
    // 提供模板方法:1. 調(diào)用子類發(fā)起請求  2. 包裝響應為 RpcResult
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
    }
    // 子類覆寫的真正調(diào)用的方法
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // wrapper:通過動態(tài)生成一個真實的服務提供者(DemoServiceImpl)的wrapper類,來避免反射調(diào)用
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 直接調(diào)用wrapper,wrapper底層調(diào)用DemoServiceImpl
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

這樣服務暴露的第一步,我們會得到一個由 JavassistProxyFactory#getInvoker(...) 創(chuàng)建的匿名內(nèi)部類 AbstractProxyInvoker 實例。

image.png
  • 服務暴露的第二步,會使用 Protocol#export(Invoker<T> invoker) 將第一步得到的 AbstractProxyInvoker 實例封裝到 Exporter 中(InvokerExporter 一對一,后續(xù) Exporter 會管理 Invoker 的生命周期,并且作為 Invoker 的門面與外部溝通),我們只看遠程暴露,服務暴露第二步之后,會獲得一個 DubboExporter 實例
public abstract class AbstractExporter<T> implements Exporter<T> {
    // 封裝了 Invoker,用于管理該 Invoker 的生命周期
    private final Invoker<T> invoker;
    @Override
    public Invoker<T> getInvoker() {
        return invoker;
    }
    @Override
    public void unexport() {
        ...
        getInvoker().destroy();
    }
}

public class DubboExporter<T> extends AbstractExporter<T> {
    // serviceKey: group/path:version:port
    private final String key;

    /**
     * 注意:exporterMap 對象實際上持有的是 AbstractProtocol 中的 exporterMap 對象引用
     * key: serviceKey
     * value: 具體的 Exporter 實例,eg. DubboExporter
     */
    private final Map<String, Exporter<?>> exporterMap;

    public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker); // 存儲該 DubboExporter 實例管理的 Invoker 實例
        this.key = key;
        this.exporterMap = exporterMap;
    }

    @Override
    public void unexport() {
        super.unexport();
        exporterMap.remove(key);
    }
}
image.png
  • RegistryProtocol 實際上是其他具體 Protocol(eg. DubboProtocol)的 AOP 類:
  1. 首先會調(diào)用具體 Protocol#export(...)
  2. 之后獲取 Registry,然后進行注冊,最后監(jiān)聽 override 數(shù)據(jù)。(這一步的三個操作就是 AOP 的目的)
// 具體 Protocol 的 AOP 類
public class RegistryProtocol implements Protocol {
    // 具體協(xié)議
    private Protocol protocol;
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 1. 做具體的 Protocol 的暴露操作
        ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        ...
        // 2. 獲取注冊中心
        Registry registry = getRegistry(originInvoker);
        ...
        // 3. 注冊
        register(registryUrl, registeredProviderUrl);
        // 4. 訂閱 the override data
        ...
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }

    private <T> ExporterChangeableWrapper<T> doLocalExport(Invoker<T> originInvoker) {
        ...
        // 調(diào)用具體 Protocol 暴露服務
        protocol.export(invokerDelegete);
        ...
        return exporter;
    }
    ...
}

public abstract class AbstractProtocol implements Protocol {
    /**
     * 該容器是當請求到來時獲取 Exporter 的地方
     * key: serviceKey
     * value: 具體的 Exporter 實例,eg. DubboExporter
     */
    protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
    ...
}

public class DubboProtocol extends AbstractProtocol {
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        ...
        // 獲取 serviceKey:group/path:version:port
        String key = serviceKey(url);
        // 將 invoker 存儲到 DubboExporter 中,并且讓 DubboExporter 持有 AbstractProtocol#exporterMap 的對象引用(后續(xù) DubboExporter 在做銷毀操作時,會主動從該 exporterMap 中刪除對象,做到邏輯高內(nèi)聚)
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 將 {serviceKey:具體的Exporter} 存儲到 AbstractProtocol#exporterMap 中,當請求到來時,來這里根據(jù) serviceKey 獲取 Exporter
        exporterMap.put(key, exporter);
        ...
        // 開啟 netty 服務
        openServer(url);
        ...
        return exporter;
    }

    // 真正的 netty 請求處理器
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 當請求到來時,找到真正要處理請求的那個Invoker
                Invoker<?> invoker = getInvoker(channel, inv); 
                ...
                // 發(fā)起調(diào)用
                return invoker.invoke(inv);
            }
            throw new RemotingException(...);
        }

        // 接收到請求時的處理邏輯
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                ...
            }
        }
        ...
    };

    /**
     * 當請求到來時,找到真正要處理請求的那個Invoker
     * 1. 首先根據(jù)請求參數(shù) Channel channel, Invocation inv 組裝 serviceKey=group/path:version:port
     * 2. 根據(jù) serviceKey 從 ExporterMap 中獲取 Exporter
     * 3. 從Exporter 中獲取 Invoker
     */
    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
        /**
         * 1. 根據(jù)請求參數(shù) Channel channel, Invocation inv 組裝 serviceKey=group/path:version:port
         *    port 從 Channel 中獲?。籶ath、version、group 從 Invocation 中獲取
         */
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(Constants.PATH_KEY);
        ...
        // group/path:version:port
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
        /**
         * 2. 從 exporterMap 中獲取 Exporter
         */
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(...);
        /**
         * 3. 從Exporter 中獲取 Invoker
         */
        return exporter.getInvoker();
    }
}

整個流程:當 netty 接收到一個請求時

  1. 首先根據(jù)請求參數(shù) Channel channel, Invocation inv 組裝 serviceKey=group/path:version:port
  2. 根據(jù) serviceKey 從 AbstractProtocol#exporterMap 中獲取 Exporter(DubboExporter 實例,服務暴露第二步做了存儲)
  3. 從 Exporter 中獲取 Invoker(AbstractInvoker 實例,服務暴露第二步做了存儲)
  4. 調(diào)用 Invoker#invoke(...) 調(diào)用對應的 ref.xxxmethod(...)(服務暴露第一步做了初始化)

二、服務暴露源碼梯形圖

ServiceBean.onApplicationEvent(ApplicationEvent event)
-->ServiceConfig.export()
  -->doExport()
    -->doExportUrls()
      -->doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
        <!-- 一 本地暴露 -->
        -->exportLocal(url)
          //1.1 將實現(xiàn)類ref封裝成Invoker
          -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
             proxy:DemoServiceImpl實例(即ref實例) 
             type:interface com.alibaba.dubbo.demo.DemoService  
             url:injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3141&side=provider&timestamp=1510021401013
            -->Wrapper.getWrapper(Class DemoServiceImpl)
            -->new AbstractProxyInvoker<T>(proxy, type, url)
          //1.2 將實現(xiàn)類Invoker暴露為Exporter
          -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
             組建invoker鏈,實際上只有最后一個是真正的AbstractProxyInvoker實例,前邊的都是filter。
             invoker:AbstractProxyInvoker實例 
             key:service.filter 
             group:provider
          -->InjvmProtocol.export(Invoker<T> invoker)
             invoker:經(jīng)過filter包裝的invoker
            -->new InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap)
               invoker:經(jīng)過filter包裝的invoker 
               key:com.alibaba.dubbo.demo.DemoService 
               exporterMap:傳入時為空,構造器執(zhí)行后為{"com.alibaba.dubbo.demo.DemoService", 當前的InjvmExporter實例}
          -->List<Exporter<?>> exporters.add(上述的exporter)
        <!-- 二 遠程暴露 -->
        //2.1 將實現(xiàn)類ref封裝成Invoker
        -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
           proxy:DemoServiceImpl實例(即ref實例) 
           type:interface com.alibaba.dubbo.demo.DemoService
          -->Wrapper.getWrapper(Class DemoServiceImpl)
          -->new AbstractProxyInvoker<T>(proxy, type, url)
        -->RegistryProtocol.export(final Invoker<T> originInvoker)
           originInvoker:上述的AbstractProxyInvoker實例
          //2.2 將invoker轉(zhuǎn)化為exporter
          -->doLocalExport(originInvoker)
            -->new InvokerDelegete(Invoker<T> invoker, URL url)
               invoker:原始的AbstractProxyInvoker實例
            -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
               組建invoker鏈,實際上只有最后一個是真正的InvokerDelegete實例,前邊的都是filter
               invoker:InvokerDelegete實例 
               key:service.filter 
               group:provider
            -->DubboProtocol.export(Invoker<T> invoker)
               invoker:經(jīng)過filter包裝的InvokerDelegete實例
              -->new DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap)
                 invoker:經(jīng)過filter包裝的InvokerDelegete實例 
                 key:com.alibaba.dubbo.demo.DemoService:20880 (group/path:version:port)
                 exporterMap:傳入時為空,構造器執(zhí)行后又執(zhí)行了put,為{"com.alibaba.dubbo.demo.DemoService:20880", 當前的DubboExporter實例}
              //2.3 開啟netty服務端監(jiān)聽客戶端請求
              -->openServer(URL url)
                -->createServer(URL url)
                    -->HeaderExchanger.bind(URL url, ExchangeHandler handler)
                       handler:DubboProtocol.requestHandler
                      -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                        -->NettyTransporter.bind(URL url, ChannelHandler listener)
                           listener:上邊的DecodeHandler實例
                          -->new NettyServer(URL url, ChannelHandler handler)
                            -->ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
                               handler:上邊的DecodeHandler實例
                              -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                            -->getChannelCodec(url)//獲取Codec2,這里是DubboCountCodec實例
                            -->doOpen()//開啟netty服務
                      -->new HeaderExchangeServer(Server server)
                         server:上述的NettyServer
                        -->startHeatbeatTimer()
            -->new ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker)
               exporter:上述的DubboExporter實例  
               originInvoker:原始的AbstractProxyInvoker實例
          //2.4 創(chuàng)建Registry:創(chuàng)建zkclient,連接zk
          -->getRegistry(final Invoker<?> originInvoker)
            -->AbstractRegistryFactory.getRegistry(URL url)
              -->ZookeeperRegistryFactory.createRegistry(URL url)
                -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
                  -->ZkclientZookeeperTransporter.connect(URL url)
                    -->new ZkclientZookeeperClient(URL url)
                      -->new ZkClient(url.getBackupAddress())//這里是10.211.55.5:2181
            -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)
          //2.5 向注冊中心注冊服務
          -->registry.register(registedProviderUrl)
            -->ZookeeperRegistry.doRegister(URL url)
              -->AbstractZookeeperClient.create(String path, boolean ephemeral)
          //2.6 訂閱override數(shù)據(jù)
          -->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
          //2.7 創(chuàng)建新的Exporter實例
          -->new Exporter<T>()//包含了上邊的ExporterChangeableWrapper<T> exporter實例 + ZookeeperRegistry實例

服務暴露的實機:默認為spring發(fā)布上下文刷新事件時。

默認情況下,整個服務暴露會發(fā)生本地暴露遠程暴露兩次暴露。

2.1 本地暴露

1. 使用 JavassistProxyFactory 將實現(xiàn)類 ref(DemoServiceImpl)封裝成AbstractProxyInvoker實例
          1.1 將實現(xiàn)類ref封裝成Invoker
          -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
             proxy:DemoServiceImpl實例(即ref實例) 
             type:interface com.alibaba.dubbo.demo.DemoService  
             url:injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3141&side=provider&timestamp=1510021401013
            -->Wrapper.getWrapper(Class DemoServiceImpl)
            -->new AbstractProxyInvoker<T>(proxy, type, url)

看一下核心代碼:

public class JavassistProxyFactory extends AbstractProxyFactory {
    ......
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // 創(chuàng)建 wrapper 實例
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 創(chuàng)建 Invoker 實例
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // Invoker 調(diào)用 wrapper,wrapper 調(diào)用 proxy(即 ref,eg. DemoServiceImpl)
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

注意:在 JavassistProxyFactory#getInvoker 中,首先根據(jù) ref 生成一個 Wrapper,之后將 wrapper 封裝在 AbstractProxyInvoker 類中??匆幌逻@個 Wrapper 類的核心代碼(com.alibaba.dubbo.common.bytecode.Wrapper0,該類是通過 javassist 生成的)。

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.demo.DemoService;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    ...
    /**
     * @param object 實現(xiàn)類ref,eg. DemoServiceImpl
     * @param string 方法名稱
     * @param arrclass 參數(shù)類型
     * @param arrobject 參數(shù)值
     * @return 調(diào)用返回值
     * @throws java.lang.reflect.InvocationTargetException
     */
    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            demoService = (DemoService)object;
        } catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        } catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
    ...
}

Dubbo 通過“硬編碼”的方式(實際上是 Javassist 動態(tài)生成)生成一個 wrapper 類,在 wrapper 類內(nèi)部又調(diào)用了具體實現(xiàn)類的具體方法,通過這樣的方式避免了反射調(diào)用。

2. 使用 ProtocolFilterWrapper 將 AbstractProxyInvoker 實例進行 Filter 鏈的包裝,之后使用 InjvmProtocol 暴露為 InjvmExporter 實例
          //1.2 將實現(xiàn)類Invoker暴露為Exporter
          -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
             組建invoker鏈,實際上只有最后一個是真正的AbstractProxyInvoker實例,前邊的都是filter。
             invoker:AbstractProxyInvoker實例 
             key:service.filter 
             group:provider
          -->InjvmProtocol.export(Invoker<T> invoker)
             invoker:經(jīng)過filter包裝的invoker
            -->new InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap)
               invoker:經(jīng)過filter包裝的invoker 
               key:com.alibaba.dubbo.demo.DemoService 
               exporterMap:傳入時為空,構造器執(zhí)行后為{"com.alibaba.dubbo.demo.DemoService", 當前的JvmExporter實例}
          -->List<Exporter<?>> exporters.add(上述的exporter)

這里首先使用 Dubbo SPI 機制,獲取到了相應的 filter,之后使用這些 filter 對 AbstractProxyInvoker 實例進行鏈式包裝。最后通過 InjvmProtocol 將包裝后的 AbstractProxyInvoker 實例暴露為 InjvmExporter 實例。(實際上,Exporter 是用于管理 Invoker 的生命周期的)

最終生成的InjvmExporter實例包含如下屬性:

key = "com.alibaba.dubbo.demo.DemoService"
exporterMap: { "com.alibaba.dubbo.demo.DemoService" -> 當前的InjvmExporter實例 }
Invoker<T> invoker = 被filter進行遞歸包裹后的Invoker

到這里,整個本地暴露就完成了??偨Y一下 整體流程

  1. 首先根據(jù) ref 生成一個 wrapper 類,之后將該 wrapper 類封裝到 AbstractProxyInvoker 實例中;
  2. 使用符合條件的 filter 對 AbstractProxyInvoker 進行鏈式包裝,最后將包裝后的 AbstractProxyInvoker 實例封裝到 InjvmExporter 實例中。

2.2 遠程暴露

1. 使用 JavassistProxyFactory 將實現(xiàn)類 ref(DemoServiceImpl)封裝成 AbstractProxyInvoker 實例

與本地暴露類似,不再贅述。

        //2.1 將實現(xiàn)類ref封裝成Invoker
        -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
           proxy:DemoServiceImpl實例(即ref實例) 
           type:interface com.alibaba.dubbo.demo.DemoService
          -->Wrapper.getWrapper(Class DemoServiceImpl)
          -->new AbstractProxyInvoker<T>(proxy, type, url)
2. 使用 ProtocolFilterWrapper 將 AbstractProxyInvoker 實例進行 Filter 鏈的包裝,之后使用 DubboProtocol 暴露為 DubboExporter 實例
            -->new InvokerDelegete(Invoker<T> invoker, URL url)
               invoker:原始的AbstractProxyInvoker實例
            -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
               組建invoker鏈,實際上只有最后一個是真正的InvokerDelegete實例,前邊的都是filter
               invoker:InvokerDelegete實例 
               key:service.filter 
               group:provider
            -->DubboProtocol.export(Invoker<T> invoker)
               invoker:經(jīng)過filter包裝的InvokerDelegete實例
              -->new DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap)
                 invoker:經(jīng)過filter包裝的InvokerDelegete實例 
                 key:com.alibaba.dubbo.demo.DemoService:20880 (group/servicename:version:port)
                 exporterMap:傳入時為空,構造器執(zhí)行后又執(zhí)行了put,為{"com.alibaba.dubbo.demo.DemoService:20880", 當前的DubboExporter實例}

與本地暴露類似,最終得到的DubboExporter實例包含如下屬性:

key:com.alibaba.dubbo.demo.DemoService:20880
invoker:“InvokerDelegete的filter對象”(InvokerDelegete只是對AbstractProxyInvoker實例的簡單封裝)
exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }

實際上key的完整值為:group/path:version:port(如果都配置了的話)。

注意:最后在 DubboProtocol 的屬性 Map<String, Exporter<?>> exporterMap 中也會存儲 { key -> 當前的 DubboExporter 實例 },將來 Nettyhandler 會根據(jù) consumer 的請求參數(shù)組建 key,然后從這個 exporterMap 中尋找 Exporter 實例,然后從 Exporter 實例中獲取相關的被 filter 鏈包裝的 AbstractProxyInvoker 實例,進行調(diào)用。

3. 開啟 Netty 服務端監(jiān)聽客戶端請求
              //2.3 開啟netty服務端監(jiān)聽客戶端請求
              -->openServer(URL url)
                -->createServer(URL url)
                    -->HeaderExchanger.bind(URL url, ExchangeHandler handler)
                       handler:DubboProtocol.requestHandler
                      -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                        -->NettyTransporter.bind(URL url, ChannelHandler listener)
                           listener:上邊的DecodeHandler實例
                          -->new NettyServer(URL url, ChannelHandler handler)
                            -->ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
                               handler:上邊的DecodeHandler實例
                              -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                            -->getChannelCodec(url)//獲取Codec2,這里是DubboCountCodec實例
                            -->doOpen()//開啟netty服務
                      -->new HeaderExchangeServer(Server server)
                         server:上述的NettyServer
                        -->startHeatbeatTimer()

這里首先構建并創(chuàng)建各種 handler,層層包裝 DubboProtocol.requestHandler 實例(該實例就是 Netty 最終用來處理業(yè)務邏輯的那個 handler)。然后啟動 Netty 服務端。最后啟動心跳。

這里說一下 心跳機制

  • provider:Dubbo 的心跳默認是在 60s 內(nèi)如果沒有接收到消息,就會發(fā)送心跳消息,如果連著3次(180s)沒有收到心跳響應,provider 會關閉 channel。(用于回收資源,會執(zhí)行 Netty 的 channel 的優(yōu)雅關閉操作)
  • consumer:Dubbo 的心跳默認是在 60s 內(nèi)如果沒有接收到消息,就會發(fā)送心跳消息,如果連著3次(180s)沒有收到心跳響應,consumer會進行重連。
4. 創(chuàng)建 Registry:創(chuàng)建zkclient,連接zk
          //2.4 創(chuàng)建Registry:創(chuàng)建zkclient,連接zk
          -->getRegistry(final Invoker<?> originInvoker)
            -->AbstractRegistryFactory.getRegistry(URL url)
              -->ZookeeperRegistryFactory.createRegistry(URL url)
                -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
                  -->ZkclientZookeeperTransporter.connect(URL url)
                    -->new ZkclientZookeeperClient(URL url)
                      -->new ZkClient(url.getBackupAddress())//這里是10.211.55.5:2181
            -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)
5. 向注冊中心注冊服務
          //2.5 向注冊中心注冊服務
          -->registry.register(registedProviderUrl)
            -->ZookeeperRegistry.doRegister(URL url)
              -->AbstractZookeeperClient.create(String path, boolean ephemeral)

注冊完成之后,就會在 zk 上創(chuàng)建節(jié)點(url 解碼后如下):

/dubbo
- /com.alibaba.dubbo.demo.DemoService
-- /providers
--- /dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=3508&side=provider&timestamp=1510023456461
6.訂閱 override 數(shù)據(jù)
          //2.6 訂閱 override 數(shù)據(jù)
          -->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)

override 數(shù)據(jù)是用于做配置覆蓋的(改變的是 /dubbo/servicekey/configurators 節(jié)點),實現(xiàn) “熱配置” 功能。

到這里,遠程暴露也就完成了。

流程總結

  1. 首先根據(jù) ref 生成一個 wrapper 類,之后將該 wrapper 類封裝到 AbstractProxyInvoker 實例中;
  2. 使用符合條件的 filter 對 AbstractProxyInvoker 進行鏈式包裝,最后將包裝后的 AbstractProxyInvoker 實例封裝到 DubboExporter 實例(DubboExporter 用于管理 AbstractProxyInvoker 實例的生命周期)中;
  3. 開啟 Netty 服務端監(jiān)聽客戶端請求;
  4. 創(chuàng)建 zkclient,連接 zk;
  5. 向注冊中心注冊服務;
  6. 訂閱 override 數(shù)據(jù)。
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容