Dubbo 服務(wù)調(diào)用

一、前言

image.png

服務(wù)導(dǎo)出的過程中,我們已經(jīng)獲取了一個(gè)代理對(duì)象。服務(wù)調(diào)用就是通過調(diào)用這個(gè)代理對(duì)象的方法。

image.png

Dubbo官方文檔給出了服務(wù)調(diào)用的具體過程。簡述一下就是客戶端通過代理對(duì)象發(fā)起調(diào)用,提前構(gòu)造好協(xié)議頭,然后將對(duì)象序列化成協(xié)議體,通過client(Netty)進(jìn)行網(wǎng)絡(luò)傳輸。

服務(wù)提供者的NettyServer接收到這個(gè)請(qǐng)求后會(huì)分發(fā)給業(yè)務(wù)線程池。由業(yè)務(wù)線程池調(diào)用具體的實(shí)現(xiàn)方法。

二、源碼分析

客戶端調(diào)用代碼

客戶端調(diào)用的代碼如下

public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
        context.start();
        DemoService demoService = context.getBean("demoService", DemoService.class);
        CompletableFuture<String> hello = demoService.sayHelloAsync("world");
        System.out.println("result: " + hello.get());
    }

在服務(wù)導(dǎo)出結(jié)束完成后,我們獲取DemoService實(shí)際是一個(gè)代理對(duì)象。通過該代理對(duì)象完成方法調(diào)用。最終會(huì)生成一個(gè)RPCInvocation對(duì)象調(diào)用MockClusterInvoker#invoke方法。

image.png

MockClusterInvoker#invoke

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        
        // 獲取mock配置
        String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else {
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }

這個(gè)方法主要是根據(jù)mock配置決定是否調(diào)用mock方法

  • mock無配置調(diào)用真實(shí)方法
  • mock為force則強(qiáng)制走mock方法
  • mock為true,真實(shí)方法調(diào)用失敗后執(zhí)行mock方法

AbstractClusterInvoker#invoke

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();

        // binding attachments into invocation.
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }

        // 調(diào)用directory.list 主要是做路由過濾
        List<Invoker<T>> invokers = list(invocation);
        // 過濾完成通過SPI機(jī)制獲取loadBalance實(shí)現(xiàn)類
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 調(diào)用子類方法
        return doInvoke(invocation, invokers, loadbalance);
    }

這段模板代碼主要邏輯是:

  • 綁定attachement到invocation
  • 通過RegistryDirectory過濾Invoker
  • 通過SPI機(jī)制獲取負(fù)載均衡實(shí)現(xiàn)
  • 執(zhí)行子類的doInvoke方法

最終這里是會(huì)調(diào)用到FailoverClusterInvoker執(zhí)行doInvoker方法

FailoverClusterInvoker#doInvoke

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 省略代碼
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {       
            // 負(fù)載均衡中選擇一個(gè)Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 執(zhí)行方法
                Result result = invoker.invoke(invocation);
                // 省略代碼
                return result;
            } catch (RpcException e) {
               // 省略代碼
            } catch (Throwable e) {
    // 省略代碼
   }
        }
        throw new RpcException();
    }

這個(gè)方法主要是完成了重試機(jī)制的邏輯

  • 獲取重試次數(shù)并循環(huán)執(zhí)行
  • 根據(jù)負(fù)載均衡策略選擇一個(gè)Invoker
  • 執(zhí)行子類的doInvoke方法

最終調(diào)用到DubboInvoker的doInvoke方法

DubboInvoker#doInvoke

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        // 獲取client
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 判斷是否是oneWay方式調(diào)用
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發(fā)送
                currentClient.send(inv, isSent);
                // 返回null
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

這段代碼的主要邏輯是

  • 獲取client對(duì)象
  • 根據(jù)有無返回值判斷調(diào)用方式是否是oneway
  • oneway通過client發(fā)起請(qǐng)求,返回一個(gè)異步執(zhí)行結(jié)果的返回值
  • 非oneway則獲取回調(diào)線程池,發(fā)送請(qǐng)求,返回一個(gè)Future對(duì)象。

服務(wù)端調(diào)用代碼

