spring cloud ribbon學(xué)習(xí)三:負(fù)載均衡器ILoadBalancer接口及其實(shí)現(xiàn)

看負(fù)載均衡器這源碼,好繞,看的好累。

雖然Spring Cloud中定義了LoadBalancerClient作為負(fù)載均衡器的通用接口,并且針對Ribbon實(shí)現(xiàn)了RibbonLoadBalancerClient,但是它作為具體實(shí)現(xiàn)客戶端負(fù)載均衡時,是通過Ribbon的com.netflix.loadbalancer.ILoadBalancer接口實(shí)現(xiàn)的。

總結(jié)一下:
ILoadBalancer接口實(shí)現(xiàn)類做了以下的一些事情:
1.維護(hù)了存儲服務(wù)實(shí)例Server對象的二個列表。一個用于存儲所有服務(wù)實(shí)例的清單,一個用于存儲正常服務(wù)的實(shí)例清單
2.初始化得到可用的服務(wù)列表,啟動定時任務(wù)去實(shí)時的檢測服務(wù)列表中的服務(wù)的可用性,并且間斷性的去更新服務(wù)列表,結(jié)合注冊中心。
3.選擇可用的服務(wù)進(jìn)行調(diào)用(這個一般交給IRule去實(shí)現(xiàn),不同的輪詢策略)

三個很重要的概念

  • ServerList接口:定義用于獲取服務(wù)器列表的方法的接口,主要實(shí)現(xiàn)DomainExtractingServerList接口,每隔30s種執(zhí)行g(shù)etUpdatedListOfServers方法進(jìn)行服務(wù)列表的更新。
  • ServerListUpdater接口:主要實(shí)現(xiàn)類EurekaNotificationServerListUpdater和PollingServerListUpdater(默認(rèn)使用的是PollingServerListUpdater,結(jié)合Eureka注冊中心,定時任務(wù)的方式進(jìn)行服務(wù)列表的更新)
  • ServerListFilter接口:根據(jù)LoadBalancerStats然后根據(jù)一些規(guī)則去過濾部分服務(wù),比如根據(jù)zone(區(qū)域感知)去過濾。(主要實(shí)現(xiàn)類ZonePreferenceServerListFilter的getFilteredListOfServers會在更新服務(wù)列表的時候去執(zhí)行)。
ILoadBalancer及其實(shí)現(xiàn)

com.netflix.loadbalancer.AbstractLoadBalancer

AbstractLoadBalancer contains features required for most loadbalancing
implementations.
An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
that are potentially bucketed based on a specific criteria. 2. A Class that
defines and implements a LoadBalacing Strategy via IRule 3. A
Class that defines and implements a mechanism to determine the
suitability/availability of the nodes/servers in the List.
AbstractLoadBalancer包含大多數(shù)負(fù)載均衡實(shí)現(xiàn)的特征。
典型的LoadBalancer(負(fù)載均衡器)包括
1.一個基于某些特征的服務(wù)列表。
2.一個通過IRule定義和實(shí)現(xiàn)負(fù)載均衡戰(zhàn)略的類。
3.一個用來確定列表節(jié)點(diǎn)/服務(wù)是否可用的類。

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
public enum ServerGroup{
       
   //所有服務(wù)實(shí)例     
   ALL,
   //正常服務(wù)的實(shí)例
   STATUS_UP,
   //停止服務(wù)的實(shí)例
   STATUS_NOT_UP        
}
        
/**
 * 選擇具體的服務(wù)實(shí)例,key為null,忽略key的條件判斷
 */
public Server chooseServer() {
    return chooseServer(null);
}
/**
 * 定義了根據(jù)分組類型來獲取不同的服務(wù)實(shí)例的列表。
 */
 public abstract List<Server> getServerList(ServerGroup serverGroup);
    
