Spring Cloud Ribbon設(shè)計(jì)原理

Ribbon 是netflix 公司開源的基于客戶端的負(fù)載均衡組件,是Spring Cloud大家庭中非常重要的一個(gè)模塊;Ribbon應(yīng)該也是整個(gè)大家庭中相對(duì)而言比較復(fù)雜的模塊,直接影響到服務(wù)調(diào)度的質(zhì)量和性能。全面掌握Ribbon可以幫助我們了解在分布式微服務(wù)集群工作模式下,服務(wù)調(diào)度應(yīng)該考慮到的每個(gè)環(huán)節(jié)。
本文將詳細(xì)地剖析Ribbon的設(shè)計(jì)原理,幫助大家對(duì)Spring Cloud 有一個(gè)更好的認(rèn)知。

一. Spring集成下的Ribbon工作結(jié)構(gòu)

先貼一張總覽圖,說(shuō)明一下Spring如何集成Ribbon的,如下所示:

image.png

Spring Cloud集成模式下的Ribbon有以下幾個(gè)特征:

  1. Ribbon 服務(wù)配置方式
    每一個(gè)服務(wù)配置都有一個(gè)Spring ApplicationContext上下文,用于加載各自服務(wù)的實(shí)例。
    比如,當(dāng)前Spring Cloud 系統(tǒng)內(nèi),有如下幾個(gè)服務(wù):
服務(wù)名稱 角色 依賴服務(wù)
order 訂單模塊 user
user 用戶模塊 無(wú)
mobile-bff 移動(dòng)端BFF order,user

mobile-bff服務(wù)在實(shí)際使用中,會(huì)用到orderuser模塊,那么在mobile-bff服務(wù)的Spring上下文中,會(huì)為orderuser 分別創(chuàng)建一個(gè)子ApplicationContext,用于加載各自服務(wù)模塊的配置。也就是說(shuō),各個(gè)客戶端的配置相互獨(dú)立,彼此不收影響

  1. 和Feign的集成模式
    在使用Feign作為客戶端時(shí),最終請(qǐng)求會(huì)轉(zhuǎn)發(fā)成 http://<服務(wù)名稱>/<relative-path-to-service>的格式,通過(guò)LoadBalancerFeignClient, 提取出服務(wù)標(biāo)識(shí)<服務(wù)名稱>,然后根據(jù)服務(wù)名稱在上下文中查找對(duì)應(yīng)服務(wù)的負(fù)載均衡器FeignLoadBalancer,負(fù)載均衡器負(fù)責(zé)根據(jù)既有的服務(wù)實(shí)例的統(tǒng)計(jì)信息,挑選出最合適的服務(wù)實(shí)例

二、Spring Cloud模式下和Feign的集成實(shí)現(xiàn)方式

和Feign結(jié)合的場(chǎng)景下,F(xiàn)eign的調(diào)用會(huì)被包裝成調(diào)用請(qǐng)求LoadBalancerCommand,然后底層通過(guò)Rxjava基于事件的編碼風(fēng)格,發(fā)送請(qǐng)求;Spring Cloud框架通過(guò) Feigin 請(qǐng)求的URL,提取出服務(wù)名稱,然后在上下文中找到對(duì)應(yīng)服務(wù)的的負(fù)載均衡器實(shí)現(xiàn)FeignLoadBalancer,然后通過(guò)負(fù)載均衡器中挑選一個(gè)合適的Server實(shí)例,然后將調(diào)用請(qǐng)求轉(zhuǎn)發(fā)到該Server實(shí)例上,完成調(diào)用,在此過(guò)程中,記錄對(duì)應(yīng)Server實(shí)例的調(diào)用統(tǒng)計(jì)信息。

/**
     * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
     * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
     * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
     * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
     * result during execution and retries will be emitted.
     */
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }
        
        // 同一Server最大嘗試次數(shù)
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        //下一Server最大嘗試次數(shù)
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        // 使用負(fù)載均衡器,挑選出合適的Server,然后執(zhí)行Server請(qǐng)求,將請(qǐng)求的數(shù)據(jù)和行為整合到ServerStats中
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        // 獲取Server的統(tǒng)計(jì)值
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry 服務(wù)調(diào)用
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();//重試計(jì)數(shù)
                                        loadBalancerContext.noteOpenConnection(stats);//鏈接統(tǒng)計(jì)
                                        
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
                                        //執(zhí)行監(jiān)控器,記錄執(zhí)行時(shí)間
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        //找到合適的server后,開始執(zhí)行請(qǐng)求
                                        //底層調(diào)用有結(jié)果后,做消息處理
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // 記錄統(tǒng)計(jì)信息
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);//記錄異常信息
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;//返回結(jié)果值
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
                                            
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();//結(jié)束計(jì)時(shí)
                                                //標(biāo)記請(qǐng)求結(jié)束,更新統(tǒng)計(jì)信息
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        //如果失敗,根據(jù)重試策略觸發(fā)重試邏輯
                        // 使用observable 做重試邏輯,根據(jù)predicate 做邏輯判斷,這里做
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
         // next請(qǐng)求處理,基于重試器操作   
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }

