08-dubbo服務(wù)引入和調(diào)用源碼分析

開頭

上一節(jié)講到了服務(wù)的導(dǎo)出,即服務(wù)端如何將自己的接口提供成dubbo服務(wù)的過程,這一節(jié)就是講服務(wù)的調(diào)用了,消費端是如何調(diào)用服務(wù)端的接口的呢?

主要流程

1.spring啟動時,會給@Reference注解的屬性賦值,賦值的時候會調(diào)用referenceBean.get方法
2.準(zhǔn)備初始化invoker對象,MockClusterInvoker,生成這個是最終目的
3.在注冊中心初始化服務(wù)目錄RegistryDirectory
4.將消費端信息注冊到zk
5.構(gòu)造路由鏈、服務(wù)訂閱
6.根據(jù)服務(wù)目錄得到最終的invoker對象MockClusterInvoker
8.最終調(diào)用MockClusterInvoker.invoke方法執(zhí)行請求發(fā)送數(shù)據(jù),里面調(diào)用了netty.send方法
9.通過netty channel,執(zhí)行nettyServerHandler方法處理請求和結(jié)果返回

源碼流程
流程圖地址:https://www.processon.com/view/link/60e02b8d637689510d6c4184

服務(wù)引入.jpg

1.程序入口

在spring啟動的時候,會對@Reference注解的屬性賦值,生成ReferenceBean,在ReferenceAnnotationBeanPostProcessor.doGetInjectedBean方法中
可以看到,最終調(diào)用了 referenceBean.get()方法,這個方法最后返回了一個ref對象,這個ref對象看到最后就是一個Invoke代理對象,也就是主要流程的第二步,準(zhǔn)備初始化invoker對象,MockClusterInvoker,生成這個是最終目的

@Override
    protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                       InjectionMetadata.InjectedElement injectedElement) throws Exception {

    
        return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
    }
 private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
        if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
            return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                    wrapInvocationHandler(referenceBeanName, referenceBean));
        } else {                                    // ReferenceBean should be initialized and get immediately
            // 這里
            return referenceBean.get();
        }
    }

public synchronized T get() {
        checkAndUpdateSubConfigs();

        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            // 入口
            init();
        }
        return ref;  // Invoke代理
    }

2.準(zhǔn)備初始化invoker對象,MockClusterInvoker

由init()->createProxy(map),這個方法太長了,留了三個主要的方法:
1.加載注冊中心url地址

  1. invoker = REF_PROTOCOL.refer調(diào)用registry.refer,這里又是spi機制,最終調(diào)用了registryProtocol.refer方法
private T createProxy(Map<String, String> map) {
     List<URL> us = loadRegistries(false);
    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
     invoker = CLUSTER.join(new StaticDirectory(u, invokers));

    }

3.在注冊中心初始化服務(wù)目錄RegistryDirectory
留下了主要代碼,可以看到這里初始化了一個注冊目錄,也就是我們最終在zk上看到的consumers節(jié)點文件夾。
registry.register(directory.getRegisteredConsumerUrl());這里最終會調(diào)用ZookeeperRegistry.doRegister方法,用zk客戶端向zk服務(wù)端創(chuàng)建節(jié)點,將消費端信息注冊到zk,可以看到這里創(chuàng)建的是臨時節(jié)點

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
      
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);

         
          registry.register(directory.getRegisteredConsumerUrl());
       

    
        directory.buildRouterChain(subscribeUrl);

      
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
         
    

        return invoker;
    }

 @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

4.構(gòu)造路由鏈、服務(wù)訂閱
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY

3.生成最終的invoker對象MockClusterInvoker

    Invoker invoker = cluster.join(directory);

這里又是SPI機制,由于Cluster有一個包裝類,所以會先調(diào)用MockClusterWrapper.join方法,原理可參照我之前單獨寫的一節(jié)SPI源碼分析
可以看到,這里最終生成MockClusterInvoker

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

4.服務(wù)調(diào)用

第3步驟中生成了一個MockClusterInvoker對象,所以最終調(diào)用服務(wù)的方法實際上就是調(diào)用MockClusterInvoker.invoke方法,會依次調(diào)用AbstractClusterInvoker.invoke->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else {
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            }
        return result;
    }

我們直接看DubboInvoker.doInvoke方法
1.首先會拿到一個 ExchangeClient客戶端
2.異步請求currentClient.request,最終調(diào)用HeaderExchangeChannel.request->調(diào)用netty的方法channel.send

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

 
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
           
            currentClient = clients[index.getAndIncrement() % clients.length];
        }

        try {
         
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);

                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);

                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);

                asyncRpcResult.subscribeTo(responseFuture);

                return asyncRpcResult;
            }
        }
    }

5.服務(wù)請求處理

由于使用的netty通信,所有客戶端發(fā)送消息后,netty服務(wù)端會在NettyServerHandler.channelRead中接到消息,這里調(diào)用了很多handler,就不展開看了。
1.MultiMessageHandler
2.HeartbeatHandler
3.AllChannelHandler
4.DecodeHandler
5.HeaderExchangeHandler
6.ExchangeHandlerAdapter

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {

            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

總結(jié)

服務(wù)的引入的目的就是在消費端@Reference標(biāo)注一個服務(wù)端接口,這個注解會去將消費端消息注冊到zk,最終會生成一個調(diào)用服務(wù)端的代理對象invoker,消費端調(diào)用服務(wù)端接口的時候最后調(diào)用的就是invoker.invoke方法,而這個方法采用的通信框架是netty,實現(xiàn)了遠(yuǎn)程調(diào)用。
dubbo源碼寫的很好,比如里面的SPI機制運用的很巧妙,還有一些抽象工廠設(shè)計模式等,源碼值得品讀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容