
Ribbon的核心作用就是進(jìn)行請求的負(fù)載均衡,它的基本原理如下圖所示。就是客戶端集成Ribbon這個(gè)組件,Ribbon中會(huì)針對已經(jīng)配置的服務(wù)提供者地址列表進(jìn)行負(fù)載均衡的計(jì)算,得到一個(gè)目標(biāo)地址之后,再發(fā)起請求。

那么接下來,我們從兩個(gè)層面去分析Ribbon的原理
- @LoadBalanced 注解如何讓普通的
RestTemplate具備負(fù)載均衡的能力 - OpenFeign集成Ribbon的實(shí)現(xiàn)原理
@LoadBalancer注解解析過程分析
在使用RestTemplate的時(shí)候,我們加了一個(gè)@LoadBalance注解,就能讓這個(gè)RestTemplate在請求時(shí),就擁有客戶端負(fù)載均衡的能力。
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
然后,我們打開@LoadBalanced這個(gè)注解,可以發(fā)現(xiàn)該注解僅僅是聲明了一個(gè)@qualifier注解。
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
@qualifier注解的作用
我們平時(shí)在使用注解去注入一個(gè)Bean時(shí),都是采用@Autowired。并且大家應(yīng)該知道@Autowired是可以注入一個(gè)List或者M(jìn)ap的。給大家舉個(gè)例子(在一個(gè)springboot應(yīng)用中)
定義一個(gè)TestClass
@AllArgsConstructor
@Data
public class TestClass {
private String name;
}
聲明一個(gè)配置類,并注入TestClass
@Configuration
public class TestConfig {
@Bean("testClass1")
TestClass testClass(){
return new TestClass("testClass1");
}
@Bean("testClass2")
TestClass testClass2(){
return new TestClass("testClass2");
}
}
定義一個(gè)Controller,用于測試, 注意,此時(shí)我們使用的是@Autowired來注入一個(gè)List集合
@RestController
public class TestController {
@Autowired(required = false)
List<TestClass> testClasses= Collections.emptyList();
@GetMapping("/test")
public Object test(){
return testClasses;
}
}
此時(shí)訪問:http://localhost:8080/test , 得到的結(jié)果是
[
{
name: "testClass1"
},
{
name: "testClass2"
}
]
修改
TestConfig和TestController
@Configuration
public class TestConfig {
@Bean("testClass1")
@Qualifier
TestClass testClass(){
return new TestClass("testClass1");
}
@Bean("testClass2")
TestClass testClass2(){
return new TestClass("testClass2");
}
}
@RestController
public class TestController {
@Autowired(required = false)
@Qualifier
List<TestClass> testClasses= Collections.emptyList();
@GetMapping("/test")
public Object test(){
return testClasses;
}
}
再次訪問:http://localhost:8080/test , 得到的結(jié)果是
[
{
name: "testClass1"
}
]
@LoadBalancer注解篩選及攔截
了解了@qualifier注解的作用后,再回到@LoadBalancer注解上,就不難理解了。
因?yàn)槲覀冃枰獟呙璧皆黾恿?code>@LoadBalancer注解的RestTemplate實(shí)例,所以,@LoadBalancer可以完成這個(gè)動(dòng)作,它的具體的實(shí)現(xiàn)代碼如下:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
}
從這個(gè)代碼中可以看出,在LoadBalancerAutoConfiguration這個(gè)配置類中,會(huì)使用同樣的方式,把配置了@LoadBalanced注解的RestTemplate注入到restTemplates集合中。
拿到了RestTemplate之后,在LoadBalancerInterceptorConfig配置類中,會(huì)針對這些RestTemplate進(jìn)行攔截,實(shí)現(xiàn)代碼如下:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
//省略....
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
//裝載一個(gè)LoadBalancerInterceptor的實(shí)例到IOC容器。
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//會(huì)遍歷所有加了@LoadBalanced注解的RestTemplate,在原有的攔截器之上,再增加了一個(gè)LoadBalancerInterceptor
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
//省略....
}
LoadBalancerInterceptor
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
RestTemplate調(diào)用過程
我們在程序中,使用下面的代碼發(fā)起遠(yuǎn)程請求時(shí)
restTemplate.getForObject(url,String.class);
它的整個(gè)調(diào)用過程如下。
RestTemplate.getForObject
-----> AbstractClientHttpRequest.execute()
----->AbstractBufferingClientHttpRequest.executeInternal()
-----> InterceptingClientHttpRequest.executeInternal()
-----> InterceptingClientHttpRequest.execute()
InterceptingClientHttpRequest.execute()方法的代碼如下。
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) { //遍歷所有的攔截器,通過攔截器進(jìn)行逐個(gè)處理。
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
LoadBalancerInterceptor
LoadBalancerInterceptor是一個(gè)攔截器,當(dāng)一個(gè)被@Loadbalanced注解修飾的RestTemplate對象發(fā)起HTTP請求時(shí),會(huì)被LoadBalancerInterceptor的intercept方法攔截,
在這個(gè)方法中直接通過getHost方法就可以獲取到服務(wù)名(因?yàn)槲覀冊谑褂肦estTemplate調(diào)用服務(wù)的時(shí)候,使用的是服務(wù)名而不是域名,所以這里可以通過getHost直接拿到服務(wù)名然后去調(diào)用execute方法發(fā)起請求)
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
LoadBalancerClient其實(shí)是一個(gè)接口,我們看一下它的類圖,它有一個(gè)唯一的實(shí)現(xiàn)類:RibbonLoadBalancerClient。