從一組ServerList 列表中挑選合適的Server

    /**
     * Compute the final URI from a partial URI in the request. The following steps are performed:
     * <ul>
     * <li>  如果host尚未指定,則從負(fù)載均衡器中選定 host/port
     * <li>  如果host 尚未指定并且尚未找到負(fù)載均衡器,則嘗試從 虛擬地址中確定host/port
     * <li> 如果指定了HOST,并且URI的授權(quán)部分通過(guò)虛擬地址設(shè)置,并且存在負(fù)載均衡器,則通過(guò)負(fù)載就均衡器中確定host/port(指定的HOST將會(huì)被忽略)
     * <li> 如果host已指定,但是尚未指定負(fù)載均衡器和虛擬地址配置,則使用真實(shí)地址作為host
     * <li> if host is missing but none of the above applies, throws ClientException
     * </ul>
     *
     * @param original Original URI passed from caller
     */
    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // 提供部分URI,缺少HOST情況下
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);// 使用負(fù)載均衡器選擇Server
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                //通過(guò)負(fù)載均衡器選擇的結(jié)果中選擇host
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
            } else {
                // No Full URL - and we dont have a LoadBalancer registered to
                // obtain a server
                // if we have a vipAddress that came with the registration, we
                // can use that else we
                // bail out
                // 通過(guò)虛擬地址配置解析出host配置返回
                if (vipAddresses != null && vipAddresses.contains(",")) {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            "Method is invoked for client " + clientName + " with partial URI of ("
                            + original
                            + ") with no load balancer configured."
                            + " Also, there are multiple vipAddresses and hence no vip address can be chosen"
                            + " to complete this partial uri");
                } else if (vipAddresses != null) {
                    try {
                        Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
                        host = hostAndPort.first();
                        port = hostAndPort.second();
                    } catch (URISyntaxException e) {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                "Method is invoked for client " + clientName + " with partial URI of ("
                                + original
                                + ") with no load balancer configured. "
                                + " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
                    }
                } else {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            this.clientName
                            + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
                            + " Also has no vipAddress registered");
                }
            }
        } else {
            // Full URL Case URL中指定了全地址,可能是虛擬地址或者是hostAndPort
            // This could either be a vipAddress or a hostAndPort or a real DNS
            // if vipAddress or hostAndPort, we just have to consult the loadbalancer
            // but if it does not return a server, we should just proceed anyways
            // and assume its a DNS
            // For restClients registered using a vipAddress AND executing a request
            // by passing in the full URL (including host and port), we should only
            // consult lb IFF the URL passed is registered as vipAddress in Discovery
            boolean shouldInterpretAsVip = false;

            if (lb != null) {
                shouldInterpretAsVip = isVipRecognized(original.getAuthority());
            }
            if (shouldInterpretAsVip) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc != null){
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("using LB returned Server: {} for request: {}", svc, original);
                    return svc;
                } else {
                    // just fall back as real DNS
                    logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
                }
            } else {
                // consult LB to obtain vipAddress backed instance given full URL
                //Full URL execute request - where url!=vipAddress
                logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
            }
        }
        // end of creating final URL
        if (host == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
        }
        // just verify that at this point we have a full URL

        return new Server(host, port);
    }

三. LoadBalancer--負(fù)載均衡器的核心

LoadBalancer 的職能主要有三個(gè):

  1. 維護(hù)Sever列表的數(shù)量(新增、更新、刪除等)
  2. 維護(hù)Server列表的狀態(tài)(狀態(tài)更新)
  3. 當(dāng)請(qǐng)求Server實(shí)例時(shí),能否返回最合適的Server實(shí)例

本章節(jié)將通過(guò)詳細(xì)闡述著這三個(gè)方面。

3.1 負(fù)載均衡器的內(nèi)部基本實(shí)現(xiàn)原理

先熟悉一下負(fù)載均衡器LoadBalancer的實(shí)現(xiàn)原理圖:


image.png
組成部分 職能 參考章節(jié)
Server Server 作為服務(wù)實(shí)例的表示,會(huì)記錄服務(wù)實(shí)例的相關(guān)信息,如:服務(wù)地址,所屬zone,服務(wù)名稱,實(shí)例ID等
ServerList 維護(hù)著一組Server實(shí)例列表,在應(yīng)用運(yùn)行的過(guò)程中,Ribbon通過(guò)ServerList中的服務(wù)實(shí)例供負(fù)載均衡器選擇。ServerList維護(hù)列表可能在運(yùn)行的過(guò)程中動(dòng)態(tài)改變 3.2
ServerStats 作為對(duì)應(yīng)Server 的運(yùn)行情況統(tǒng)計(jì),一般是服務(wù)調(diào)用過(guò)程中的Server平均響應(yīng)時(shí)間,累計(jì)請(qǐng)求失敗計(jì)數(shù),熔斷時(shí)間控制等。一個(gè)ServerStats實(shí)例唯一對(duì)應(yīng)一個(gè)Server實(shí)例
LoadBalancerStats 作為 ServerStats實(shí)例列表的容器,統(tǒng)一維護(hù)
ServerListUpdater 負(fù)載均衡器通過(guò)ServerListUpdater來(lái)更新ServerList,比如實(shí)現(xiàn)一個(gè)定時(shí)任務(wù),每隔一段時(shí)間獲取最新的Server實(shí)例列表 3.2
Pinger 服務(wù)狀態(tài)檢驗(yàn)器,負(fù)責(zé)維護(hù)ServerList列表中的服務(wù)狀態(tài)注意:Pinger僅僅負(fù)責(zé)Server的狀態(tài),沒(méi)有能力決定是否刪除
PingerStrategy 定義以何種方式還檢驗(yàn)服務(wù)是否有效,比如是按照順序的方式還是并行的方式
IPing Ping,檢驗(yàn)服務(wù)是否可用的方法,常見的是通過(guò)HTTP,或者TCP/IP的方式看服務(wù)有無(wú)認(rèn)為正常的請(qǐng)求
image.png

3.2 如何維護(hù)Server列表?(新增、更新、刪除)

單從服務(wù)列表的維護(hù)角度上,Ribbon的結(jié)構(gòu)如下所示:


image.png

Server列表的維護(hù)從實(shí)現(xiàn)方法上分為兩類:

  1. 基于配置的服務(wù)列表
    這種方式一般是通過(guò)配置文件,靜態(tài)地配置服務(wù)器列表,這種方式相對(duì)而言比較簡(jiǎn)單,但并不是意味著在機(jī)器運(yùn)行的時(shí)候就一直不變。netflix 在做Spring cloud 套件時(shí),使用了分布式配置框架netflix archaius ,archaius 框架有一個(gè)特點(diǎn)是會(huì)動(dòng)態(tài)的監(jiān)控配置文件的變化,將變化刷新到各個(gè)應(yīng)用上。也就是說(shuō),當(dāng)我們?cè)诓魂P(guān)閉服務(wù)的情況下,如果修改了基于配置的服務(wù)列表時(shí), 服務(wù)列表可以直接刷新
  2. 結(jié)合服務(wù)發(fā)現(xiàn)組件(如Eureka)的服務(wù)注冊(cè)信息動(dòng)態(tài)維護(hù)服務(wù)列表
    基于Spring Cloud框架下,服務(wù)注冊(cè)和發(fā)現(xiàn)是一個(gè)分布式服務(wù)集群必不可少的一個(gè)組件,它負(fù)責(zé)維護(hù)不同的服務(wù)實(shí)例(注冊(cè)、續(xù)約、取消注冊(cè)),本文將介紹和Eureka集成模式下,如果借助Eureka的服務(wù)注冊(cè)信息動(dòng)態(tài)刷新ribbon 的服務(wù)列表

Ribbon 通過(guò)配置項(xiàng):<service-name>.ribbon.NIWSServerListClassName 來(lái)決定使用哪種實(shí)現(xiàn)方式。對(duì)應(yīng)地:

策略 ServerList實(shí)現(xiàn)
基于配置 com.netflix.loadbalancer.ConfigurationBasedServerList
基于服務(wù)發(fā)現(xiàn) com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList

Server列表可能在運(yùn)行的時(shí)候動(dòng)態(tài)的更新,而具體的更新方式由<service-name>.ribbon.ServerListUpdaterClassName,當(dāng)前有如下兩種實(shí)現(xiàn)方式:

更新策略 ServerListUpdater實(shí)現(xiàn)
基于定時(shí)任務(wù)的拉取服務(wù)列表 com.netflix.loadbalancer.PollingServerListUpdater
基于Eureka服務(wù)事件通知的方式更新 com.netflix.loadbalancer.EurekaNotificationServerListUpdater
  • 基于定時(shí)任務(wù)拉取服務(wù)列表方式
    這種方式的實(shí)現(xiàn)為:com.netflix.loadbalancer.PollingServerListUpdater,其內(nèi)部維護(hù)了一個(gè)周期性的定時(shí)任務(wù),拉取最新的服務(wù)列表,然后將最新的服務(wù)列表更新到ServerList之中,其核心的實(shí)現(xiàn)邏輯如下:
public class PollingServerListUpdater implements ServerListUpdater {
    private static final Logger logger = LoggerFactory.getLogger(PollingServerListUpdater.class);

    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
    // 更新器線程池定義以及鉤子設(shè)置
    private static class LazyHolder {
        private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
        private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
        private static Thread _shutdownThread;

        static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;

        static {
            int coreSize = poolSizeProp.get();
            ThreadFactory factory = (new ThreadFactoryBuilder())
                    .setNameFormat("PollingServerListUpdater-%d")
                    .setDaemon(true)
                    .build();
            _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
            poolSizeProp.addCallback(new Runnable() {
                @Override
                public void run() {
                    _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
                }

            });
            _shutdownThread = new Thread(new Runnable() {
                public void run() {
                    logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
                    shutdownExecutorPool();
                }
            });
            Runtime.getRuntime().addShutdownHook(_shutdownThread);
        }

        private static void shutdownExecutorPool() {
            if (_serverListRefreshExecutor != null) {
                _serverListRefreshExecutor.shutdown();

                if (_shutdownThread != null) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(_shutdownThread);
                    } catch (IllegalStateException ise) { // NOPMD
                        // this can happen if we're in the middle of a real
                        // shutdown,
                        // and that's 'ok'
                    }
                }

            }
        }
    }
    // 省略部分代碼
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            //創(chuàng)建定時(shí)任務(wù),按照特定的實(shí)行周期執(zhí)行更新操作
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        //執(zhí)行update操作 ,更新操作定義在LoadBalancer中
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
           //定時(shí)任務(wù)創(chuàng)建
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs, //初始延遲時(shí)間
                    refreshIntervalMs, //內(nèi)部刷新時(shí)間
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
//省略部分代碼
}