客戶端默認(rèn)是通過Netty進(jìn)行發(fā)起請(qǐng)求調(diào)用,對(duì)于服務(wù)端主要是通過NettyServerHandler#channelRead方法進(jìn)行接收消息

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.received(channel, msg);
    }

服務(wù)端接收消息默認(rèn)是所有消息派發(fā)至業(yè)務(wù)線程池,也就是AllChannelHandler#received

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

這里將消息封裝成ChannelEventRunnable扔到業(yè)務(wù)線程池執(zhí)行。接下來會(huì)將消息解碼后調(diào)用到HeaderExchangeHandler#handleRequest

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            channel.send(res);
            return;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // 執(zhí)行方法
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }

這里調(diào)用具體的handler執(zhí)行reply方法,最終調(diào)用到DubboProtocol中ExchangeHandler的實(shí)現(xiàn)

@Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

這里就是獲取具體的Invoker對(duì)象執(zhí)行方法調(diào)用

三、總結(jié)

客戶端通過接口調(diào)用某個(gè)方法,實(shí)際調(diào)用到代理類。代理類從cluster中獲取Invoker對(duì)象,并進(jìn)行router進(jìn)行過濾。緊接著通過loadBalance進(jìn)行負(fù)載均衡。獲取到Invoker對(duì)象后會(huì)根據(jù)協(xié)議構(gòu)造請(qǐng)求頭,然后將參數(shù)序列化后構(gòu)造回請(qǐng)求體,最后通過Client進(jìn)行遠(yuǎn)程調(diào)用。

服務(wù)端通過NettryServer監(jiān)聽請(qǐng)求,根據(jù)協(xié)議反序列化成對(duì)象,再按照派發(fā)策略派發(fā)消息。默認(rèn)是All,也就是所有請(qǐng)求扔給業(yè)務(wù)線程池。業(yè)務(wù)線程會(huì)獲取Invoker對(duì)象,并調(diào)用真實(shí)類,最終將結(jié)果返回。

[toc]


一、前言

[圖片上傳失敗...(image-2b0c4e-1632559794086)]

服務(wù)導(dǎo)出的過程中,我們已經(jīng)獲取了一個(gè)代理對(duì)象。服務(wù)調(diào)用就是通過調(diào)用這個(gè)代理對(duì)象的方法。

[圖片上傳失敗...(image-5b88ab-1632559794086)]

Dubbo官方文檔給出了服務(wù)調(diào)用的具體過程。簡述一下就是客戶端通過代理對(duì)象發(fā)起調(diào)用,提前構(gòu)造好協(xié)議頭,然后將對(duì)象序列化成協(xié)議體,通過client(Netty)進(jìn)行網(wǎng)絡(luò)傳輸。

服務(wù)提供者的NettyServer接收到這個(gè)請(qǐng)求后會(huì)分發(fā)給業(yè)務(wù)線程池。由業(yè)務(wù)線程池調(diào)用具體的實(shí)現(xiàn)方法。

二、源碼分析

客戶端調(diào)用代碼

客戶端調(diào)用的代碼如下

public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
        context.start();
        DemoService demoService = context.getBean("demoService", DemoService.class);
        CompletableFuture<String> hello = demoService.sayHelloAsync("world");
        System.out.println("result: " + hello.get());
    }

在服務(wù)導(dǎo)出結(jié)束完成后,我們獲取DemoService實(shí)際是一個(gè)代理對(duì)象。通過該代理對(duì)象完成方法調(diào)用。最終會(huì)生成一個(gè)RPCInvocation對(duì)象調(diào)用MockClusterInvoker#invoke方法。

image.png

MockClusterInvoker#invoke

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        
        // 獲取mock配置
        String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else {
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }

這個(gè)方法主要是根據(jù)mock配置決定是否調(diào)用mock方法

  • mock無配置調(diào)用真實(shí)方法
  • mock為force則強(qiáng)制走mock方法
  • mock為true,真實(shí)方法調(diào)用失敗后執(zhí)行mock方法

