dubbo系列之-服務(wù)暴露-2021-01-16

背景

服務(wù)暴露網(wǎng)上已經(jīng)有很多文章了,大而全,我們這里主要抓細(xì)節(jié)??。

image

疑問(wèn)

暴露過(guò)程做了些啥?

是先啟動(dòng)服務(wù)還是先連接注冊(cè)中心?

服務(wù)下線(xiàn)怎么感知注冊(cè)中心?

暴露

我們從 org.apache.dubbo.config.ServiceConfig#doExportUrls() 方法進(jìn)去

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    //支持多協(xié)議暴露就是說(shuō) <dubbo:protocol 可以多個(gè)
    //<dubbo:protocol name="dubbo" port="20880"/>
    //<dubbo:protocol  name="rest" port="20881"/>
    //像這樣,如果有php客戶(hù)端 和 dubbo客戶(hù)端都可以同事支持
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

進(jìn)入 doExportUrlsFor1Protocol()中,這個(gè)方法大家一定要進(jìn)去瞅一眼,和我們寫(xiě)的代碼也差不多,方法長(zhǎng)度太長(zhǎng),而且循環(huán)嵌套很深。

//org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (StringUtils.isEmpty(name)) {//沒(méi)有配置協(xié)議,默認(rèn)dubbo
        name = DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    map.put(SIDE_KEY, PROVIDER_SIDE);
    //將所有的配置都放到URL 的key=value 中
    appendRuntimeParameters(map);
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    if (ProtocolUtils.isGeneric(generic)) {//泛化
        map.put(GENERIC_KEY, generic);
        map.put(METHODS_KEY, ANY_VALUE);
    } else {//版本
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put(REVISION_KEY, revision);
        }

        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    //token,dubbo 支持token校驗(yàn),只有攜帶對(duì)的token才能調(diào)用成功
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(TOKEN_KEY, token);
        }
    }

    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(SCOPE_KEY);
    // don't export when none is configured
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
       if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);//先將服務(wù)暴露到本地,下面分析
        }
        // export to remote if the config is not local (export to local only when config is local)
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                //注冊(cè)中心也支持多個(gè),比如可以將服務(wù)暴露到集群內(nèi),也可以將
                //服務(wù)暴露到中臺(tái)供所其他業(yè)務(wù)線(xiàn)用
                for (URL registryURL : registryURLs) {
                    //if protocol is only injvm ,not register
                    if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                        continue;
                    }
                    url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                    //加載監(jiān)控配置
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                    }

                    // 調(diào)用具體bean的代理模式,默認(rèn)為javassist 
                    String proxy = url.getParameter(PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                    }
                    //組裝invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    //暴露服務(wù)
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
            //存儲(chǔ)發(fā)布信息
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                metadataReportService.publishProvider(url);
            }
        }
    }
    this.urls.add(url);
}

本地暴露 exportLocal(url)

//org.apache.dubbo.config.ServiceConfig#exportLocal
private void exportLocal(URL url) {
    URL local = URLBuilder.from(url)
       .setProtocol(LOCAL_PROTOCOL)//收到設(shè)置協(xié)議為injvm,以供下面選擇對(duì)應(yīng)的protocol
            .setHost(LOCALHOST_VALUE)
            .setPort(0)
            .build();
//
    Exporter<?> exporter = protocol.export(
            PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
    exporters.add(exporter);
}
static Protocol protocol = ExtensionLoader
.getExtensionLoader(Protocol.class).getAdaptiveExtension();

static ProxyFactory PROXY_FACTORY = ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

protocol 靜態(tài)變量為 Protocol 接口的自適應(yīng)擴(kuò)展點(diǎn),調(diào)用 protocol.export(Invoker<T> invoker) 將會(huì)根據(jù)傳入的invoker 信息決定去往哪個(gè)實(shí)現(xiàn)類(lèi)。而 invoker 傳入的值為

PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local) ,PROXY_FACTORY 靜態(tài)變量也是一個(gè) ProxyFactory 的擴(kuò)展點(diǎn),從下面可以看到該擴(kuò)展點(diǎn)為方法擴(kuò)展點(diǎn),這里我們并沒(méi)有個(gè)自定義過(guò)proxy屬性,默認(rèn)實(shí)現(xiàn)為javassist=JavassistProxyFactory;(這里忽略各種包裝器)

@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({"proxy"})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) ;
}

進(jìn)到JavassistProxyFactory 的 getInvoker實(shí)現(xiàn)中。

//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 
//這里的proxy 是我們真正的實(shí)現(xiàn)類(lèi)HelloServiceImpl@xxx,
//如果傳進(jìn)來(lái)的是一個(gè)代理類(lèi)實(shí)現(xiàn)的花,這里只取接口type=HelloService
Class cls = proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type;
//將 HelloServiceImpl包裝成一個(gè)Wrapper類(lèi),而wrapper對(duì)象的創(chuàng)建方式正式默認(rèn)的javassist
final Wrapper wrapper = Wrapper.getWrapper(cls);
//返回一個(gè)匿名內(nèi)部類(lèi)對(duì)象,對(duì)象 doInvoke 方法中持有wrapper對(duì)象
//AbstractProxyInvoker 實(shí)現(xiàn)了Invoker
return new AbstractProxyInvoker<T>(proxy, type, url) {
    @Override
    protected Object doInvoke(T proxy, String methodName,
                              Class<?>[] parameterTypes,
                              Object[] arguments) throws Throwable {

        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    }
};
}

上面這種匿名的寫(xiě)法可能不夠具體,我們通過(guò)自定義類(lèi)的方式去實(shí)現(xiàn)它,更具象點(diǎn)