有上述代碼可以看到,ServerListUpdator只是定義了更新的方式,而具體怎么更新,則是封裝成UpdateAction來(lái)操作的:

    /**
     * an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }
    
    //在DynamicServerListLoadBalancer 中則實(shí)現(xiàn)了具體的操作:
    public DynamicServerListLoadBalancer() {
        this.isSecure = false;
        this.useTunnel = false;
        this.serverListUpdateInProgress = new AtomicBoolean(false);
        this.updateAction = new UpdateAction() {
            public void doUpdate() {
                //更新服務(wù)列表
                DynamicServerListLoadBalancer.this.updateListOfServers();
            }
        };
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList();
       // 通過(guò)ServerList獲取最新的服務(wù)列表 
       if (this.serverListImpl != null) {
            servers = this.serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            //返回的結(jié)果通過(guò)過(guò)濾器的方式進(jìn)行過(guò)濾
            if (this.filter != null) {
                servers = this.filter.getFilteredListOfServers((List)servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            }
        }
        //更新列表
        this.updateAllServerList((List)servers);
    }

    protected void updateAllServerList(List<T> ls) {
        if (this.serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                Iterator var2 = ls.iterator();

                while(var2.hasNext()) {
                    T s = (Server)var2.next();
                    s.setAlive(true);
                }

                this.setServersList(ls);
                super.forceQuickPing();
            } finally {
                this.serverListUpdateInProgress.set(false);
            }
        }

    }
  • 基于Eureka服務(wù)事件通知的方式更新
    基于Eureka的更新方式則有些不同, 當(dāng)Eureka注冊(cè)中心發(fā)生了Server服務(wù)注冊(cè)信息變更時(shí),會(huì)將消息通知發(fā)送到EurekaNotificationServerListUpdater 上,然后此Updator觸發(fā)刷新ServerList:
public class EurekaNotificationServerListUpdater implements ServerListUpdater {

   //省略部分代碼
  
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
          //創(chuàng)建Eureka時(shí)間監(jiān)聽器,當(dāng)Eureka發(fā)生改變后,將觸發(fā)對(duì)應(yīng)邏輯  
          this.updateListener = new EurekaEventListener() {
                @Override
                public void onEvent(EurekaEvent event) {
                    if (event instanceof CacheRefreshedEvent) {
                        //內(nèi)部消息隊(duì)列
                       if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                            logger.info("an update action is already queued, returning as no-op");
                            return;
                        }

                        if (!refreshExecutor.isShutdown()) {
                            try {
                                //提交更新操作請(qǐng)求到消息隊(duì)列中
                                refreshExecutor.submit(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                            updateAction.doUpdate(); // 執(zhí)行真正的更新操作
                                            lastUpdated.set(System.currentTimeMillis());
                                        } catch (Exception e) {
                                            logger.warn("Failed to update serverList", e);
                                        } finally {
                                            updateQueued.set(false);
                                        }
                                    }
                                });  // fire and forget
                            } catch (Exception e) {
                                logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                                updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                            }
                        }
                        else {
                            logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                            stop();
                        }
                    }
                }
            };
            //EurekaClient 客戶端實(shí)例
            if (eurekaClient == null) {
                eurekaClient = eurekaClientProvider.get();
            }
            //基于EeurekaClient注冊(cè)事件監(jiān)聽器
            if (eurekaClient != null) {
                eurekaClient.registerEventListener(updateListener);
            } else {
                logger.error("Failed to register an updateListener to eureka client, eureka client is null");
                throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
            }
        } else {
            logger.info("Update listener already registered, no-op");
        }
    }

}

3.2.1 相關(guān)的配置項(xiàng)
配置項(xiàng) 說(shuō)明 生效場(chǎng)景 默認(rèn)值
<service-name>.ribbon.NIWSServerListClassName ServerList的實(shí)現(xiàn),實(shí)現(xiàn)參考上述描述 ConfigurationBasedServerList
<service-name>.ribbon.listOfServers 服務(wù)列表 hostname:port 形式,以逗號(hào)隔開 當(dāng)ServerList實(shí)現(xiàn)基于配置時(shí)
<service-name>.ribbon.ServerListUpdaterClassName 服務(wù)列表更新策略實(shí)現(xiàn),參考上述描述 PollingServerListUpdater
<service-name>.ribbon.ServerListRefreshInterval 服務(wù)列表刷新頻率 基于定時(shí)任務(wù)拉取時(shí) 30s
3.2.2 ribbon的默認(rèn)實(shí)現(xiàn)

ribbon在默認(rèn)情況下,會(huì)采用如下的配置項(xiàng),即,采用基于配置的服務(wù)列表維護(hù),基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式,頻率為30s.

<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
<service-name>.ribbon.listOfServers=<ip:port>,<ip:port>
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
<service-name>.ribbon.ServerListRefreshInterval=30
### 更新線程池大小
DynamicServerListLoadBalancer.ThreadPoolSize=2
3.2.3 Spring Cloud集成下的配置

ribbon在默認(rèn)情況下,會(huì)采用如下的配置項(xiàng),即,采用基于配置的服務(wù)列表維護(hù),基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式,頻率為30s.

<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
### 更新線程池大小
EurekaNotificationServerListUpdater.ThreadPoolSize=2
###通知隊(duì)列接收大小
EurekaNotificationServerListUpdater.queueSize=1000

3.3 負(fù)載均衡器如何維護(hù)服務(wù)實(shí)例的狀態(tài)?

Ribbon負(fù)載均衡器將服務(wù)實(shí)例的狀態(tài)維護(hù)托交給PingerPingerStrategy、IPing 來(lái)維護(hù),具體交互模式如下所示:

image.png

/**
* 定義Ping服務(wù)狀態(tài)是否有效的策略,是序列化順序Ping,還是并行的方式Ping,在此過(guò)程中,應(yīng)當(dāng)保證相互不受影響
 *
 */
public interface IPingStrategy {

    boolean[] pingServers(IPing ping, Server[] servers);
}


/**
 * 定義如何Ping一個(gè)服務(wù),確保是否有效
 * @author stonse
 *
 */
public interface IPing {
    
    /**
     * Checks whether the given <code>Server</code> is "alive" i.e. should be
     * considered a candidate while loadbalancing
     * 校驗(yàn)是否存活
     */
    public boolean isAlive(Server server);
}

3.3.1 創(chuàng)建Ping定時(shí)任務(wù)

默認(rèn)情況下,負(fù)載均衡器內(nèi)部會(huì)創(chuàng)建一個(gè)周期性定時(shí)任務(wù)

控制參數(shù) 說(shuō)明 默認(rèn)值
<service-name>.ribbon.NFLoadBalancerPingInterval Ping定時(shí)任務(wù)周期 30 s
<service-name>.ribbon.NFLoadBalancerMaxTotalPingTime Ping超時(shí)時(shí)間 2s
<service-name>.ribbon.NFLoadBalancerPingClassName IPing實(shí)現(xiàn)類 DummyPing,直接返回true

默認(rèn)的PingStrategy,采用序列化的實(shí)現(xiàn)方式,依次檢查服務(wù)實(shí)例是否可用:

/**
     * Default implementation for <c>IPingStrategy</c>, performs ping
     * serially, which may not be desirable, if your <c>IPing</c>
     * implementation is slow, or you have large number of servers.
     */
    private static class SerialPingStrategy implements IPingStrategy {

        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];

            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* Default answer is DEAD. */
                try {
                    // 按序列依次檢查服務(wù)是否正常,并返回對(duì)應(yīng)的數(shù)組表示 
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }
3.3.2 Ribbon默認(rèn)的IPing實(shí)現(xiàn):DummyPing

默認(rèn)的IPing實(shí)現(xiàn),直接返回為true:

public class DummyPing extends AbstractLoadBalancerPing {

    public DummyPing() {
    }

