開篇
最近在公眾號看到一篇記一次dubbo服務(wù)發(fā)現(xiàn)導(dǎo)致的OOM的文章,這篇文章的核心是Rest協(xié)議在服務(wù)引用過程中由于異常導(dǎo)致內(nèi)存OOM,借助這篇文章順帶梳理下Rest協(xié)議的服務(wù)引用過程。
整個(gè)Rest協(xié)議的引用過程按照 RegistryDirectory#notify => RegistryDirectory#refreshInvoker => RegistryDirectory#toInvokers => RestProtocol#refer => RestProtocol#doRefer的流程執(zhí)行。
RegistryDirectory#notify
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
// 分解URLS,這里只關(guān)注invokerUrls即provider對應(yīng)的Url地址
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// 根據(jù)invokerUrls轉(zhuǎn)為Invoker對象
refreshInvoker(invokerUrls);
}
}
- RegistryDirectory#notify核心操作包括兩步:分類Url和轉(zhuǎn)換Url。
- 分類Url是指根據(jù)Url的類別屬性分為不同類別,這里關(guān)心invokerUrls。
- 轉(zhuǎn)換Url是指對invokerUrls進(jìn)行轉(zhuǎn)換操作,通過refreshInvoker(invokerUrls)實(shí)現(xiàn)。
RegistryDirectory#refreshInvoker
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// toInvokers(invokerUrls)執(zhí)行將url轉(zhuǎn)為invoker對象
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
}
- RegistryDirectory#refreshInvoker核心通過toInvokers()轉(zhuǎn)換invoker對象。
RegistryDirectory#toInvokers
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
// newUrlInvokerMap的key為provider的URL,value為對應(yīng)的invoker對象
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// protocol.refer()創(chuàng)建invoker對象
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
}
// invoker不為null保存到newUrlInvokerMap當(dāng)中
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
}
- RegistryDirectory#toInvokers內(nèi)部針對providerUrl執(zhí)行refer()生成invoker對象。
- RegistryDirectory#toInvokers內(nèi)部針對成功生成invoker對象按照Url和invoker的kv形式進(jìn)行存儲。
- 如果protocol.refer()生成invoker異常,導(dǎo)致newUrlInvokerMap永遠(yuǎn)不存在Url對應(yīng)的invoker,那么該Url對應(yīng)的invoker會被重復(fù)創(chuàng)建,但是因?yàn)槭∮肋h(yuǎn)不會建成功。
- RegistryDirectory#toInvokers內(nèi)部針對newUrlInvokerMap中不存在的URL才會重新生成invoker。
RestProtocol#refer
public abstract class AbstractProxyProtocol extends AbstractProtocol {
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
// 執(zhí)行 doRefer(type, url)
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
// 生成AbstractInvoker對應(yīng)的invoker對象
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = target.invoke(invocation);
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
}
}
};
invokers.add(invoker);
return invoker;
}
}
- RestProtocol#refer()中會調(diào)用proxyFactory.getInvoker(doRefer())方法生成target對象,調(diào)用RestProtocol#doRefer的方法。
- 通過AbstractInvoker類二次封裝target返回invoker對象。
RestProtocol#doRefer
public class RestProtocol extends AbstractProxyProtocol {
private final List<ResteasyClient> clients = Collections.synchronizedList(new LinkedList<ResteasyClient>());
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
if (connectionMonitor == null) {
connectionMonitor = new ConnectionMonitor();
}
// TODO more configs to add
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
// 20 is the default maxTotal of current PoolingClientConnectionManager
connectionManager.setMaxTotal(url.getParameter(Constants.CONNECTIONS_KEY, 20));
connectionManager.setDefaultMaxPerRoute(url.getParameter(Constants.CONNECTIONS_KEY, 20));
connectionMonitor.addConnectionManager(connectionManager);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT))
.setSocketTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT))
.build();
SocketConfig socketConfig = SocketConfig.custom()
.setSoKeepAlive(true)
.setTcpNoDelay(true)
.build();
CloseableHttpClient httpClient = HttpClientBuilder.create()
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
return Long.parseLong(value) * 1000;
}
}
// TODO constant
return 30 * 1000;
}
})
.setDefaultRequestConfig(requestConfig)
.setDefaultSocketConfig(socketConfig)
.build();
ApacheHttpClient4Engine engine = new ApacheHttpClient4Engine(httpClient/*, localContext*/);
// 創(chuàng)建client并添加到clients
ResteasyClient client = new ResteasyClientBuilder().httpEngine(engine).build();
clients.add(client);
client.register(RpcContextFilter.class);
for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(url.getParameter(Constants.EXTENSION_KEY, ""))) {
if (!StringUtils.isEmpty(clazz)) {
try {
client.register(Thread.currentThread().getContextClassLoader().loadClass(clazz.trim()));
} catch (ClassNotFoundException e) {
throw new RpcException("Error loading JAX-RS extension class: " + clazz.trim(), e);
}
}
}
// TODO protocol
ResteasyWebTarget target = client.target("http://" + url.getHost() + ":" + url.getPort() + "/" + getContextPath(url));
return target.proxy(serviceType);
}
}
- RestProtocol對象在Dubbo的上下文中只存在一個(gè)實(shí)例,所以類中的clients保存了所有的Rest的client對象。
- RestProtocol#doRefer方法內(nèi)部會創(chuàng)建ResteasyClient的client對象。
- RestProtocol#doRefer方法內(nèi)部通過client.target()方法返回target對象。
- 如果創(chuàng)建ResteasyClient對象成功但是創(chuàng)建ResteasyWebTarget失敗,那么client依然會增加,但是外層的invoker卻因?yàn)楫惓?dǎo)致無法創(chuàng)建成功。
- 上述的異常導(dǎo)致了provider的Url在每次創(chuàng)建invoker對象都會失敗進(jìn)而造成每次該Url重新發(fā)布就回走一次創(chuàng)建invoker的過程,最終結(jié)果client不停增加而OOM。