什么是負(fù)載均衡器?
假設(shè)有一個(gè)分布式系統(tǒng),該系統(tǒng)由在不同計(jì)算機(jī)上運(yùn)行的許多服務(wù)組成。但是,當(dāng)用戶(hù)數(shù)量很大時(shí),通常會(huì)為服務(wù)創(chuàng)建搭建集群。集群中每個(gè)服務(wù)實(shí)例都在單獨(dú)一臺(tái)計(jì)算機(jī)上運(yùn)行。此時(shí),出現(xiàn) “Load Balancer(負(fù)載均衡器)”。它有助于在服務(wù)器之間平均分配傳入流量。
服務(wù)器端負(fù)載均衡器
傳統(tǒng)Load Balancers(例如Nginx、F5)是放置在服務(wù)器端的組件。當(dāng)請(qǐng)求來(lái)自 客戶(hù)端 時(shí),它們將轉(zhuǎn)到負(fù)載均衡器,負(fù)載均衡器將為請(qǐng)求指定服務(wù)器。負(fù)載均衡器使用的最簡(jiǎn)單的算法是隨機(jī)指定。在這種情況下,大多數(shù)負(fù)載平衡器是用于控制負(fù)載平衡的硬件集成軟件。

特點(diǎn):
- 對(duì)客戶(hù)端不透明,客戶(hù)端不知道服務(wù)器端的服務(wù)列表,甚至不知道自己發(fā)送請(qǐng)求的目標(biāo)地址存在負(fù)載均衡器。
- 服務(wù)器端維護(hù)負(fù)載均衡服務(wù)器,控制負(fù)載均衡策略和算法。
客戶(hù)端負(fù)載均衡器
當(dāng)負(fù)載均衡器位于客戶(hù)端時(shí),客戶(hù)端得到可用的服務(wù)器列表然后按照特定的負(fù)載均衡策略,分發(fā)請(qǐng)求到不同的服務(wù)器 。

特點(diǎn):
- 對(duì)客戶(hù)端透明,客戶(hù)端需要知道服務(wù)器端的服務(wù)列表,需要自行決定請(qǐng)求要發(fā)送的目標(biāo)地址。
- 客戶(hù)端維護(hù)負(fù)載均衡服務(wù)器,控制負(fù)載均衡策略和算法。
- 目前單獨(dú)提供的客戶(hù)端實(shí)現(xiàn)比較少(Ribbon是其中之一),大部分都是在框架內(nèi)部自行實(shí)現(xiàn)。
Ribbon
簡(jiǎn)介
Ribbon是Netflix公司開(kāi)源的一個(gè)客戶(hù)端負(fù)載均衡的項(xiàng)目,可以自動(dòng)與 Eureka 進(jìn)行交互。它提供下列特性:
- 負(fù)載均衡
- 容錯(cuò)
- 以異步和反應(yīng)式模型執(zhí)行多協(xié)議 (HTTP、TCP、UDP)
- 緩存和批量
Ribbon中的關(guān)鍵組件

