深度解析Spring Cloud Ribbon的實(shí)現(xiàn)源碼及原理

Spring Cloud Ribbon源碼分析

Ribbon的核心作用就是進(jìn)行請求的負(fù)載均衡,它的基本原理如下圖所示。就是客戶端集成Ribbon這個(gè)組件,Ribbon中會(huì)針對已經(jīng)配置的服務(wù)提供者地址列表進(jìn)行負(fù)載均衡的計(jì)算,得到一個(gè)目標(biāo)地址之后,再發(fā)起請求。

image-20211118135001876

那么接下來,我們從兩個(gè)層面去分析Ribbon的原理

  1. @LoadBalanced 注解如何讓普通的RestTemplate具備負(fù)載均衡的能力
  2. OpenFeign集成Ribbon的實(shí)現(xiàn)原理

@LoadBalancer注解解析過程分析

在使用RestTemplate的時(shí)候,我們加了一個(gè)@LoadBalance注解,就能讓這個(gè)RestTemplate在請求時(shí),就擁有客戶端負(fù)載均衡的能力。

@Bean
@LoadBalanced
RestTemplate restTemplate() {
    return new RestTemplate();
}

然后,我們打開@LoadBalanced這個(gè)注解,可以發(fā)現(xiàn)該注解僅僅是聲明了一個(gè)@qualifier注解。

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

@qualifier注解的作用

我們平時(shí)在使用注解去注入一個(gè)Bean時(shí),都是采用@Autowired。并且大家應(yīng)該知道@Autowired是可以注入一個(gè)List或者M(jìn)ap的。給大家舉個(gè)例子(在一個(gè)springboot應(yīng)用中)

定義一個(gè)TestClass

@AllArgsConstructor
@Data
public class TestClass {
    private String name;
}

聲明一個(gè)配置類,并注入TestClass

@Configuration
public class TestConfig {

    @Bean("testClass1")
    TestClass testClass(){
        return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){
        return new TestClass("testClass2");
    }
}

定義一個(gè)Controller,用于測試, 注意,此時(shí)我們使用的是@Autowired來注入一個(gè)List集合

@RestController
public class TestController {

    @Autowired(required = false)
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){
        return testClasses;
    }
}

此時(shí)訪問:http://localhost:8080/test , 得到的結(jié)果是

[
    {
        name: "testClass1"
    },
    {
        name: "testClass2"
    }
]

修改TestConfigTestController

@Configuration
public class TestConfig {

    @Bean("testClass1")
    @Qualifier
    TestClass testClass(){
        return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){
        return new TestClass("testClass2");
    }
}
@RestController
public class TestController {

    @Autowired(required = false)
    @Qualifier
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){
        return testClasses;
    }
}

再次訪問:http://localhost:8080/test , 得到的結(jié)果是

[
    {
        name: "testClass1"
    }
]

@LoadBalancer注解篩選及攔截

了解了@qualifier注解的作用后,再回到@LoadBalancer注解上,就不難理解了。

因?yàn)槲覀冃枰獟呙璧皆黾恿?code>@LoadBalancer注解的RestTemplate實(shí)例,所以,@LoadBalancer可以完成這個(gè)動(dòng)作,它的具體的實(shí)現(xiàn)代碼如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();
}

從這個(gè)代碼中可以看出,在LoadBalancerAutoConfiguration這個(gè)配置類中,會(huì)使用同樣的方式,把配置了@LoadBalanced注解的RestTemplate注入到restTemplates集合中。

拿到了RestTemplate之后,在LoadBalancerInterceptorConfig配置類中,會(huì)針對這些RestTemplate進(jìn)行攔截,實(shí)現(xiàn)代碼如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    //省略....

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration(proxyBeanMethods = false)
    @Conditional(RetryMissingOrDisabledCondition.class)
    static class LoadBalancerInterceptorConfig {
        
        //裝載一個(gè)LoadBalancerInterceptor的實(shí)例到IOC容器。
        @Bean
        public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
        
        //會(huì)遍歷所有加了@LoadBalanced注解的RestTemplate,在原有的攔截器之上,再增加了一個(gè)LoadBalancerInterceptor
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }
    //省略....
}

LoadBalancerInterceptor

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

RestTemplate調(diào)用過程

我們在程序中,使用下面的代碼發(fā)起遠(yuǎn)程請求時(shí)

restTemplate.getForObject(url,String.class);

它的整個(gè)調(diào)用過程如下。

RestTemplate.getForObject

 ----->  AbstractClientHttpRequest.execute()

                 ----->AbstractBufferingClientHttpRequest.executeInternal()

                              -----> InterceptingClientHttpRequest.executeInternal()

                                            -----> InterceptingClientHttpRequest.execute()

