Ribbon源碼分析

前言

這篇文章參考了Spring+Cloud微服務(wù)實戰(zhàn)這本書。但是在此基礎(chǔ)上延伸了很多知識點。

image.png

源碼分析

@LoadBalanced注解被@Qualifier注解

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

對于Spring中的AnnotationMetadata不太熟悉的同學(xué),可以跑一下下面的CASE

public class MetaTest1 {

    public static void main(String[] args) {
        StandardAnnotationMetadata metadata = new StandardAnnotationMetadata(
                MetaDemo.class, true
        );

        System.out.println("============ClassMetadata===================");
        ClassMetadata classMetadata = metadata;
        System.out.println(classMetadata.getClassName());
        // 是不是內(nèi)部類
        System.out.println(classMetadata.getEnclosingClassName());
        // 返回內(nèi)部類集合
        System.out.println(StringUtils.arrayToCommaDelimitedString(
                classMetadata.getMemberClassNames()
        ));
        // 返回接口集合
        System.out.println(StringUtils.arrayToCommaDelimitedString(classMetadata.getInterfaceNames()));

        // 有沒有超類, 如果超類是Object,那么為false
        System.out.println(classMetadata.hasSuperClass());
        System.out.println(classMetadata.getSuperClassName());

        System.out.println(classMetadata.isAnnotation());
        System.out.println(classMetadata.isFinal());
        // 就是可以獨立new出來的, top class或者static inner class
        System.out.println(classMetadata.isIndependent());

        System.out.println("==========AnnotatedTypeMetadata====================");
        AnnotatedTypeMetadata annotatedTypeMetadata = metadata;

        System.out.println(annotatedTypeMetadata.isAnnotated(Service.class.getName()));
        System.out.println(annotatedTypeMetadata.isAnnotated(Component.class.getName()));
        System.out.println(annotatedTypeMetadata.isAnnotated(EnableAsync.class.getName()));


        System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Service.class.getName()));
        System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Component.class.getName()));
        System.out.println(annotatedTypeMetadata.getAnnotationAttributes(Repository.class.getName()));
        System.out.println(annotatedTypeMetadata.getAnnotationAttributes(EnableAsync.class.getName()));

        // 數(shù)組返回  不會進行屬性合并的操作
        System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(Service.class.getName()));
        System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(Component.class.getName()));
        System.out.println(annotatedTypeMetadata.getAllAnnotationAttributes(EnableAsync.class.getName()));


        System.out.println("=================AnnotationMetadata=================");
        AnnotationMetadata annotationMetadata = metadata;

        // 獲取元注解
        System.out.println(annotationMetadata.getAnnotationTypes());
        // 獲取service注解的元注解
        System.out.println(annotationMetadata.getMetaAnnotationTypes(Service.class.getName()));
        // 獲取component注解的元注解
        /**
         * meta就是獲取注解上面的注解,會排除掉java.lang這些注解們
         */
        System.out.println(annotationMetadata.getMetaAnnotationTypes(Component.class.getName()));

        // 不會去找元注解的,true
        System.out.println(annotationMetadata.hasAnnotation(Service.class.getName()));
        // false
        System.out.println(annotationMetadata.hasAnnotation(Component.class.getName()));

        /**
         * 確定基礎(chǔ)類是否有一個自身的注釋 使用給定類型的元注釋進行注釋。
         */
        // false
        System.out.println(annotationMetadata.hasMetaAnnotation(Service.class.getName()));
        // true
        System.out.println(annotationMetadata.hasMetaAnnotation(Component.class.getName()));
        System.out.println(annotationMetadata.hasAnnotatedMethods(Autowired.class.getName()));

        // StandardMethodMetadata
        annotationMetadata.getAnnotatedMethods(Autowired.class.getName())
                .forEach(method -> {
                    System.out.println(method.getClass());
                    System.out.println(method.getDeclaringClassName());
                    System.out.println(method.getMethodName());
                    System.out.println(method.getReturnTypeName());
                });
    }


    @Repository("repository")
    @Service("serviceName")
    @EnableAsync
    public static class MetaDemo extends HashMap<String, String> implements
            Serializable {

        private static class InnerClass {

        }

        @Autowired
        private String getName() {
            return "xiaoma";
        }
    }
 }

運行結(jié)果如下:

============ClassMetadata===================
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo$InnerClass
java.io.Serializable
true
java.util.HashMap
false
false
true
==========AnnotatedTypeMetadata====================
true
true
true
{value=serviceName}
{value=repository}
{value=repository}
{order=2147483647, annotation=interface java.lang.annotation.Annotation, mode=PROXY, proxyTargetClass=false}
{value=[serviceName]}
{value=[, ]}
{order=[2147483647], annotation=[interface java.lang.annotation.Annotation], mode=[PROXY], proxyTargetClass=[false]}
=================AnnotationMetadata=================
[org.springframework.stereotype.Repository, org.springframework.stereotype.Service, org.springframework.scheduling.annotation.EnableAsync]
[org.springframework.stereotype.Component, org.springframework.stereotype.Indexed]
[]
true
false
false
true
true
class org.springframework.core.type.StandardMethodMetadata
com.cmazxiaoma.springcloud.zuul.msg.meta.MetaTest1$MetaDemo
getName
java.lang.String