/**
 * 定義了獲取LoadBalancerStats對象的方法,LoadBalancerStats對象被用來存儲負(fù)載均衡器中
 * 各個服務(wù)實(shí)例當(dāng)前的屬性和統(tǒng)計信息。這些信息非常有用,我們可以利用這些信息來觀察負(fù)載均衡
 * 的運(yùn)行情況,同時這些信息也是用來制定負(fù)載均衡策略的重要依據(jù)。
 */
public abstract LoadBalancerStats getLoadBalancerStats();
    
}

com.netflix.loadbalancer.BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer類是Ribbon負(fù)載均衡器的基礎(chǔ)實(shí)現(xiàn)類,在該類中定義了很多關(guān)于負(fù)載均衡器相關(guān)的基礎(chǔ)內(nèi)容。

定義并維護(hù)了兩種存儲服務(wù)實(shí)例Server對象的列表。一個用于存儲所有服務(wù)實(shí)例的清單,一個用于存儲正常服務(wù)的實(shí)例清單。

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
         .synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
         .synchronizedList(new ArrayList<Server>());
  • 定義了之前我們提到的用來存儲負(fù)載均衡實(shí)例屬性和統(tǒng)計信息的LoadBalancerStates對象。
  • 定義了檢查服務(wù)實(shí)例是否正常服務(wù)的IPing對象,在BaseLoadBalancer中默認(rèn)為null,需要在構(gòu)造時注入它的具體實(shí)現(xiàn)。
protected IPing ping = null;
  • 定義了檢查服務(wù)實(shí)例操作的執(zhí)行策略對象IPingStrategy,在BaseLoadBalancer中默認(rèn)使用了該類中定義的靜態(tài)內(nèi)部類SerialPingStrategy實(shí)現(xiàn)。根據(jù)源碼,我們可以看到該策略采用線性遍歷ping服務(wù)實(shí)例的方式實(shí)現(xiàn)檢查。該策略在當(dāng)IPing對象的實(shí)現(xiàn)速度不理想,或者是Server列表過大時,可能會影響到系統(tǒng)性能,這時候需要通過實(shí)現(xiàn)IPingStrategy接口并重寫pingServer(IPing ping Server[] servers)函數(shù)去擴(kuò)展ping的執(zhí)行策略。
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
SerialPingStrategy實(shí)現(xiàn)
  • 定義了負(fù)載均衡的處理原則IRule對象,從BaseLoadBalancerchooseServer(Object key)的實(shí)現(xiàn)源碼可以知道,負(fù)載均衡器實(shí)際將服務(wù)實(shí)例選擇的任務(wù)委托給IRule實(shí)例中的choose函數(shù)來實(shí)現(xiàn)。
    默認(rèn)初始化了RoundRobinRule實(shí)現(xiàn),RoundRobinRule實(shí)現(xiàn)了最基本且常用的線性負(fù)載均衡規(guī)則。
protected IRule rule = DEFAULT_RULE;
private final static IRule DEFAULT_RULE = new RoundRobinRule();
BaseLoadBalancer的chooseServer方法
  • 啟動ping任務(wù):在BaseLoadBalancer的默認(rèn)構(gòu)造函數(shù)中,會直接啟動一個用于定時檢查Server是否健康的任務(wù)。該任務(wù)默認(rèn)的執(zhí)行間隔式10s。
/**
  * Default constructor which sets name as "default", sets null ping, and
  * {@link RoundRobinRule} as the rule.
  * <p>
  * This constructor is mainly used by {@link ClientFactory}. Calling this
  * constructor must be followed by calling {@link #init()} or
  * {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
  * This constructor is provided for reflection. When constructing
  * programatically, it is recommended to use other constructors.
  */
public BaseLoadBalancer() {
     this.name = DEFAULT_NAME;
     this.ping = null;
     setRule(DEFAULT_RULE);
     setupPingTask();
     lbStats = new LoadBalancerStats(DEFAULT_NAME);
}

定時任務(wù)