InterceptingClientHttpRequest.execute()方法的代碼如下。

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) { //遍歷所有的攔截器,通過攔截器進(jìn)行逐個(gè)處理。
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

LoadBalancerInterceptor

LoadBalancerInterceptor是一個(gè)攔截器,當(dāng)一個(gè)被@Loadbalanced注解修飾的RestTemplate對象發(fā)起HTTP請求時(shí),會(huì)被LoadBalancerInterceptorintercept方法攔截,

在這個(gè)方法中直接通過getHost方法就可以獲取到服務(wù)名(因?yàn)槲覀冊谑褂肦estTemplate調(diào)用服務(wù)的時(shí)候,使用的是服務(wù)名而不是域名,所以這里可以通過getHost直接拿到服務(wù)名然后去調(diào)用execute方法發(fā)起請求)

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

LoadBalancerClient其實(shí)是一個(gè)接口,我們看一下它的類圖,它有一個(gè)唯一的實(shí)現(xiàn)類:RibbonLoadBalancerClient。

image-20211211152356718

RibbonLoadBalancerClient.execute

RibbonLoadBalancerClient這個(gè)類的代碼比較長,我們主要看一下他的核心方法execute

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

上述代碼的實(shí)現(xiàn)邏輯如下:

  • 根據(jù)serviceId獲得一個(gè)ILoadBalancer,實(shí)例為:ZoneAwareLoadBalancer
  • 調(diào)用getServer方法去獲取一個(gè)服務(wù)實(shí)例
  • 判斷Server的值是否為空。這里的Server實(shí)際上就是傳統(tǒng)的一個(gè)服務(wù)節(jié)點(diǎn),這個(gè)對象存儲(chǔ)了服務(wù)節(jié)點(diǎn)的一些元數(shù)據(jù),比如host、port等

getServer

getServer是用來獲得一個(gè)具體的服務(wù)節(jié)點(diǎn),它的實(shí)現(xiàn)如下

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
    if (loadBalancer == null) {
        return null;
    }
    // Use 'default' on a null hint, or just pass it on?
    return loadBalancer.chooseServer(hint != null ? hint : "default");
}

通過代碼可以看到,getServer實(shí)際調(diào)用了IloadBalancer.chooseServer這個(gè)方法,ILoadBalancer這個(gè)是一個(gè)負(fù)載均衡器接口。

public interface ILoadBalancer {
    //addServers表示向負(fù)載均衡器中維護(hù)的實(shí)例列表增加服務(wù)實(shí)例
    public void addServers(List<Server> newServers);
    //chooseServer表示通過某種策略,從負(fù)載均衡服務(wù)器中挑選出一個(gè)具體的服務(wù)實(shí)例
    public Server chooseServer(Object key);
    //markServerDown表示用來通知和標(biāo)識(shí)負(fù)載均衡器中某個(gè)具體實(shí)例已經(jīng)停止服務(wù),否則負(fù)載均衡器在下一次獲取服務(wù)實(shí)例清單前都會(huì)認(rèn)為這個(gè)服務(wù)實(shí)例是正常工作的
    public void markServerDown(Server server);
    //getReachableServers表示獲取當(dāng)前正常工作的服務(wù)實(shí)例列表
    public List<Server> getReachableServers();
    //getAllServers表示獲取所有的服務(wù)實(shí)例列表,包括正常的服務(wù)和停止工作的服務(wù)
    public List<Server> getAllServers();
}

ILoadBalancer的類關(guān)系圖如下:

image-20211211153617850

從整個(gè)類的關(guān)系圖來看,BaseLoadBalancer類實(shí)現(xiàn)了基礎(chǔ)的負(fù)載均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer則是在負(fù)載均衡策略的基礎(chǔ)上做了一些功能擴(kuò)展。

  • AbstractLoadBalancer實(shí)現(xiàn)了ILoadBalancer接口,它定義了服務(wù)分組的枚舉類/chooseServer(用來選取一個(gè)服務(wù)實(shí)例)/getServerList(獲取某一個(gè)分組中的所有服務(wù)實(shí)例)/getLoadBalancerStats用來獲得一個(gè)LoadBalancerStats對象,這個(gè)對象保存了每一個(gè)服務(wù)的狀態(tài)信息。
  • BaseLoadBalancer,它實(shí)現(xiàn)了作為負(fù)載均衡器的基本功能,比如服務(wù)列表維護(hù)、服務(wù)存活狀態(tài)監(jiān)測、負(fù)載均衡算法選擇Server等。但是它只是完成基本功能,在有些復(fù)雜場景中還無法實(shí)現(xiàn),比如動(dòng)態(tài)服務(wù)列表、Server過濾、Zone區(qū)域意識(shí)(服務(wù)之間的調(diào)用希望盡可能是在同一個(gè)區(qū)域內(nèi)進(jìn)行,減少延遲)。
  • DynamicServerListLoadBalancer是BaseLoadbalancer的一個(gè)子類,它對基礎(chǔ)負(fù)載均衡提供了擴(kuò)展,從名字上可以看出,它提供了動(dòng)態(tài)服務(wù)列表的特性
  • ZoneAwareLoadBalancer 它是在DynamicServerListLoadBalancer的基礎(chǔ)上,增加了以Zone的形式來配置多個(gè)LoadBalancer的功能。