負載均衡器客戶端LoadBalancerClient接口

        // 使用從負載均衡器中挑選一個對應(yīng)服務(wù)的實例
        <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
        <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
        // 為系統(tǒng)構(gòu)建一個合適的host:port形式的url
        // ServiceInstance對象是帶有host和port的具體服務(wù)實例
        // 后者url對象則是使用邏輯服務(wù)定義為host的URL,比如http://SERVICE-PROVIDER/serviceprovider/hello
        URI reconstructURI(ServiceInstance instance, URI original);

服務(wù)實例選擇器ServiceInstanceChooser接口

        // 從負載均衡器中挑選一個服務(wù)實例
        ServiceInstance choose(String serviceId)

LoadBalancerAutoConfiguration類實現(xiàn)客戶端負載均衡器的自動化配置

  • RetryLoadBalancerInterceptor:用于實現(xiàn)對客戶端發(fā)起請求時攔截,以實現(xiàn)客戶端負載均衡
  • RestTemplateCustomizer:用于給RestTemplate增加LoadBalancerInterceptor攔截器
  • 維護了一個被@LoadBalanced注解修飾的RestTemplate對象列表,并在這里進行初始化通過調(diào)用RestTemplatCustomizer的實例給需要客戶端負載均衡的RestTemplate增加LoadBalancerInterceptor攔截器
  • RibbonLoadBalancedRetryFactory 給客戶端負債均衡增加重試機制
  • LoadBalancerRequestFactoryrequest進行包裝加工成LoadBalancerRequest,調(diào)用ClientHttpRequestExecution中的execute(serviceRequest, body),返回ClientHttpResponse

LoadBalancerInterceptor:
當(dāng)一個@LoadBalanced注解修飾的RestTemplate對象向外發(fā)起Http請求,
會被這個類所攔截,由于我們在使用RestTemplate時采用了服務(wù)器名作為host,
所以直接從HttpRequestURI對象中通過getHost就可以拿到服務(wù)名。

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}

RibbonLoadBalancerClientexecute方法可以看到getServer中是通過ILoadBalancer接口來獲取服務(wù)。

對了這里說明以下 在一個應(yīng)用里面比如調(diào)用了A,B服務(wù) 那么應(yīng)用會創(chuàng)建一個A的Ribbon容器和B的ribbon容器。而且容器還是懶加載,所以第一次請求總是會超時

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }
    
        protected Server getServer(ILoadBalancer loadBalancer) {
            if (loadBalancer == null) {
                return null;
            }
            return loadBalancer.chooseServer("default"); // TODO: better handling of key
        }
        
        

Ribbon的容器工廠是SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>

RibbonAutoConfiguration配置中, 有將List<RibbonClientSpecification> configurations = new ArrayList<>()屬性設(shè)置給SpringClientFactory

那SpringBoot是怎么掃描@RibbonClient注解的呢? RibbonClientSpecification是怎么注冊到Spring容器呢?
服務(wù)ID和對應(yīng)的ribbon配置是怎么關(guān)聯(lián)起來的呢?詳情可以看到RibbonClientConfigurationRegistrar

可以看到我們將每個服務(wù)ID和對應(yīng)的ribbon配置通過RibbonClientSpecification來維護,同時注冊到Spring容器中。

    private void registerClientConfiguration(BeanDefinitionRegistry registry,
            Object name, Object configuration) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder
                .genericBeanDefinition(RibbonClientSpecification.class);
        builder.addConstructorArgValue(name);
        builder.addConstructorArgValue(configuration);
        registry.registerBeanDefinition(name + ".RibbonClientSpecification",
                builder.getBeanDefinition());
    }

FeignRibbon容器創(chuàng)建就是在這里進行的, 還把他們的配置類裝在自己的容器里, 此時SpringBoot容器也有一份喲。同時將SpringBoot容器設(shè)置成父容器。

    protected AnnotationConfigApplicationContext createContext(String name) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        if (this.configurations.containsKey(name)) {
            for (Class<?> configuration : this.configurations.get(name)
                    .getConfiguration()) {
                context.register(configuration);
            }
        }
        for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
            if (entry.getKey().startsWith("default.")) {
                for (Class<?> configuration : entry.getValue().getConfiguration()) {
                    context.register(configuration);
                }
            }
        }
        context.register(PropertyPlaceholderAutoConfiguration.class,
                this.defaultConfigType);
        context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
                this.propertySourceName,
                Collections.<String, Object> singletonMap(this.propertyName, name)));
        if (this.parent != null) {
            // Uses Environment from parent as well as beans
            context.setParent(this.parent);
        }
        context.setDisplayName(generateDisplayName(name));
        context.refresh();
        return context;
    }

RibbonClientConfiguration配置類中, 配置基本和客戶端怎么使用ribbon有關(guān)系。