    public boolean isAlive(Server server) {
        return true;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

除此之外,IPing還有如下幾種實(shí)現(xiàn):

image.png
3.3.3 Spring Cloud集成下的IPing實(shí)現(xiàn):NIWSDiscoveryPing

而和Spring Cloud 集成后,IPing的默認(rèn)實(shí)現(xiàn),是NIWSDiscoveryPing ,其使用Eureka作為服務(wù)注冊(cè)和發(fā)現(xiàn),則校驗(yàn)服務(wù)是否可用,則通過(guò)監(jiān)聽Eureka 服務(wù)更新來(lái)更新Ribbon的Server狀態(tài),而具體的實(shí)現(xiàn)就是 NIWSDiscoveryPing:

/**
 * "Ping" Discovery Client
 * i.e. we dont do a real "ping". We just assume that the server is up if Discovery Client says so
 * @author stonse
 *
 */
public class NIWSDiscoveryPing extends AbstractLoadBalancerPing {
            
        BaseLoadBalancer lb = null; 
        

        public NIWSDiscoveryPing() {
        }
        
        public BaseLoadBalancer getLb() {
            return lb;
        }

        /**
         * Non IPing interface method - only set this if you care about the "newServers Feature"
         * @param lb
         */
        public void setLb(BaseLoadBalancer lb) {
            this.lb = lb;
        }

        public boolean isAlive(Server server) {
            boolean isAlive = true;
         //取 Eureka Server 的Instance實(shí)例狀態(tài)作為Ribbon服務(wù)的狀態(tài)   
                 if (server!=null && server instanceof DiscoveryEnabledServer){
                DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
                InstanceInfo instanceInfo = dServer.getInstanceInfo();
                if (instanceInfo!=null){                    
                    InstanceStatus status = instanceInfo.getStatus();
                    if (status!=null){
                        isAlive = status.equals(InstanceStatus.UP);
                    }
                }
            }
            return isAlive;
        }

        @Override
        public void initWithNiwsConfig(
                IClientConfig clientConfig) {
        }
        
}

Spring Cloud下的默認(rèn)實(shí)現(xiàn)入口:

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

3.4 如何從服務(wù)列表中挑選一個(gè)合適的服務(wù)實(shí)例?

3.4.1 服務(wù)實(shí)例容器:ServerList的維護(hù)

負(fù)載均衡器通過(guò) ServerList來(lái)統(tǒng)一維護(hù)服務(wù)實(shí)例,具體模式如下:


image.png

基礎(chǔ)的接口定義非常簡(jiǎn)單:

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {
    //獲取初始化的服務(wù)列表
    public List<T> getInitialListOfServers();
    
    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 獲取更新后的的服務(wù)列表
     */
    public List<T> getUpdatedListOfServers();   

}

在Ribbon的實(shí)現(xiàn)中,在ServerList中,維護(hù)著Server的實(shí)例,并返回最新的List<Server>集合,供LoadBalancer使用

image.png

ServerList<Server>的職能
負(fù)責(zé)維護(hù)服務(wù)實(shí)例,并使用ServerListFilter過(guò)濾器過(guò)濾出符合要求的服務(wù)實(shí)例列表List<Server>

3.4.2 服務(wù)實(shí)例列表過(guò)濾器ServerListFilter

服務(wù)實(shí)例列表過(guò)濾器ServerListFilter的職能很簡(jiǎn)單:
傳入一個(gè)服務(wù)實(shí)例列表,過(guò)濾出滿足過(guò)濾條件的服務(wù)列表

public interface ServerListFilter<T extends Server> {
    public List<T> getFilteredListOfServers(List<T> servers);
}
3.4.2.1 Ribbon 的默認(rèn)ServerListFilter實(shí)現(xiàn):ZoneAffinityServerListFilter

Ribbon默認(rèn)采取了區(qū)域優(yōu)先的過(guò)濾策略,即當(dāng)Server列表中,過(guò)濾出和當(dāng)前實(shí)例所在的區(qū)域(zone)一致的server列表
與此相關(guān)聯(lián)的,Ribbon有兩個(gè)相關(guān)得配置參數(shù):

控制參數(shù) 說(shuō)明 默認(rèn)值
<service-name>.ribbon.EnableZoneAffinity 是否開啟區(qū)域優(yōu)先 false
<service-name>.ribbon.EnableZoneExclusivity 是否采取區(qū)域排他性,即只返回和當(dāng)前Zone一致的服務(wù)實(shí)例 false
<service-name>.ribbon.zoneAffinity.maxLoadPerServer 每個(gè)Server上的最大活躍請(qǐng)求負(fù)載數(shù)閾值 0.6
<service-name>.ribbon.zoneAffinity.maxBlackOutServesrPercentage 最大斷路過(guò)濾的百分比 0.8
<service-name>.ribbon.zoneAffinity.minAvailableServers 最少可用的服務(wù)實(shí)例閾值 2

其核心實(shí)現(xiàn)如下所示:

public class ZoneAffinityServerListFilter<T extends Server> extends
        AbstractServerListFilter<T> implements IClientConfigAware {

     @Override
    public List<T> getFilteredListOfServers(List<T> servers) {
          //zone非空,并且開啟了區(qū)域優(yōu)先,并且服務(wù)實(shí)例數(shù)量不為空
          if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
           //基于斷言過(guò)濾服務(wù)列表 
           List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            //如果允許區(qū)域優(yōu)先,則返回過(guò)濾列表
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }
    // 判斷是否應(yīng)該使用區(qū)域優(yōu)先過(guò)濾條件
    private boolean shouldEnableZoneAffinity(List<T> filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
       // 獲取統(tǒng)計(jì)信息
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            //獲取區(qū)域Server快照,包含統(tǒng)計(jì)數(shù)據(jù)
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            //平均負(fù)載,此負(fù)載的意思是,當(dāng)前所有的Server中,平均每臺(tái)機(jī)器上的活躍請(qǐng)求數(shù)
           double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            // 1. 如果Server斷路的比例超過(guò)了設(shè)置的上限(默認(rèn)`0.8`)
            // 2. 或者當(dāng)前負(fù)載超過(guò)了設(shè)置的負(fù)載上限
            // 3. 如果可用的服務(wù)小于設(shè)置的服務(wù)上限`默認(rèn)為2`
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }
        
   

}

具體判斷流程如下所示

image.png

3.4.2.2 Ribbon 的ServerListFilter實(shí)現(xiàn)2:ZonePreferenceServerListFilter

ZonePreferenceServerListFilter 集成自 ZoneAffinityServerListFilter,在此基礎(chǔ)上做了拓展,在 ZoneAffinityServerListFilter返回結(jié)果的基礎(chǔ)上,再過(guò)濾出和本地服務(wù)相同區(qū)域(zone)的服務(wù)列表。

核心邏輯-什么時(shí)候起作用?
當(dāng)指定了當(dāng)前服務(wù)的所在Zone,并且 ZoneAffinityServerListFilter 沒(méi)有起到過(guò)濾效果時(shí),ZonePreferenceServerListFilter會(huì)返回當(dāng)前Zone的Server列表。

public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {

    private String zone;

    @Override
    public void initWithNiwsConfig(IClientConfig niwsClientConfig) {
        super.initWithNiwsConfig(niwsClientConfig);
        if (ConfigurationManager.getDeploymentContext() != null) {
            this.zone = ConfigurationManager.getDeploymentContext().getValue(
                    ContextKey.zone);
        }
    }

    @Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        //父類的基礎(chǔ)上,獲取過(guò)濾結(jié)果
                List<Server> output = super.getFilteredListOfServers(servers);
        //沒(méi)有起到過(guò)濾效果,則進(jìn)行區(qū)域優(yōu)先過(guò)濾
                if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }

    public String getZone() {
        return zone;
    }

    public void setZone(String zone) {
        this.zone = zone;
    }
3.4.2.3 Ribbon 的ServerListFilter實(shí)現(xiàn)3:ServerListSubsetFilter

這個(gè)過(guò)濾器作用于當(dāng)Server數(shù)量列表特別龐大時(shí)(比如有上百個(gè)Server實(shí)例),這時(shí),長(zhǎng)時(shí)間保持Http鏈接也不太合適,可以適當(dāng)?shù)乇A舨糠址?wù),舍棄其中一些服務(wù),這樣可使釋放沒(méi)必要的鏈接。
此過(guò)濾器也是繼承自 ZoneAffinityServerListFilter,在此基礎(chǔ)上做了拓展,在實(shí)際使用中不太常見,這個(gè)后續(xù)再展開介紹,暫且不表。