//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new MyProxyInvoker(proxy,type,url,wrapper);
}
public class MyProxyInvoker extends AbstractProxyInvoker {
    private Wrapper wrapper;
    public MyProxyInvoker(Object proxy, Class type, URL url, Wrapper wrapper) {
        super(proxy, type, url);
        this.wrapper = wrapper;
    }
    @Override
    protected Object doInvoke(Object proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    }
}

這樣寫(xiě)的效果是一樣的 JavassistProxyFactory#getInvoker()方法返回的是 MyProxyInvoker 對(duì)象,后面我們就用該對(duì)象來(lái)描述分析。

回到 Exporter<?> exporter = protocol.export(

PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));中,表達(dá)式變成了Exporter<?> exporter = protocol.export(MyProxyInvoker),MyProxyInvoker中的url對(duì)象為local

URL local = URLBuilder.from(url)
        .setProtocol("injvm")
        .setHost(LOCALHOST_VALUE)
        .setPort(0)
        .build();

所以protocol.export()的實(shí)現(xiàn)類(lèi)為InjvmProtocol

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(),
     exporterMap);
}

該方法返回 InjvmExporter,最后執(zhí)行 exporters.add(exporter),將InjvmExporter(這里其實(shí)外面會(huì)包裝一層ListenerExporterWrapper包裝器) 對(duì)象暴露到map中結(jié)束了jvm本地暴露。

遠(yuǎn)程暴露

我們?cè)賮?lái)看看遠(yuǎn)程暴露的區(qū)別

//同本地暴露一樣返回MyProxyInvoker實(shí)例
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//區(qū)別本地暴露 將 MyProxyInvoker實(shí)例 包裝為 DelegateProviderMetaDataInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//這里寫(xiě)法和本地暴露一樣,區(qū)別在于 wrapperInvoker 中的url#protocol 并不是injvm
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);

我們dubug 看看 wrapperInvoker 中的url#protocol 是啥

image

Protocol 為registry,所以流程會(huì)進(jìn)入到 RegistryProtocol#export(同樣這里也會(huì)有Wrapper包裝)我們debug進(jìn)去,這個(gè)方法內(nèi)容太豐富了,這里我們先只分析服務(wù)暴露

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // 獲取要暴露到注冊(cè)中心的url
    URL providerUrl = getProviderUrl(originInvoker);

    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //暴露服務(wù) 下面分析
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    //...
    return new DestroyableExporter<>(exporter);
}

暴露服務(wù) doLocalExport()
//org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    //將要暴露的服務(wù)生成唯一的key,避免重復(fù)
    String key = getCacheKey(originInvoker);
    //再次包裝invoker,然后暴露
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
    //providerUrl 為dubbo://xxx
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        //protocol.export 經(jīng)過(guò)各種Wrapper 會(huì)進(jìn)入到Dubbo.export
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}
private String getCacheKey(final Invoker<?> originInvoker) {
    URL providerUrl = getProviderUrl(originInvoker);
    String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
    return key;
}

這里originInvoker為 DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx)),invokerDelegate再次包裝為InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))),我們繼續(xù)debug,到了ProtocolFilterWrapper#export

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
           //.....
            };
        }
    }
    return new CallbackRegistrationInvoker<>(last, filters);
}

buildInvokerChain()會(huì)將InvokerDelegate關(guān)聯(lián)多個(gè)Filter過(guò)濾器,然后包裝為CallbackRegistrationInvoker對(duì)象返回,我們接著debug,最后到了DubboProtocol#export(),此時(shí)的invoker為CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))

image
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    //生成服務(wù)key=com.poizon.study.api.service.HelloService:20880,和方法無(wú)關(guān)
    String key = serviceKey(url);
    //將CallbackRegistrationInvoker包裝為DubboExporter,然后存儲(chǔ)在map中
    //這個(gè)map 很關(guān)鍵,將作為后面調(diào)用尋找服務(wù)的入口
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
    //開(kāi)啟服務(wù),也就是調(diào)用netty,開(kāi)啟20880端口
    openServer(url);
    //加載指定序列化方式 默認(rèn)采用hessan2
    optimizeSerialization(url);
    return exporter;
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) {
    //..... createServer()創(chuàng)建服務(wù)
   serverMap.put(key, createServer(url));
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
private ExchangeServer createServer(URL url) {

    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    return server;
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//org....remoting.Transporters#bind(URL, ChannelHandler...)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {

    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }//默認(rèn)選擇netty4 實(shí)現(xiàn)
    return getTransporter().bind(url, handler);
}

//org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}
//org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    try {
        doOpen();
//org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen     
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);

    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {

        @Override
        protected void initChannel(Channel ch) throws Exception {

跟到最后看到了熟悉的netty啟動(dòng),這里有好多我們熟悉的配置,比如第一篇文章我們說(shuō)到的心跳實(shí)現(xiàn)IdleStateHandler,以及心跳默認(rèn)時(shí)間 UrlUtils.getHeartbeat(getUrl()),還有netty 的自定義handler nettyClientHandler(沒(méi)錯(cuò)這個(gè)handler就是處理dubbo消費(fèi)者請(qǐng)求的)

總結(jié)

總結(jié)下,我們一根線(xiàn)走到底,走到了最后的socket啟動(dòng),最后將 DubboExporter 放入了map中,最后層層包裝為 DestroyableExporter(ExporterChangeableWrapper(ListenerExporterWrapper(DubboExporter(CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))))));嵌套雖然多了點(diǎn),但是Wrapper 類(lèi)的功能都是為了擴(kuò)展小功能,后面我們調(diào)幾個(gè)分析

image

后面將分析注冊(cè)中心和Wrapper 等功能。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容