負載均衡器ILoadBalancer接口

    // 向負載均衡器中維護的實例列表增加服務(wù)實例
    public void addServers(List<Server> newServers);
    // 通過某種策略,從負載均衡器中挑選出一個具體的實例
    public Server chooseServer(Object key);
    // 用來通知和標(biāo)識負載均衡器中某個具體實例已經(jīng)停止服務(wù),不然負載均衡器
    在下一次獲取服務(wù)實例清單前都會認(rèn)為服務(wù)實例均是正常
    public void markServerDown(Server server);
    // 獲取當(dāng)前正常服務(wù)的實例列表
    public List<Server> getReachableServers();
    // 獲取所有已知的服務(wù)實例列表,包括正常服務(wù)和停止服務(wù)的實例。
    public List<Server> getAllServers();

BaseLoadBalancer類實現(xiàn)了基礎(chǔ)的負載均衡功能
DynamicServerListLoadBalancerZoneAwardLoadBalancer

RibbonClientConfiguration中可以看到是ZoneAwardLoadBalancer

可以看到有ILoadBalancer,IPing,IRule,ServerList,ServerListFilter之類的配置

這里我們可以看到PropertiesFactory這個類就明白了為什么可以通過serviceId.ribbon.NFLoadBalancerRuleClassName=RuleImpl.class配置負載均衡策略

實際上是靠Constructor(參數(shù)是IClientConfig)反射去完成實例化的。
如果是IRule,IPing的話可能會拋出異常,但是這里忽略掉了。

不能靠有參構(gòu)造器初始化的話,接著調(diào)用BeanUtils.instantiate(clazz)完成初始化;

public class PropertiesFactory {
    @Autowired
    private Environment environment;

    private Map<Class, String> classToProperty = new HashMap<>();

    public PropertiesFactory() {
        classToProperty.put(ILoadBalancer.class, "NFLoadBalancerClassName");
        classToProperty.put(IPing.class, "NFLoadBalancerPingClassName");
        classToProperty.put(IRule.class, "NFLoadBalancerRuleClassName");
        classToProperty.put(ServerList.class, "NIWSServerListClassName");
        classToProperty.put(ServerListFilter.class, "NIWSServerListFilterClassName");
    }

    public boolean isSet(Class clazz, String name) {
        return StringUtils.hasText(getClassName(clazz, name));
    }

    public String getClassName(Class clazz, String name) {
        if (this.classToProperty.containsKey(clazz)) {
            String classNameProperty = this.classToProperty.get(clazz);
            String className = environment.getProperty(name + "." + NAMESPACE + "." + classNameProperty);
            return className;
        }
        return null;
    }

    @SuppressWarnings("unchecked")
    public <C> C get(Class<C> clazz, IClientConfig config, String name) {
        String className = getClassName(clazz, name);
        if (StringUtils.hasText(className)) {
            try {
                Class<?> toInstantiate = Class.forName(className);
                return (C) SpringClientFactory.instantiateWithConfig(toInstantiate, config);
            } catch (ClassNotFoundException e) {
                throw new IllegalArgumentException("Unknown class to load "+className+" for class " + clazz + " named " + name);
            }
        }
        return null;
    }
}

而我們上面在RibbonLoadBalancerClient中的getLoadBalancer(serviceId)方法最后就是調(diào)用以下的方法

首先從該服務(wù)的Ribbon容器, SpringBoot容器 尋找ILoadBalancer的實現(xiàn)類

如果不存在的話, 接著找IClientConfig的實現(xiàn)類

找到了IClientConfig的實現(xiàn)類就可以圍魏救趙了

接著調(diào)用instantiateWithConfig(getContext(name), type, config);

通過有參構(gòu)造器初始化ILoadBalancer, 參數(shù)是IClientConfig

這時候你就會問了通過反射初始化 會導(dǎo)致很多屬性沒有注入,那絕對有問題啊

其實這些屬性比如ServerList,ServerListFilter,Rule,Ping之類的屬性
會通過DefaultClientConfigImpl中去拿, 也就是構(gòu)造器中的那個IClientConfig參數(shù)


    public ILoadBalancer getLoadBalancer(String name) {
        return getInstance(name, ILoadBalancer.class);
    }
@Override
    public <C> C getInstance(String name, Class<C> type) {
        C instance = super.getInstance(name, type);
        if (instance != null) {
            return instance;
        }
        IClientConfig config = getInstance(name, IClientConfig.class);
        return instantiateWithConfig(getContext(name), type, config);
    }

SpringClientFactoryinstantiateWithConfig方法中看到以下代碼
可能是為了兼容 IPing,IRule,ServerList,ServerListFilter這些實現(xiàn)類,同時還注入屬性。

if (result == null) {
            result = BeanUtils.instantiate(clazz);
            
            if (result instanceof IClientConfigAware) {
                ((IClientConfigAware) result).initWithNiwsConfig(config);
            }
            
            if (context != null) {
                context.getAutowireCapableBeanFactory().autowireBean(result);
            }
        }

上面說到Ribbin容器是懶加載,那么我們可不可以設(shè)置成急加載呢?
RibbonAutoConfiguration中可以配置RibbonApplicationContextInitializer
通過接收SpringBoot啟動完畢事件 ApplicationReadyEvent 初始化clients中的容器
這樣就可以避免第一次調(diào)用超時,但是增加了應(yīng)用啟動時間。有得有失把