AbstractClusterInvoker#invoke

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();

        // binding attachments into invocation.
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }

        // 調(diào)用directory.list 主要是做路由過濾
        List<Invoker<T>> invokers = list(invocation);
        // 過濾完成通過SPI機(jī)制獲取loadBalance實(shí)現(xiàn)類
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 調(diào)用子類方法
        return doInvoke(invocation, invokers, loadbalance);
    }

這段模板代碼主要邏輯是:

  • 綁定attachement到invocation
  • 通過RegistryDirectory過濾Invoker
  • 通過SPI機(jī)制獲取負(fù)載均衡實(shí)現(xiàn)
  • 執(zhí)行子類的doInvoke方法

最終這里是會(huì)調(diào)用到FailoverClusterInvoker執(zhí)行doInvoker方法

FailoverClusterInvoker#doInvoke

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 省略代碼
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {       
            // 負(fù)載均衡中選擇一個(gè)Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 執(zhí)行方法
                Result result = invoker.invoke(invocation);
                // 省略代碼
                return result;
            } catch (RpcException e) {
               // 省略代碼
            } catch (Throwable e) {
    // 省略代碼
   }
        }
        throw new RpcException();
    }

這個(gè)方法主要是完成了重試機(jī)制的邏輯

  • 獲取重試次數(shù)并循環(huán)執(zhí)行
  • 根據(jù)負(fù)載均衡策略選擇一個(gè)Invoker
  • 執(zhí)行子類的doInvoke方法

最終調(diào)用到DubboInvoker的doInvoke方法

DubboInvoker#doInvoke

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        // 獲取client
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 判斷是否是oneWay方式調(diào)用
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發(fā)送
                currentClient.send(inv, isSent);
                // 返回null
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

這段代碼的主要邏輯是

  • 獲取client對(duì)象
  • 根據(jù)有無返回值判斷調(diào)用方式是否是oneway
  • oneway通過client發(fā)起請(qǐng)求,返回一個(gè)異步執(zhí)行結(jié)果的返回值
  • 非oneway則獲取回調(diào)線程池,發(fā)送請(qǐng)求,返回一個(gè)Future對(duì)象。

服務(wù)端調(diào)用代碼

客戶端默認(rèn)是通過Netty進(jìn)行發(fā)起請(qǐng)求調(diào)用,對(duì)于服務(wù)端主要是通過NettyServerHandler#channelRead方法進(jìn)行接收消息

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.received(channel, msg);
    }

服務(wù)端接收消息默認(rèn)是所有消息派發(fā)至業(yè)務(wù)線程池,也就是AllChannelHandler#received

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

這里將消息封裝成ChannelEventRunnable扔到業(yè)務(wù)線程池執(zhí)行。接下來會(huì)將消息解碼后調(diào)用到HeaderExchangeHandler#handleRequest

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            channel.send(res);
            return;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // 執(zhí)行方法
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }

這里調(diào)用具體的handler執(zhí)行reply方法,最終調(diào)用到DubboProtocol中ExchangeHandler的實(shí)現(xiàn)

@Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

這里就是獲取具體的Invoker對(duì)象執(zhí)行方法調(diào)用

三、總結(jié)

客戶端通過接口調(diào)用某個(gè)方法,實(shí)際調(diào)用到代理類。代理類從cluster中獲取Invoker對(duì)象,并進(jìn)行router進(jìn)行過濾。緊接著通過loadBalance進(jìn)行負(fù)載均衡。獲取到Invoker對(duì)象后會(huì)根據(jù)協(xié)議構(gòu)造請(qǐng)求頭,然后將參數(shù)序列化后構(gòu)造回請(qǐng)求體,最后通過Client進(jìn)行遠(yuǎn)程調(diào)用。

服務(wù)端通過NettryServer監(jiān)聽請(qǐng)求,根據(jù)協(xié)議反序列化成對(duì)象,再按照派發(fā)策略派發(fā)消息。默認(rèn)是All,也就是所有請(qǐng)求扔給業(yè)務(wù)線程池。業(yè)務(wù)線程會(huì)獲取Invoker對(duì)象,并調(diào)用真實(shí)類,最終將結(jié)果返回。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者。

友情鏈接更多精彩內(nèi)容