開頭
上一節(jié)講到了服務(wù)的導(dǎo)出,即服務(wù)端如何將自己的接口提供成dubbo服務(wù)的過程,這一節(jié)就是講服務(wù)的調(diào)用了,消費端是如何調(diào)用服務(wù)端的接口的呢?
主要流程
1.spring啟動時,會給@Reference注解的屬性賦值,賦值的時候會調(diào)用referenceBean.get方法
2.準(zhǔn)備初始化invoker對象,MockClusterInvoker,生成這個是最終目的
3.在注冊中心初始化服務(wù)目錄RegistryDirectory
4.將消費端信息注冊到zk
5.構(gòu)造路由鏈、服務(wù)訂閱
6.根據(jù)服務(wù)目錄得到最終的invoker對象MockClusterInvoker
8.最終調(diào)用MockClusterInvoker.invoke方法執(zhí)行請求發(fā)送數(shù)據(jù),里面調(diào)用了netty.send方法
9.通過netty channel,執(zhí)行nettyServerHandler方法處理請求和結(jié)果返回
源碼流程
流程圖地址:https://www.processon.com/view/link/60e02b8d637689510d6c4184

1.程序入口
在spring啟動的時候,會對@Reference注解的屬性賦值,生成ReferenceBean,在ReferenceAnnotationBeanPostProcessor.doGetInjectedBean方法中
可以看到,最終調(diào)用了 referenceBean.get()方法,這個方法最后返回了一個ref對象,這個ref對象看到最后就是一個Invoke代理對象,也就是主要流程的第二步,準(zhǔn)備初始化invoker對象,MockClusterInvoker,生成這個是最終目的
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}
private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
wrapInvocationHandler(referenceBeanName, referenceBean));
} else { // ReferenceBean should be initialized and get immediately
// 這里
return referenceBean.get();
}
}
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// 入口
init();
}
return ref; // Invoke代理
}
2.準(zhǔn)備初始化invoker對象,MockClusterInvoker
由init()->createProxy(map),這個方法太長了,留了三個主要的方法:
1.加載注冊中心url地址
- invoker = REF_PROTOCOL.refer調(diào)用registry.refer,這里又是spi機制,最終調(diào)用了registryProtocol.refer方法
private T createProxy(Map<String, String> map) {
List<URL> us = loadRegistries(false);
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
}
3.在注冊中心初始化服務(wù)目錄RegistryDirectory
留下了主要代碼,可以看到這里初始化了一個注冊目錄,也就是我們最終在zk上看到的consumers節(jié)點文件夾。
registry.register(directory.getRegisteredConsumerUrl());這里最終會調(diào)用ZookeeperRegistry.doRegister方法,用zk客戶端向zk服務(wù)端創(chuàng)建節(jié)點,將消費端信息注冊到zk,可以看到這里創(chuàng)建的是臨時節(jié)點
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
registry.register(directory.getRegisteredConsumerUrl());
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
return invoker;
}
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
4.構(gòu)造路由鏈、服務(wù)訂閱
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY
3.生成最終的invoker對象MockClusterInvoker
Invoker invoker = cluster.join(directory);
這里又是SPI機制,由于Cluster有一個包裝類,所以會先調(diào)用MockClusterWrapper.join方法,原理可參照我之前單獨寫的一節(jié)SPI源碼分析
可以看到,這里最終生成MockClusterInvoker
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
4.服務(wù)調(diào)用
第3步驟中生成了一個MockClusterInvoker對象,所以最終調(diào)用服務(wù)的方法實際上就是調(diào)用MockClusterInvoker.invoke方法,會依次調(diào)用AbstractClusterInvoker.invoke->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
}
return result;
}
我們直接看DubboInvoker.doInvoke方法
1.首先會拿到一個 ExchangeClient客戶端
2.異步請求currentClient.request,最終調(diào)用HeaderExchangeChannel.request->調(diào)用netty的方法channel.send
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
return asyncRpcResult;
}
}
}
5.服務(wù)請求處理
由于使用的netty通信,所有客戶端發(fā)送消息后,netty服務(wù)端會在NettyServerHandler.channelRead中接到消息,這里調(diào)用了很多handler,就不展開看了。
1.MultiMessageHandler
2.HeartbeatHandler
3.AllChannelHandler
4.DecodeHandler
5.HeaderExchangeHandler
6.ExchangeHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
總結(jié)
服務(wù)的引入的目的就是在消費端@Reference標(biāo)注一個服務(wù)端接口,這個注解會去將消費端消息注冊到zk,最終會生成一個調(diào)用服務(wù)端的代理對象invoker,消費端調(diào)用服務(wù)端接口的時候最后調(diào)用的就是invoker.invoke方法,而這個方法采用的通信框架是netty,實現(xiàn)了遠(yuǎn)程調(diào)用。
dubbo源碼寫的很好,比如里面的SPI機制運用的很巧妙,還有一些抽象工廠設(shè)計模式等,源碼值得品讀。