@Configuration
@ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {
...
}
@Bean
    public SpringClientFactory springClientFactory() {
        SpringClientFactory factory = new SpringClientFactory();
        factory.setConfigurations(this.configurations);
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }

    @Bean
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    @ConditionalOnMissingBean
    public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
        return new RibbonLoadBalancedRetryFactory(clientFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public PropertiesFactory propertiesFactory() {
        return new PropertiesFactory();
    }

    @Bean
    @ConditionalOnProperty(value = "ribbon.eager-load.enabled")
    public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
        return new RibbonApplicationContextInitializer(springClientFactory(),
                ribbonEagerLoadProperties.getClients());
    }

回歸正題 在RibbonLoadBalancerClient中的execute()中,我們已經(jīng)獲取了ServiceInstance,那么怎么得到請求的url呢

把目光轉(zhuǎn)移到request.apply(serviceInstance)這個方法

也就是上文說到的LoadBalancerRequestFactory中的createRequest()實現(xiàn)該接口中的這個方法

@Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

LoadBalancerInterceptor攔截器中,ClientHttpRequestExecution的實例
具體會執(zhí)行execute(servletRequest,body)時,會調(diào)用InterceptingClientHttpRequest
InterceptingRequestExecution類中的execute函數(shù), 攔截器的鏈?zhǔn)教幚砭腕w現(xiàn)在這里

if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
}

還記得我們之前說提到的RestTemplate中設(shè)置LoadBalancerInterceptor嗎?
這里的攔截器責(zé)任鏈就是我們之前設(shè)置的。

我們調(diào)用restTemplate.execute()最終會經(jīng)過這里被攔截
RestTemplate中的HttpRequestFactory就是InterceptingClientHttpRequestFactory

createRequest方法如下

    @Override
    protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
        return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
    }

可以看到RestTemplate中的doExecute()最終會執(zhí)行request.execute()

ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);

看到InterceptingClientHttpRequest中的executeInternal
那么流程又回到我們剛才的InterceptingRequestExecution中的execute函數(shù)

    @Override
    protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
        InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
        return requestExecution.execute(this, bufferedOutput);
    }

InterceptingRequestExecution中的execute函數(shù)

注意到ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);

@Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }

這里的requestFactory可不是LoadBalancerRequestFactory, 別搞混了

這里的requestFatory是實現(xiàn)了ClientHttpRequestFactory接口的factory,而LoadBalancerRequestFactory沒有實現(xiàn)這個接口.

這里的requestFactory其實是通過setRequestFactory(requestFactory)函數(shù)去設(shè)置的。

在我們應(yīng)用里面沒有調(diào)用這個函數(shù),其默認(rèn)的實現(xiàn)是SimpleClientHttpRequestFactory

比較常見的實現(xiàn)有HttpComponentsClientHttpRequestFactory,RibbonClientHttpRequestFactory等等等

public abstract class HttpAccessor {

    /** Logger available to subclasses */
    protected final Log logger = LogFactory.getLog(getClass());

    private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
    ...
}

這里的request.getURI其實是調(diào)用的ServiceRequestWrapper中的getURI方法

ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);

先前我們在LoadBalancerInterceptor有設(shè)置loadBalancerClientrequestFactory屬性,我們調(diào)用loadBalancer.chooseServer
獲取Server節(jié)點信息, 把它包裝成RibbonServer, 也就是這里的serviceInstance

HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);

ServiceRequestWrapper中的getURI方法如下

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

返回的URI實則是LoadBalancerClient中獲取得到的。

這里我們可以看到從 服務(wù)的ribbon容器獲取其RibbonLoadBalancerContext上下文信息

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }

    @Override
    public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }

RibbonLoadBalancerContext根據(jù)RibbonServer.getServer返回的信息和原始的URI得到最終的URI信息, 也就是上文提到帶有服務(wù)HOST:PORT的URI

public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server.getPort();
        String scheme = server.getScheme();
        
        if (host.equals(original.getHost()) 
                && port == original.getPort()
                && scheme == original.getScheme()) {
            return original;
        }
        if (scheme == null) {
            scheme = original.getScheme();
        }
        if (scheme == null) {
            scheme = deriveSchemeAndPortFromPartialUri(original).first();
        }

        try {
            StringBuilder sb = new StringBuilder();
            sb.append(scheme).append("://");
            if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                sb.append(original.getRawUserInfo()).append("@");
            }
            sb.append(host);
            if (port >= 0) {
                sb.append(":").append(port);
            }
            sb.append(original.getRawPath());
            if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                sb.append("?").append(original.getRawQuery());
            }
            if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                sb.append("#").append(original.getRawFragment());
            }
            URI newURI = new URI(sb.toString());
            return newURI;            
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

不知不覺,已經(jīng)寫了這么多。勿以善小而不為,勿以惡小而為之,再接再厲把

為什么一個服務(wù)對應(yīng)一個ribbon容器,Feigin容器呢? 我覺得應(yīng)該是為了資源隔離把

SpringClientFactory類是應(yīng)該用來創(chuàng)建客戶端負載均衡器的工程類,該工廠類會為每一個不同名的ribbon客戶端生成不同的Spring上下文

RibbonLoadBalancerContext類是LoadBalancerContext的子類,該類用于存儲一些被負載均衡器使用的上下文內(nèi)容和API操作(比如得到真實的URI, 重試機制的判斷, 獲取DefaultClientConfigImpl類的配置信息等等等)