void setupPingTask() {
   if (canSkipPing()) {
       return;
   }
   if (lbTimer != null) {
      lbTimer.cancel();
   }
   lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
         true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}
  • 實(shí)現(xiàn)了ILoadBalancer接口定義的一系列的基本操作:

addServers(List<Server> newServers):向負(fù)載均衡器中增加新的服務(wù)實(shí)例列表,該實(shí)現(xiàn)將原本已經(jīng)維護(hù)的所有服務(wù)實(shí)例清單allServerList和新傳入的服務(wù)實(shí)例清單newServers都加入了newList中,然后通過調(diào)用setServersList函數(shù)對newList進(jìn)行處理,在BaseLoadBalancer中實(shí)現(xiàn)的時候會使用新的列表覆蓋舊的列表。

    /**
     * Add a list of servers to the 'allServer' list; does not verify
     * uniqueness, so you could give a server a greater share by adding it more
     * than once
     */
    @Override
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("Exception while adding Servers", e);
            }
        }
    }

chooseServer(Object key) :挑選一個具體的服務(wù)實(shí)例,

  public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Throwable t) {
                return null;
            }
        }
    }

markServerDown(Server server):標(biāo)記某個服務(wù)實(shí)例為暫停服務(wù)。

public void markServerDown(Server server) {
   if (server == null) {
      return;
   }

   if (!server.isAlive()) {
       return;
   }

   logger.error("LoadBalancer:  markServerDown called on ["
        + server.getId() + "]");
   server.setAlive(false);
   //forceQuickPing();

   notifyServerStatusChangeListener(singleton(server));
}

getReachableServers():獲取可用的服務(wù)實(shí)例列表。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個正常服務(wù)的實(shí)例清單,所以直接返回即可

@Override
public List<Server> getReachableServers() {
   return Collections.unmodifiableList(upServerList);
}

getAllServers():獲取所有的服務(wù)實(shí)例列表。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個正常服務(wù)的實(shí)例清單,所以直接返回即可。

@Override
public List<Server> getAllServers() {
   return Collections.unmodifiableList(allServerList);
}

com.netflix.loadbalancer.DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer類繼承com.netflix.loadbalancer.BaseLoadBalancer類,它是對基礎(chǔ)負(fù)載均衡器的擴(kuò)展。
該負(fù)載均衡器中,實(shí)現(xiàn)了服務(wù)實(shí)例清單在運(yùn)行期的動態(tài)更新能力;同時,它還具備了對服務(wù)實(shí)例清單的過濾功能,也就是說,我們可以通過過濾器來選擇性的獲取一批服務(wù)實(shí)例清單。

ServerList
DynamicServerListLoadBalancer的成員定義中,我們馬上可以發(fā)現(xiàn)新增了一個關(guān)于服務(wù)列表的操作對象ServerList<T> serverListImpl。從類名DynamicServerListLoadBalancer<T extends Server>發(fā)現(xiàn)T泛型是Server子類,即代表了一個具體的服務(wù)實(shí)例的擴(kuò)展類,而ServerList接口定義如下:

volatile ServerList<T> serverListImpl;

ServerList接口定義如下

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {

    //用于獲取初始化的服務(wù)實(shí)例清單
    public List<T> getInitialListOfServers();
    
    //獲取更新的服務(wù)實(shí)例清單,每隔30s更新一次
    public List<T> getUpdatedListOfServers();   

}

其實(shí)現(xiàn)類:

DynamicServerListLoadBalancer中的ServerList默認(rèn)配置到底使用了哪些具體的實(shí)現(xiàn)呢?既然是該負(fù)載均衡器中實(shí)現(xiàn)服務(wù)實(shí)例的動態(tài)更新,那么勢必需要Ribbon訪問Eureka來獲取服務(wù)實(shí)例的能力,可以從Ribbon整合Eureka的包下去尋找,
org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration中的,

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
}

