啟動(dòng)流程
消費(fèi)者在啟動(dòng)之后,會(huì)通過ReferenceConfig#get()來生成遠(yuǎn)程調(diào)用代理類。在get方法中,會(huì)啟動(dòng)一系列調(diào)用函數(shù),我們來一個(gè)個(gè)解析。
配置同樣包含2種:
- XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="first-consumer-xml"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181/"/>
<dubbo:reference proxy="javassist" scope="remote"
id="demoService"
generic="false"
check="false"
async="false"
group="dubbo-sxzhongf-group"
version="1.0.0"
interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService">
<dubbo:method name="sayHello" retries="3" timeout="5000" mock="false" />
<dubbo:method name="testGeneric" retries="3" timeout="5000" mock="false" />
</dubbo:reference>
</beans>
- Java API
public class ApiConsumerApplication {
public static void main(String[] args) {
// 1. 創(chuàng)建服務(wù)引用對(duì)象實(shí)例
ReferenceConfig<IGreetingService> referenceConfig = new ReferenceConfig<IGreetingService>();
// 2. 設(shè)置應(yīng)用程序信息
referenceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-consumer"));
// 3. 設(shè)置注冊(cè)中心
referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181/"));
// 4. 設(shè)置服務(wù)接口和超時(shí)時(shí)間
referenceConfig.setInterface(IGreetingService.class);
// 默認(rèn)重試3次
referenceConfig.setTimeout(5000);
// 5 設(shè)置服務(wù)分組和版本
referenceConfig.setGroup("dubbo-sxzhongf-group");
referenceConfig.setVersion("1.0.0");
// 6. 引用服務(wù)
IGreetingService greetingService = referenceConfig.get();
// 7. 設(shè)置隱式參數(shù)
RpcContext.getContext().setAttachment("company", "sxzhongf");
// 獲取provider端傳遞的隱式參數(shù)(FIXME: 需要后續(xù)追蹤)
// System.out.println("年齡是:" + RpcContext.getContext().getAttachment("age"));
//8. 調(diào)用服務(wù)
System.out.println(greetingService.sayHello("pan"));
}
}
1. new ReferenceConfig
在此階段,會(huì)初始化org.apache.dubbo.config.AbstractConfig & org.apache.dubbo.config.ReferenceConfig的靜態(tài)變量以及靜態(tài)代碼塊。
2. ReferenceConfig#get
-
ReferenceConfig#init- 通過
DubboBootstrap啟動(dòng)dubbo。 - 繼而初始化服務(wù)的元數(shù)據(jù)信息,
URL.buildKey(interfaceName, group, version)這段用來生成唯一服務(wù)的key,所以我們之前說dubbo的唯一標(biāo)識(shí)是通過全路徑和group以及version來決定的。 - 接下來通過
org.apache.dubbo.config.utils.ConfigValidationUtils#checkMock來檢查我們mock是否設(shè)置正確。 - 設(shè)置一系列要用的參數(shù)(系統(tǒng)運(yùn)行參數(shù)、是否為consumer、是否為泛型調(diào)用等等),檢查
dubbo的注冊(cè)地址,默認(rèn)為當(dāng)前主機(jī)IP
- 通過
ReferenceConfig#createProxy創(chuàng)建調(diào)用代理開始
ReferenceConfig#shouldJvmRefer首先判斷是否為Injvm調(diào)用如果不為
injvm,判斷是否為peer to peer端對(duì)端設(shè)置,如果為p2p,那么就直連url檢查注冊(cè)中心是否存在(注冊(cè)中心有可能有多個(gè))
循環(huán)檢查注冊(cè)中心是否有效
-
配置轉(zhuǎn)換
URLregistry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=deep-in-dubbo-first-consumer&dubbo=2.0.2&pid=9780&refer=application%3Ddeep-in-dubbo-first-consumer%26dubbo%3D2.0.2%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D9780%26register.ip%3D192.168.14.99%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D5000%26timestamp%3D1582959441066%26version%3D1.0.0®istry=zookeeper&release=2.7.5×tamp=1582961922459 -
如果只有一個(gè)注冊(cè)中心,執(zhí)行
REF_PROTOCOL.refer(interfaceClass, urls.get(0));來將URL轉(zhuǎn)為Invoker對(duì)象,因?yàn)?code>private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();是擴(kuò)展是Adaptive,因此在這里會(huì)執(zhí)行Protocol$Adaptive#refer方法,又由于protocol參數(shù)值為registry,因此會(huì)只是RegistryProtocol#refer,又由于被Wrapper類裝配,因此會(huì)先執(zhí)行三個(gè)Wrapper類,最后才能執(zhí)行到RegistryProtocol#refer -> RegistryProtocol#doRefer,在doRefer方法中會(huì)訂閱服務(wù)提供者地址,最后返回Invoker對(duì)象。1582964279895.png
1582977260255.png
那么究竟是如何生成的Invoker對(duì)象呢?我們來看下具體代碼:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 1.可以查尋RegistryDirectory & StaticDirectory 區(qū)別
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//2. 封裝訂閱所用URL
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
//3.build路由規(guī)則鏈
directory.buildRouterChain(subscribeUrl);
//4.訂閱服務(wù)提供者地址
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//5.封裝集群策略到虛擬invoker
Invoker invoker = cluster.join(directory);
return invoker;
}
上述代碼中,步驟1根據(jù)URL生成了一個(gè)RegistryDirectory(關(guān)于Directory接口的作用,可以自行查詢一些,直白一些就是將一堆Invoker對(duì)象封裝成一個(gè)List<Invoker>,只有2種實(shí)現(xiàn)RegistryDirectory & StaticDirectory,從命名可看出一個(gè)是動(dòng)態(tài)可變,一個(gè)不可變),代碼2 封裝了訂閱所要使用的參數(shù)信息,代碼3則是封裝綁定路由規(guī)則鏈,代碼4訂閱。代碼5調(diào)用 Cluster$Adaptive#join方法生成Invoker對(duì)象。
在代碼2種從zk獲取服務(wù)提供者信息:

一旦zk返回服務(wù)提供者列表之后,就會(huì)調(diào)用RegistryDirectory#notify,如下:

在org.apache.dubbo.common.utils.UrlUtils#isMatch中對(duì)provider和consumer的api進(jìn)行匹配校驗(yàn)。繼續(xù)跟蹤:RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokers 在toInvokers正式將URL轉(zhuǎn)換為Invoker,通過invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); 在這里protocol#refer同樣執(zhí)行順序如:
(dubbo 2.7.5) protocol#refer -> protocol$Adaptive#refer -> QosProtocolWrapper#refer -> ProtocolListenerWrapper#refer -> ProtocolFilterWrapper#refer ->AbstractProtocol#refer->DubboProtocol#protocolBindingRefer,調(diào)用代碼如下:
```java
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
```
關(guān)注getClients,其中執(zhí)行了DubboProtocol#getSharedClient -> DubboProtocol#initClient 創(chuàng)建netty client進(jìn)行連接。
因?yàn)檫@里使用的是明確的DubboInvoker,在回調(diào)的時(shí)候,Wrapper會(huì)對(duì)該Invoker進(jìn)行包裝,執(zhí)行效果如下:

因此,會(huì)執(zhí)行到ProtocolFilterWrapper#buildInvokerChain,該函數(shù)會(huì)對(duì)服務(wù)進(jìn)行調(diào)用鏈跟蹤:
```java
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 獲取所有在MATA-INF文件中配置的激活的責(zé)任鏈接口
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
```
所有的負(fù)載均衡、容錯(cuò)策略等都是在這里綁定的。
7.如果有多個(gè)注冊(cè)中心,將會(huì)循環(huán)執(zhí)行步驟6,將URL轉(zhuǎn)換為Invoker對(duì)象,然后添加到一個(gè)List,分別進(jìn)行注冊(cè)之后,然后判斷最后一個(gè)注冊(cè)中心url是否有效,針對(duì)多訂閱的場(chǎng)景,URL中添加cluster參數(shù),默認(rèn)使用org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster策略,使用org.apache.dubbo.rpc.cluster.Cluster#join將多個(gè)Invoker對(duì)象封裝一個(gè)虛擬的Invoker中,否則如果最后一個(gè)注冊(cè)中心是無效的,直接封裝Invoker對(duì)象。
8.創(chuàng)建服務(wù)代理ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>),因?yàn)?code>ProxyFactory是一個(gè)適配類。那么同樣這里會(huì)調(diào)用ProxyFactory$Adaptive#getProxy,這里最終就是返回一個(gè)代理服務(wù)的Invoker對(duì)象。
至此,我們的消費(fèi)端的綁定遠(yuǎn)程zk的服務(wù)就已經(jīng)結(jié)束了。
那么,我們?cè)谡{(diào)用服務(wù)器方法的時(shí)候服務(wù)器端和客戶端都是如何處理的呢?下節(jié)我們將繼續(xù)分析。