關(guān)于RibbonLoadBalancerContext配置可以看RibbonClientConfiguration配置類

    @Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
                                                               IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
    }

我們回顧整一個請求的過程, 首先RestTemplate.doExecute()實際上是調(diào)用request.execute()方法,

此時我們要進行貍貓換太子操作, 通過InterceptingClientHttpRequestFactory(內(nèi)部委托了一個真正執(zhí)行發(fā)起請求的requestFactory)創(chuàng)建出帶攔截屬性的InterceptingClientHttpRequest對象

該對象調(diào)用execute()會被攔截LoadBalancerInterceptor所攔截到請求, 我們負載均衡器根據(jù)rule選出一個適合的服務(wù)實例地址,
再把請求交給InterceptingClientHttpRequestFactory中委托的requestFactory處理

ClientHttpRequestFactory有很多實現(xiàn),比如nettyClient,HttpClient,RibbonClient,OkHttpClient, 通過這些客戶端發(fā)起對服務(wù)實例地址的請求。


接下來的篇幅會講到上文所提到的負載均衡器ILoadBalancer接口的實現(xiàn)

AbstractLoadBalancer

ILoadBalancer接口的抽象實現(xiàn),它把實例進行了分組

  • ALL:所有服務(wù)實例
  • STATUS_UP:正常服務(wù)的實例
  • STATUS_NOT_UP:停止服務(wù)的實例

最后定義了2個抽象函數(shù)

  • getServerList(ServerGroup serverGroup):根據(jù)分組類型獲取服務(wù)實例列表
  • getLoadBalancerStats():獲取當(dāng)前負載均衡器各個服務(wù)實例當(dāng)前的屬性和統(tǒng)計信息
public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }
        
    /**
     * delegate to {@link #chooseServer(Object)} with parameter null.
     */
    public Server chooseServer() {
        return chooseServer(null);
    }

    
    /**
     * List of servers that this Loadbalancer knows about
     * 
     * @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
     */
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    
    /**
     * Obtain LoadBalancer related Statistics
     */
    public abstract LoadBalancerStats getLoadBalancerStats();    
}

BaseLoadBalancer

  • 定義維護了2個存儲服務(wù)實例Server對象的列表,一個用于存儲所有服務(wù)實例的清單,一個用于存儲正常服務(wù)的實例清單
  • 定義用來存儲負載均衡器各服務(wù)實例屬性和統(tǒng)計信息的LoadBalancerStats對象
  • 定義了檢查服務(wù)實例是否正常服務(wù)的IPing對象,在BaseLoadBalancer中默認(rèn)為null,需要在構(gòu)造時注入實現(xiàn)
  • 定義了檢查服務(wù)實例操作的執(zhí)行策略對象IPingStrategy,默認(rèn)實現(xiàn)是SerialPingStrategy,采用的是線性遍歷ping服務(wù)實例的方式實現(xiàn)檢查
  • 定義了負載均衡的處理規(guī)則IRule對象
  • 啟動ping任務(wù),在BaseLoadBalancer的默認(rèn)構(gòu)造函數(shù)中,會直接啟動一個用于定時檢查Server是否健康的任務(wù),該任務(wù)默認(rèn)的執(zhí)行間隔是10s

DynamicServerListLoadBalancer

定義了服務(wù)列表操作對象ServerList

  • getInitialListOfServers用于獲取初始化的服務(wù)實例清單
  • getUpdatedListOfServers用于獲取更新的服務(wù)實例清單

DynamicServerListLoadBalancer中使用的是哪個ServerList的實現(xiàn)呢

EurekaRibbonClientConfiguration中可以看到端倪, 可以看到IPing,IServerListFilter的實現(xiàn)都是和Eureka相關(guān)的。

EurekaRibbonClientConfigurationRibbonAutoConfiguration是相輔相成的,前者優(yōu)先級高于后者。


    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

DomainExtractingServerList中的getInitialListOfServersgetUpdatedListOfServers
的具體實現(xiàn),其實委托給了DiscoveryEnabledNIWSServerList

DisconveryEnabledNIWSServerList是通過obtainServersViaDiscovery通過服務(wù)發(fā)現(xiàn)機制來實現(xiàn)服務(wù)實例的獲取

主要通過EurekaClient從服務(wù)注冊中心獲取具體的服務(wù)實例InstanceInfo列表
這里的vipAddress可以是邏輯上的服務(wù)名,比如hello-service

接著對這些服務(wù)實例進行遍歷, 將狀態(tài)為UP的實例轉(zhuǎn)換成DiscoveryEnabledServer對象最后將這些實例組成列表返回.

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

DomainExtractingServer后續(xù)將這些list通過setZones函數(shù)繼續(xù)處理,轉(zhuǎn)換成
DomainExtractingServer, 設(shè)置一些必要的屬性,比如id,zone,isAliveFlag等等等

ServerListUpdater

DynamicServerListLoadBalancer中可以看到定義

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