3.4.3 LoadBalancer選擇服務(wù)實(shí)例 的流程

LoadBalancer的核心功能是根據(jù)負(fù)載情況,從服務(wù)列表中挑選最合適的服務(wù)實(shí)例。LoadBalancer內(nèi)部采用了如下圖所示的組件完成:

image.png

LoadBalancer 選擇服務(wù)實(shí)例的流程

  1. 通過(guò)ServerList獲取當(dāng)前可用的服務(wù)實(shí)例列表;
  2. 通過(guò)ServerListFilter將步驟1 得到的服務(wù)列表進(jìn)行一次過(guò)濾,返回滿足過(guò)濾器條件的服務(wù)實(shí)例列表;
  3. 應(yīng)用Rule規(guī)則,結(jié)合服務(wù)實(shí)例的統(tǒng)計(jì)信息,返回滿足規(guī)則的某一個(gè)服務(wù)實(shí)例;

通過(guò)上述的流程可以看到,實(shí)際上,在服務(wù)實(shí)例列表選擇的過(guò)程中,有兩次過(guò)濾的機(jī)會(huì):第一次是首先通過(guò)ServerListFilter過(guò)濾器,另外一次是用過(guò)IRule 的選擇規(guī)則進(jìn)行過(guò)濾

通過(guò)ServerListFilter進(jìn)行服務(wù)實(shí)例過(guò)濾的策略上面已經(jīng)介紹得比較詳細(xì)了,接下來(lái)將介紹Rule是如何從一堆服務(wù)列表中選擇服務(wù)的。
在介紹Rule之前,需要介紹一個(gè)概念:Server統(tǒng)計(jì)信息

當(dāng)LoadBalancer在選擇合適的Server提供給應(yīng)用后,應(yīng)用會(huì)向該Server發(fā)送服務(wù)請(qǐng)求,則在請(qǐng)求的過(guò)程中,應(yīng)用會(huì)根據(jù)請(qǐng)求的相應(yīng)時(shí)間或者網(wǎng)絡(luò)連接情況等進(jìn)行統(tǒng)計(jì);當(dāng)應(yīng)用后續(xù)從LoadBalancer選擇合適的Server時(shí),LoadBalancer 會(huì)根據(jù)每個(gè)服務(wù)的統(tǒng)計(jì)信息,結(jié)合Rule來(lái)判定哪個(gè)服務(wù)是最合適的。

3.4.3.1 負(fù)載均衡器LoaderBalancer 都統(tǒng)計(jì)了哪些關(guān)于服務(wù)實(shí)例Server相關(guān)的信息?
ServerStats 說(shuō)明 類型 默認(rèn)值
zone 當(dāng)前服務(wù)所屬的可用區(qū) 配置 可通過(guò) eureka.instance.meta.zone 指定
totalRequests 總請(qǐng)求數(shù)量,client每次調(diào)用,數(shù)量會(huì)遞增 實(shí)時(shí) 0
activeRequestsCountTimeout 活動(dòng)請(qǐng)求計(jì)數(shù)時(shí)間窗niws.loadbalancer.serverStats.activeRequestsCount.effectiveWindowSeconds,如果時(shí)間窗范圍之內(nèi)沒(méi)有activeRequestsCount值的改變,則activeRequestsCounts初始化為0 配置 60*10(seconds)
successiveConnectionFailureCount 連續(xù)連接失敗計(jì)數(shù) 實(shí)時(shí)
connectionFailureThreshold 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置 配置 3
circuitTrippedTimeoutFactor 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds 配置 10(seconds)
maxCircuitTrippedTimeout 最大斷路器超時(shí)秒數(shù),niws.loadbalancer.default.circuitTripMaxTimeoutSeconds 配置 30(seconds)
totalCircuitBreakerBlackOutPeriod 累計(jì)斷路器終端時(shí)間區(qū)間 實(shí)時(shí) milliseconds
lastAccessedTimestamp 最后連接時(shí)間 實(shí)時(shí)
lastConnectionFailedTimestamp 最后連接失敗時(shí)間 實(shí)時(shí)
firstConnectionTimestamp 首次連接時(shí)間 實(shí)時(shí)
activeRequestsCount 當(dāng)前活躍的連接數(shù) 實(shí)時(shí)
failureCountSlidingWindowInterval 失敗次數(shù)統(tǒng)計(jì)時(shí)間窗 配置 1000(ms)
serverFailureCounts 當(dāng)前時(shí)間窗內(nèi)連接失敗的數(shù)量 實(shí)時(shí)
responseTimeDist.mean 請(qǐng)求平均響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.max 請(qǐng)求最大響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.minimum 請(qǐng)求最小響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.minimum 請(qǐng)求最小響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.stddev 請(qǐng)求響應(yīng)時(shí)間標(biāo)準(zhǔn)差 實(shí)時(shí) (ms)
dataDist.sampleSize QoS服務(wù)質(zhì)量采集點(diǎn)大小 實(shí)時(shí)
dataDist.timestamp QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn) 實(shí)時(shí)
dataDist.timestampMillis QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn)毫秒數(shù),自1970.1.1開始 實(shí)時(shí)
dataDist.mean QoS 最近的時(shí)間窗內(nèi)的請(qǐng)求平均響應(yīng)時(shí)間 實(shí)時(shí)
dataDist.10thPercentile QoS 10% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.25thPercentile QoS 25% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.50thPercentile QoS 50% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.75thPercentile QoS 75% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.95thPercentile QoS 95% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.99thPercentile QoS 99% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.99.5thPercentile QoS 前99.5% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
3.4.3.2 服務(wù)斷路器的工作原理

