這一章說說基于Fegin的聲明式調(diào)用請求是怎么個流程
首先我們從構(gòu)建流程中知道,大體上來說是基于JDK的動態(tài)代理機制實現(xiàn)的,那么在JDK的動態(tài)代理中,對方法進行增強的類就是InvocationHandler,核心方法就是invoke(),在Fegin中就是FeignInvocationHandler
我們看看這個類
feign.ReflectiveFeign$FeignInvocationHandler
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//對Object原生方法做幾個判斷
if ("equals".equals(method.getName())) {
try {
Object
otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
//從methodHandle中依據(jù)接口的Method獲取SynchronousMethodHandler
return dispatch.get(method).invoke(args);
}
真正執(zhí)行的Method是解析好的SynchronousMethodHandler,args是請求的方法參數(shù),從這里看出,真正執(zhí)行的是構(gòu)建的
SynchronousMethodHandler
進入SynchronousMethodHandler#invoke方法中
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
//看名字就知道是執(zhí)行并解碼返回值
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
//真正執(zhí)行方法的核心
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
//省略……………………
}
我們知道在targetToHandlersByName.apply(target)方法中將接口的注解參數(shù)使用SpringMvcContract解析生成MethodMetadata,其中就有template,在上面executeAndDecode方法,第一步是將實際的請求地址進行拼裝,參數(shù)替換,最后形成的就像這樣:
最后形成一個包含服務名的請求路徑,是不是很眼熟,和我們在ribbon中傳入的URL很像
這里方法嵌套的很多,沒辦法,只有一個個分析
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
//獲取請求URL 類似:http://serverA/1?name=tom?age=12
URI asUri = URI.create(request.url());
//獲取服務名 類似:serverA
String clientName = asUri.getHost();
//替換掉服務名,類似:http:///1?name=tom?age=12
URI uriWithoutHost = cleanUrl(request.url(), clientName);
//這一步是將請求封裝為RibbonRequest
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
//獲取當前服務的請求參數(shù)
IClientConfig requestConfig = getClientConfig(options, clientName);
//核心方法 細講
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
上面excute()方的核心lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse(),看第一個方法lbClient(clientName):
org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory#create
public FeignLoadBalancer create(String clientName) {
if (this.cache.containsKey(clientName)) {
return this.cache.get(clientName);
}
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
第一步是先去緩存中獲取該服務對應的FeignLoadBalancer,如果沒有進行創(chuàng)建,我們看看創(chuàng)建的流程
- 獲取該服務對應的配置類
- 獲取服務對應的ILoadBalancer,其實在Ribbon中提到的,這里默認的走的是ZoneAwareLoadBalancer,注意啊,在ZoneAwareLoadBalancer初始話的時候已經(jīng)完成了和EurekaClient本地注冊表的拉取,保存在allServerList(BaseLoadBalancer)中,并啟動了定時調(diào)度任務,每30S進行一次全量更新。初始的話也是創(chuàng)建和服務對應的Ribbon上下文,從該上下文中獲取該服務實例的
- 獲取服務攔截器
- 是否配置重試機制,一般沒有配置,走FeignLoadBalancer
好了,lbClient(clientName)返回的是一個FeignLoadBalancer,接著執(zhí)行它的executeWithLoadBalancer方法
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//獲取在每個服務實例重試的的次數(shù)
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//最多嘗試幾個服務實例
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
//對于每個服務實例的調(diào)用邏輯
//默認field server是null,通過selectServer()方法獲取一個Server
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
//對于每個Server,按順序映射為對于每個Server包含重試邏輯的請求調(diào)用
public Observable<T> call(Server server) {
//設置上下文
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
//每個Server包含重試邏輯的請求調(diào)用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
//增加Server正在處理的請求計數(shù)
loadBalancerContext.noteOpenConnection(stats);
//監(jiān)聽器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//計時器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//operation.call(server)就是剛剛分析的AbstractLoadBalancerAwareClient傳過來的ServerOperation,就是直接對這個Server調(diào)用請求
//doOnEach的操作就是記錄請求前后的一些數(shù)據(jù)用于負載均衡數(shù)據(jù)統(tǒng)計
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
//記錄請求完成
recordStats(tracer, stats, entity, null);
}
@Override
public void onError(Throwable e) {
//記錄請求結(jié)束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
//發(fā)生了錯誤,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
//因為只有調(diào)用請求成功只有一個結(jié)果(只有一個請求), 這里的entity就是結(jié)果,只要收到結(jié)果就代表請求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
//是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
//是否retry,如果retry回調(diào)用selectServer()返回下一個Server
o = o.retry(retryPolicy(maxRetrysNext, false));
//異常處理
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
//如果超過重試次數(shù),則拋異常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
首先這個Observable使用的是Java的rx包下面的組件,服務的選取采用selectServer,這個就是
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
采用Ribbon的服務選取進行的,上面那一大坨主要是對服務調(diào)用包裝一些重試策略
具體的重試講解,后續(xù)開文再說
這submit()方法,請求最終還是會回調(diào)call(Server server)方法
@Override
public Observable<T> call(Server server) {
//將服務請求URL替換為真實URL,Ribbon中選取的服務
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
//執(zhí)行請求邏輯
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception e) {
return Observable.error(e);
}
}
@Override
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
throws IOException {
Request.Options options;
if (configOverride != null) {
//配置請求參數(shù)
options = new Request.Options(
configOverride.get(CommonClientConfigKey.ConnectTimeout,
this.connectTimeout),
(configOverride.get(CommonClientConfigKey.ReadTimeout,
this.readTimeout)));
}
else {
options = new Request.Options(this.connectTimeout, this.readTimeout);
}
//真實請求
Response response = request.client().execute(request.toRequest(), options);
return new RibbonResponse(request.getUri(), response);
}
附上總流程圖:
