Fegin-完整請求流程解析

這一章說說基于Fegin的聲明式調(diào)用請求是怎么個流程

首先我們從構(gòu)建流程中知道,大體上來說是基于JDK的動態(tài)代理機制實現(xiàn)的,那么在JDK的動態(tài)代理中,對方法進行增強的類就是InvocationHandler,核心方法就是invoke(),在Fegin中就是FeignInvocationHandler

我們看看這個類

feign.ReflectiveFeign$FeignInvocationHandler

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      //對Object原生方法做幾個判斷
      if ("equals".equals(method.getName())) {
        try {
          Object
              otherHandler =
              args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
          return equals(otherHandler);
        } catch (IllegalArgumentException e) {
          return false;
        }
      } else if ("hashCode".equals(method.getName())) {
        return hashCode();
      } else if ("toString".equals(method.getName())) {
        return toString();
      }
      //從methodHandle中依據(jù)接口的Method獲取SynchronousMethodHandler
      return dispatch.get(method).invoke(args);
    }

真正執(zhí)行的Method是解析好的SynchronousMethodHandler,args是請求的方法參數(shù),從這里看出,真正執(zhí)行的是構(gòu)建的
SynchronousMethodHandler

進入SynchronousMethodHandler#invoke方法中

  @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        //看名字就知道是執(zhí)行并解碼返回值
        return executeAndDecode(template);
      } catch (RetryableException e) {
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }

    Response response;
    long start = System.nanoTime();
    try {
      //真正執(zhí)行方法的核心    
      response = client.execute(request, options);
      // ensure the request is set. TODO: remove in Feign 10
      response.toBuilder().request(request).build();
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
      throw errorExecuting(request, e);
    }
    //省略……………………
}

我們知道在targetToHandlersByName.apply(target)方法中將接口的注解參數(shù)使用SpringMvcContract解析生成MethodMetadata,其中就有template,在上面executeAndDecode方法,第一步是將實際的請求地址進行拼裝,參數(shù)替換,最后形成的就像這樣:

http://serverA/1?name=tom?age=12

最后形成一個包含服務名的請求路徑,是不是很眼熟,和我們在ribbon中傳入的URL很像

這里方法嵌套的很多,沒辦法,只有一個個分析

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            //獲取請求URL 類似:http://serverA/1?name=tom?age=12
            URI asUri = URI.create(request.url());
            //獲取服務名 類似:serverA
            String clientName = asUri.getHost();
            //替換掉服務名,類似:http:///1?name=tom?age=12
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            //這一步是將請求封裝為RibbonRequest
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);
            //獲取當前服務的請求參數(shù)
            IClientConfig requestConfig = getClientConfig(options, clientName);
            //核心方法 細講
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }

上面excute()方的核心lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse(),看第一個方法lbClient(clientName):

org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory#create

    public FeignLoadBalancer create(String clientName) {
        if (this.cache.containsKey(clientName)) {
            return this.cache.get(clientName);
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }

第一步是先去緩存中獲取該服務對應的FeignLoadBalancer,如果沒有進行創(chuàng)建,我們看看創(chuàng)建的流程

  • 獲取該服務對應的配置類
  • 獲取服務對應的ILoadBalancer,其實在Ribbon中提到的,這里默認的走的是ZoneAwareLoadBalancer,注意啊,在ZoneAwareLoadBalancer初始話的時候已經(jīng)完成了和EurekaClient本地注冊表的拉取,保存在allServerList(BaseLoadBalancer)中,并啟動了定時調(diào)度任務,每30S進行一次全量更新。初始的話也是創(chuàng)建和服務對應的Ribbon上下文,從該上下文中獲取該服務實例的
  • 獲取服務攔截器
  • 是否配置重試機制,一般沒有配置,走FeignLoadBalancer

好了,lbClient(clientName)返回的是一個FeignLoadBalancer,接著執(zhí)行它的executeWithLoadBalancer方法

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //獲取在每個服務實例重試的的次數(shù)
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    //最多嘗試幾個服務實例
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    //對于每個服務實例的調(diào)用邏輯
    //默認field server是null,通過selectServer()方法獲取一個Server
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                //對于每個Server,按順序映射為對于每個Server包含重試邏輯的請求調(diào)用
                public Observable<T> call(Server server) {
                    //設置上下文
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //每個Server包含重試邏輯的請求調(diào)用
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    //增加Server正在處理的請求計數(shù)
                                    loadBalancerContext.noteOpenConnection(stats);

                                    //監(jiān)聽器
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    //計時器
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    //operation.call(server)就是剛剛分析的AbstractLoadBalancerAwareClient傳過來的ServerOperation,就是直接對這個Server調(diào)用請求
                                    //doOnEach的操作就是記錄請求前后的一些數(shù)據(jù)用于負載均衡數(shù)據(jù)統(tǒng)計
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            //記錄請求完成
                                            recordStats(tracer, stats, entity, null);
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            //記錄請求結(jié)束
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            //發(fā)生了錯誤,通知listener
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            //因為只有調(diào)用請求成功只有一個結(jié)果(只有一個請求), 這里的entity就是結(jié)果,只要收到結(jié)果就代表請求成功
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            

                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });

                    if (maxRetrysSame > 0)
                        //是否retry
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });

    if (maxRetrysNext > 0 && server == null)
        //是否retry,如果retry回調(diào)用selectServer()返回下一個Server
        o = o.retry(retryPolicy(maxRetrysNext, false));

    //異常處理
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                //如果超過重試次數(shù),則拋異常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

首先這個Observable使用的是Java的rx包下面的組件,服務的選取采用selectServer,這個就是

Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   

采用Ribbon的服務選取進行的,上面那一大坨主要是對服務調(diào)用包裝一些重試策略

具體的重試講解,后續(xù)開文再說

這submit()方法,請求最終還是會回調(diào)call(Server server)方法

@Override
public Observable<T> call(Server server) {
   //將服務請求URL替換為真實URL,Ribbon中選取的服務    
   URI finalUri = reconstructURIWithServer(server, request.getUri());
    S requestForServer = (S) request.replaceUri(finalUri);
    try {
        //執(zhí)行請求邏輯
         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
        } catch (Exception e) {
          return Observable.error(e);
        }
  }
    @Override
    public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
            throws IOException {
        Request.Options options;
        if (configOverride != null) {
            //配置請求參數(shù)
            options = new Request.Options(
                    configOverride.get(CommonClientConfigKey.ConnectTimeout,
                            this.connectTimeout),
                    (configOverride.get(CommonClientConfigKey.ReadTimeout,
                            this.readTimeout)));
        }
        else {
            options = new Request.Options(this.connectTimeout, this.readTimeout);
        }
        //真實請求
        Response response = request.client().execute(request.toRequest(), options);
        return new RibbonResponse(request.getUri(), response);
    }

附上總流程圖:


Fegin-完整Http請求流程.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容