那在getServer方法中,loadBalancer.chooseServer具體的實(shí)現(xiàn)類是哪一個(gè)呢?我們找到RibbonClientConfiguration這個(gè)類

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

從上述聲明中,發(fā)現(xiàn)如果沒有自定義ILoadBalancer,則直接返回一個(gè)ZoneAwareLoadBalancer

ZoneAwareLoadBalancer

Zone表示區(qū)域的意思,區(qū)域指的就是地理區(qū)域的概念,一般較大規(guī)模的互聯(lián)網(wǎng)公司,都會(huì)做跨區(qū)域部署,這樣做有幾個(gè)好處,第一個(gè)是為不同地域的用戶提供最近的訪問節(jié)點(diǎn)減少訪問延遲,其次是為了保證高可用,做容災(zāi)處理。

而ZoneAwareLoadBalancer就是提供了具備區(qū)域意識(shí)的負(fù)載均衡器,它的主要作用是對Zone進(jìn)行了感知,保證每個(gè)Zone里面的負(fù)載均衡策略都是隔離的,它并不保證A區(qū)域過來的請求一定會(huì)發(fā)動(dòng)到A區(qū)域?qū)?yīng)的Server內(nèi)。真正實(shí)現(xiàn)這個(gè)需求的是ZonePreferenceServerListFilter/ZoneAffinityServerListFilter

ZoneAwareLoadBalancer的核心功能是

  • 若開啟了區(qū)域意識(shí),且zone的個(gè)數(shù) > 1,就繼續(xù)區(qū)域選擇邏輯
  • 根據(jù)ZoneAvoidanceRule.getAvailableZones()方法拿到可用區(qū)們(會(huì)T除掉完全不可用的區(qū)域們,以及可用但是負(fù)載最高的一個(gè)區(qū)域)
  • 從可用區(qū)zone們中,通過ZoneAvoidanceRule.randomChooseZone隨機(jī)選一個(gè)zone出來 (該隨機(jī)遵從權(quán)重規(guī)則:誰的zone里面Server數(shù)量最多,被選中的概率越大)
  • 在選中的zone里面的所有Server中,采用該zone對對應(yīng)的Rule,進(jìn)行choose
