看負(fù)載均衡器這源碼,好繞,看的好累。
雖然Spring Cloud中定義了LoadBalancerClient作為負(fù)載均衡器的通用接口,并且針對Ribbon實(shí)現(xiàn)了RibbonLoadBalancerClient,但是它作為具體實(shí)現(xiàn)客戶端負(fù)載均衡時,是通過Ribbon的com.netflix.loadbalancer.ILoadBalancer接口實(shí)現(xiàn)的。
總結(jié)一下:
ILoadBalancer接口實(shí)現(xiàn)類做了以下的一些事情:
1.維護(hù)了存儲服務(wù)實(shí)例Server對象的二個列表。一個用于存儲所有服務(wù)實(shí)例的清單,一個用于存儲正常服務(wù)的實(shí)例清單
2.初始化得到可用的服務(wù)列表,啟動定時任務(wù)去實(shí)時的檢測服務(wù)列表中的服務(wù)的可用性,并且間斷性的去更新服務(wù)列表,結(jié)合注冊中心。
3.選擇可用的服務(wù)進(jìn)行調(diào)用(這個一般交給IRule去實(shí)現(xiàn),不同的輪詢策略)三個很重要的概念
- ServerList接口:定義用于獲取服務(wù)器列表的方法的接口,主要實(shí)現(xiàn)DomainExtractingServerList接口,每隔30s種執(zhí)行g(shù)etUpdatedListOfServers方法進(jìn)行服務(wù)列表的更新。
- ServerListUpdater接口:主要實(shí)現(xiàn)類EurekaNotificationServerListUpdater和PollingServerListUpdater(默認(rèn)使用的是PollingServerListUpdater,結(jié)合Eureka注冊中心,定時任務(wù)的方式進(jìn)行服務(wù)列表的更新)
- ServerListFilter接口:根據(jù)LoadBalancerStats然后根據(jù)一些規(guī)則去過濾部分服務(wù),比如根據(jù)zone(區(qū)域感知)去過濾。(主要實(shí)現(xiàn)類ZonePreferenceServerListFilter的getFilteredListOfServers會在更新服務(wù)列表的時候去執(zhí)行)。