對于ServerListUpdater的實現(xiàn)有2個

  • PollingServerListUpdater:動態(tài)服務(wù)列表更新的默認(rèn)策略,在DynamicServerListLoadBalancer負載均衡器的默認(rèn)實現(xiàn)就是它,它是通過定時更新回調(diào)UpdateAction中的doUpdate函數(shù)

  • EurekaNotificationServerListUpdater:它的觸發(fā)機制與PollingServerListUpdater不同,它需要利用Eureka的時間監(jiān)聽器來驅(qū)動服務(wù)的更新操作。通過接收EurekaEvent時間,異步回調(diào)doUpdate函數(shù)完成刷新實例列表。

PollingServerListUpdater內(nèi)部實現(xiàn)比較簡單, 定時任務(wù)初始化1s后執(zhí)行, 并以30s為周期重復(fù)執(zhí)行,同時還會記錄最后更新時間,是否存活等信息。

@Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

ServerListFilter

會在上文中的updateListOfServers()函數(shù)中過濾一些servers節(jié)點,

ribbon中默認(rèn)實現(xiàn)就是ZonePreferenceServerListFilter, 這個類的父類是ZoneAffinityServerListFilter

該過濾器基于區(qū)域感知的方式實現(xiàn)服務(wù)實例的過濾,也就是說,它會根據(jù)服務(wù)的實例所處的Zone和消費者自身的所處Zone進行比較,過濾掉這些不是同處一個區(qū)域的實例。

首先過濾出消費者和服務(wù)的實例處于同一個zoneserver節(jié)點,但不是不會馬上過濾的結(jié)果, 而是通過shouldEnableZoneAffinity(filteredServers)函數(shù)來判斷是否要啟用區(qū)域感知

    @Override
    public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }

使用LoadBalancerStatsgetZoneSnapShot方法來獲取這些過濾后的同區(qū)域?qū)嵗幕A(chǔ)指標(biāo)。比如實例數(shù)量,斷路器斷開數(shù),活動請求數(shù),實例平均負載等

根據(jù)這些指標(biāo)和設(shè)置的閾值進行對比,如果有一個條件符合, 就不啟動區(qū)域感知過濾的服務(wù)實例清單。

當(dāng)集群出現(xiàn)區(qū)域故障時,依然可以依靠其他區(qū)域的實例進行正常服務(wù)提供了完善的高可用保障

blackOutServerPercentage:故障實例百分比(斷路器斷開數(shù)/實例數(shù)量)>=0.8
activeRequestPerServer:實例平均負載>=0.6
avaiableServers:可用實例數(shù)(實例數(shù)-斷路器斷開數(shù)) < 2

    private boolean shouldEnableZoneAffinity(List<T> filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }

ZonePreferenceServerListFilter

實現(xiàn)通過配置Zone或者Eureka實例元數(shù)據(jù)的Zone來過濾出同區(qū)域的服務(wù)實例

ZoneAwareLoadBalancer

是對DynamicServerListLoadBalancer的擴展

DynamicServerListLoadBalancer是重用其父類的chooseServer方法,
采用RoundRobinRule規(guī)則,以線性輪詢的方式來選擇調(diào)用的服務(wù)實例, 它會把所有實例視為一個Zone下的節(jié)點來看待,這樣會周期性的產(chǎn)生跨區(qū)域Zone訪問

ZoneAwareLoadBalancer重寫了setServerListForZones(Map<String,List<Server>> zoneServersMap)

@Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }    

DynamicServerListLoadBalancer中這個方法是根據(jù)Zone劃分實例列表,
交給LoadBalancerStats中的zoneStatsMap集合管理,每個Zone對應(yīng)一個ZoneStats,用于存儲每個Zone節(jié)點的狀態(tài)。

為每個Zone分配一個BaseLoadBalancer,每個BaseLoadBalancer維護各自Zone的服務(wù)實例列表

@VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 

第二個循環(huán)對Zone中服務(wù)實例列表進行檢查

再來看看ZoneAwareLoadBalancer選擇實例的邏輯

  • 1.如果當(dāng)前維護的Zone個數(shù)小于1,默認(rèn)走父類DynamicServerListLoadBalancer的chooseServer實現(xiàn)
  • 2.從LoadBalancerStats拿出所有Zone快照信息
  • 3.獲取所有可用的Zone(會過濾一些不符合規(guī)則的Zone,從實例數(shù),負載,實例故障率維度去考量)
  • 4.隨機選擇一個Zone
  • 5.獲取當(dāng)前Zone的負載均衡器, 根據(jù)IRule選擇具體的服務(wù)實例
@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

IRule

  • 1.RandomRule 隨機
  • 2.RoundRobinRule 輪詢
  • 3.RetryRule 帶重試的輪詢
  • 4.WeightedResponseTimeRule 帶權(quán)重的輪詢.該策略是對RoundRobinRule的擴展, 根據(jù)實例的運行情況來計算權(quán)重,并且根據(jù)權(quán)重來挑選實例達到更優(yōu)的分配效果。WeightedResponseTimeRule 需要注入ILoadBalancer屬性。在初始化的時候 會創(chuàng)建定時任務(wù), 每30s執(zhí)行一次 計算每個服務(wù)實例的權(quán)重。
        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                + name, true);
        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                serverWeightTaskTimerInterval);
    class DynamicServerWeightTask extends TimerTask {
        public void run() {
            ServerWeight serverWeight = new ServerWeight();
            try {
                serverWeight.maintainWeights();
            } catch (Exception e) {
                logger.error("Error running DynamicServerWeightTask for {}", name, e);
            }
        }
    }