@Override
public Server chooseServer(Object key) {
    //ENABLED,表示是否用區(qū)域意識(shí)的choose選擇Server,默認(rèn)是true,
    //如果禁用了區(qū)域、或者只有一個(gè)zone,就直接按照父類的邏輯來進(jìn)行處理,父類默認(rèn)采用輪詢算法
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    Server server = null;
    try {
        LoadBalancerStats lbStats = getLoadBalancerStats();
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        if (triggeringLoad == null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
        }

        if (triggeringBlackoutPercentage == null) {
            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        //根據(jù)相關(guān)閾值計(jì)算可用區(qū)域
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            //從可用區(qū)域中隨機(jī)選擇一個(gè)區(qū)域,zone里面的服務(wù)器節(jié)點(diǎn)越多,被選中的概率越大
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                //根據(jù)zone獲得該zone中的LB,然后根據(jù)該Zone的負(fù)載均衡算法選擇一個(gè)server
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {
        logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {
        return server;
    } else {
        logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

BaseLoadBalancer.chooseServer

假設(shè)我們現(xiàn)在沒有使用多區(qū)域部署,那么負(fù)載策略會(huì)執(zhí)行到BaseLoadBalancer.chooseServer,

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

根據(jù)默認(rèn)的負(fù)載均衡算法來獲得指定的服務(wù)節(jié)點(diǎn)。默認(rèn)的算法是RoundBin。

rule.choose

rule代表負(fù)載均衡算法規(guī)則,它有很多實(shí)現(xiàn),IRule的實(shí)現(xiàn)類關(guān)系圖如下。

image-20211211155112400

默認(rèn)情況下,rule的實(shí)現(xiàn)為ZoneAvoidanceRule,它是在RibbonClientConfiguration這個(gè)配置類中定義的,代碼如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
// Order is important here, last should be the default, first should be optional
// see
// https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
        RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
}

所以,在BaseLoadBalancer.chooseServer中調(diào)用rule.choose(key);,實(shí)際會(huì)進(jìn)入到ZoneAvoidanceRulechoose方法

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer(); //獲取負(fù)載均衡器
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); //通過該方法獲取目標(biāo)服務(wù)
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

復(fù)合判斷server所在區(qū)域的性能和server的可用性選擇server

主要分析chooseRoundRobinAfterFiltering方法。

chooseRoundRobinAfterFiltering

從方法名稱可以看出來,它是通過對目標(biāo)服務(wù)集群通過過濾算法過濾一遍后,再使用輪詢實(shí)現(xiàn)負(fù)載均衡。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

CompositePredicate.getEligibleServers

使用主過濾條件對所有實(shí)例過濾并返回過濾后的清單,

@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    //
    List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
    
    //按照fallbacks中存儲(chǔ)的過濾器順序進(jìn)行過濾(此處就行先ZoneAvoidancePredicate然后AvailabilityPredicate)
    Iterator<AbstractServerPredicate> i = fallbacks.iterator();
    while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
           && i.hasNext()) {
        AbstractServerPredicate predicate = i.next();
        result = predicate.getEligibleServers(servers, loadBalancerKey);
    }
    return result;
}

依次使用次過濾條件對主過濾條件的結(jié)果進(jìn)行過濾*

  • //不論是主過濾條件還是次過濾條件,都需要判斷下面兩個(gè)條件
  • //只要有一個(gè)條件符合,就不再過濾,將當(dāng)前結(jié)果返回供線性輪詢
    • 第1個(gè)條件:過濾后的實(shí)例總數(shù)>=最小過濾實(shí)例數(shù)(默認(rèn)為1)
    • 第2個(gè)條件:過濾互的實(shí)例比例>最小過濾百分比(默認(rèn)為0)

getEligibleServers

這里的實(shí)現(xiàn)邏輯是,遍歷所有服務(wù)器列表,調(diào)用this.apply方法進(jìn)行驗(yàn)證,驗(yàn)證通過的節(jié)點(diǎn),會(huì)加入到results這個(gè)列表中返回。

public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

this.apply,會(huì)進(jìn)入到CompositePredicate.apply方法中,代碼如下。

//CompositePredicate.apply

@Override
public boolean apply(@Nullable PredicateKey input) {
    return delegate.apply(input);
}

delegate的實(shí)例是AbstractServerPredicate, 代碼如下!

public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
    return new AbstractServerPredicate() {
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
            public boolean apply(PredicateKey input) {
            return p.apply(input);
        }            
    };        
}

也就是說,會(huì)通過AbstractServerPredicate.apply方法進(jìn)行過濾,其中,input表示目標(biāo)服務(wù)器集群的某一個(gè)具體節(jié)點(diǎn)。

其中p,表示AndPredicate實(shí)例,這里用到了組合predicate進(jìn)行判斷,而這里的組合判斷是and的關(guān)系,用到了AndPredicate實(shí)現(xiàn)。

 private static class AndPredicate<T> implements Predicate<T>, Serializable {
        private final List<? extends Predicate<? super T>> components;
        private static final long serialVersionUID = 0L;

        private AndPredicate(List<? extends Predicate<? super T>> components) {
            this.components = components;
        }

        public boolean apply(@Nullable T t) {
            for(int i = 0; i < this.components.size(); ++i) { //遍歷多個(gè)predicate,逐一進(jìn)行判斷。
                if (!((Predicate)this.components.get(i)).apply(t)) {
                    return false;
                }
            }

            return true;
        }
 }

上述代碼中,components是由兩個(gè)predicate組合而成

  1. AvailabilityPredicate,過濾熔斷狀態(tài)下的服務(wù)以及并發(fā)連接過多的服務(wù)。
  2. ZoneAvoidancePredicate,過濾掉無可用區(qū)域的節(jié)點(diǎn)。

所以在AndPredicateapply方法中,需要遍歷這兩個(gè)predicate逐一進(jìn)行判斷。

AvailablilityPredicate

過濾熔斷狀態(tài)下的服務(wù)以及并發(fā)連接過多的服務(wù),代碼如下:

@Override
public boolean apply(@Nullable PredicateKey input) {
    LoadBalancerStats stats = getLBStats();
    if (stats == null) {
        return true;
    }
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}

