Dubbo Rest 服務(wù)引用流程

開篇

  • 最近在公眾號看到一篇記一次dubbo服務(wù)發(fā)現(xiàn)導(dǎo)致的OOM的文章,這篇文章的核心是Rest協(xié)議在服務(wù)引用過程中由于異常導(dǎo)致內(nèi)存OOM,借助這篇文章順帶梳理下Rest協(xié)議的服務(wù)引用過程。

  • 整個(gè)Rest協(xié)議的引用過程按照 RegistryDirectory#notify => RegistryDirectory#refreshInvoker => RegistryDirectory#toInvokers => RestProtocol#refer => RestProtocol#doRefer的流程執(zhí)行。


RegistryDirectory#notify

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        // 分解URLS,這里只關(guān)注invokerUrls即provider對應(yīng)的Url地址
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // 根據(jù)invokerUrls轉(zhuǎn)為Invoker對象
        refreshInvoker(invokerUrls);
    }
}
  • RegistryDirectory#notify核心操作包括兩步:分類Url和轉(zhuǎn)換Url。
  • 分類Url是指根據(jù)Url的類別屬性分為不同類別,這里關(guān)心invokerUrls。
  • 轉(zhuǎn)換Url是指對invokerUrls進(jìn)行轉(zhuǎn)換操作,通過refreshInvoker(invokerUrls)實(shí)現(xiàn)。


RegistryDirectory#refreshInvoker

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private void refreshInvoker(List<URL> invokerUrls) {
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            // toInvokers(invokerUrls)執(zhí)行將url轉(zhuǎn)為invoker對象
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
}
  • RegistryDirectory#refreshInvoker核心通過toInvokers()轉(zhuǎn)換invoker對象。


RegistryDirectory#toInvokers

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        // newUrlInvokerMap的key為provider的URL,value為對應(yīng)的invoker對象
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
           
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        // protocol.refer()創(chuàng)建invoker對象
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                }
                // invoker不為null保存到newUrlInvokerMap當(dāng)中
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }
}
  • RegistryDirectory#toInvokers內(nèi)部針對providerUrl執(zhí)行refer()生成invoker對象。
  • RegistryDirectory#toInvokers內(nèi)部針對成功生成invoker對象按照Url和invoker的kv形式進(jìn)行存儲。
  • 如果protocol.refer()生成invoker異常,導(dǎo)致newUrlInvokerMap永遠(yuǎn)不存在Url對應(yīng)的invoker,那么該Url對應(yīng)的invoker會被重復(fù)創(chuàng)建,但是因?yàn)槭∮肋h(yuǎn)不會建成功。
  • RegistryDirectory#toInvokers內(nèi)部針對newUrlInvokerMap中不存在的URL才會重新生成invoker。


RestProtocol#refer

public abstract class AbstractProxyProtocol extends AbstractProtocol {

    public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
        // 執(zhí)行 doRefer(type, url)
        final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
        // 生成AbstractInvoker對應(yīng)的invoker對象
        Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    Result result = target.invoke(invocation);
                    Throwable e = result.getException();
                    if (e != null) {
                        for (Class<?> rpcException : rpcExceptions) {
                            if (rpcException.isAssignableFrom(e.getClass())) {
                                throw getRpcException(type, url, invocation, e);
                            }
                        }
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                        e.setCode(getErrorCode(e.getCause()));
                    }
                    throw e;
                } catch (Throwable e) {
                    throw getRpcException(type, url, invocation, e);
                }
            }
        };
        invokers.add(invoker);
        return invoker;
    }
}
  • RestProtocol#refer()中會調(diào)用proxyFactory.getInvoker(doRefer())方法生成target對象,調(diào)用RestProtocol#doRefer的方法。
  • 通過AbstractInvoker類二次封裝target返回invoker對象。


RestProtocol#doRefer

public class RestProtocol extends AbstractProxyProtocol {

    private final List<ResteasyClient> clients = Collections.synchronizedList(new LinkedList<ResteasyClient>());

    protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
        if (connectionMonitor == null) {
            connectionMonitor = new ConnectionMonitor();
        }

        // TODO more configs to add
        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
        // 20 is the default maxTotal of current PoolingClientConnectionManager
        connectionManager.setMaxTotal(url.getParameter(Constants.CONNECTIONS_KEY, 20));
        connectionManager.setDefaultMaxPerRoute(url.getParameter(Constants.CONNECTIONS_KEY, 20));

        connectionMonitor.addConnectionManager(connectionManager);
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT))
                .setSocketTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT))
                .build();

        SocketConfig socketConfig = SocketConfig.custom()
                .setSoKeepAlive(true)
                .setTcpNoDelay(true)
                .build();

        CloseableHttpClient httpClient = HttpClientBuilder.create()
                .setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
                    @Override
                    public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
                        HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
                        while (it.hasNext()) {
                            HeaderElement he = it.nextElement();
                            String param = he.getName();
                            String value = he.getValue();
                            if (value != null && param.equalsIgnoreCase("timeout")) {
                                return Long.parseLong(value) * 1000;
                            }
                        }
                        // TODO constant
                        return 30 * 1000;
                    }
                })
                .setDefaultRequestConfig(requestConfig)
                .setDefaultSocketConfig(socketConfig)
                .build();

        ApacheHttpClient4Engine engine = new ApacheHttpClient4Engine(httpClient/*, localContext*/);

        // 創(chuàng)建client并添加到clients
        ResteasyClient client = new ResteasyClientBuilder().httpEngine(engine).build();
        clients.add(client);

        client.register(RpcContextFilter.class);
        for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(url.getParameter(Constants.EXTENSION_KEY, ""))) {
            if (!StringUtils.isEmpty(clazz)) {
                try {
                    client.register(Thread.currentThread().getContextClassLoader().loadClass(clazz.trim()));
                } catch (ClassNotFoundException e) {
                    throw new RpcException("Error loading JAX-RS extension class: " + clazz.trim(), e);
                }
            }
        }

        // TODO protocol
        ResteasyWebTarget target = client.target("http://" + url.getHost() + ":" + url.getPort() + "/" + getContextPath(url));
        return target.proxy(serviceType);
    }
}
  • RestProtocol對象在Dubbo的上下文中只存在一個(gè)實(shí)例,所以類中的clients保存了所有的Rest的client對象。
  • RestProtocol#doRefer方法內(nèi)部會創(chuàng)建ResteasyClient的client對象。
  • RestProtocol#doRefer方法內(nèi)部通過client.target()方法返回target對象。
  • 如果創(chuàng)建ResteasyClient對象成功但是創(chuàng)建ResteasyWebTarget失敗,那么client依然會增加,但是外層的invoker卻因?yàn)楫惓?dǎo)致無法創(chuàng)建成功。
  • 上述的異常導(dǎo)致了provider的Url在每次創(chuàng)建invoker對象都會失敗進(jìn)而造成每次該Url重新發(fā)布就回走一次創(chuàng)建invoker的過程,最終結(jié)果client不停增加而OOM。


參考文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容