前言
這篇文章參考了Spring+Cloud微服務(wù)實戰(zhàn)這本書。但是在此基礎(chǔ)上延伸了很多知識點。

源碼分析
@LoadBalanced注解被@Qualifier注解
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
對于Spring中的AnnotationMetadata不太熟悉的同學(xué),可以跑一下下面的CASE
public class MetaTest1 {
public static void main(String[] args) {
StandardAnnotationMetadata metadata = new StandardAnnotationMetadata(
MetaDemo.class, true
);
System.out.println("============ClassMetadata===================");
ClassMetadata classMetadata = metadata;
System.out.println(classMetadata.getClassName());
// 是不是內(nèi)部類
System.out.println(classMetadata.getEnclosingClassName());
// 返回內(nèi)部類集合
System.out.println(StringUtils.arrayToCommaDelimitedString(
classMetadata.getMemberClassNames()
));
// 返回接口集合
System.out.println(StringUtils.arrayToCommaDelimitedString(classMetadata.getInterfaceNames()));
// 有沒有超類, 如果超類是Object,那么為false
System.out.println(classMetadata.hasSuperClass());
System.out.println(classMetadata.getSuperClassName());
System.out.println(classMetadata.isAnnotation());
System.out.println(classMetadata.isFinal());
// 就是可以獨立new出來的, top class或者static inner class
System.out.println(classMetadata.isIndependent());
System.out.println("==========AnnotatedTypeMetadata====================");
AnnotatedTypeMetadata annotatedTypeMetadata = metadata;
System.out.println(annotatedTypeMetadata.isAnnotated(Service.class.getName()));
System.out.println(annotatedTypeMetadata.isAnnotated(Component.class.getName()));
System.out.println(annotatedTypeMetadata.isAnnotated(EnableAsync.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Service.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Component.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Repository.class.getName()));
System.out.println(annotatedTypeMetadata.getAnnotationAttributes(EnableAsync.class.getName()));
// 數(shù)組返回 不會進行屬性合并的操作
System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(Service.class.getName()));
System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(Component.class.getName()));
System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(EnableAsync.class.getName()));
System.out.println("=================AnnotationMetadata=================");
AnnotationMetadata annotationMetadata = metadata;
// 獲取元注解
System.out.println(annotationMetadata.getAnnotationTypes());
// 獲取service注解的元注解
System.out.println(annotationMetadata.getMetaAnnotationTypes(Service.class.getName()));
// 獲取component注解的元注解
/**
* meta就是獲取注解上面的注解,會排除掉java.lang這些注解們
*/
System.out.println(annotationMetadata.getMetaAnnotationTypes(Component.class.getName()));
// 不會去找元注解的,true
System.out.println(annotationMetadata.hasAnnotation(Service.class.getName()));
// false
System.out.println(annotationMetadata.hasAnnotation(Component.class.getName()));
/**
* 確定基礎(chǔ)類是否有一個自身的注釋 使用給定類型的元注釋進行注釋。
*/
// false
System.out.println(annotationMetadata.hasMetaAnnotation(Service.class.getName()));
// true
System.out.println(annotationMetadata.hasMetaAnnotation(Component.class.getName()));
System.out.println(annotationMetadata.hasAnnotatedMethods(Autowired.class.getName()));
// StandardMethodMetadata
annotationMetadata.getAnnotatedMethods(Autowired.class.getName())
.forEach(method -> {
System.out.println(method.getClass());
System.out.println(method.getDeclaringClassName());
System.out.println(method.getMethodName());
System.out.println(method.getReturnTypeName());
});
}
@Repository("repository")
@Service("serviceName")
@EnableAsync
public static class MetaDemo extends HashMap<String, String> implements
Serializable {
private static class InnerClass {
}
@Autowired
private String getName() {
return "xiaoma";
}
}
}
運行結(jié)果如下:
============ClassMetadata===================
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo$InnerClass
java.io.Serializable
true
java.util.HashMap
false
false
true
==========AnnotatedTypeMetadata====================
true
true
true
{value=serviceName}
{value=repository}
{value=repository}
{order=2147483647, annotation=interface java.lang.annotation.Annotation, mode=PROXY, proxyTargetClass=false}
{value=[serviceName]}
{value=[, ]}
{order=[2147483647], annotation=[interface java.lang.annotation.Annotation], mode=[PROXY], proxyTargetClass=[false]}
=================AnnotationMetadata=================
[org.springframework.stereotype.Repository, org.springframework.stereotype.Service, org.springframework.scheduling.annotation.EnableAsync]
[org.springframework.stereotype.Component, org.springframework.stereotype.Indexed]
[]
true
false
false
true
true
class org.springframework.core.type.StandardMethodMetadata
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo
getName
java.lang.String
負載均衡器客戶端LoadBalancerClient接口
// 使用從負載均衡器中挑選一個對應(yīng)服務(wù)的實例
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
// 為系統(tǒng)構(gòu)建一個合適的host:port形式的url
// ServiceInstance對象是帶有host和port的具體服務(wù)實例
// 后者url對象則是使用邏輯服務(wù)定義為host的URL,比如http://SERVICE-PROVIDER/serviceprovider/hello
URI reconstructURI(ServiceInstance instance, URI original);
服務(wù)實例選擇器ServiceInstanceChooser接口
// 從負載均衡器中挑選一個服務(wù)實例
ServiceInstance choose(String serviceId)
LoadBalancerAutoConfiguration類實現(xiàn)客戶端負載均衡器的自動化配置
-
RetryLoadBalancerInterceptor:用于實現(xiàn)對客戶端發(fā)起請求時攔截,以實現(xiàn)客戶端負載均衡 -
RestTemplateCustomizer:用于給RestTemplate增加LoadBalancerInterceptor攔截器 - 維護了一個被
@LoadBalanced注解修飾的RestTemplate對象列表,并在這里進行初始化通過調(diào)用RestTemplatCustomizer的實例給需要客戶端負載均衡的RestTemplate增加LoadBalancerInterceptor攔截器 -
RibbonLoadBalancedRetryFactory給客戶端負債均衡增加重試機制 -
LoadBalancerRequestFactory對request進行包裝加工成LoadBalancerRequest,調(diào)用ClientHttpRequestExecution中的execute(serviceRequest, body),返回ClientHttpResponse
LoadBalancerInterceptor:
當(dāng)一個@LoadBalanced注解修飾的RestTemplate對象向外發(fā)起Http請求,
會被這個類所攔截,由于我們在使用RestTemplate時采用了服務(wù)器名作為host,
所以直接從HttpRequest的URI對象中通過getHost就可以拿到服務(wù)名。
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
RibbonLoadBalancerClient中execute方法可以看到getServer中是通過ILoadBalancer接口來獲取服務(wù)。
對了這里說明以下 在一個應(yīng)用里面比如調(diào)用了A,B服務(wù) 那么應(yīng)用會創(chuàng)建一個A的Ribbon容器和B的ribbon容器。而且容器還是懶加載,所以第一次請求總是會超時
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
Ribbon的容器工廠是SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>
在RibbonAutoConfiguration配置中, 有將List<RibbonClientSpecification> configurations = new ArrayList<>()屬性設(shè)置給SpringClientFactory
那SpringBoot是怎么掃描@RibbonClient注解的呢? RibbonClientSpecification是怎么注冊到Spring容器呢?
服務(wù)ID和對應(yīng)的ribbon配置是怎么關(guān)聯(lián)起來的呢?詳情可以看到RibbonClientConfigurationRegistrar
可以看到我們將每個服務(wù)ID和對應(yīng)的ribbon配置通過RibbonClientSpecification來維護,同時注冊到Spring容器中。
private void registerClientConfiguration(BeanDefinitionRegistry registry,
Object name, Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(RibbonClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + ".RibbonClientSpecification",
builder.getBeanDefinition());
}
Feign和Ribbon容器創(chuàng)建就是在這里進行的, 還把他們的配置類裝在自己的容器里, 此時SpringBoot容器也有一份喲。同時將SpringBoot容器設(shè)置成父容器。
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object> singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}
在RibbonClientConfiguration配置類中, 配置基本和客戶端怎么使用ribbon有關(guān)系。
負載均衡器ILoadBalancer接口
// 向負載均衡器中維護的實例列表增加服務(wù)實例
public void addServers(List<Server> newServers);
// 通過某種策略,從負載均衡器中挑選出一個具體的實例
public Server chooseServer(Object key);
// 用來通知和標(biāo)識負載均衡器中某個具體實例已經(jīng)停止服務(wù),不然負載均衡器
在下一次獲取服務(wù)實例清單前都會認(rèn)為服務(wù)實例均是正常
public void markServerDown(Server server);
// 獲取當(dāng)前正常服務(wù)的實例列表
public List<Server> getReachableServers();
// 獲取所有已知的服務(wù)實例列表,包括正常服務(wù)和停止服務(wù)的實例。
public List<Server> getAllServers();
BaseLoadBalancer類實現(xiàn)了基礎(chǔ)的負載均衡功能
而DynamicServerListLoadBalancer和ZoneAwardLoadBalancer
在RibbonClientConfiguration中可以看到是ZoneAwardLoadBalancer
可以看到有ILoadBalancer,IPing,IRule,ServerList,ServerListFilter之類的配置
這里我們可以看到PropertiesFactory這個類就明白了為什么可以通過serviceId.ribbon.NFLoadBalancerRuleClassName=RuleImpl.class配置負載均衡策略
實際上是靠Constructor(參數(shù)是IClientConfig)反射去完成實例化的。
如果是IRule,IPing的話可能會拋出異常,但是這里忽略掉了。
不能靠有參構(gòu)造器初始化的話,接著調(diào)用BeanUtils.instantiate(clazz)完成初始化;
public class PropertiesFactory {
@Autowired
private Environment environment;
private Map<Class, String> classToProperty = new HashMap<>();
public PropertiesFactory() {
classToProperty.put(ILoadBalancer.class, "NFLoadBalancerClassName");
classToProperty.put(IPing.class, "NFLoadBalancerPingClassName");
classToProperty.put(IRule.class, "NFLoadBalancerRuleClassName");
classToProperty.put(ServerList.class, "NIWSServerListClassName");
classToProperty.put(ServerListFilter.class, "NIWSServerListFilterClassName");
}
public boolean isSet(Class clazz, String name) {
return StringUtils.hasText(getClassName(clazz, name));
}
public String getClassName(Class clazz, String name) {
if (this.classToProperty.containsKey(clazz)) {
String classNameProperty = this.classToProperty.get(clazz);
String className = environment.getProperty(name + "." + NAMESPACE + "." + classNameProperty);
return className;
}
return null;
}
@SuppressWarnings("unchecked")
public <C> C get(Class<C> clazz, IClientConfig config, String name) {
String className = getClassName(clazz, name);
if (StringUtils.hasText(className)) {
try {
Class<?> toInstantiate = Class.forName(className);
return (C) SpringClientFactory.instantiateWithConfig(toInstantiate, config);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Unknown class to load "+className+" for class " + clazz + " named " + name);
}
}
return null;
}
}
而我們上面在RibbonLoadBalancerClient中的getLoadBalancer(serviceId)方法最后就是調(diào)用以下的方法
首先從該服務(wù)的Ribbon容器, SpringBoot容器 尋找ILoadBalancer的實現(xiàn)類
如果不存在的話, 接著找IClientConfig的實現(xiàn)類
找到了IClientConfig的實現(xiàn)類就可以圍魏救趙了
接著調(diào)用instantiateWithConfig(getContext(name), type, config);
通過有參構(gòu)造器初始化ILoadBalancer, 參數(shù)是IClientConfig
這時候你就會問了通過反射初始化 會導(dǎo)致很多屬性沒有注入,那絕對有問題啊
其實這些屬性比如ServerList,ServerListFilter,Rule,Ping之類的屬性
會通過DefaultClientConfigImpl中去拿, 也就是構(gòu)造器中的那個IClientConfig參數(shù)
public ILoadBalancer getLoadBalancer(String name) {
return getInstance(name, ILoadBalancer.class);
}
@Override
public <C> C getInstance(String name, Class<C> type) {
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
在SpringClientFactory中instantiateWithConfig方法中看到以下代碼
可能是為了兼容 IPing,IRule,ServerList,ServerListFilter這些實現(xiàn)類,同時還注入屬性。
if (result == null) {
result = BeanUtils.instantiate(clazz);
if (result instanceof IClientConfigAware) {
((IClientConfigAware) result).initWithNiwsConfig(config);
}
if (context != null) {
context.getAutowireCapableBeanFactory().autowireBean(result);
}
}
上面說到Ribbin容器是懶加載,那么我們可不可以設(shè)置成急加載呢?
RibbonAutoConfiguration中可以配置RibbonApplicationContextInitializer
通過接收SpringBoot啟動完畢事件 ApplicationReadyEvent 初始化clients中的容器
這樣就可以避免第一次調(diào)用超時,但是增加了應(yīng)用啟動時間。有得有失把
@Configuration
@ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {
...
}
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
@Bean
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
return new RibbonLoadBalancedRetryFactory(clientFactory);
}
@Bean
@ConditionalOnMissingBean
public PropertiesFactory propertiesFactory() {
return new PropertiesFactory();
}
@Bean
@ConditionalOnProperty(value = "ribbon.eager-load.enabled")
public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
return new RibbonApplicationContextInitializer(springClientFactory(),
ribbonEagerLoadProperties.getClients());
}
回歸正題 在RibbonLoadBalancerClient中的execute()中,我們已經(jīng)獲取了ServiceInstance,那么怎么得到請求的url呢
把目光轉(zhuǎn)移到request.apply(serviceInstance)這個方法
也就是上文說到的LoadBalancerRequestFactory中的createRequest()實現(xiàn)該接口中的這個方法
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
在LoadBalancerInterceptor攔截器中,ClientHttpRequestExecution的實例
具體會執(zhí)行execute(servletRequest,body)時,會調(diào)用InterceptingClientHttpRequest下
InterceptingRequestExecution類中的execute函數(shù), 攔截器的鏈?zhǔn)教幚砭腕w現(xiàn)在這里
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
還記得我們之前說提到的RestTemplate中設(shè)置LoadBalancerInterceptor嗎?
這里的攔截器責(zé)任鏈就是我們之前設(shè)置的。
我們調(diào)用restTemplate.execute()最終會經(jīng)過這里被攔截
RestTemplate中的HttpRequestFactory就是InterceptingClientHttpRequestFactory
其createRequest方法如下
@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}
可以看到RestTemplate中的doExecute()最終會執(zhí)行request.execute()
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
看到InterceptingClientHttpRequest中的executeInternal
那么流程又回到我們剛才的InterceptingRequestExecution中的execute函數(shù)
@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
在InterceptingRequestExecution中的execute函數(shù)
注意到ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
這里的requestFactory可不是LoadBalancerRequestFactory, 別搞混了
這里的requestFatory是實現(xiàn)了ClientHttpRequestFactory接口的factory,而LoadBalancerRequestFactory沒有實現(xiàn)這個接口.
這里的requestFactory其實是通過setRequestFactory(requestFactory)函數(shù)去設(shè)置的。
在我們應(yīng)用里面沒有調(diào)用這個函數(shù),其默認(rèn)的實現(xiàn)是SimpleClientHttpRequestFactory
比較常見的實現(xiàn)有HttpComponentsClientHttpRequestFactory,RibbonClientHttpRequestFactory等等等
public abstract class HttpAccessor {
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
...
}
這里的request.getURI其實是調(diào)用的ServiceRequestWrapper中的getURI方法
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
先前我們在LoadBalancerInterceptor有設(shè)置loadBalancerClient和requestFactory屬性,我們調(diào)用loadBalancer.chooseServer
獲取Server節(jié)點信息, 把它包裝成RibbonServer, 也就是這里的serviceInstance
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
ServiceRequestWrapper中的getURI方法如下
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
LoadBalancerClient loadBalancer) {
super(request);
this.instance = instance;
this.loadBalancer = loadBalancer;
}
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(
this.instance, getRequest().getURI());
return uri;
}
}
返回的URI實則是LoadBalancerClient中獲取得到的。
這里我們可以看到從 服務(wù)的ribbon容器獲取其RibbonLoadBalancerContext上下文信息
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
} else {
server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}
RibbonLoadBalancerContext根據(jù)RibbonServer.getServer返回的信息和原始的URI得到最終的URI信息, 也就是上文提到帶有服務(wù)HOST:PORT的URI
public URI reconstructURIWithServer(Server server, URI original) {
String host = server.getHost();
int port = server.getPort();
String scheme = server.getScheme();
if (host.equals(original.getHost())
&& port == original.getPort()
&& scheme == original.getScheme()) {
return original;
}
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
不知不覺,已經(jīng)寫了這么多。勿以善小而不為,勿以惡小而為之,再接再厲把
為什么一個服務(wù)對應(yīng)一個ribbon容器,Feigin容器呢? 我覺得應(yīng)該是為了資源隔離把
SpringClientFactory類是應(yīng)該用來創(chuàng)建客戶端負載均衡器的工程類,該工廠類會為每一個不同名的ribbon客戶端生成不同的Spring上下文
RibbonLoadBalancerContext類是LoadBalancerContext的子類,該類用于存儲一些被負載均衡器使用的上下文內(nèi)容和API操作(比如得到真實的URI, 重試機制的判斷, 獲取DefaultClientConfigImpl類的配置信息等等等)
關(guān)于RibbonLoadBalancerContext配置可以看RibbonClientConfiguration配置類
@Bean
@ConditionalOnMissingBean
public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
IClientConfig config, RetryHandler retryHandler) {
return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
}
我們回顧整一個請求的過程, 首先RestTemplate.doExecute()實際上是調(diào)用request.execute()方法,
此時我們要進行貍貓換太子操作, 通過InterceptingClientHttpRequestFactory(內(nèi)部委托了一個真正執(zhí)行發(fā)起請求的requestFactory)創(chuàng)建出帶攔截屬性的InterceptingClientHttpRequest對象
該對象調(diào)用execute()會被攔截LoadBalancerInterceptor所攔截到請求, 我們負載均衡器根據(jù)rule選出一個適合的服務(wù)實例地址,
再把請求交給InterceptingClientHttpRequestFactory中委托的requestFactory處理
ClientHttpRequestFactory有很多實現(xiàn),比如nettyClient,HttpClient,RibbonClient,OkHttpClient, 通過這些客戶端發(fā)起對服務(wù)實例地址的請求。
接下來的篇幅會講到上文所提到的負載均衡器ILoadBalancer接口的實現(xiàn)
AbstractLoadBalancer
是ILoadBalancer接口的抽象實現(xiàn),它把實例進行了分組
-
ALL:所有服務(wù)實例 -
STATUS_UP:正常服務(wù)的實例 -
STATUS_NOT_UP:停止服務(wù)的實例
最后定義了2個抽象函數(shù)
-
getServerList(ServerGroup serverGroup):根據(jù)分組類型獲取服務(wù)實例列表 -
getLoadBalancerStats():獲取當(dāng)前負載均衡器各個服務(wù)實例當(dāng)前的屬性和統(tǒng)計信息
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
/**
* delegate to {@link #chooseServer(Object)} with parameter null.
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* List of servers that this Loadbalancer knows about
*
* @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* Obtain LoadBalancer related Statistics
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
BaseLoadBalancer
- 定義維護了2個存儲服務(wù)實例
Server對象的列表,一個用于存儲所有服務(wù)實例的清單,一個用于存儲正常服務(wù)的實例清單 - 定義用來存儲負載均衡器各服務(wù)實例屬性和統(tǒng)計信息的
LoadBalancerStats對象 - 定義了檢查服務(wù)實例是否正常服務(wù)的
IPing對象,在BaseLoadBalancer中默認(rèn)為null,需要在構(gòu)造時注入實現(xiàn) - 定義了檢查服務(wù)實例操作的執(zhí)行策略對象
IPingStrategy,默認(rèn)實現(xiàn)是SerialPingStrategy,采用的是線性遍歷ping服務(wù)實例的方式實現(xiàn)檢查 - 定義了負載均衡的處理規(guī)則
IRule對象 - 啟動
ping任務(wù),在BaseLoadBalancer的默認(rèn)構(gòu)造函數(shù)中,會直接啟動一個用于定時檢查Server是否健康的任務(wù),該任務(wù)默認(rèn)的執(zhí)行間隔是10s
DynamicServerListLoadBalancer
定義了服務(wù)列表操作對象ServerList
-
getInitialListOfServers用于獲取初始化的服務(wù)實例清單 -
getUpdatedListOfServers用于獲取更新的服務(wù)實例清單
在DynamicServerListLoadBalancer中使用的是哪個ServerList的實現(xiàn)呢
在EurekaRibbonClientConfiguration中可以看到端倪, 可以看到IPing,IServerListFilter的實現(xiàn)都是和Eureka相關(guān)的。
EurekaRibbonClientConfiguration和RibbonAutoConfiguration是相輔相成的,前者優(yōu)先級高于后者。
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
DomainExtractingServerList中的getInitialListOfServers和getUpdatedListOfServers
的具體實現(xiàn),其實委托給了DiscoveryEnabledNIWSServerList
而DisconveryEnabledNIWSServerList是通過obtainServersViaDiscovery通過服務(wù)發(fā)現(xiàn)機制來實現(xiàn)服務(wù)實例的獲取
主要通過EurekaClient從服務(wù)注冊中心獲取具體的服務(wù)實例InstanceInfo列表
這里的vipAddress可以是邏輯上的服務(wù)名,比如hello-service
接著對這些服務(wù)實例進行遍歷, 將狀態(tài)為UP的實例轉(zhuǎn)換成DiscoveryEnabledServer對象最后將這些實例組成列表返回.
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
DomainExtractingServer后續(xù)將這些list通過setZones函數(shù)繼續(xù)處理,轉(zhuǎn)換成
DomainExtractingServer, 設(shè)置一些必要的屬性,比如id,zone,isAliveFlag等等等
ServerListUpdater
在DynamicServerListLoadBalancer中可以看到定義
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
對于ServerListUpdater的實現(xiàn)有2個
PollingServerListUpdater:動態(tài)服務(wù)列表更新的默認(rèn)策略,在DynamicServerListLoadBalancer負載均衡器的默認(rèn)實現(xiàn)就是它,它是通過定時更新回調(diào)UpdateAction中的doUpdate函數(shù)EurekaNotificationServerListUpdater:它的觸發(fā)機制與PollingServerListUpdater不同,它需要利用Eureka的時間監(jiān)聽器來驅(qū)動服務(wù)的更新操作。通過接收EurekaEvent時間,異步回調(diào)doUpdate函數(shù)完成刷新實例列表。
PollingServerListUpdater內(nèi)部實現(xiàn)比較簡單, 定時任務(wù)初始化1s后執(zhí)行, 并以30s為周期重復(fù)執(zhí)行,同時還會記錄最后更新時間,是否存活等信息。
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
ServerListFilter
會在上文中的updateListOfServers()函數(shù)中過濾一些servers節(jié)點,
在ribbon中默認(rèn)實現(xiàn)就是ZonePreferenceServerListFilter, 這個類的父類是ZoneAffinityServerListFilter
該過濾器基于區(qū)域感知的方式實現(xiàn)服務(wù)實例的過濾,也就是說,它會根據(jù)服務(wù)的實例所處的Zone和消費者自身的所處Zone進行比較,過濾掉這些不是同處一個區(qū)域的實例。
首先過濾出消費者和服務(wù)的實例處于同一個zone的server節(jié)點,但不是不會馬上過濾的結(jié)果, 而是通過shouldEnableZoneAffinity(filteredServers)函數(shù)來判斷是否要啟用區(qū)域感知
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
使用LoadBalancerStats的getZoneSnapShot方法來獲取這些過濾后的同區(qū)域?qū)嵗幕A(chǔ)指標(biāo)。比如實例數(shù)量,斷路器斷開數(shù),活動請求數(shù),實例平均負載等
根據(jù)這些指標(biāo)和設(shè)置的閾值進行對比,如果有一個條件符合, 就不啟動區(qū)域感知過濾的服務(wù)實例清單。
當(dāng)集群出現(xiàn)區(qū)域故障時,依然可以依靠其他區(qū)域的實例進行正常服務(wù)提供了完善的高可用保障
blackOutServerPercentage:故障實例百分比(斷路器斷開數(shù)/實例數(shù)量)>=0.8
activeRequestPerServer:實例平均負載>=0.6
avaiableServers:可用實例數(shù)(實例數(shù)-斷路器斷開數(shù)) < 2
private boolean shouldEnableZoneAffinity(List<T> filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
ZonePreferenceServerListFilter
實現(xiàn)通過配置Zone或者Eureka實例元數(shù)據(jù)的Zone來過濾出同區(qū)域的服務(wù)實例
ZoneAwareLoadBalancer
是對DynamicServerListLoadBalancer的擴展
DynamicServerListLoadBalancer是重用其父類的chooseServer方法,
采用RoundRobinRule規(guī)則,以線性輪詢的方式來選擇調(diào)用的服務(wù)實例, 它會把所有實例視為一個Zone下的節(jié)點來看待,這樣會周期性的產(chǎn)生跨區(qū)域Zone訪問
ZoneAwareLoadBalancer重寫了setServerListForZones(Map<String,List<Server>> zoneServersMap)
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
DynamicServerListLoadBalancer中這個方法是根據(jù)Zone劃分實例列表,
交給LoadBalancerStats中的zoneStatsMap集合管理,每個Zone對應(yīng)一個ZoneStats,用于存儲每個Zone節(jié)點的狀態(tài)。
為每個Zone分配一個BaseLoadBalancer,每個BaseLoadBalancer維護各自Zone的服務(wù)實例列表
@VisibleForTesting
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
第二個循環(huán)對Zone中服務(wù)實例列表進行檢查
再來看看ZoneAwareLoadBalancer選擇實例的邏輯
- 1.如果當(dāng)前維護的
Zone個數(shù)小于1,默認(rèn)走父類DynamicServerListLoadBalancer的chooseServer實現(xiàn) - 2.從
LoadBalancerStats拿出所有Zone快照信息 - 3.獲取所有可用的
Zone(會過濾一些不符合規(guī)則的Zone,從實例數(shù),負載,實例故障率維度去考量) - 4.隨機選擇一個
Zone - 5.獲取當(dāng)前
Zone的負載均衡器, 根據(jù)IRule選擇具體的服務(wù)實例
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
IRule
- 1.RandomRule 隨機
- 2.RoundRobinRule 輪詢
- 3.RetryRule 帶重試的輪詢
- 4.WeightedResponseTimeRule 帶權(quán)重的輪詢.該策略是對RoundRobinRule的擴展, 根據(jù)實例的運行情況來計算權(quán)重,并且根據(jù)權(quán)重來挑選實例達到更優(yōu)的分配效果。
WeightedResponseTimeRule需要注入ILoadBalancer屬性。在初始化的時候 會創(chuàng)建定時任務(wù), 每30s執(zhí)行一次 計算每個服務(wù)實例的權(quán)重。
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
比如有4個實例A,B,C,D,它們平均響應(yīng)時間為10,40,80,100. 總響應(yīng)時間為10+40+80+100=230。所以A的權(quán)重是230-10=220 【0,220】,B的權(quán)重是220+230-40=410 (220,410] ,C的權(quán)重是410+230-80=560 (410,560] ,D的權(quán)重是560+230-100=690 (560,690)
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
實例的選擇, 生成一個[0, 最大權(quán)重值)區(qū)間內(nèi)的隨機數(shù)。遍歷權(quán)重列表,找到匹配的Server節(jié)點。
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
- 5.
ClientConfigEnabledRoundRobin: 內(nèi)部的實現(xiàn)還是輪詢 - 6.
BestAvailableRule: 注入了LoadBalancerStats,遍歷所有服務(wù)實例, 過濾故障的實例,選擇最空閑的實例。如果LoadBalancerStats為空的話,采用父類ClientConfigEanbledRoundRobin實現(xiàn) - 7.
PredicateBasedRule:先過濾,后輪詢
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
/**
* Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
*
*/
public abstract AbstractServerPredicate getPredicate();
/**
* Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
* The performance for this method is O(n) where n is number of servers to be filtered.
*/
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
AbstractServerPredicate中的實現(xiàn)
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
- 8.
AvailabilityFilterRule繼承上文的PredicateBasedRule, 先輪詢一個Server,看這個Server是否滿足判斷條件。如果滿足直接返回,
如果不滿足一直循環(huán)至11次。 11次都沒有選出合適的Server, 降級策略, 調(diào)用父類PredicateBasedRule的默認(rèn)實現(xiàn)
@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
這是一個組合Predicate,在上文中的predicate.apply(new PredicateKey(server)實際上會調(diào)用CompositePredicate中的AvailabilityPredicate
public AvailabilityFilteringRule() {
super();
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
可見AvailabilityPredicate是根據(jù)當(dāng)前Server的狀態(tài)來過濾的。
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
上文中的降級策略最終會走到PredicateBaseRule, 這個規(guī)則中的predicate是其子類AvailabilityFilteringRule中的CompositePredicate
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
CompositePredicate的父類是AbstractServerPredicate,最終又回到AbstractServerPredicate的chooseRoundRobinAfterFiltering
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
但是其中的getEligibleServers(servers, loadBalancerKey)方法又被子類CompositePredicate重寫,一切仿佛又回到了原點, 這里就是上文所說的降級策略, 因為fallbackPredicate默認(rèn)實現(xiàn)為true,所以這里的邏輯是先走一遍AvailabilityPredicate, 如果所有可用的server列表, 如果還不滿足的話, 就直接輪詢了。
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
- 9.
ZoneAvoidanceRule:還是遵循先過濾后輪詢思想, 首先執(zhí)行這2個ZoneAvoidancePredicate,AvailabilityPredicate邏輯,先找出合適的Zone,再找Zone下面健康的Server實例。如果都沒找到,執(zhí)行降級策略。
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
for (String zone : lbStats.getAvailableZones()) {
ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
map.put(zone, snapshot);
}
return map;
}
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
double triggeringLoad, double triggeringBlackoutPercentage) {
if (lbStats == null) {
return null;
}
Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
return getAvailableZones(snapshot, triggeringLoad,
triggeringBlackoutPercentage);
}
@Override
public AbstractServerPredicate getPredicate() {
return compositePredicate;
}
}
public class ZoneAvoidancePredicate extends AbstractServerPredicate {
private volatile DynamicDoubleProperty triggeringLoad = new DynamicDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer.triggeringLoadPerServerThreshold", 0.2d);
private volatile DynamicDoubleProperty triggeringBlackoutPercentage = new DynamicDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer.avoidZoneWithBlackoutPercetage", 0.99999d);
private static final Logger logger = LoggerFactory.getLogger(ZoneAvoidancePredicate.class);
private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory
.getInstance().getBooleanProperty(
"niws.loadbalancer.zoneAvoidanceRule.enabled", true);
public ZoneAvoidancePredicate(IRule rule, IClientConfig clientConfig) {
super(rule, clientConfig);
initDynamicProperties(clientConfig);
}
public ZoneAvoidancePredicate(LoadBalancerStats lbStats,
IClientConfig clientConfig) {
super(lbStats, clientConfig);
initDynamicProperties(clientConfig);
}
ZoneAvoidancePredicate(IRule rule) {
super(rule);
}
private void initDynamicProperties(IClientConfig clientConfig) {
if (clientConfig != null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + clientConfig.getClientName() + ".triggeringLoadPerServerThreshold", 0.2d);
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + clientConfig.getClientName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
}
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
}
注意事項
如果配置只對某個服務(wù)的Ribbon客戶端生效,則CustomRibbonConfiguration類不能包含在主應(yīng)用程序上下文的@CompantScan中,需要添加了自定義注解。
使用自定義注解和excludeFilters使CustomRibbonConfiguration類不@CompantScan掃描到!
CustomRibbonConfiguration只會注冊在該服務(wù)的Spring容器中!也就是上文中提到的SpringClientFactory!
尾言
我不是頭腦空空,我不是一只米蟲!