判斷是否要跳過這個(gè)目標(biāo)節(jié)點(diǎn),實(shí)現(xiàn)邏輯如下。

private boolean shouldSkipServer(ServerStats stats) {  
        //niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped是否為true
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) //該Server是否為斷路狀態(tài)
        || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {//本機(jī)發(fā)往這個(gè)Server未處理完的請求個(gè)數(shù)是否大于Server實(shí)例最大的活躍連接數(shù)
        return true;
    }
    return false;
}

Server是否為斷路狀態(tài)是如何判斷的呢?

ServerStats源碼,這里詳細(xì)源碼我們不貼了,說一下機(jī)制:

斷路是通過時(shí)間判斷實(shí)現(xiàn)的,每次失敗記錄上次失敗時(shí)間。如果失敗了,則觸發(fā)判斷,是否大于斷路的最小失敗次數(shù),則判斷:

計(jì)算斷路持續(xù)時(shí)間: (2^失敗次數(shù))* 斷路時(shí)間因子,如果大于最大斷路時(shí)間,則取最大斷路時(shí)間。
判斷當(dāng)前時(shí)間是否大于上次失敗時(shí)間+短路持續(xù)時(shí)間,如果小于,則是斷路狀態(tài)。
這里又涉及三個(gè)配置(這里需要將default替換成你調(diào)用的微服務(wù)名稱):

  • niws.loadbalancer.default.connectionFailureCountThreshold,默認(rèn)為3, 觸發(fā)判斷是否斷路的最小失敗次數(shù),也就是,默認(rèn)如果失敗三次,就會(huì)判斷是否要斷路了。
  • niws.loadbalancer.default.circuitTripTimeoutFactorSeconds, 默認(rèn)為10, 斷路時(shí)間因子,
  • niws.loadbalancer.default.circuitTripMaxTimeoutSeconds,默認(rèn)為30,最大斷路時(shí)間

ZoneAvoidancePredicate

ZoneAvoidancePredicate,過濾掉不可用區(qū)域的節(jié)點(diǎn),代碼如下!

@Override
public boolean apply(@Nullable PredicateKey input) {
    if (!ENABLED.get()) {//查看niws.loadbalancer.zoneAvoidanceRule.enabled配置的熟悉是否為true(默認(rèn)為true)如果為false沒有開啟分片過濾 則不進(jìn)行過濾
        return true;
    }
    ////獲取配置的分區(qū)字符串 默認(rèn)為UNKNOWN
    String serverZone = input.getServer().getZone();
    if (serverZone == null) { //如果沒有分區(qū),則不需要進(jìn)行過濾,直接返回即可
        // there is no zone information from the server, we do not want to filter
        // out this server
        return true;
    }
    //獲取負(fù)載均衡的狀態(tài)信息
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    //如果可用區(qū)域小于等于1,也不需要進(jìn)行過濾直接返回
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    //針對當(dāng)前負(fù)載信息,創(chuàng)建一個(gè)區(qū)域快照,后續(xù)會(huì)用快照數(shù)據(jù)進(jìn)行計(jì)算(避免后續(xù)因?yàn)閿?shù)據(jù)變更導(dǎo)致判斷計(jì)算不準(zhǔn)確問題)
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) { //如果快照信息中沒有包含當(dāng)前服務(wù)器所在區(qū)域,則也不需要進(jìn)行判斷。
        // The server zone is unknown to the load balancer, do not filter it out 
        return true;
    }
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    //獲取有效區(qū)域
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null) { //有效區(qū)域如果包含當(dāng)前節(jié)點(diǎn),則返回true,否則返回false, 返回false表示這個(gè)區(qū)域不可用,不需要進(jìn)行目標(biāo)節(jié)點(diǎn)分發(fā)。
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
} 

LoadBalancerStats,在每次發(fā)起通訊的時(shí)候,狀態(tài)信息會(huì)在控制臺(tái)打印如下!