可以看出創(chuàng)建了DomainExtractingServerList實(shí)例,其內(nèi)部也維護(hù)了ServerList list,同時DomainExtractingServerList類中對getInitialListOfServersgetUpdatedListOfServers的具體實(shí)現(xiàn),其實(shí)是委托給內(nèi)部定義的ServerList<DiscoveryEnabledServer> list對象,而該對象是通過創(chuàng)建DiscoveryEnabledNIWSServerList實(shí)例傳遞進(jìn)去的

org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList的源碼:

那么DiscoveryEnabledNIWSServerList是如何實(shí)現(xiàn)這兩個服務(wù)實(shí)例獲取的呢?從DiscoveryEnabledNIWSServerList其源碼的私有方法obtainServersViaDiscovery通過服務(wù)發(fā)現(xiàn)機(jī)制來實(shí)現(xiàn)服務(wù)實(shí)例的獲取的,
com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList的obtainServersViaDiscovery方法,

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
}

EurekaClient eurekaClient = eurekaClientProvider.get();
        //vipAddresses可以理解為邏輯上的服務(wù)名,對這些服務(wù)名進(jìn)行遍歷,將狀態(tài)為UP(正常服務(wù))的實(shí)例轉(zhuǎn)換成DiscoveryEnabledServer對象
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        //將這些實(shí)例組織成列表返回。
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
}

ServerListUpdater
com.netflix.loadbalancer.DynamicServerListLoadBalancer負(fù)載均衡器使用不同的策略進(jìn)行列表更新的策略。

上面分析了如何從Eureka Server中獲取服務(wù)實(shí)例清單,那么它又是如何觸發(fā)向Eureka Server去獲取服務(wù)實(shí)例清單以及如何在獲取到服務(wù)實(shí)例清單后更新本地實(shí)例清單呢?

回到com.netflix.loadbalancer.DynamicServerListLoadBalancer

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            //實(shí)現(xiàn)對服務(wù)列表的更新
            updateListOfServers();
        }
    };
    
    protected volatile ServerListUpdater serverListUpdater;

updateListOfServers方法

     @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

ServerListUpdater的二種實(shí)現(xiàn):

ServerListUpdater的二種實(shí)現(xiàn)

PollingServerListUpdater:動態(tài)服務(wù)列表更新的默認(rèn)策略,也就是說,DynamicServerListLoadBalancer負(fù)載均衡器中的默認(rèn)實(shí)現(xiàn)就是它,它通過定時任務(wù)的方式進(jìn)行服務(wù)列表的更新。

EurekaNotificationServerListUpdater:該更新器也可以服務(wù)于DynamicServerListLoadBalancer負(fù)載均衡器,但是它的觸發(fā)機(jī)制與PollingServerListUpdater不同,它需要利用Eureka的事件監(jiān)聽來驅(qū)動服務(wù)列表的更新操作。

查看PollingServerListUpdater的實(shí)現(xiàn),

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;

    public PollingServerListUpdater() {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }

    public PollingServerListUpdater(IClientConfig clientConfig) {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
    }

    public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
        this.initialDelayMs = initialDelayMs;
        this.refreshIntervalMs = refreshIntervalMs;
    }

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

initialDelayMsrefreshIntervalMs的默認(rèn)定義是1000和30*1000,單位是毫秒。就是說,更新服務(wù)實(shí)例在初始化之后延遲1s后開始執(zhí)行,并以30s位周期重復(fù)執(zhí)行,還會記錄最后更新時間,是否存活等信息。

ServerListFilter

volatile ServerListFilter<T> filter;

ServerListFilter接口的定義:
This interface allows for filtering the configured or dynamically obtained List of candidate servers with desirable characteristics.
該接口允許用配置或動態(tài)獲取的具有所需特性的候選服務(wù)器列表進(jìn)行過濾。

public interface ServerListFilter<T extends Server> {