比如有4個實例A,B,C,D,它們平均響應(yīng)時間為10,40,80,100. 總響應(yīng)時間為10+40+80+100=230。所以A的權(quán)重是230-10=220 【0,220】,B的權(quán)重是220+230-40=410 (220,410] ,C的權(quán)重是410+230-80=560 (410,560] ,D的權(quán)重是560+230-100=690 (560,690)

        public void maintainWeights() {
            ILoadBalancer lb = getLoadBalancer();
            if (lb == null) {
                return;
            }
            
            if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
                return; 
            }
            
            try {
                logger.info("Weight adjusting job started");
                AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                LoadBalancerStats stats = nlb.getLoadBalancerStats();
                if (stats == null) {
                    // no statistics, nothing to do
                    return;
                }
                double totalResponseTime = 0;
                // find maximal 95% response time
                for (Server server : nlb.getAllServers()) {
                    // this will automatically load the stats if not in cache
                    ServerStats ss = stats.getSingleServerStat(server);
                    totalResponseTime += ss.getResponseTimeAvg();
                }
                // weight for each server is (sum of responseTime of all servers - responseTime)
                // so that the longer the response time, the less the weight and the less likely to be chosen
                Double weightSoFar = 0.0;
                
                // create new list and hot swap the reference
                List<Double> finalWeights = new ArrayList<Double>();
                for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }
                setWeights(finalWeights);
            } catch (Exception e) {
                logger.error("Error calculating server weights", e);
            } finally {
                serverWeightAssignmentInProgress.set(false);
            }

        }
    }

實例的選擇, 生成一個[0, 最大權(quán)重值)區(qū)間內(nèi)的隨機數(shù)。遍歷權(quán)重列表,找到匹配的Server節(jié)點。

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            // get hold of the current reference in case it is changed from the other thread
            List<Double> currentWeights = accumulatedWeights;
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();

            if (serverCount == 0) {
                return null;
            }

            int serverIndex = 0;

            // last one in the list is the sum of all weights
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
            // No server has been hit yet and total weight is not initialized
            // fallback to use round robin
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {
                    return server;
                }
            } else {
                // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
                double randomWeight = random.nextDouble() * maxTotalWeight;
                // pick the server index based on the randomIndex
                int n = 0;
                for (Double d : currentWeights) {
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }

                server = allList.get(serverIndex);
            }

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Next.
            server = null;
        }
        return server;
    }
  • 5.ClientConfigEnabledRoundRobin: 內(nèi)部的實現(xiàn)還是輪詢
  • 6.BestAvailableRule: 注入了LoadBalancerStats,遍歷所有服務(wù)實例, 過濾故障的實例,選擇最空閑的實例。如果LoadBalancerStats為空的話,采用父類ClientConfigEanbledRoundRobin實現(xiàn)
  • 7.PredicateBasedRule:先過濾,后輪詢
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
   
    /**
     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
     * 
     */
    public abstract AbstractServerPredicate getPredicate();
        
    /**
     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
     * The performance for this method is O(n) where n is number of servers to be filtered.
     */
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
}

AbstractServerPredicate中的實現(xiàn)

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
    
     public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
            if (loadBalancerKey == null) {
                return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
            } else {
                List<Server> results = Lists.newArrayList();
                for (Server server: servers) {
                    if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                        results.add(server);
                    }
                }
                return results;            
            }
        }
  • 8.AvailabilityFilterRule繼承上文的PredicateBasedRule, 先輪詢一個Server,看這個Server是否滿足判斷條件。如果滿足直接返回,
    如果不滿足一直循環(huán)至11次。 11次都沒有選出合適的Server, 降級策略, 調(diào)用父類PredicateBasedRule的默認(rèn)實現(xiàn)
    @Override
    public Server choose(Object key) {
        int count = 0;
        Server server = roundRobinRule.choose(key);
        while (count++ <= 10) {
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            server = roundRobinRule.choose(key);
        }
        return super.choose(key);
    }

這是一個組合Predicate,在上文中的predicate.apply(new PredicateKey(server)實際上會調(diào)用CompositePredicate中的AvailabilityPredicate

    public AvailabilityFilteringRule() {
        super();
        predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }
    

可見AvailabilityPredicate是根據(jù)當(dāng)前Server的狀態(tài)來過濾的。

    @Override
    public boolean apply(@Nullable PredicateKey input) {
        LoadBalancerStats stats = getLBStats();
        if (stats == null) {
            return true;
        }
        return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
    }
    
    
    private boolean shouldSkipServer(ServerStats stats) {        
        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
            return true;
        }
        return false;
    }

上文中的降級策略最終會走到PredicateBaseRule, 這個規(guī)則中的predicate是其子類AvailabilityFilteringRule中的CompositePredicate

    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

CompositePredicate的父類是AbstractServerPredicate,最終又回到AbstractServerPredicatechooseRoundRobinAfterFiltering

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