當(dāng)有某個(gè)服務(wù)存在多個(gè)實(shí)例時(shí),在請(qǐng)求的過(guò)程中,負(fù)載均衡器會(huì)統(tǒng)計(jì)每次請(qǐng)求的情況(請(qǐng)求相應(yīng)時(shí)間,是否發(fā)生網(wǎng)絡(luò)異常等),當(dāng)出現(xiàn)了請(qǐng)求出現(xiàn)累計(jì)重試時(shí),負(fù)載均衡器會(huì)標(biāo)識(shí)當(dāng)前服務(wù)實(shí)例,設(shè)置當(dāng)前服務(wù)實(shí)例的斷路的時(shí)間區(qū)間,在此區(qū)間內(nèi),當(dāng)請(qǐng)求過(guò)來(lái)時(shí),負(fù)載均衡器會(huì)將此服務(wù)實(shí)例可用服務(wù)實(shí)例列表中暫時(shí)剔除,優(yōu)先選擇其他服務(wù)實(shí)例。

相關(guān)統(tǒng)計(jì)信息如下

ServerStats 說(shuō)明 類型 默認(rèn)值
successiveConnectionFailureCount 連續(xù)連接失敗計(jì)數(shù) 實(shí)時(shí)
connectionFailureThreshold 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置,當(dāng)successiveConnectionFailureCount 超過(guò)了此限制時(shí),將計(jì)算熔斷時(shí)間 配置 3
circuitTrippedTimeoutFactor 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds 配置 10(seconds)
maxCircuitTrippedTimeout 最大斷路器超時(shí)秒數(shù),niws.loadbalancer.default.circuitTripMaxTimeoutSeconds 配置 30(seconds)
totalCircuitBreakerBlackOutPeriod 累計(jì)斷路器終端時(shí)間區(qū)間 實(shí)時(shí) milliseconds
lastAccessedTimestamp 最后連接時(shí)間 實(shí)時(shí)
lastConnectionFailedTimestamp 最后連接失敗時(shí)間 實(shí)時(shí)
firstConnectionTimestamp 首次連接時(shí)間 實(shí)時(shí)
@Monitor(name="CircuitBreakerTripped", type = DataSourceType.INFORMATIONAL)    
    public boolean isCircuitBreakerTripped() {
        return isCircuitBreakerTripped(System.currentTimeMillis());
    }
    
    public boolean isCircuitBreakerTripped(long currentTime) {
        //斷路器熔斷的時(shí)間戳
        long circuitBreakerTimeout = getCircuitBreakerTimeout();
        if (circuitBreakerTimeout <= 0) {
            return false;
        }
        return circuitBreakerTimeout > currentTime;//還在熔斷區(qū)間內(nèi),則返回熔斷結(jié)果
    }
   //獲取熔斷超時(shí)時(shí)間
   private long getCircuitBreakerTimeout() {
        long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
        if (blackOutPeriod <= 0) {
            return 0;
        }
        return lastConnectionFailedTimestamp + blackOutPeriod;
    }
    //返回中斷毫秒數(shù)
    private long getCircuitBreakerBlackoutPeriod() {
        int failureCount = successiveConnectionFailureCount.get();
        int threshold = connectionFailureThreshold.get();
       // 連續(xù)失敗,但是尚未超過(guò)上限,則服務(wù)中斷周期為 0 ,表示可用 
       if (failureCount < threshold) {
            return 0;
        }
        //當(dāng)鏈接失敗超過(guò)閾值時(shí),將進(jìn)行熔斷,熔斷的時(shí)間間隔為:
        int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
        int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
        if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
            blackOutSeconds = maxCircuitTrippedTimeout.get();
        }
        return blackOutSeconds * 1000L;
    }