DynamicServerListLoadBalancer for client goods-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=goods-service,current list of Servers=[localhost:9091, localhost:9081],Load balancer stats=Zone stats: {unknown=[Zone:unknown;   Instance count:2;   Active connections count: 0;    Circuit breaker tripped count: 0;   Active connections per server: 0.0;]
},Server stats: [[Server:localhost:9091;    Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
, [Server:localhost:9081;   Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
]}ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@74ddb59a

getAvailableZones方法的代碼如下,用來計(jì)算有效可用區(qū)域。

public static Set<String> getAvailableZones(
    Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
    double triggeringBlackoutPercentage) {
    if (snapshot.isEmpty()) { //如果快照信息為空,返回空
        return null;
    }
    //定義一個(gè)集合存儲(chǔ)有效區(qū)域節(jié)點(diǎn)
    Set<String> availableZones = new HashSet<String>(snapshot.keySet());
    if (availableZones.size() == 1) { //如果有效區(qū)域的集合只有1個(gè),直接返回
        return availableZones;
    }
    //記錄有問題的區(qū)域集合
    Set<String> worstZones = new HashSet<String>();
    double maxLoadPerServer = 0; //定義一個(gè)變量,保存所有zone中,平均負(fù)載最高值
    // true:zone有限可用
    // false:zone全部可用
    boolean limitedZoneAvailability = false; //
    
    //遍歷所有的區(qū)域信息. 對每個(gè)zone進(jìn)行逐一分析
    for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
        String zone = zoneEntry.getKey();  //得到zone字符串
        ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); //得到該zone的快照信息
        int instanceCount = zoneSnapshot.getInstanceCount();
        if (instanceCount == 0) { //若該zone內(nèi)一個(gè)實(shí)例都木有了,那就是完全不可用,那就移除該zone,然后標(biāo)記zone是有限可用的(并非全部可用)
            availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {
            double loadPerServer = zoneSnapshot.getLoadPerServer(); //獲取該區(qū)域的平均負(fù)載
            // 機(jī)器的熔斷總數(shù) / 總實(shí)例數(shù)已經(jīng)超過了閾值(默認(rèn)為1,也就是全部熔斷才會(huì)認(rèn)為該zone完全不可用)
            if (((double) zoneSnapshot.getCircuitTrippedCount())
                / instanceCount >= triggeringBlackoutPercentage
                || loadPerServer < 0) { //loadPerServer表示當(dāng)前區(qū)域所有節(jié)點(diǎn)都熔斷了。
                availableZones.remove(zone); 
                limitedZoneAvailability = true;
            } else { // 進(jìn)入到這個(gè)邏輯,說明并不是完全不可用,就看看區(qū)域的狀態(tài)
                // 如果當(dāng)前負(fù)載和最大負(fù)載相當(dāng),那認(rèn)為當(dāng)前區(qū)域狀態(tài)很不好,加入到worstZones中
                if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                    // they are the same considering double calculation
                    // round error
                    worstZones.add(zone);
                   
                } else if (loadPerServer > maxLoadPerServer) {// 或者若當(dāng)前負(fù)載大于最大負(fù)載了。
                    maxLoadPerServer = loadPerServer;
                    worstZones.clear();
                    worstZones.add(zone);
                }
            }
        }
    }
    // 如果最大負(fù)載小于設(shè)定的負(fù)載閾值 并且limitedZoneAvailability=false
    // 說明全部zone都可用,并且最大負(fù)載都還沒有達(dá)到閾值,那就把全部zone返回   
    if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
        // zone override is not needed here
        return availableZones;
    }
    //若最大負(fù)載超過閾值, 就不能全部返回,則直接從負(fù)載最高的區(qū)域中隨機(jī)返回一個(gè),這么處理的目的是把負(fù)載最高的那個(gè)哥們T除掉,再返回結(jié)果。
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
    if (zoneToAvoid != null) {
        availableZones.remove(zoneToAvoid);
    }
    return availableZones;

}