RibbonLoadBalancerClient.execute
RibbonLoadBalancerClient這個(gè)類的代碼比較長,我們主要看一下他的核心方法execute
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
上述代碼的實(shí)現(xiàn)邏輯如下:
- 根據(jù)serviceId獲得一個(gè)ILoadBalancer,實(shí)例為:ZoneAwareLoadBalancer
- 調(diào)用getServer方法去獲取一個(gè)服務(wù)實(shí)例
- 判斷Server的值是否為空。這里的Server實(shí)際上就是傳統(tǒng)的一個(gè)服務(wù)節(jié)點(diǎn),這個(gè)對象存儲(chǔ)了服務(wù)節(jié)點(diǎn)的一些元數(shù)據(jù),比如host、port等
getServer
getServer是用來獲得一個(gè)具體的服務(wù)節(jié)點(diǎn),它的實(shí)現(xiàn)如下
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
通過代碼可以看到,getServer實(shí)際調(diào)用了IloadBalancer.chooseServer這個(gè)方法,ILoadBalancer這個(gè)是一個(gè)負(fù)載均衡器接口。
public interface ILoadBalancer {
//addServers表示向負(fù)載均衡器中維護(hù)的實(shí)例列表增加服務(wù)實(shí)例
public void addServers(List<Server> newServers);
//chooseServer表示通過某種策略,從負(fù)載均衡服務(wù)器中挑選出一個(gè)具體的服務(wù)實(shí)例
public Server chooseServer(Object key);
//markServerDown表示用來通知和標(biāo)識(shí)負(fù)載均衡器中某個(gè)具體實(shí)例已經(jīng)停止服務(wù),否則負(fù)載均衡器在下一次獲取服務(wù)實(shí)例清單前都會(huì)認(rèn)為這個(gè)服務(wù)實(shí)例是正常工作的
public void markServerDown(Server server);
//getReachableServers表示獲取當(dāng)前正常工作的服務(wù)實(shí)例列表
public List<Server> getReachableServers();
//getAllServers表示獲取所有的服務(wù)實(shí)例列表,包括正常的服務(wù)和停止工作的服務(wù)
public List<Server> getAllServers();
}
ILoadBalancer的類關(guān)系圖如下:

從整個(gè)類的關(guān)系圖來看,BaseLoadBalancer類實(shí)現(xiàn)了基礎(chǔ)的負(fù)載均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer則是在負(fù)載均衡策略的基礎(chǔ)上做了一些功能擴(kuò)展。
- AbstractLoadBalancer實(shí)現(xiàn)了ILoadBalancer接口,它定義了服務(wù)分組的枚舉類/chooseServer(用來選取一個(gè)服務(wù)實(shí)例)/getServerList(獲取某一個(gè)分組中的所有服務(wù)實(shí)例)/getLoadBalancerStats用來獲得一個(gè)LoadBalancerStats對象,這個(gè)對象保存了每一個(gè)服務(wù)的狀態(tài)信息。
- BaseLoadBalancer,它實(shí)現(xiàn)了作為負(fù)載均衡器的基本功能,比如服務(wù)列表維護(hù)、服務(wù)存活狀態(tài)監(jiān)測、負(fù)載均衡算法選擇Server等。但是它只是完成基本功能,在有些復(fù)雜場景中還無法實(shí)現(xiàn),比如動(dòng)態(tài)服務(wù)列表、Server過濾、Zone區(qū)域意識(shí)(服務(wù)之間的調(diào)用希望盡可能是在同一個(gè)區(qū)域內(nèi)進(jìn)行,減少延遲)。
- DynamicServerListLoadBalancer是BaseLoadbalancer的一個(gè)子類,它對基礎(chǔ)負(fù)載均衡提供了擴(kuò)展,從名字上可以看出,它提供了動(dòng)態(tài)服務(wù)列表的特性
- ZoneAwareLoadBalancer 它是在DynamicServerListLoadBalancer的基礎(chǔ)上,增加了以Zone的形式來配置多個(gè)LoadBalancer的功能。
那在getServer方法中,loadBalancer.chooseServer具體的實(shí)現(xiàn)類是哪一個(gè)呢?我們找到RibbonClientConfiguration這個(gè)類
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
從上述聲明中,發(fā)現(xiàn)如果沒有自定義ILoadBalancer,則直接返回一個(gè)ZoneAwareLoadBalancer
ZoneAwareLoadBalancer
Zone表示區(qū)域的意思,區(qū)域指的就是地理區(qū)域的概念,一般較大規(guī)模的互聯(lián)網(wǎng)公司,都會(huì)做跨區(qū)域部署,這樣做有幾個(gè)好處,第一個(gè)是為不同地域的用戶提供最近的訪問節(jié)點(diǎn)減少訪問延遲,其次是為了保證高可用,做容災(zāi)處理。
而ZoneAwareLoadBalancer就是提供了具備區(qū)域意識(shí)的負(fù)載均衡器,它的主要作用是對Zone進(jìn)行了感知,保證每個(gè)Zone里面的負(fù)載均衡策略都是隔離的,它并不保證A區(qū)域過來的請求一定會(huì)發(fā)動(dòng)到A區(qū)域?qū)?yīng)的Server內(nèi)。真正實(shí)現(xiàn)這個(gè)需求的是ZonePreferenceServerListFilter/ZoneAffinityServerListFilter。
ZoneAwareLoadBalancer的核心功能是
- 若開啟了區(qū)域意識(shí),且zone的個(gè)數(shù) > 1,就繼續(xù)區(qū)域選擇邏輯
- 根據(jù)ZoneAvoidanceRule.getAvailableZones()方法拿到可用區(qū)們(會(huì)T除掉完全不可用的區(qū)域們,以及可用但是負(fù)載最高的一個(gè)區(qū)域)
- 從可用區(qū)zone們中,通過ZoneAvoidanceRule.randomChooseZone隨機(jī)選一個(gè)zone出來 (該隨機(jī)遵從權(quán)重規(guī)則:誰的zone里面Server數(shù)量最多,被選中的概率越大)
- 在選中的zone里面的所有Server中,采用該zone對對應(yīng)的Rule,進(jìn)行choose
@Override
public Server chooseServer(Object key) {
//ENABLED,表示是否用區(qū)域意識(shí)的choose選擇Server,默認(rèn)是true,
//如果禁用了區(qū)域、或者只有一個(gè)zone,就直接按照父類的邏輯來進(jìn)行處理,父類默認(rèn)采用輪詢算法
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//根據(jù)相關(guān)閾值計(jì)算可用區(qū)域
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//從可用區(qū)域中隨機(jī)選擇一個(gè)區(qū)域,zone里面的服務(wù)器節(jié)點(diǎn)越多,被選中的概率越大
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//根據(jù)zone獲得該zone中的LB,然后根據(jù)該Zone的負(fù)載均衡算法選擇一個(gè)server
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
BaseLoadBalancer.chooseServer
假設(shè)我們現(xiàn)在沒有使用多區(qū)域部署,那么負(fù)載策略會(huì)執(zhí)行到BaseLoadBalancer.chooseServer,
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
根據(jù)默認(rèn)的負(fù)載均衡算法來獲得指定的服務(wù)節(jié)點(diǎn)。默認(rèn)的算法是RoundBin。
rule.choose
rule代表負(fù)載均衡算法規(guī)則,它有很多實(shí)現(xiàn),IRule的實(shí)現(xiàn)類關(guān)系圖如下。

默認(rèn)情況下,rule的實(shí)現(xiàn)為ZoneAvoidanceRule,它是在RibbonClientConfiguration這個(gè)配置類中定義的,代碼如下:
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
// Order is important here, last should be the default, first should be optional
// see
// https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
}
所以,在BaseLoadBalancer.chooseServer中調(diào)用rule.choose(key);,實(shí)際會(huì)進(jìn)入到ZoneAvoidanceRule的choose方法
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer(); //獲取負(fù)載均衡器
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); //通過該方法獲取目標(biāo)服務(wù)
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
復(fù)合判斷server所在區(qū)域的性能和server的可用性選擇server
主要分析chooseRoundRobinAfterFiltering方法。
chooseRoundRobinAfterFiltering
從方法名稱可以看出來,它是通過對目標(biāo)服務(wù)集群通過過濾算法過濾一遍后,再使用輪詢實(shí)現(xiàn)負(fù)載均衡。
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
CompositePredicate.getEligibleServers
使用主過濾條件對所有實(shí)例過濾并返回過濾后的清單,
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
//
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
//按照fallbacks中存儲(chǔ)的過濾器順序進(jìn)行過濾(此處就行先ZoneAvoidancePredicate然后AvailabilityPredicate)
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
依次使用次過濾條件對主過濾條件的結(jié)果進(jìn)行過濾*
- //不論是主過濾條件還是次過濾條件,都需要判斷下面兩個(gè)條件
- //只要有一個(gè)條件符合,就不再過濾,將當(dāng)前結(jié)果返回供線性輪詢
- 第1個(gè)條件:過濾后的實(shí)例總數(shù)>=最小過濾實(shí)例數(shù)(默認(rèn)為1)
- 第2個(gè)條件:過濾互的實(shí)例比例>最小過濾百分比(默認(rèn)為0)
getEligibleServers
這里的實(shí)現(xiàn)邏輯是,遍歷所有服務(wù)器列表,調(diào)用this.apply方法進(jìn)行驗(yàn)證,驗(yàn)證通過的節(jié)點(diǎn),會(huì)加入到results這個(gè)列表中返回。
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
this.apply,會(huì)進(jìn)入到CompositePredicate.apply方法中,代碼如下。
//CompositePredicate.apply
@Override
public boolean apply(@Nullable PredicateKey input) {
return delegate.apply(input);
}
delegate的實(shí)例是AbstractServerPredicate, 代碼如下!
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
return p.apply(input);
}
};
}
也就是說,會(huì)通過AbstractServerPredicate.apply方法進(jìn)行過濾,其中,input表示目標(biāo)服務(wù)器集群的某一個(gè)具體節(jié)點(diǎn)。
其中p,表示AndPredicate實(shí)例,這里用到了組合predicate進(jìn)行判斷,而這里的組合判斷是and的關(guān)系,用到了AndPredicate實(shí)現(xiàn)。
private static class AndPredicate<T> implements Predicate<T>, Serializable {
private final List<? extends Predicate<? super T>> components;
private static final long serialVersionUID = 0L;
private AndPredicate(List<? extends Predicate<? super T>> components) {
this.components = components;
}
public boolean apply(@Nullable T t) {
for(int i = 0; i < this.components.size(); ++i) { //遍歷多個(gè)predicate,逐一進(jìn)行判斷。
if (!((Predicate)this.components.get(i)).apply(t)) {
return false;
}
}
return true;
}
}
上述代碼中,components是由兩個(gè)predicate組合而成
- AvailabilityPredicate,過濾熔斷狀態(tài)下的服務(wù)以及并發(fā)連接過多的服務(wù)。
- ZoneAvoidancePredicate,過濾掉無可用區(qū)域的節(jié)點(diǎn)。
所以在AndPredicate的apply方法中,需要遍歷這兩個(gè)predicate逐一進(jìn)行判斷。
AvailablilityPredicate
過濾熔斷狀態(tài)下的服務(wù)以及并發(fā)連接過多的服務(wù),代碼如下:
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
判斷是否要跳過這個(gè)目標(biāo)節(jié)點(diǎn),實(shí)現(xiàn)邏輯如下。
private boolean shouldSkipServer(ServerStats stats) {
//niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped是否為true
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) //該Server是否為斷路狀態(tài)
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {//本機(jī)發(fā)往這個(gè)Server未處理完的請求個(gè)數(shù)是否大于Server實(shí)例最大的活躍連接數(shù)
return true;
}
return false;
}
Server是否為斷路狀態(tài)是如何判斷的呢?
ServerStats源碼,這里詳細(xì)源碼我們不貼了,說一下機(jī)制:
斷路是通過時(shí)間判斷實(shí)現(xiàn)的,每次失敗記錄上次失敗時(shí)間。如果失敗了,則觸發(fā)判斷,是否大于斷路的最小失敗次數(shù),則判斷:
計(jì)算斷路持續(xù)時(shí)間: (2^失敗次數(shù))* 斷路時(shí)間因子,如果大于最大斷路時(shí)間,則取最大斷路時(shí)間。
判斷當(dāng)前時(shí)間是否大于上次失敗時(shí)間+短路持續(xù)時(shí)間,如果小于,則是斷路狀態(tài)。
這里又涉及三個(gè)配置(這里需要將default替換成你調(diào)用的微服務(wù)名稱):
- niws.loadbalancer.default.connectionFailureCountThreshold,默認(rèn)為3, 觸發(fā)判斷是否斷路的最小失敗次數(shù),也就是,默認(rèn)如果失敗三次,就會(huì)判斷是否要斷路了。
- niws.loadbalancer.default.circuitTripTimeoutFactorSeconds, 默認(rèn)為10, 斷路時(shí)間因子,
- niws.loadbalancer.default.circuitTripMaxTimeoutSeconds,默認(rèn)為30,最大斷路時(shí)間
ZoneAvoidancePredicate
ZoneAvoidancePredicate,過濾掉不可用區(qū)域的節(jié)點(diǎn),代碼如下!
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {//查看niws.loadbalancer.zoneAvoidanceRule.enabled配置的熟悉是否為true(默認(rèn)為true)如果為false沒有開啟分片過濾 則不進(jìn)行過濾
return true;
}
////獲取配置的分區(qū)字符串 默認(rèn)為UNKNOWN
String serverZone = input.getServer().getZone();
if (serverZone == null) { //如果沒有分區(qū),則不需要進(jìn)行過濾,直接返回即可
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
//獲取負(fù)載均衡的狀態(tài)信息
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
//如果可用區(qū)域小于等于1,也不需要進(jìn)行過濾直接返回
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
//針對當(dāng)前負(fù)載信息,創(chuàng)建一個(gè)區(qū)域快照,后續(xù)會(huì)用快照數(shù)據(jù)進(jìn)行計(jì)算(避免后續(xù)因?yàn)閿?shù)據(jù)變更導(dǎo)致判斷計(jì)算不準(zhǔn)確問題)
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) { //如果快照信息中沒有包含當(dāng)前服務(wù)器所在區(qū)域,則也不需要進(jìn)行判斷。
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
//獲取有效區(qū)域
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null) { //有效區(qū)域如果包含當(dāng)前節(jié)點(diǎn),則返回true,否則返回false, 返回false表示這個(gè)區(qū)域不可用,不需要進(jìn)行目標(biāo)節(jié)點(diǎn)分發(fā)。
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
LoadBalancerStats,在每次發(fā)起通訊的時(shí)候,狀態(tài)信息會(huì)在控制臺(tái)打印如下!
DynamicServerListLoadBalancer for client goods-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=goods-service,current list of Servers=[localhost:9091, localhost:9081],Load balancer stats=Zone stats: {unknown=[Zone:unknown; Instance count:2; Active connections count: 0; Circuit breaker tripped count: 0; Active connections per server: 0.0;]
},Server stats: [[Server:localhost:9091; Zone:UNKNOWN; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]
, [Server:localhost:9081; Zone:UNKNOWN; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]
]}ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@74ddb59a
getAvailableZones方法的代碼如下,用來計(jì)算有效可用區(qū)域。
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) { //如果快照信息為空,返回空
return null;
}
//定義一個(gè)集合存儲(chǔ)有效區(qū)域節(jié)點(diǎn)
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) { //如果有效區(qū)域的集合只有1個(gè),直接返回
return availableZones;
}
//記錄有問題的區(qū)域集合
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0; //定義一個(gè)變量,保存所有zone中,平均負(fù)載最高值
// true:zone有限可用
// false:zone全部可用
boolean limitedZoneAvailability = false; //
//遍歷所有的區(qū)域信息. 對每個(gè)zone進(jìn)行逐一分析
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey(); //得到zone字符串
ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); //得到該zone的快照信息
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) { //若該zone內(nèi)一個(gè)實(shí)例都木有了,那就是完全不可用,那就移除該zone,然后標(biāo)記zone是有限可用的(并非全部可用)
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer(); //獲取該區(qū)域的平均負(fù)載
// 機(jī)器的熔斷總數(shù) / 總實(shí)例數(shù)已經(jīng)超過了閾值(默認(rèn)為1,也就是全部熔斷才會(huì)認(rèn)為該zone完全不可用)
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) { //loadPerServer表示當(dāng)前區(qū)域所有節(jié)點(diǎn)都熔斷了。
availableZones.remove(zone);
limitedZoneAvailability = true;
} else { // 進(jìn)入到這個(gè)邏輯,說明并不是完全不可用,就看看區(qū)域的狀態(tài)
// 如果當(dāng)前負(fù)載和最大負(fù)載相當(dāng),那認(rèn)為當(dāng)前區(qū)域狀態(tài)很不好,加入到worstZones中
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {// 或者若當(dāng)前負(fù)載大于最大負(fù)載了。
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
// 如果最大負(fù)載小于設(shè)定的負(fù)載閾值 并且limitedZoneAvailability=false
// 說明全部zone都可用,并且最大負(fù)載都還沒有達(dá)到閾值,那就把全部zone返回
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
//若最大負(fù)載超過閾值, 就不能全部返回,則直接從負(fù)載最高的區(qū)域中隨機(jī)返回一個(gè),這么處理的目的是把負(fù)載最高的那個(gè)哥們T除掉,再返回結(jié)果。
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
上述邏輯還是比較復(fù)雜的,我們通過一個(gè)簡單的文字進(jìn)行說明:
- 如果
zone為null,那么也就是沒有可用區(qū)域,直接返回null - 如果
zone的可用區(qū)域?yàn)?,也沒有什么可以選擇的,直接返回這一個(gè) - 使用
Set<String> worstZones記錄所有zone中比較狀態(tài)不好的的zone列表,用maxLoadPerServer表示所有zone中負(fù)載最高的區(qū)域;用limitedZoneAvailability表示是否是部分zone可用(true:部分可用,false:全部可用),接著我們需要遍歷所有的zone信息,逐一進(jìn)行判斷從而對有效zone的結(jié)果進(jìn)行處理。- 如果當(dāng)前
zone的instanceCount為0,那就直接把這個(gè)區(qū)域移除就行,并且標(biāo)記limitedZoneAvailability為部分可用,沒什么好說的。 - 獲取當(dāng)前總的平均負(fù)載
loadPerServer,如果zone內(nèi)的熔斷實(shí)例數(shù) / 總實(shí)例數(shù) >= triggeringBlackoutPercentage或者loadPerServer < 0的話,說明當(dāng)前區(qū)域有問題,直接執(zhí)行remove移除當(dāng)前zone,并且limitedZoneAvailability=true.- (
熔斷實(shí)例數(shù) / 總實(shí)例數(shù) >= 閾值,標(biāo)記為當(dāng)前zone就不可用了(移除掉),這個(gè)很好理解。這個(gè)閾值為0.99999d也就說所有的Server實(shí)例被熔斷了,該zone才算不可用了). -
loadPerServer = -1,也就說當(dāng)所有實(shí)例都熔斷了。這兩個(gè)條件判斷都差不多,都是判斷這個(gè)區(qū)域的可用性。
- (
- 如果當(dāng)前zone沒有達(dá)到閾值,則判斷區(qū)域的負(fù)載情況,從所有
zone中找到負(fù)載最高的區(qū)域(負(fù)載差值在0.000001d),則把這些區(qū)域加入到worstZones列表,也就是這個(gè)集合保存的是負(fù)載較高的區(qū)域。
- 如果當(dāng)前
- 通過上述遍歷對區(qū)域數(shù)據(jù)進(jìn)行計(jì)算后,最后要設(shè)置返回的有效區(qū)域數(shù)據(jù)。
- 最高負(fù)載
maxLoadPerServer仍舊小于提供的triggeringLoad閾值,并且并且limitedZoneAvailability=false(就是說所有zone都可用的情況下),那就返回所有的zone:availableZones。 (也就是所有區(qū)域的負(fù)載都在閾值范圍內(nèi)并且每個(gè)區(qū)域內(nèi)的節(jié)點(diǎn)都還存活狀態(tài),就全部返回) - 否則,最大負(fù)載超過閾值或者某些區(qū)域存在部分不可用的節(jié)點(diǎn)時(shí),就從這些負(fù)載較高的節(jié)點(diǎn)
worstZones中隨機(jī)移除一個(gè)
- 最高負(fù)載
AbstractServerPredicate
在回答下面的代碼,通過getEligibleServers判斷可用服務(wù)節(jié)點(diǎn)后,如果可用節(jié)點(diǎn)不為0 ,則執(zhí)行incrementAndGetModulo方法進(jìn)行輪詢。
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
該方法是通過輪詢來實(shí)現(xiàn),代碼如下!
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
服務(wù)列表的加載過程
在本實(shí)例中,我們將服務(wù)列表配置在application.properties文件中,意味著在某個(gè)時(shí)候會(huì)加載這個(gè)列表,保存在某個(gè)位置,那它是在什么時(shí)候加載的呢?
在RibbonClientConfiguration這個(gè)配置類中,有下面這個(gè)Bean的聲明,(該Bean是條件觸發(fā))它用來定義默認(rèn)的負(fù)載均衡實(shí)現(xiàn)。
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
前面分析過,它的類關(guān)系圖如下!

當(dāng)ZoneAwareLoadBalancer在初始化時(shí),會(huì)調(diào)用父類DynamicServerListLoadBalancer的構(gòu)造方法,代碼如下。
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
restOfInit
restOfInit方法主要做兩件事情。
- 開啟動(dòng)態(tài)更新Server的功能
- 更新Server列表
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature(); //開啟動(dòng)態(tài)更新Server
updateListOfServers(); //更新Server列表
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
updateListOfServers
全量更新一次服務(wù)列表。
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
上述代碼解釋如下
- 由于我們是通過
application.properties文件配置的靜態(tài)服務(wù)地址列表,所以此時(shí)serverListImpl的實(shí)例為:ConfigurationBasedServerList,調(diào)用getUpdatedListOfServers方法時(shí),返回的是在application.properties文件中定義的服務(wù)列表。 - 判斷是否需要
filter,如果有,則通過filter進(jìn)行服務(wù)列表過濾。
最后調(diào)用updateAllServerList,更新所有Server到本地緩存中。
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
動(dòng)態(tài)Ping機(jī)制
在Ribbon中,基于Ping機(jī)制,目標(biāo)服務(wù)地址也會(huì)發(fā)生動(dòng)態(tài)變更,具體的實(shí)現(xiàn)方式在DynamicServerListLoadBalancer.restOfInit方法中
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature(); //開啟定時(shí)任務(wù)動(dòng)態(tài)更新
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
注意,這里會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),而定時(shí)任務(wù)所執(zhí)行的程序是updateAction,它是一個(gè)匿名內(nèi)部類,定義如下。
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
定時(shí)任務(wù)的啟動(dòng)方法如下,這個(gè)任務(wù)每隔30s執(zhí)行一次。
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate(); //執(zhí)行具體的任務(wù)。
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs, //1000
refreshIntervalMs, //30000
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
當(dāng)30s之后觸發(fā)了doUpdate方法后,最終進(jìn)入到updateAllServerList方法
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
其中,會(huì)調(diào)用super.forceQuickPing();進(jìn)行心跳健康檢測。
public void forceQuickPing() {
if (canSkipPing()) {
return;
}
logger.debug("LoadBalancer [{}]: forceQuickPing invoking", name);
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
}
}
RibbonLoadBalancerClient.execute
經(jīng)過上述分析,再回到RibbonLoadBalancerClient.execute方法!
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
此時(shí),Server server = getServer(loadBalancer, hint);這行代碼,會(huì)返回一個(gè)具體的目標(biāo)服務(wù)器。
其中,在調(diào)用execute方法之前,會(huì)包裝一個(gè)RibbonServer對象傳遞下去,它的主要作用是用來記錄請求的負(fù)載信息。
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal); //記錄請求狀態(tài)
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex); //記錄請求狀態(tài)
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
request.apply
request是LoadBalancerRequest接口,它里面提供了一個(gè)apply方法,但是從代碼中我們發(fā)現(xiàn)這個(gè)方法并沒有實(shí)現(xiàn)類,那么它是在哪里實(shí)現(xiàn)的呢?
繼續(xù)又往前分析發(fā)現(xiàn),這個(gè)request對象是從LoadBalancerInterceptor的intercept方法中傳遞過來的.
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
而request的傳遞,是通過this.requestFactory.createRequest(request, body, execution)創(chuàng)建而來,于是我們找到這個(gè)方法。
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
return (instance) -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
LoadBalancerRequestTransformer transformer;
if (this.transformers != null) {
for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
transformer = (LoadBalancerRequestTransformer)var6.next();
}
}
return execution.execute((HttpRequest)serviceRequest, body);
};
}
從代碼中發(fā)現(xiàn),它是一個(gè)用lambda表達(dá)式實(shí)現(xiàn)的匿名內(nèi)部類。在該內(nèi)部類中,創(chuàng)建了一個(gè)ServiceRequestWrapper,這個(gè)ServiceRequestWrapper實(shí)際上就是HttpRequestWrapper的一個(gè)子類,ServiceRequestWrapper重寫了HttpRequestWrapper的getURI()方法,重寫的URI實(shí)際上就是通過調(diào)用LoadBalancerClient接口的reconstructURI函數(shù)來重新構(gòu)建一個(gè)URI進(jìn)行訪問。
InterceptingClientHttpRequest.execute
上述代碼執(zhí)行的execution.execute,又會(huì)進(jìn)入到InterceptingClientHttpRequest.execute方法中,代碼如下。
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); //注意這里
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
此時(shí)需要注意,request對象的實(shí)例是HttpRequestWrapper。
request.getURI()
當(dāng)調(diào)用request.getURI()獲取目標(biāo)地址創(chuàng)建http請求時(shí),會(huì)調(diào)用ServiceRequestWrapper中的.getURI()方法。
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
在這個(gè)方法中,調(diào)用RibbonLoadBalancerClient實(shí)例中的reconstructURI方法,根據(jù)service-id生成目標(biāo)服務(wù)地址。
RibbonLoadBalancerClient.reconstructURI
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId(); //獲取實(shí)例id,也就是服務(wù)名稱
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId); //獲取RibbonLoadBalancerContext上下文,這個(gè)是從spring容器中獲取的對象實(shí)例。
URI uri;
Server server;
if (instance instanceof RibbonServer) { //如果instance為RibbonServer
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer(); //獲取目標(biāo)服務(wù)器的Server信息
uri = updateToSecureConnectionIfNeeded(original, ribbonServer); //判斷是否需要更新成一個(gè)安全連接。
}
else { //如果是一個(gè)普通的http地址
server = new Server(instance.getScheme(), instance.getHost(),
instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri); //調(diào)用這個(gè)方法拼接成一個(gè)真實(shí)的目標(biāo)服務(wù)器地址。
}