熔斷時(shí)間的計(jì)算

  1. 計(jì)算累計(jì)連接失敗計(jì)數(shù)successiveConnectionFailureCount 是否超過(guò) 鏈接失敗閾值connectionFailureThreshold。如果 successiveConnectionFailureCount < connectionFailureThreshold,即尚未超過(guò)限額,則熔斷時(shí)間為 0 ;反之,如果超過(guò)限額,則進(jìn)行步驟2的計(jì)算;
  2. 計(jì)算失敗基數(shù),最大不得超過(guò) 16:diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold)
  3. 根據(jù)超時(shí)因子circuitTrippedTimeoutFactor計(jì)算超時(shí)時(shí)間: int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
  4. 超時(shí)時(shí)間不得超過(guò)最大超時(shí)時(shí)間`maxCircuitTrippedTimeout 上線,

當(dāng)有鏈接失敗情況出現(xiàn)斷路邏輯時(shí),將會(huì)最多:1<<16 * 10 =320s、最少1<<1 * 10 =100s 的請(qǐng)求熔斷時(shí)間,再此期間內(nèi),此Server將會(huì)被忽略。
即:
熔斷時(shí)間最大值:1<<16 * 10 =320s
熔斷時(shí)間最小值:1<<1 * 10 =100s

熔斷統(tǒng)計(jì)何時(shí)清空?

熔斷的統(tǒng)計(jì)有自己的清除策略,當(dāng)如下幾種場(chǎng)景存在時(shí),熔斷統(tǒng)計(jì)會(huì)清空歸零:

  1. 當(dāng)請(qǐng)求時(shí),發(fā)生的異常不是斷路攔截類的異常(Exception)時(shí)(至于如何節(jié)點(diǎn)是否是斷路攔截類異常,可以自定義)
  2. 當(dāng)請(qǐng)求未發(fā)生異常,切且有結(jié)果返回時(shí)
3.4.3.3 定義IRule,從服務(wù)實(shí)例列表中,選擇最合適的Server實(shí)例
image.png

由上圖可見,IRule會(huì)從服務(wù)列表中,根據(jù)自身定義的規(guī)則,返回最合適的Server實(shí)例,其接口定義如下:

public interface IRule{
    /*
     * choose one alive server from lb.allServers or
     * lb.upServers according to key
     * 
     * @return choosen Server object. NULL is returned if none
     *  server is available 
     */

    public Server choose(Object key);
    
    public void setLoadBalancer(ILoadBalancer lb);
    
    public ILoadBalancer getLoadBalancer();    
}

Ribbon定義了一些常見的規(guī)則

實(shí)現(xiàn) 描述 備注
RoundRobinRule 通過(guò)輪詢的方式,選擇過(guò)程會(huì)有最多10次的重試機(jī)制
RandomRule 隨機(jī)方式,從列表中隨機(jī)挑選一個(gè)服務(wù)
ZoneAvoidanceRule 基于ZoneAvoidancePredicate斷言和AvailabilityPredicate斷言的規(guī)則。ZoneAvoidancePredicate計(jì)算出哪個(gè)Zone的服務(wù)最差,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉;而AvaliabitiyPredicate是過(guò)濾掉正處于熔斷狀態(tài)的服務(wù);上述兩個(gè)斷言過(guò)濾出來(lái)的結(jié)果后,再通過(guò)RoundRobin輪詢的方式從列表中挑選一個(gè)服務(wù)
BestAvailableRule 最優(yōu)匹配規(guī)則:從服務(wù)列表中給挑選出并發(fā)數(shù)最少的Server
RetryRule 采用了裝飾模式,為Rule提供了重試機(jī)制
WeightedResponseTimeRule 基于請(qǐng)求響應(yīng)時(shí)間加權(quán)計(jì)算的規(guī)則,如果此規(guī)則沒(méi)有生效,將采用 RoundRobinRule的的策略選擇服務(wù)實(shí)例
image.png
3.4.3.3.1 RoundRobinRule 的實(shí)現(xiàn)
public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        setLoadBalancer(lb);
    }

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        //10次重試機(jī)制
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
            // 生成輪詢數(shù)據(jù)
            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

    /**
     * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
     *
     * @param modulo The modulo to bound the value of the counter.
     * @return The next value.
     */
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }
}
3.4.3.3.2 ZoneAvoidanceRule的實(shí)現(xiàn)

ZoneAvoidanceRule的處理思路:

  1. ZoneAvoidancePredicate 計(jì)算出哪個(gè)Zone的服務(wù)最差,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉;
  2. AvailabilityPredicate 將處于熔斷狀態(tài)的服務(wù)剔除掉;
  3. 將上述兩步驟過(guò)濾后的服務(wù)通過(guò)RoundRobinRule挑選一個(gè)服務(wù)實(shí)例返回

ZoneAvoidancePredicate 剔除最差的Zone的過(guò)程:

public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                   //如果負(fù)載超過(guò)限額,則將用可用區(qū)中剔除出去
                  if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    //計(jì)算最差的Zone區(qū)域
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        // they are the same considering double calculation
                        // round error
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }
        // 如果最大負(fù)載沒(méi)有超過(guò)上限,則返回所有可用分區(qū)
        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        // 從最差的可用分區(qū)中隨機(jī)挑選一個(gè)剔除,這么做是保證服務(wù)的高可用
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

四. Ribbon的配置參數(shù)

控制參數(shù) 說(shuō)明 默認(rèn)值
<service-name>.ribbon.NFLoadBalancerPingInterval Ping定時(shí)任務(wù)周期 30 s
<service-name>.ribbon.NFLoadBalancerMaxTotalPingTime Ping超時(shí)時(shí)間 2s
<service-name>.ribbon.NFLoadBalancerRuleClassName IRule實(shí)現(xiàn)類 RoundRobinRule,基于輪詢調(diào)度算法規(guī)則選擇服務(wù)實(shí)例
<service-name>.ribbon.NFLoadBalancerPingClassName IPing實(shí)現(xiàn)類 DummyPing,直接返回true
<service-name>.ribbon.NFLoadBalancerClassName 負(fù)載均衡器實(shí)現(xiàn)類 2s
<service-name>.ribbon.NIWSServerListClassName ServerList實(shí)現(xiàn)類 ConfigurationBasedServerList,基于配置的服務(wù)列表
<service-name>.ribbon.ServerListUpdaterClassName 服務(wù)列表更新類 PollingServerListUpdater,
<service-name>.ribbon.NIWSServerListFilterClassName 服務(wù)實(shí)例過(guò)濾器 2s
<service-name>.ribbon.ServerListRefreshInterval 服務(wù)列表刷新頻率 2s
<service-name>.ribbon.NFLoadBalancerClassName 自定義負(fù)載均衡器實(shí)現(xiàn)類 2s
<service-name>.ribbon.NFLoadBalancerClassName 自定義負(fù)載均衡器實(shí)現(xiàn)類 2s
<service-name>.ribbon.NFLoadBalancerClassName 自定義負(fù)載均衡器實(shí)現(xiàn)類 2s

五. 結(jié)語(yǔ)

Ribbon是Spring Cloud框架中相當(dāng)核心的模塊,負(fù)責(zé)著服務(wù)負(fù)載調(diào)用,Ribbon也可以脫離SpringCloud單獨(dú)使用。
另外Ribbon是客戶端的負(fù)載均衡框架,即每個(gè)客戶端上,獨(dú)立維護(hù)著自身的調(diào)用信息統(tǒng)計(jì),相互隔離;也就是說(shuō):Ribbon的負(fù)載均衡表現(xiàn)在各個(gè)機(jī)器上變現(xiàn)并不完全一致
Ribbon 也是整個(gè)組件框架中最復(fù)雜的一環(huán),控制流程上為了保證服務(wù)的高可用性,有很多比較細(xì)節(jié)的參數(shù)控制,在使用的過(guò)程中,需要深入理清每個(gè)環(huán)節(jié)的處理機(jī)制,這樣在問(wèn)題定位上會(huì)高效很多。


亦山札記,聚焦微服務(wù)、中間件
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容