com.netflix.loadbalancer.AbstractLoadBalancer
AbstractLoadBalancer contains features required for most loadbalancing
implementations.
An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
that are potentially bucketed based on a specific criteria. 2. A Class that
defines and implements a LoadBalacing Strategy via IRule 3. A
Class that defines and implements a mechanism to determine the
suitability/availability of the nodes/servers in the List.
AbstractLoadBalancer包含大多數(shù)負(fù)載均衡實(shí)現(xiàn)的特征。
典型的LoadBalancer(負(fù)載均衡器)包括
1.一個基于某些特征的服務(wù)列表。
2.一個通過IRule定義和實(shí)現(xiàn)負(fù)載均衡戰(zhàn)略的類。
3.一個用來確定列表節(jié)點(diǎn)/服務(wù)是否可用的類。
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
//所有服務(wù)實(shí)例
ALL,
//正常服務(wù)的實(shí)例
STATUS_UP,
//停止服務(wù)的實(shí)例
STATUS_NOT_UP
}
/**
* 選擇具體的服務(wù)實(shí)例,key為null,忽略key的條件判斷
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* 定義了根據(jù)分組類型來獲取不同的服務(wù)實(shí)例的列表。
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* 定義了獲取LoadBalancerStats對象的方法,LoadBalancerStats對象被用來存儲負(fù)載均衡器中
* 各個服務(wù)實(shí)例當(dāng)前的屬性和統(tǒng)計信息。這些信息非常有用,我們可以利用這些信息來觀察負(fù)載均衡
* 的運(yùn)行情況,同時這些信息也是用來制定負(fù)載均衡策略的重要依據(jù)。
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
com.netflix.loadbalancer.BaseLoadBalancer
com.netflix.loadbalancer.BaseLoadBalancer類是Ribbon負(fù)載均衡器的基礎(chǔ)實(shí)現(xiàn)類,在該類中定義了很多關(guān)于負(fù)載均衡器相關(guān)的基礎(chǔ)內(nèi)容。
定義并維護(hù)了兩種存儲服務(wù)實(shí)例Server對象的列表。一個用于存儲所有服務(wù)實(shí)例的清單,一個用于存儲正常服務(wù)的實(shí)例清單。
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
- 定義了之前我們提到的用來存儲負(fù)載均衡實(shí)例屬性和統(tǒng)計信息的
LoadBalancerStates對象。 - 定義了檢查服務(wù)實(shí)例是否正常服務(wù)的
IPing對象,在BaseLoadBalancer中默認(rèn)為null,需要在構(gòu)造時注入它的具體實(shí)現(xiàn)。
protected IPing ping = null;
- 定義了檢查服務(wù)實(shí)例操作的執(zhí)行策略對象
IPingStrategy,在BaseLoadBalancer中默認(rèn)使用了該類中定義的靜態(tài)內(nèi)部類SerialPingStrategy實(shí)現(xiàn)。根據(jù)源碼,我們可以看到該策略采用線性遍歷ping服務(wù)實(shí)例的方式實(shí)現(xiàn)檢查。該策略在當(dāng)IPing對象的實(shí)現(xiàn)速度不理想,或者是Server列表過大時,可能會影響到系統(tǒng)性能,這時候需要通過實(shí)現(xiàn)IPingStrategy接口并重寫pingServer(IPing ping Server[] servers)函數(shù)去擴(kuò)展ping的執(zhí)行策略。
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();

- 定義了負(fù)載均衡的處理原則
IRule對象,從BaseLoadBalancer中chooseServer(Object key)的實(shí)現(xiàn)源碼可以知道,負(fù)載均衡器實(shí)際將服務(wù)實(shí)例選擇的任務(wù)委托給IRule實(shí)例中的choose函數(shù)來實(shí)現(xiàn)。
默認(rèn)初始化了RoundRobinRule實(shí)現(xiàn),RoundRobinRule實(shí)現(xiàn)了最基本且常用的線性負(fù)載均衡規(guī)則。
protected IRule rule = DEFAULT_RULE;
private final static IRule DEFAULT_RULE = new RoundRobinRule();

- 啟動ping任務(wù):在
BaseLoadBalancer的默認(rèn)構(gòu)造函數(shù)中,會直接啟動一個用于定時檢查Server是否健康的任務(wù)。該任務(wù)默認(rèn)的執(zhí)行間隔式10s。
/**
* Default constructor which sets name as "default", sets null ping, and
* {@link RoundRobinRule} as the rule.
* <p>
* This constructor is mainly used by {@link ClientFactory}. Calling this
* constructor must be followed by calling {@link #init()} or
* {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
* This constructor is provided for reflection. When constructing
* programatically, it is recommended to use other constructors.
*/
public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
定時任務(wù)
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
- 實(shí)現(xiàn)了
ILoadBalancer接口定義的一系列的基本操作:
addServers(List<Server> newServers):向負(fù)載均衡器中增加新的服務(wù)實(shí)例列表,該實(shí)現(xiàn)將原本已經(jīng)維護(hù)的所有服務(wù)實(shí)例清單allServerList和新傳入的服務(wù)實(shí)例清單newServers都加入了newList中,然后通過調(diào)用setServersList函數(shù)對newList進(jìn)行處理,在BaseLoadBalancer中實(shí)現(xiàn)的時候會使用新的列表覆蓋舊的列表。
/**
* Add a list of servers to the 'allServer' list; does not verify
* uniqueness, so you could give a server a greater share by adding it more
* than once
*/
@Override
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("Exception while adding Servers", e);
}
}
}
chooseServer(Object key) :挑選一個具體的服務(wù)實(shí)例,
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Throwable t) {
return null;
}
}
}
markServerDown(Server server):標(biāo)記某個服務(wù)實(shí)例為暫停服務(wù)。
public void markServerDown(Server server) {
if (server == null) {
return;
}
if (!server.isAlive()) {
return;
}
logger.error("LoadBalancer: markServerDown called on ["
+ server.getId() + "]");
server.setAlive(false);
//forceQuickPing();
notifyServerStatusChangeListener(singleton(server));
}
getReachableServers():獲取可用的服務(wù)實(shí)例列表。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個正常服務(wù)的實(shí)例清單,所以直接返回即可
@Override
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
getAllServers():獲取所有的服務(wù)實(shí)例列表。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個正常服務(wù)的實(shí)例清單,所以直接返回即可。
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
com.netflix.loadbalancer.DynamicServerListLoadBalancer
com.netflix.loadbalancer.DynamicServerListLoadBalancer類繼承com.netflix.loadbalancer.BaseLoadBalancer類,它是對基礎(chǔ)負(fù)載均衡器的擴(kuò)展。
該負(fù)載均衡器中,實(shí)現(xiàn)了服務(wù)實(shí)例清單在運(yùn)行期的動態(tài)更新能力;同時,它還具備了對服務(wù)實(shí)例清單的過濾功能,也就是說,我們可以通過過濾器來選擇性的獲取一批服務(wù)實(shí)例清單。
ServerList
從DynamicServerListLoadBalancer的成員定義中,我們馬上可以發(fā)現(xiàn)新增了一個關(guān)于服務(wù)列表的操作對象ServerList<T> serverListImpl。從類名DynamicServerListLoadBalancer<T extends Server>發(fā)現(xiàn)T泛型是Server子類,即代表了一個具體的服務(wù)實(shí)例的擴(kuò)展類,而ServerList接口定義如下:
volatile ServerList<T> serverListImpl;
ServerList接口定義如下
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
//用于獲取初始化的服務(wù)實(shí)例清單
public List<T> getInitialListOfServers();
//獲取更新的服務(wù)實(shí)例清單,每隔30s更新一次
public List<T> getUpdatedListOfServers();
}
其實(shí)現(xiàn)類:

DynamicServerListLoadBalancer中的ServerList默認(rèn)配置到底使用了哪些具體的實(shí)現(xiàn)呢?既然是該負(fù)載均衡器中實(shí)現(xiàn)服務(wù)實(shí)例的動態(tài)更新,那么勢必需要Ribbon訪問Eureka來獲取服務(wù)實(shí)例的能力,可以從Ribbon整合Eureka的包下去尋找,
org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration中的,
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
可以看出創(chuàng)建了DomainExtractingServerList實(shí)例,其內(nèi)部也維護(hù)了ServerList list,同時DomainExtractingServerList類中對getInitialListOfServers和getUpdatedListOfServers的具體實(shí)現(xiàn),其實(shí)是委托給內(nèi)部定義的ServerList<DiscoveryEnabledServer> list對象,而該對象是通過創(chuàng)建DiscoveryEnabledNIWSServerList實(shí)例傳遞進(jìn)去的
org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList的源碼:

那么DiscoveryEnabledNIWSServerList是如何實(shí)現(xiàn)這兩個服務(wù)實(shí)例獲取的呢?從DiscoveryEnabledNIWSServerList其源碼的私有方法obtainServersViaDiscovery通過服務(wù)發(fā)現(xiàn)機(jī)制來實(shí)現(xiàn)服務(wù)實(shí)例的獲取的,
com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList的obtainServersViaDiscovery方法,
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
//vipAddresses可以理解為邏輯上的服務(wù)名,對這些服務(wù)名進(jìn)行遍歷,將狀態(tài)為UP(正常服務(wù))的實(shí)例轉(zhuǎn)換成DiscoveryEnabledServer對象
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
//將這些實(shí)例組織成列表返回。
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
ServerListUpdater
com.netflix.loadbalancer.DynamicServerListLoadBalancer負(fù)載均衡器使用不同的策略進(jìn)行列表更新的策略。
上面分析了如何從Eureka Server中獲取服務(wù)實(shí)例清單,那么它又是如何觸發(fā)向Eureka Server去獲取服務(wù)實(shí)例清單以及如何在獲取到服務(wù)實(shí)例清單后更新本地實(shí)例清單呢?
回到com.netflix.loadbalancer.DynamicServerListLoadBalancer
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
//實(shí)現(xiàn)對服務(wù)列表的更新
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
updateListOfServers方法
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
ServerListUpdater的二種實(shí)現(xiàn):