上述邏輯還是比較復(fù)雜的,我們通過一個(gè)簡單的文字進(jìn)行說明:

  1. 如果zone為null,那么也就是沒有可用區(qū)域,直接返回null
  2. 如果zone的可用區(qū)域?yàn)?,也沒有什么可以選擇的,直接返回這一個(gè)
  3. 使用Set<String> worstZones記錄所有zone中比較狀態(tài)不好的的zone列表,用maxLoadPerServer表示所有zone中負(fù)載最高的區(qū)域;用limitedZoneAvailability表示是否是部分zone可用(true:部分可用,false:全部可用),接著我們需要遍歷所有的zone信息,逐一進(jìn)行判斷從而對有效zone的結(jié)果進(jìn)行處理。
    1. 如果當(dāng)前zoneinstanceCount為0,那就直接把這個(gè)區(qū)域移除就行,并且標(biāo)記limitedZoneAvailability為部分可用,沒什么好說的。
    2. 獲取當(dāng)前總的平均負(fù)載loadPerServer,如果zone內(nèi)的熔斷實(shí)例數(shù) / 總實(shí)例數(shù) >= triggeringBlackoutPercentage 或者 loadPerServer < 0的話,說明當(dāng)前區(qū)域有問題,直接執(zhí)行remove移除當(dāng)前zone,并且limitedZoneAvailability=true .
      1. (熔斷實(shí)例數(shù) / 總實(shí)例數(shù) >= 閾值,標(biāo)記為當(dāng)前zone就不可用了(移除掉),這個(gè)很好理解。這個(gè)閾值為0.99999d也就說所有的Server實(shí)例被熔斷了,該zone才算不可用了).
      2. loadPerServer = -1,也就說當(dāng)所有實(shí)例都熔斷了。這兩個(gè)條件判斷都差不多,都是判斷這個(gè)區(qū)域的可用性。
    3. 如果當(dāng)前zone沒有達(dá)到閾值,則判斷區(qū)域的負(fù)載情況,從所有zone中找到負(fù)載最高的區(qū)域(負(fù)載差值在0.000001d),則把這些區(qū)域加入到worstZones列表,也就是這個(gè)集合保存的是負(fù)載較高的區(qū)域。
  4. 通過上述遍歷對區(qū)域數(shù)據(jù)進(jìn)行計(jì)算后,最后要設(shè)置返回的有效區(qū)域數(shù)據(jù)。
    1. 最高負(fù)載maxLoadPerServer仍舊小于提供的triggeringLoad閾值,并且并且limitedZoneAvailability=false(就是說所有zone都可用的情況下),那就返回所有的zone:availableZones。 (也就是所有區(qū)域的負(fù)載都在閾值范圍內(nèi)并且每個(gè)區(qū)域內(nèi)的節(jié)點(diǎn)都還存活狀態(tài),就全部返回)
    2. 否則,最大負(fù)載超過閾值或者某些區(qū)域存在部分不可用的節(jié)點(diǎn)時(shí),就從這些負(fù)載較高的節(jié)點(diǎn)worstZones中隨機(jī)移除一個(gè)

AbstractServerPredicate

在回答下面的代碼,通過getEligibleServers判斷可用服務(wù)節(jié)點(diǎn)后,如果可用節(jié)點(diǎn)不為0 ,則執(zhí)行incrementAndGetModulo方法進(jìn)行輪詢。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

該方法是通過輪詢來實(shí)現(xiàn),代碼如下!

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

服務(wù)列表的加載過程

在本實(shí)例中,我們將服務(wù)列表配置在application.properties文件中,意味著在某個(gè)時(shí)候會(huì)加載這個(gè)列表,保存在某個(gè)位置,那它是在什么時(shí)候加載的呢?

RibbonClientConfiguration這個(gè)配置類中,有下面這個(gè)Bean的聲明,(該Bean是條件觸發(fā))它用來定義默認(rèn)的負(fù)載均衡實(shí)現(xiàn)。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

前面分析過,它的類關(guān)系圖如下!

image-20211211153617850

當(dāng)ZoneAwareLoadBalancer在初始化時(shí),會(huì)調(diào)用父類DynamicServerListLoadBalancer的構(gòu)造方法,代碼如下。

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

restOfInit

restOfInit方法主要做兩件事情。

  1. 開啟動(dòng)態(tài)更新Server的功能
  2. 更新Server列表
void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature(); //開啟動(dòng)態(tài)更新Server

    updateListOfServers(); //更新Server列表
    
    
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

updateListOfServers

全量更新一次服務(wù)列表。

public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                     getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                         getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

上述代碼解釋如下

  1. 由于我們是通過application.properties文件配置的靜態(tài)服務(wù)地址列表,所以此時(shí)serverListImpl的實(shí)例為:ConfigurationBasedServerList,調(diào)用getUpdatedListOfServers方法時(shí),返回的是在application.properties文件中定義的服務(wù)列表。
  2. 判斷是否需要filter,如果有,則通過filter進(jìn)行服務(wù)列表過濾。

最后調(diào)用updateAllServerList,更新所有Server到本地緩存中。

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

動(dòng)態(tài)Ping機(jī)制

在Ribbon中,基于Ping機(jī)制,目標(biāo)服務(wù)地址也會(huì)發(fā)生動(dòng)態(tài)變更,具體的實(shí)現(xiàn)方式在DynamicServerListLoadBalancer.restOfInit方法中

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();  //開啟定時(shí)任務(wù)動(dòng)態(tài)更新

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
    LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

注意,這里會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),而定時(shí)任務(wù)所執(zhí)行的程序是updateAction,它是一個(gè)匿名內(nèi)部類,定義如下。

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
        updateListOfServers();
    }
};

定時(shí)任務(wù)的啟動(dòng)方法如下,這個(gè)任務(wù)每隔30s執(zhí)行一次。

public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();  //執(zhí)行具體的任務(wù)。
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,  //1000
            refreshIntervalMs,  //30000 
            TimeUnit.MILLISECONDS 
        );
    } else {
        logger.info("Already active, no-op");
    }
}