- ServerList:可以響應(yīng)客戶(hù)端的特定服務(wù)的服務(wù)器列表。
- ServerListFilter:可以動(dòng)態(tài)獲得的具有所需特征的候選服務(wù)器列表的過(guò)濾器。
- ServerListUpdater:用于執(zhí)行動(dòng)態(tài)服務(wù)器列表更新。
- Rule:負(fù)載均衡策略,用于確定從服務(wù)器列表返回哪個(gè)服務(wù)器。
- Ping:客戶(hù)端用于快速檢查服務(wù)器當(dāng)時(shí)是否處于活動(dòng)狀態(tài)。
- LoadBalancer:負(fù)載均衡器,負(fù)責(zé)負(fù)載均衡調(diào)度的管理。
源碼分析
LoadBalancerClient
實(shí)際應(yīng)用中,通常將 RestTemplate 和 Ribbon 結(jié)合使用,例如:
@Configuration
public class RibbonConfig {
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}
消費(fèi)者調(diào)用服務(wù)接口:
@Service
public class RibbonService {
@Autowired
private RestTemplate restTemplate;
public String hi(String name) {
return restTemplate.getForObject("http://service-hi/hi?name="+name,String.class);
}
}
@LoadBalanced,通過(guò)源碼可以發(fā)現(xiàn)這是一個(gè)標(biāo)記注解:
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
通過(guò)注釋可以知道@LoadBalanced注解是用來(lái)給RestTemplate做標(biāo)記,方便我們對(duì)RestTemplate添加一個(gè)LoadBalancerClient,以實(shí)現(xiàn)客戶(hù)端負(fù)載均衡。
自動(dòng)裝載核心配置類(lèi)
SpringCloud對(duì)EurekaServer的封裝使得發(fā)布一個(gè)EurekaServer無(wú)比簡(jiǎn)單,根據(jù)自動(dòng)裝載原則可以在spring-cloud-netflix-ribbon-2.2.5.RELEASE.jar下的META-INF目錄下找到 spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
RibbonAutoConfiguration
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(
name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
}
先決條件
- @ConditionalOnClass:當(dāng)前環(huán)境必須存在這幾個(gè)類(lèi):IClient、RestTemplate、AsyncRestTemplate、Ribbon
- @RibbonClients:這個(gè)注解上面已經(jīng)講過(guò)了。
- @AutoConfigureAfter:負(fù)載均衡肯定是要基于注冊(cè)中心來(lái)做的,所以自動(dòng)裝配是在Eureka初始化完畢之后初始化的。
- @AutoConfigureBefore:這里的兩個(gè)類(lèi)先不說(shuō),保持神秘。
- @EnableConfigurationProperties,兩個(gè)配置類(lèi),其中:
- RibbonEagerLoadProperties類(lèi)中是關(guān)于Ribbon的饑餓加載模式的屬性
- ServerIntrospectorProperties類(lèi)中是關(guān)于安全端口的屬性
@RibbonClients
@RibbonClients注解使用@Import注解引入了配置類(lèi)RibbonClientConfigurationRegistrar
@Configuration(proxyBeanMethods = false)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Documented
@Import(RibbonClientConfigurationRegistrar.class)
public @interface RibbonClients {
RibbonClient[] value() default {};
Class<?>[] defaultConfiguration() default {};
}
RibbonClientConfigurationRegistrar
RibbonClientConfigurationRegistrar是一個(gè) ImportBeanDefinitionRegistrar,為配置了注冊(cè)了對(duì)應(yīng) RibbonClientSpecification 的 BeanDefinition。
public class RibbonClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
//RibbonClients 注解解析, 遍歷注冊(cè)所有 RibbonClient 配置類(lèi)的
Map<String, Object> attrs = metadata
.getAnnotationAttributes(RibbonClients.class.getName(), true);
if (attrs != null && attrs.containsKey("value")) {
AnnotationAttributes[] clients = (AnnotationAttributes[]) attrs.get("value");
for (AnnotationAttributes client : clients) {
registerClientConfiguration(registry, getClientName(client),
client.get("configuration"));
}
}
// 全局默認(rèn)配置
if (attrs != null && attrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." + metadata.getEnclosingClassName();
}
else {
name = "default." + metadata.getClassName();
}
registerClientConfiguration(registry, name,
attrs.get("defaultConfiguration"));
}
// 單個(gè) RibbonClient 的解析,注冊(cè)對(duì)應(yīng)配置類(lèi)的 BD
Map<String, Object> client = metadata
.getAnnotationAttributes(RibbonClient.class.getName(), true);
String name = getClientName(client);
if (name != null) {
registerClientConfiguration(registry, name, client.get("configuration"));
}
}
// 注冊(cè)類(lèi)型為 RibbonClientSpecification
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(RibbonClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + ".RibbonClientSpecification",
builder.getBeanDefinition());
}
}
首先會(huì)判斷是否存在注解@RibbonClients,注意,這里可是多了一個(gè)s的
然后判斷@RibbonClients注解上是否存在屬性value和defaultConfiguration,如果存在的話分別注冊(cè)他們。接著最后才是處理@RibbonClient注解
這里我們就可以猜測(cè)RibbonClientConfigurationRegistrar這個(gè)類(lèi)應(yīng)該是可以同時(shí)處理這兩個(gè)注解的,觀察一下@RibbonClients注解的源碼發(fā)現(xiàn)它確實(shí)是引入的也是這個(gè)類(lèi)
這兩個(gè)注解的區(qū)別應(yīng)該也可以猜測(cè)出來(lái),單數(shù)和雙數(shù)觀察最后注冊(cè)的代碼,可以看到最后注冊(cè)bean的類(lèi)型都是RibbonClientSpecification。
RibbonAutoConfiguration
該類(lèi)由 自動(dòng)裝配 加載,對(duì)應(yīng)于 OpenFeign 的 FeignContext,所有 RibbonContext 的 上下文 由 SpringClientFactory 創(chuàng)建和管理
// 掃描的所有 RibbonClientSpecification
@Autowired(required = false)
private List<RibbonClientSpecification> configurations = new ArrayList<>();
/**
* 將所有的 RibbonClientSpecification 交給 SpringClientFactory
* 由 SpringClientFactory 創(chuàng)建和管理對(duì)應(yīng)的 RibbonClient 上下文
*/
@Bean
@ConditionalOnMissingBean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
這跟 FeignContext 的原理一模一樣,因此不再過(guò)多解讀,RibbonClient 對(duì)應(yīng)的 上下文 創(chuàng)建與管理由 SpringClientFactory 實(shí)現(xiàn)。
SpringClientFactory,每一個(gè)微服務(wù)在都會(huì)調(diào)用多個(gè)微服務(wù),而調(diào)用各個(gè)微服務(wù)的配置可能是不一樣的,所以就需要這個(gè)創(chuàng)建客戶(hù)端負(fù)載均衡器的工廠類(lèi),它可以為每一個(gè)ribbon客戶(hù)端生成不同的Spring上下文,而觀察這個(gè)類(lèi)的configurations屬性也驗(yàn)證了這一點(diǎn)
同時(shí),RibbonAutoConfiguration 還創(chuàng)建了 LoadBalancerClient 的實(shí)例,為 RibbonLoadBalancerClient
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
RestTemplateCustomizer RestTemplate定制器
//Ribbon的http請(qǐng)求配置
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HttpRequest.class)
@ConditionalOnRibbonRestClient
protected static class RibbonClientHttpRequestFactoryConfiguration {
@Autowired
private SpringClientFactory springClientFactory;
//RestTemplate定制器
@Bean
public RestTemplateCustomizer restTemplateCustomizer(
final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
return restTemplate -> restTemplate
.setRequestFactory(ribbonClientHttpRequestFactory);
}
//注冊(cè)RibbonClientHttpRequestFactory ,聽(tīng)過(guò)它來(lái)創(chuàng)建ClientHttpRequest用來(lái)發(fā)http請(qǐng)求的,
//后續(xù)Ribbon執(zhí)行流程中會(huì)用到ClientHttpRequest
@Bean
public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
return new RibbonClientHttpRequestFactory(this.springClientFactory);
}
}
上方雖然看了Ribbon的自動(dòng)裝配功能,但是好像離真相還有一些距離,這是因?yàn)殡m然Ribbon準(zhǔn)備好了,但是負(fù)載均衡還沒(méi)看呢。SpringCloud把負(fù)載均衡相關(guān)的自動(dòng)配置放在了spring-cloud-commons包下,負(fù)載均衡的配置類(lèi)是LoadBalancerAutoConfiguration
@AutoConfigureBefore注解會(huì)加載LoadBalancerAutoConfiguration類(lèi)
@Configuration(proxyBeanMethods = false)
//必須存在RestTemplate類(lèi)
@ConditionalOnClass(RestTemplate.class)
//必須存在LoadBalancerClient類(lèi)型的bean
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
//所有被@LoadBalanced注解修飾的RestTemplate
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
//對(duì)于所有被@LoadBalanced注解修飾的RestTemplate,
//調(diào)用SmartInitializingSingleton的customize方法
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
//產(chǎn)生一個(gè)LoadBalancerInterceptor類(lèi)型的bean,包含loadBalancerClient類(lèi)型的bean
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//對(duì)于所有被@LoadBalanced注解修飾的RestTemplate,增加loadBalancerInterceptor屬性
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
LoadBalancerAutoConfiguration配置類(lèi)的作用是將所有被@LoadBalanced注解修飾的RestTemplate bean增加LoadBalancerInterceptor攔截器bean,而LoadBalancerInterceptor又包含loadBalancerClient,這樣當(dāng)用RestTemplate調(diào)用時(shí),會(huì)首先調(diào)用攔截器方法,在攔截器方法里使用loadBalancerClient真正實(shí)現(xiàn)負(fù)載均衡以及url轉(zhuǎn)換,達(dá)到服務(wù)名到真正的host之間的轉(zhuǎn)換和負(fù)載均衡。
@LoadBalanced
使用該注解配置 RestTemplate Bean,通過(guò)注解源碼可以看到,該注解的本質(zhì)是一個(gè) @Qualifier 注解。
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
Qualifier的意思是合格者,通過(guò)這個(gè)標(biāo)示,表明了哪個(gè)實(shí)現(xiàn)類(lèi)才是我們所需要的,添加@Qualifier注解,需要注意的是@Qualifier的參數(shù)名稱(chēng)為我們之前定義@Service注解的名稱(chēng)之一。
攔截器LoadBalancerInterceptor
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
//注入LoadBalancerClient 的實(shí)現(xiàn) (唯一的實(shí)現(xiàn)就是RibbonLoadBalancerClient
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
//攔截
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
//從請(qǐng)求url里面拿到服務(wù)名
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
//LoadBalancerClient執(zhí)行 根據(jù)服務(wù)名選擇實(shí)例 發(fā)起請(qǐng)求 的過(guò)程
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}
從代碼可以看出 LoadBalancerInterceptor 攔截了請(qǐng)求后,通過(guò)LoadBalancerClient執(zhí)行具體的請(qǐng)求發(fā)送。
LoadBalancerClient
LoadBalancerClient接口,有如下三個(gè)方法,其中excute()為執(zhí)行請(qǐng)求,reconstructURI()用來(lái)重構(gòu)url。
public interface LoadBalancerClient extends ServiceInstanceChooser {
//父接口方法
ServiceInstance choose(String serviceId);
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
接口說(shuō)明:
- ServiceInstance choose(String serviceId):根據(jù)傳入的服務(wù)id,從負(fù)載均衡器中為指定的服務(wù)選擇一個(gè)服務(wù)實(shí)例。
- T execute(String serviceId, LoadBalancerRequest request):根據(jù)傳入的服務(wù)id,指定的負(fù)載均衡器中的服務(wù)實(shí)例執(zhí)行請(qǐng)求。
- T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request):根據(jù)傳入的服務(wù)實(shí)例,執(zhí)行請(qǐng)求。
RibbonLoadBalancerClient
public class RibbonLoadBalancerClient implements LoadBalancerClient {
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//獲取負(fù)載均衡器
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//負(fù)載均衡器ILoadBalancer根據(jù)負(fù)載均衡算法選取一個(gè)Server
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
//
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
}
getLoadBalancer(serviceId)
- 獲取負(fù)載均衡器
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
}
實(shí)際負(fù)載均衡的是通過(guò) ILoadBalancer 來(lái)實(shí)現(xiàn)的。
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
@Deprecated
public List<Server> getServerList(boolean availableOnly);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
接口說(shuō)明:
- addServers:向負(fù)載均衡器中添加一個(gè)服務(wù)實(shí)例集合。
- chooseServer:跟據(jù)key,從負(fù)載均衡器獲取服務(wù)實(shí)例。
- markServerDown:用來(lái)標(biāo)記某個(gè)服務(wù)實(shí)例下線。
- getReachableServers:獲取可用的服務(wù)實(shí)例集合。
- getAllServers():獲取所有服務(wù)實(shí)例集合,包括下線的服務(wù)實(shí)例。
ILoadBalancer 的實(shí)現(xiàn) 依賴(lài)關(guān)系示意圖如下:

- NoOpLoadBalancer:啥都不做
-
BaseLoadBalancer:
- 一個(gè)負(fù)載均衡器的基本實(shí)現(xiàn),其中有一個(gè)任意列表,可以將服務(wù)器設(shè)置為服務(wù)器池。
- 可以設(shè)置一個(gè)ping來(lái)確定服務(wù)器的活力。
- 在內(nèi)部,該類(lèi)維護(hù)一個(gè)“all”服務(wù)器列表,以及一個(gè)“up”服務(wù)器列表,并根據(jù)調(diào)用者的要求使用它們。
-
DynamicServerListLoadBalancer:
- 通過(guò)動(dòng)態(tài)的獲取服務(wù)器的候選列表的負(fù)載平衡器。
- 可以通過(guò)篩選標(biāo)準(zhǔn)來(lái)傳遞服務(wù)器列表,以過(guò)濾不符合所需條件的服務(wù)器。
-
ZoneAwareLoadBalancer:
- 用于測(cè)量區(qū)域條件的關(guān)鍵指標(biāo)是平均活動(dòng)請(qǐng)求,它根據(jù)每個(gè)rest客戶(hù)機(jī)和每個(gè)區(qū)域聚合。這是區(qū)域內(nèi)未完成的請(qǐng)求總數(shù)除以可用目標(biāo)實(shí)例的數(shù)量(不包括斷路器跳閘實(shí)例)。當(dāng)在壞區(qū)上緩慢發(fā)生超時(shí)時(shí),此度量非常有效。
- 該負(fù)載均衡器將計(jì)算并檢查所有可用區(qū)域的區(qū)域狀態(tài)。如果任何區(qū)域的平均活動(dòng)請(qǐng)求已達(dá)到配置的閾值,則該區(qū)域?qū)幕顒?dòng)服務(wù)器列表中刪除。如果超過(guò)一個(gè)區(qū)域達(dá)到閾值,則將刪除每個(gè)服務(wù)器上活動(dòng)請(qǐng)求最多的區(qū)域。一旦去掉最壞的區(qū)域,將在其余區(qū)域中選擇一個(gè)區(qū)域,其概率與其實(shí)例數(shù)成正比。服務(wù)器將使用給定的規(guī)則從所選區(qū)域返回。對(duì)于每個(gè)請(qǐng)求,將重復(fù)上述步驟。也就是說(shuō),每個(gè)與區(qū)域相關(guān)的負(fù)載平衡決策都是實(shí)時(shí)做出的,最新的統(tǒng)計(jì)數(shù)據(jù)可以幫助進(jìn)行選擇。
RibbonClientConfiguration配置類(lèi)可以看到,在整合Ribbon的時(shí)候Spring Cloud默認(rèn)采用ZoneAwareLoadBalancer來(lái)實(shí)現(xiàn)負(fù)載均衡器。
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
}
從這段代碼 ,也可以看出,負(fù)載均衡器所需的主要配置項(xiàng)是IClientConfig、ServerList、ServerListFilter、IRule、IPing、ServerListUpdater。
IClientConfig
IClientConfig 用于對(duì)客戶(hù)端或者負(fù)載均衡的配置,它的默認(rèn)實(shí)現(xiàn)類(lèi)為 DefaultClientConfigImpl。
IRule
為L(zhǎng)oadBalancer定義“負(fù)載均衡策略”的接口。
public interface IRule{
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
IRule 的實(shí)現(xiàn) 依賴(lài)關(guān)系示意圖如下:

- BestAvailableRule:選擇具有最低并發(fā)請(qǐng)求的服務(wù)器。
- ClientConfigEnabledRoundRobinRule:輪詢(xún)。
- RandomRule:隨機(jī)選擇一個(gè)服務(wù)器。
- RoundRobinRule:輪詢(xún)選擇服務(wù)器。
- RetryRule:具備重試機(jī)制的輪詢(xún)。
- WeightedResponseTimeRule:根據(jù)使用平均響應(yīng)時(shí)間去分配一個(gè)weight(權(quán)重) ,weight越低,被選擇的可能性就越低。
- ZoneAvoidanceRule:根據(jù)區(qū)域和可用性篩選,再輪詢(xún)選擇服務(wù)器。
IPing
定義如何 “ping” 服務(wù)器以檢查其是否存活。
public interface IPing {
public boolean isAlive(Server server);
}
IPing 的實(shí)現(xiàn) 依賴(lài)關(guān)系示意圖如下:

ServerList
定義獲取所有的服務(wù)實(shí)例清單。
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();
}
ServerList 的實(shí)現(xiàn) 依賴(lài)關(guān)系示意圖如下:

- DomainExtractingServerList:代理類(lèi),根據(jù)傳入的ServerList的值,實(shí)現(xiàn)具體的邏輯。
- ConfigurationBasedServerList:從配置文件中加載服務(wù)器列表。
- DiscoveryEnabledNIWSServerList:從Eureka注冊(cè)中心中獲取服務(wù)器列表。
- StaticServerList:通過(guò)靜態(tài)配置來(lái)維護(hù)服務(wù)器列表。
ServerListFilter
允許根據(jù)過(guò)濾配置動(dòng)態(tài)獲得的具有所需特性的候選服務(wù)器列表。
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
ServerListFilter 的實(shí)現(xiàn) 依賴(lài)關(guān)系示意圖如下:

ServerListUpdater
用于執(zhí)行動(dòng)態(tài)服務(wù)器列表更新。
public interface ServerListUpdater {
public interface UpdateAction {
void doUpdate();
}
void start(UpdateAction updateAction);
void stop();
String getLastUpdate();
long getDurationSinceLastUpdateMs();
int getNumberMissedCycles();
int getCoreThreads();
}
ServerListUpdater 的實(shí)現(xiàn) 依賴(lài)關(guān)系示意圖如下:

- PollingServerListUpdater:默認(rèn)的實(shí)現(xiàn)策略,會(huì)啟動(dòng)一個(gè)定時(shí)線程池,定時(shí)執(zhí)行更新策略。
- EurekaNotificationServerListUpdater:利用Eureka的事件監(jiān)聽(tīng)器來(lái)驅(qū)動(dòng)服務(wù)列表的更新操作。
getServer(loadBalancer, hint)
在RibbonLoadBalancerClient 中的execute方法調(diào)用getServer(loadBalancer, hint)方法,
即負(fù)載均衡器ILoadBalancer根據(jù)負(fù)載均衡算法選取一個(gè)Server。
public class RibbonLoadBalancerClient implements LoadBalancerClient {
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
}
/**
* Ribbon負(fù)載均衡器的基礎(chǔ)實(shí)現(xiàn)類(lèi)
*/
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
//默認(rèn)使用RoundRobinRule
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
//負(fù)載均衡的處理規(guī)則,默認(rèn)使用RoundRobinRule規(guī)則,該規(guī)則實(shí)現(xiàn)了最基本且常用的線性負(fù)載均衡規(guī)則。
protected IRule rule = DEFAULT_RULE;
//檢查服務(wù)實(shí)例操作時(shí)的執(zhí)行策略對(duì)象,使用的策略是SerialPingStrategy
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
//用來(lái)檢查服務(wù)實(shí)例是否正常,默認(rèn)為null,需要在構(gòu)造時(shí)注入它的具體實(shí)現(xiàn)
protected IPing ping = null;
//存儲(chǔ)所有服務(wù)實(shí)例清單
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
//存儲(chǔ)正常服務(wù)實(shí)例清單
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
//用來(lái)存儲(chǔ)負(fù)載均衡器各服務(wù)實(shí)例屬性和統(tǒng)計(jì)信息
protected LoadBalancerStats lbStats;
/*
* 負(fù)載均衡器實(shí)際將服務(wù)實(shí)例選擇任務(wù)委托給了IRule實(shí)例中的choose函數(shù)來(lái)實(shí)現(xiàn),挑選一個(gè)具體的服務(wù)實(shí)例
*/
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}
RoundRobinRule
Ribbon 默認(rèn)的規(guī)則為 RoundRobinRule (輪詢(xún))
public class RoundRobinRule extends AbstractLoadBalancerRule {
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
//最多選擇十次,就會(huì)結(jié)束嘗試
while (server == null && count++ < 10) {
//獲取所有可用的服務(wù)器
List<Server> reachableServers = lb.getReachableServers();
//獲取所有服務(wù)器
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//獲取下一個(gè)提供服務(wù)的下標(biāo)
int nextServerIndex = incrementAndGetModulo(serverCount);
//獲取指定下標(biāo)的服務(wù)
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
//如果輪詢(xún)次數(shù)Server超過(guò)10次,選擇不到實(shí)例的話,會(huì)報(bào)警告信息。
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
private int incrementAndGetModulo(int modulo) {
for (;;) {
//獲取當(dāng)前已有的請(qǐng)求總數(shù)
int current = nextServerCyclicCounter.get();
//獲取服務(wù)的下標(biāo)
int next = (current + 1) % modulo;
//比較交換
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
}
RibbonClientConfiguration
在Spring Cloud中,Ribbon默認(rèn)的配置類(lèi)是RibbonClientConfiguration。也可使用一個(gè)POJO自定義Ribbon的配置(自定義配置會(huì)覆蓋默認(rèn)配置)。這種配置是細(xì)粒度的,不同的Ribbon客戶(hù)端可以使用不同的配置。
- 在SpringBoot啟動(dòng)類(lèi)以外新建ribbonconfiguration包,并新建RibbonConfiguration類(lèi)
/**
* @author: huangyibo
* @Date: 2019/11/2 18:08
* @Description: 如果將此類(lèi)放進(jìn)啟動(dòng)類(lèi)的包下,那么此工程的所有ribbon都會(huì)使用這種負(fù)載均衡規(guī)則
*/
@Configuration
public class RibbonConfiguration {
//Ribbon提供的負(fù)載均衡策略
@Bean
public IRule ribbonRule(){
return new RandomRule();
}
}
- Java代碼配置
@Configuration
@RibbonClient(name="user-center",configuration = RibbonConfiguration.class)
public class UserCenterRibbonConfiguration {
}
- 用配置屬性配置
# 通過(guò)配置文件指定user-center實(shí)例的ribbon負(fù)載均衡策略為RandomRule,和java代碼方式指定效果一樣
user-center:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
RibbonClientConfiguration會(huì)初始化負(fù)載均衡器所需的主要配置項(xiàng)是IClientConfig、ServerList、ServerListFilter、IRule、IPing、ServerListUpdater并初始化ZoneAwareLoadBalancer。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
return this.propertiesFactory.get(ServerListFilter.class, config, name);
}
ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
filter.initWithNiwsConfig(config);
return filter;
}
}
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer的構(gòu)造函數(shù)初始化父類(lèi)DynamicServerListLoadBalancer
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
}
DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
//初始化BaseLoadBalancer
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
}
BaseLoadBalancer
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
}
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
//初始化最長(zhǎng)Ping間隔時(shí)間pingIntervalTime
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
//初始化最大Ping時(shí)間maxTotalPingTime
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
setRule(rule);
setPing(ping);
setLoadBalancerStats(stats);
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
}
logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
boolean enablePrimeConnections = clientConfig.get(
CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
//獲取是否啟用連接器驗(yàn)活標(biāo)識(shí)enablePrimeConnections 默認(rèn)為false。
//如果該值為true 會(huì)在加載的時(shí)候?qū)κ褂盟蟹?wù)器進(jìn)行檢測(cè),
//通過(guò)PrimeConnections 來(lái)設(shè)置服務(wù)器的readyToServe 狀態(tài)
if (enablePrimeConnections) {
this.setEnablePrimingConnections(true);
PrimeConnections primeConnections = new PrimeConnections(
this.getName(), clientConfig);
this.setPrimeConnections(primeConnections);
}
init();
}
public void setPingInterval(int pingIntervalSeconds) {
if (pingIntervalSeconds < 1) {
return;
}
this.pingIntervalSeconds = pingIntervalSeconds;
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancer [{}]: pingIntervalSeconds set to {}",
name, this.pingIntervalSeconds);
}
//設(shè)置Server的定時(shí)Ping任務(wù)
setupPingTask(); // since ping data changed
}
}
- 初始化最長(zhǎng)Ping間隔時(shí)間pingIntervalTime和最大Ping時(shí)間maxTotalPingTime 沒(méi)有地方使用到。
- setPingInterval調(diào)用setupPingTask方法,啟動(dòng)Ping任務(wù)
- 獲取是否啟用連接器驗(yàn)活標(biāo)識(shí)enablePrimeConnections 默認(rèn)為false。如果該值為true 會(huì)在加載的時(shí)候?qū)κ褂盟蟹?wù)器進(jìn)行檢測(cè),通過(guò)PrimeConnections 來(lái)設(shè)置服務(wù)器的readyToServe 狀態(tài)
setupPingTask
- 設(shè)置Server的定時(shí)Ping任務(wù)
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
protected int pingIntervalSeconds = 10;
void setupPingTask() {
if (canSkipPing()) {
return;
}
// 如果已經(jīng)有了定時(shí)任務(wù),則取消
if (lbTimer != null) {
lbTimer.cancel();
}
// 第二個(gè)參數(shù)為true,表示它是一個(gè)deamon線程
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
// 創(chuàng)建 PingTask, 它繼承于 TimerTask,定時(shí)執(zhí)行 run 方法
// 啟動(dòng)PingTask任務(wù),每10秒執(zhí)行一次
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
}
PingTask 任務(wù)
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
class PingTask extends TimerTask {
public void run() {
try {
// 默認(rèn) pingStrategy = new SerialPingStrategy()
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
class Pinger {
public void runPinger() throws Exception {
// 如果正在ping,則返回
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// 所有的服務(wù),包括不可用的服務(wù)
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
// 所有服務(wù)的數(shù)量
int numCandidates = allServers.length;
// 所有服務(wù)ping的結(jié)果
results = pingerStrategy.pingServers(ping, allServers);
// 狀態(tài)可用的服務(wù)列表
final List<Server> newUpList = new ArrayList<Server>();
// 狀態(tài)改變的服務(wù)列表
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
// 最新的狀態(tài)
boolean isAlive = results[i];
Server svr = allServers[i];
// 老的狀態(tài)
boolean oldIsAlive = svr.isAlive();
// 更新?tīng)顟B(tài)
svr.setAlive(isAlive);
// 如果狀態(tài)改變了,則放到集合中,會(huì)進(jìn)行重新拉取
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
// 狀態(tài)可用的服務(wù)
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
// 變態(tài)改變監(jiān)聽(tīng)器
notifyServerStatusChangeListener(changedServers);
} finally {
// ping 完成
pingInProgress.set(false);
}
}
}
}
pingServers檢測(cè)服務(wù)的狀態(tài)
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private static class SerialPingStrategy implements IPingStrategy {
// 檢測(cè)服務(wù)的狀態(tài)
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
}
Ribbon 每10秒向 EurekaClient 發(fā)送 ping 來(lái)判斷服務(wù)的可用性,如果服務(wù)的可用性發(fā)生了改變或服務(wù)的數(shù)量和之前的不一致,則會(huì)更新或重新拉取服務(wù)。有了這些服務(wù)之后,會(huì)根據(jù)負(fù)載均衡策略 IRule 來(lái)選擇一個(gè)可用的服務(wù)。
DynamicServerListLoadBalancer#restOfInit(clientConfig)
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//定時(shí)更新Eureka Client實(shí)例列表
enableAndInitLearnNewServersFeature();
//獲取所有Eureka Client實(shí)例列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
}
enableAndInitLearnNewServersFeature()
- 每30秒定時(shí)更新Eureka Client實(shí)例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
}
public class PollingServerListUpdater implements ServerListUpdater {
//更新服務(wù)實(shí)例在初始化之后延遲1秒后開(kāi)始執(zhí)行
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
//以30秒為周期重復(fù)執(zhí)行
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
//以定時(shí)任務(wù)的方式進(jìn)行服務(wù)列表的更新。
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//創(chuàng)建一個(gè)Runnable的線程wrapperRunnable
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//具體更新服務(wù)實(shí)例列表的方法
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//為wrapperRunnable線程啟動(dòng)一個(gè)定時(shí)任務(wù)
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs, //1秒
refreshIntervalMs, //30秒
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
class NamelessClass_1 implements UpdateAction {
NamelessClass_1() {
}
public void doUpdate() {
//獲取所有Eureka Client的服務(wù)實(shí)例列表
DynamicServerListLoadBalancer.this.updateListOfServers();
}
}
this.updateAction = new NamelessClass_1();
this.initWithNiwsConfig(clientConfig);
}
}
DynamicServerListLoadBalancer#updateListOfServers()
- 獲取所有Eureka Client實(shí)例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList();
if (this.serverListImpl != null) {
//實(shí)現(xiàn)從Eureka Server中獲取服務(wù)可用實(shí)例列表
servers = this.serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
if (this.filter != null) {
servers = this.filter.getFilteredListOfServers((List)servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
}
}
//更新服務(wù)實(shí)例列表
this.updateAllServerList((List)servers);
}
}
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
//從DiscoveryClient中的localRegionApps中獲取服務(wù)實(shí)例緩存列表
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
}
@Singleton
public class DiscoveryClient implements EurekaClient {
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private volatile Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<>();
@Override
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
@Nullable String region) {
if (vipAddress == null) {
throw new IllegalArgumentException(
"Supplied VIP Address cannot be null");
}
Applications applications;
if (instanceRegionChecker.isLocalRegion(region)) {
//獲取服務(wù)實(shí)例緩存列表
applications = this.localRegionApps.get();
} else {
//獲取服務(wù)實(shí)例緩存列表
applications = remoteRegionVsApps.get(region);
if (null == applications) {
logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
+ "address {}.", region, vipAddress);
return Collections.emptyList();
}
}
if (!secure) {
return applications.getInstancesByVirtualHostName(vipAddress);
} else {
return applications.getInstancesBySecureVirtualHostName(vipAddress);
}
}
}
DynamicServerListLoadBalancer#updateAllServerList
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected void updateAllServerList(List<T> ls) {
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
// 狀態(tài)設(shè)置為可用
s.setAlive(true);
}
//設(shè)置服務(wù)實(shí)例列表
setServersList(ls);
// 強(qiáng)制檢測(cè)服務(wù)狀態(tài)
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
@Override
public void setServersList(List lsrv) {
//將服務(wù)實(shí)例列表設(shè)置到父類(lèi)(BaseLoadBalancer)的allServerList中
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
}
正如名所示,DynamicServerListLoadBalancer可以動(dòng)態(tài)的加載后端服務(wù)列表,DynamicServerListLoadBalancer中使用一個(gè)ServerListRefreshExecutorThread任務(wù)線程定期的更新后端服務(wù)列表。
參考:
https://www.cnblogs.com/huanchupkblog/p/10923229.html