PollingServerListUpdater:動態(tài)服務(wù)列表更新的默認(rèn)策略,也就是說,DynamicServerListLoadBalancer負(fù)載均衡器中的默認(rèn)實(shí)現(xiàn)就是它,它通過定時任務(wù)的方式進(jìn)行服務(wù)列表的更新。
EurekaNotificationServerListUpdater:該更新器也可以服務(wù)于DynamicServerListLoadBalancer負(fù)載均衡器,但是它的觸發(fā)機(jī)制與PollingServerListUpdater不同,它需要利用Eureka的事件監(jiān)聽來驅(qū)動服務(wù)列表的更新操作。
查看PollingServerListUpdater的實(shí)現(xiàn),
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
public PollingServerListUpdater() {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
public PollingServerListUpdater(IClientConfig clientConfig) {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
this.initialDelayMs = initialDelayMs;
this.refreshIntervalMs = refreshIntervalMs;
}
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
initialDelayMs和refreshIntervalMs的默認(rèn)定義是1000和30*1000,單位是毫秒。就是說,更新服務(wù)實(shí)例在初始化之后延遲1s后開始執(zhí)行,并以30s位周期重復(fù)執(zhí)行,還會記錄最后更新時間,是否存活等信息。
ServerListFilter
volatile ServerListFilter<T> filter;
ServerListFilter接口的定義:
This interface allows for filtering the configured or dynamically obtained List of candidate servers with desirable characteristics.
該接口允許用配置或動態(tài)獲取的具有所需特性的候選服務(wù)器列表進(jìn)行過濾。
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}

其中,除了org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter的實(shí)現(xiàn)是Spring Cloud Ribbon中對Netflix Ribbon的擴(kuò)展實(shí)現(xiàn)外,其他均是Netflix Ribbon中的原生實(shí)現(xiàn)類,
-
com.netflix.loadbalancer.AbstractServerListFilter:這是一個抽象的接口,接收一個重要的依據(jù)對象LoadBalancerStats

-
com.netflix.loadbalancer.ZoneAffinityServerListFilter:該過濾器基于"區(qū)域感知(Zone Affinity)"的方式實(shí)現(xiàn)服務(wù)實(shí)例的過濾,也就說,它會根據(jù)提供服務(wù)的實(shí)例所處于的區(qū)域(Zone)與消費(fèi)者自身所處區(qū)域(Zone)進(jìn)行比較,過濾掉那些不是同處一個區(qū)域的實(shí)例
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
com.netflix.niws.loadbalancer.DefaultNIWSServerListFilter:
該過濾器完全繼承自ZoneAffinityServerListFilter,是默認(rèn)的NIWS(Netfilx Internal Web Service)過濾器。com.netflix.loadbalancer.ServerListSubsetFilter:
該過濾器也繼承自ZoneAffinityServerListFilter,它非常適用于擁有大規(guī)模服務(wù)器集群(上百或者更多)的系統(tǒng)。因為它可以產(chǎn)生一個“區(qū)域感知”org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter:
spring cloud整合時新增的過濾器,若使用Spring Cloud整合Eureka和Ribbon時會默認(rèn)使用該過濾器,它實(shí)現(xiàn)了通過配置或者Eureka實(shí)例元數(shù)據(jù)的所屬區(qū)域(Zone)來過濾同區(qū)域的服務(wù)實(shí)例,它的實(shí)現(xiàn)非常簡單,首先通過ZoneAffinityServerListFilter的過濾器來獲得"區(qū)域感知"的服務(wù)實(shí)例列表,然后遍歷這個結(jié)果,取出根據(jù)消費(fèi)者配置預(yù)設(shè)的區(qū)域Zone來進(jìn)行過濾,如果過濾的結(jié)果是空就直接返回父類的結(jié)果,如果不為空就返回通過消費(fèi)者的Zone過濾后的結(jié)果。
@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
List<Server> output = super.getFilteredListOfServers(servers);
if (this.zone != null && output.size() == servers.size()) {
List<Server> local = new ArrayList<Server>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}
com.netflix.loadbalancer.ZoneAwareLoadBalancer
ZoneAwareLoadBalancer負(fù)載均衡器是對DynamicServerListLoadBalancer的擴(kuò)展。在DynamicServerListLoadBalancer中,我們可以看到它并沒有重寫選擇具體服務(wù)實(shí)例的chooseServer函數(shù),所以它依然會采用在BaseLoadBalancer中實(shí)現(xiàn)的算法。使用RoundRobinRule規(guī)則,以線性輪詢的方式來選擇調(diào)用的服務(wù)實(shí)例,該算法實(shí)現(xiàn)簡單并沒有區(qū)域(Zone)的概念,所以它會把所有實(shí)例視為一個Zone下的節(jié)點(diǎn)來看待,這樣就會周期性的跨區(qū)域(Zone)訪問的情況,由于跨區(qū)域會產(chǎn)生更高的延遲,這些實(shí)例主要以防止區(qū)域性故障實(shí)現(xiàn)高可用為目的而不能作為常規(guī)訪問的實(shí)例,所以在多區(qū)域部署的情況會出現(xiàn)一定的性能問題,而該負(fù)載均衡器則可以規(guī)避這樣的問題。
在ZoneAwareLoadBalancer中,并沒有重寫setServersList方法,說明實(shí)現(xiàn)服務(wù)實(shí)例清單的更新的主要邏輯沒有變化。但是重寫了setServerListForZones方法,DynamicServerListLoadBalancer中的定義:

setServerListForZones函數(shù)的調(diào)用位于更新服務(wù)實(shí)例清單函數(shù)setServersList最后,根據(jù)區(qū)域Zone分組的實(shí)例列表,為負(fù)載均衡器中的LoadBalancerStats對象創(chuàng)建ZoneStats并放入Map zoneServersMap集合中,每一個區(qū)域Zone對應(yīng)一個ZoneStats,它用于存儲每個Zone的一些狀態(tài)和統(tǒng)計信息。
在ZoneAwareLoadBalancer中對setServerListForZones重寫如下:
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);,它存儲每個Zone區(qū)域?qū)?yīng)的負(fù)載均衡器。
if (balancers == null) {
//創(chuàng)建一個ConcurrentHashMap類型的balancers對象
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
//具體的負(fù)載均衡器在getLoadBalancer函數(shù)中完成,同時在創(chuàng)建負(fù)載均衡器的時候會創(chuàng)建它的規(guī)則
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
//創(chuàng)建完負(fù)載均衡器的時候會馬上調(diào)用setServersList函數(shù)為其設(shè)置對應(yīng)Zone區(qū)域的實(shí)例清單
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
//對Zone區(qū)域中實(shí)例清單的檢查,看看是否有Zone區(qū)域的實(shí)例清單已經(jīng)沒有實(shí)例了,是的話就將balancers中對應(yīng)的Xone區(qū)域的實(shí)例列表清空,該操作
//的作用是為了后續(xù)選擇節(jié)點(diǎn)時,防止過多的Zone區(qū)域統(tǒng)計信息干擾具體實(shí)例的選擇算法
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
getLoadBalancer方法
//如果當(dāng)前實(shí)現(xiàn)中沒有IRule的實(shí)例,就創(chuàng)建一個AvailabilityFilteringRule規(guī)則,如果有實(shí)現(xiàn)克隆一個
@VisibleForTesting
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
return loadBalancer;
}
在了解了負(fù)載均衡器如何擴(kuò)展服務(wù)實(shí)例清單的時候,看其怎樣挑選服務(wù)實(shí)例,來實(shí)現(xiàn)對區(qū)域的識別的,
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
//只有當(dāng)負(fù)載均衡器中維護(hù)的實(shí)例所屬的Zone區(qū)域的個數(shù)大于1的時候才會執(zhí)行這里的策略
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//調(diào)用ZoneAvoidanceRule.createSnapshot方法,當(dāng)前的負(fù)載均衡器中所有的Zone區(qū)域分布創(chuàng)建快照,保存在Map zoneSnapshot中
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//調(diào)用ZoneAvoidanceRule.getAvailableZones方法,來獲取可用Zone區(qū)域集合,在該函數(shù)中會通過Zone區(qū)域快照的統(tǒng)計數(shù)據(jù)實(shí)現(xiàn)可用區(qū)的挑選。
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
//當(dāng)獲得的可用Zone區(qū)域集合不為空,并且個數(shù)小于Zone區(qū)域總數(shù),就隨機(jī)選擇一個Zone區(qū)域
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//在確定了某個Zone區(qū)域后,則獲取了對應(yīng)Zone區(qū)域服務(wù)均衡器,并調(diào)用zoneLoadBalancer.chooseServer來選擇具體的服務(wù)實(shí)例,而在
//zoneLoadBalancer.chooseServer中將使用IRule接口的choose函數(shù)來選擇具體的服務(wù)實(shí)例,在這里,IRule接口的實(shí)現(xiàn)會使用ZoneAvoidanceRule來挑選具體的服務(wù)實(shí)例。
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Throwable e) {
logger.error("Unexpected exception when choosing server using zone aware logic", e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
//否則實(shí)現(xiàn)父類的策略
return super.chooseServer(key);
}
}