當(dāng)30s之后觸發(fā)了doUpdate方法后,最終進(jìn)入到updateAllServerList方法

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

其中,會(huì)調(diào)用super.forceQuickPing();進(jìn)行心跳健康檢測。

public void forceQuickPing() {
    if (canSkipPing()) {
        return;
    }
    logger.debug("LoadBalancer [{}]:  forceQuickPing invoking", name);

    try {
        new Pinger(pingStrategy).runPinger();
    } catch (Exception e) {
        logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
    }
}

RibbonLoadBalancerClient.execute

經(jīng)過上述分析,再回到RibbonLoadBalancerClient.execute方法!

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

此時(shí),Server server = getServer(loadBalancer, hint);這行代碼,會(huì)返回一個(gè)具體的目標(biāo)服務(wù)器。

其中,在調(diào)用execute方法之前,會(huì)包裝一個(gè)RibbonServer對象傳遞下去,它的主要作用是用來記錄請求的負(fù)載信息。

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
                     LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonServer) {
        server = ((RibbonServer) serviceInstance).getServer();
    }
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }

    RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);  //記錄請求狀態(tài)
        return returnVal;
    }
    // catch IOException and rethrow so RestTemplate behaves correctly
    catch (IOException ex) {
        statsRecorder.recordStats(ex); //記錄請求狀態(tài)
        throw ex;
    }
    catch (Exception ex) {
        statsRecorder.recordStats(ex);
        ReflectionUtils.rethrowRuntimeException(ex);
    }
    return null;
}

request.apply

request是LoadBalancerRequest接口,它里面提供了一個(gè)apply方法,但是從代碼中我們發(fā)現(xiàn)這個(gè)方法并沒有實(shí)現(xiàn)類,那么它是在哪里實(shí)現(xiàn)的呢?

繼續(xù)又往前分析發(fā)現(xiàn),這個(gè)request對象是從LoadBalancerInterceptor的intercept方法中傳遞過來的.

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
    URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

而request的傳遞,是通過this.requestFactory.createRequest(request, body, execution)創(chuàng)建而來,于是我們找到這個(gè)方法。

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
    return (instance) -> {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
        LoadBalancerRequestTransformer transformer;
        if (this.transformers != null) {
            for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
                transformer = (LoadBalancerRequestTransformer)var6.next();
            }
        }

        return execution.execute((HttpRequest)serviceRequest, body);
    };
}

從代碼中發(fā)現(xiàn),它是一個(gè)用lambda表達(dá)式實(shí)現(xiàn)的匿名內(nèi)部類。在該內(nèi)部類中,創(chuàng)建了一個(gè)ServiceRequestWrapper,這個(gè)ServiceRequestWrapper實(shí)際上就是HttpRequestWrapper的一個(gè)子類,ServiceRequestWrapper重寫了HttpRequestWrapper的getURI()方法,重寫的URI實(shí)際上就是通過調(diào)用LoadBalancerClient接口的reconstructURI函數(shù)來重新構(gòu)建一個(gè)URI進(jìn)行訪問。

InterceptingClientHttpRequest.execute

上述代碼執(zhí)行的execution.execute,又會(huì)進(jìn)入到InterceptingClientHttpRequest.execute方法中,代碼如下。

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); //注意這里
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

此時(shí)需要注意,request對象的實(shí)例是HttpRequestWrapper

request.getURI()

當(dāng)調(diào)用request.getURI()獲取目標(biāo)地址創(chuàng)建http請求時(shí),會(huì)調(diào)用ServiceRequestWrapper中的.getURI()方法。

@Override
public URI getURI() {
    URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
    return uri;
}

在這個(gè)方法中,調(diào)用RibbonLoadBalancerClient實(shí)例中的reconstructURI方法,根據(jù)service-id生成目標(biāo)服務(wù)地址。

RibbonLoadBalancerClient.reconstructURI

public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId(); //獲取實(shí)例id,也就是服務(wù)名稱
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId); //獲取RibbonLoadBalancerContext上下文,這個(gè)是從spring容器中獲取的對象實(shí)例。

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) { //如果instance為RibbonServer
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();  //獲取目標(biāo)服務(wù)器的Server信息
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer); //判斷是否需要更新成一個(gè)安全連接。
        }
        else { //如果是一個(gè)普通的http地址
            server = new Server(instance.getScheme(), instance.getHost(),
                    instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);  //調(diào)用這個(gè)方法拼接成一個(gè)真實(shí)的目標(biāo)服務(wù)器地址。
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容