但是其中的getEligibleServers(servers, loadBalancerKey)方法又被子類CompositePredicate重寫,一切仿佛又回到了原點, 這里就是上文所說的降級策略, 因為fallbackPredicate默認(rèn)實現(xiàn)為true,所以這里的邏輯是先走一遍AvailabilityPredicate, 如果所有可用的server列表, 如果還不滿足的話, 就直接輪詢了。

@Override
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
        Iterator<AbstractServerPredicate> i = fallbacks.iterator();
        while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
                && i.hasNext()) {
            AbstractServerPredicate predicate = i.next();
            result = predicate.getEligibleServers(servers, loadBalancerKey);
        }
        return result;
    }
  • 9.ZoneAvoidanceRule:還是遵循先過濾后輪詢思想, 首先執(zhí)行這2個ZoneAvoidancePredicate, AvailabilityPredicate邏輯,先找出合適的Zone,再找Zone下面健康的Server實例。如果都沒找到,執(zhí)行降級策略。
public class ZoneAvoidanceRule extends PredicateBasedRule {

    private static final Random random = new Random();
    
    private CompositePredicate compositePredicate;
    
    public ZoneAvoidanceRule() {
        super();
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }
    
    private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
        return CompositePredicate.withPredicates(p1, p2)
                             .addFallbackPredicate(p2)
                             .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                             .build();
        
    }
    
    
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
        Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
        for (String zone : lbStats.getAvailableZones()) {
            ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
            map.put(zone, snapshot);
        }
        return map;
    }

    static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
            Set<String> chooseFrom) {
        if (chooseFrom == null || chooseFrom.size() == 0) {
            return null;
        }
        String selectedZone = chooseFrom.iterator().next();
        if (chooseFrom.size() == 1) {
            return selectedZone;
        }
        int totalServerCount = 0;
        for (String zone : chooseFrom) {
            totalServerCount += snapshot.get(zone).getInstanceCount();
        }
        int index = random.nextInt(totalServerCount) + 1;
        int sum = 0;
        for (String zone : chooseFrom) {
            sum += snapshot.get(zone).getInstanceCount();
            if (index <= sum) {
                selectedZone = zone;
                break;
            }
        }
        return selectedZone;
    }

    public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        // they are the same considering double calculation
                        // round error
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }

        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

    public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
            double triggeringLoad, double triggeringBlackoutPercentage) {
        if (lbStats == null) {
            return null;
        }
        Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
        return getAvailableZones(snapshot, triggeringLoad,
                triggeringBlackoutPercentage);
    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return compositePredicate;
    }    
}
public class ZoneAvoidancePredicate extends  AbstractServerPredicate {

    private volatile DynamicDoubleProperty triggeringLoad = new DynamicDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer.triggeringLoadPerServerThreshold", 0.2d);

    private volatile DynamicDoubleProperty triggeringBlackoutPercentage = new DynamicDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer.avoidZoneWithBlackoutPercetage", 0.99999d);
    
    private static final Logger logger = LoggerFactory.getLogger(ZoneAvoidancePredicate.class);
    
    private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory
            .getInstance().getBooleanProperty(
                    "niws.loadbalancer.zoneAvoidanceRule.enabled", true);


    public ZoneAvoidancePredicate(IRule rule, IClientConfig clientConfig) {
        super(rule, clientConfig);
        initDynamicProperties(clientConfig);
    }

    public ZoneAvoidancePredicate(LoadBalancerStats lbStats,
            IClientConfig clientConfig) {
        super(lbStats, clientConfig);
        initDynamicProperties(clientConfig);
    }

    ZoneAvoidancePredicate(IRule rule) {
        super(rule);
    }
    
    private void initDynamicProperties(IClientConfig clientConfig) {
        if (clientConfig != null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer." + clientConfig.getClientName() + ".triggeringLoadPerServerThreshold", 0.2d);

            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer." + clientConfig.getClientName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        
    }

    @Override
    public boolean apply(@Nullable PredicateKey input) {
        if (!ENABLED.get()) {
            return true;
        }
        String serverZone = input.getServer().getZone();
        if (serverZone == null) {
            // there is no zone information from the server, we do not want to filter
            // out this server
            return true;
        }
        LoadBalancerStats lbStats = getLBStats();
        if (lbStats == null) {
            // no stats available, do not filter
            return true;
        }
        if (lbStats.getAvailableZones().size() <= 1) {
            // only one zone is available, do not filter
            return true;
        }
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        if (!zoneSnapshot.keySet().contains(serverZone)) {
            // The server zone is unknown to the load balancer, do not filter it out 
            return true;
        }
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null) {
            return availableZones.contains(input.getServer().getZone());
        } else {
            return false;
        }
    }    
}

注意事項

如果配置只對某個服務(wù)的Ribbon客戶端生效,則CustomRibbonConfiguration類不能包含在主應(yīng)用程序上下文的@CompantScan中,需要添加了自定義注解。
使用自定義注解和excludeFilters使CustomRibbonConfiguration類不@CompantScan掃描到!
CustomRibbonConfiguration只會注冊在該服務(wù)的Spring容器中!也就是上文中提到的SpringClientFactory!


尾言

我不是頭腦空空,我不是一只米蟲!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容