    public List<T> getFilteredListOfServers(List<T> servers);

}
ServerListFilter接口的實(shí)現(xiàn)

其中,除了org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter的實(shí)現(xiàn)是Spring Cloud Ribbon中對Netflix Ribbon的擴(kuò)展實(shí)現(xiàn)外,其他均是Netflix Ribbon中的原生實(shí)現(xiàn)類,

  • com.netflix.loadbalancer.AbstractServerListFilter:這是一個抽象的接口,接收一個重要的依據(jù)對象LoadBalancerStats
AbstractServerListFilter
  • com.netflix.loadbalancer.ZoneAffinityServerListFilter:該過濾器基于"區(qū)域感知(Zone Affinity)"的方式實(shí)現(xiàn)服務(wù)實(shí)例的過濾,也就說,它會根據(jù)提供服務(wù)的實(shí)例所處于的區(qū)域(Zone)與消費(fèi)者自身所處區(qū)域(Zone)進(jìn)行比較,過濾掉那些不是同處一個區(qū)域的實(shí)例
@Override
    public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
}
  • com.netflix.niws.loadbalancer.DefaultNIWSServerListFilter
    該過濾器完全繼承自ZoneAffinityServerListFilter,是默認(rèn)的NIWS(Netfilx Internal Web Service)過濾器。

  • com.netflix.loadbalancer.ServerListSubsetFilter
    該過濾器也繼承自ZoneAffinityServerListFilter,它非常適用于擁有大規(guī)模服務(wù)器集群(上百或者更多)的系統(tǒng)。因為它可以產(chǎn)生一個“區(qū)域感知”

  • org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter
    spring cloud整合時新增的過濾器,若使用Spring Cloud整合EurekaRibbon時會默認(rèn)使用該過濾器,它實(shí)現(xiàn)了通過配置或者Eureka實(shí)例元數(shù)據(jù)的所屬區(qū)域(Zone)來過濾同區(qū)域的服務(wù)實(shí)例,它的實(shí)現(xiàn)非常簡單,首先通過ZoneAffinityServerListFilter的過濾器來獲得"區(qū)域感知"的服務(wù)實(shí)例列表,然后遍歷這個結(jié)果,取出根據(jù)消費(fèi)者配置預(yù)設(shè)的區(qū)域Zone來進(jìn)行過濾,如果過濾的結(jié)果是空就直接返回父類的結(jié)果,如果不為空就返回通過消費(fèi)者的Zone過濾后的結(jié)果。

@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<Server>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
}

com.netflix.loadbalancer.ZoneAwareLoadBalancer

ZoneAwareLoadBalancer負(fù)載均衡器是對DynamicServerListLoadBalancer的擴(kuò)展。在DynamicServerListLoadBalancer中,我們可以看到它并沒有重寫選擇具體服務(wù)實(shí)例的chooseServer函數(shù),所以它依然會采用在BaseLoadBalancer中實(shí)現(xiàn)的算法。使用RoundRobinRule規(guī)則,以線性輪詢的方式來選擇調(diào)用的服務(wù)實(shí)例,該算法實(shí)現(xiàn)簡單并沒有區(qū)域(Zone)的概念,所以它會把所有實(shí)例視為一個Zone下的節(jié)點(diǎn)來看待,這樣就會周期性的跨區(qū)域(Zone)訪問的情況,由于跨區(qū)域會產(chǎn)生更高的延遲,這些實(shí)例主要以防止區(qū)域性故障實(shí)現(xiàn)高可用為目的而不能作為常規(guī)訪問的實(shí)例,所以在多區(qū)域部署的情況會出現(xiàn)一定的性能問題,而該負(fù)載均衡器則可以規(guī)避這樣的問題。

ZoneAwareLoadBalancer中,并沒有重寫setServersList方法,說明實(shí)現(xiàn)服務(wù)實(shí)例清單的更新的主要邏輯沒有變化。但是重寫了setServerListForZones方法,DynamicServerListLoadBalancer中的定義:

DynamicServerListLoadBalancer

setServerListForZones函數(shù)的調(diào)用位于更新服務(wù)實(shí)例清單函數(shù)setServersList最后,根據(jù)區(qū)域Zone分組的實(shí)例列表,為負(fù)載均衡器中的LoadBalancerStats對象創(chuàng)建ZoneStats并放入Map zoneServersMap集合中,每一個區(qū)域Zone對應(yīng)一個ZoneStats,它用于存儲每個Zone的一些狀態(tài)和統(tǒng)計信息。

ZoneAwareLoadBalancer中對setServerListForZones重寫如下:

@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);,它存儲每個Zone區(qū)域?qū)?yīng)的負(fù)載均衡器。
        if (balancers == null) {
            //創(chuàng)建一個ConcurrentHashMap類型的balancers對象
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
        //具體的負(fù)載均衡器在getLoadBalancer函數(shù)中完成,同時在創(chuàng)建負(fù)載均衡器的時候會創(chuàng)建它的規(guī)則
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
              //創(chuàng)建完負(fù)載均衡器的時候會馬上調(diào)用setServersList函數(shù)為其設(shè)置對應(yīng)Zone區(qū)域的實(shí)例清單
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        //對Zone區(qū)域中實(shí)例清單的檢查,看看是否有Zone區(qū)域的實(shí)例清單已經(jīng)沒有實(shí)例了,是的話就將balancers中對應(yīng)的Xone區(qū)域的實(shí)例列表清空,該操作
        //的作用是為了后續(xù)選擇節(jié)點(diǎn)時,防止過多的Zone區(qū)域統(tǒng)計信息干擾具體實(shí)例的選擇算法
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }    

getLoadBalancer方法

    //如果當(dāng)前實(shí)現(xiàn)中沒有IRule的實(shí)例,就創(chuàng)建一個AvailabilityFilteringRule規(guī)則,如果有實(shí)現(xiàn)克隆一個
    @VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

在了解了負(fù)載均衡器如何擴(kuò)展服務(wù)實(shí)例清單的時候,看其怎樣挑選服務(wù)實(shí)例,來實(shí)現(xiàn)對區(qū)域的識別的,

@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        //只有當(dāng)負(fù)載均衡器中維護(hù)的實(shí)例所屬的Zone區(qū)域的個數(shù)大于1的時候才會執(zhí)行這里的策略
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            //調(diào)用ZoneAvoidanceRule.createSnapshot方法,當(dāng)前的負(fù)載均衡器中所有的Zone區(qū)域分布創(chuàng)建快照,保存在Map zoneSnapshot中
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            //調(diào)用ZoneAvoidanceRule.getAvailableZones方法,來獲取可用Zone區(qū)域集合,在該函數(shù)中會通過Zone區(qū)域快照的統(tǒng)計數(shù)據(jù)實(shí)現(xiàn)可用區(qū)的挑選。
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            //當(dāng)獲得的可用Zone區(qū)域集合不為空,并且個數(shù)小于Zone區(qū)域總數(shù),就隨機(jī)選擇一個Zone區(qū)域
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                //在確定了某個Zone區(qū)域后,則獲取了對應(yīng)Zone區(qū)域服務(wù)均衡器,并調(diào)用zoneLoadBalancer.chooseServer來選擇具體的服務(wù)實(shí)例,而在
                //zoneLoadBalancer.chooseServer中將使用IRule接口的choose函數(shù)來選擇具體的服務(wù)實(shí)例,在這里,IRule接口的實(shí)現(xiàn)會使用ZoneAvoidanceRule來挑選具體的服務(wù)實(shí)例。
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Throwable e) {
            logger.error("Unexpected exception when choosing server using zone aware logic", e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            //否則實(shí)現(xiàn)父類的策略
            return super.chooseServer(key);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容