本篇博客只是分析一下基本的服裝均衡策略,com.netflix.loadbalancer.IRule接口的一些實(shí)現(xiàn)。
負(fù)載均衡策略
負(fù)載均衡接口com.netflix.loadbalancer.IRule

com.netflix.loadbalancer.AbstractLoadBalancerRule
負(fù)載均衡策略的抽象類,在該抽象類中定義了負(fù)載均衡器ILoadBalancer對(duì)象,該對(duì)象能夠在具體實(shí)現(xiàn)選擇服務(wù)策略時(shí),獲取到一些負(fù)載均衡器中一些維護(hù)的信息來作為分配的依據(jù),并以此設(shè)計(jì)一些算法來實(shí)現(xiàn)針對(duì)特定場(chǎng)景的高效率策略。
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
com.netflix.loadbalancer.RandomRule
該策略實(shí)現(xiàn)了從服務(wù)清單中隨機(jī)選擇一個(gè)服務(wù)實(shí)例的功能。
節(jié)選了部分源碼,也是通過負(fù)載均衡器:

com.netflix.loadbalancer.RoundRobinRule
該策略實(shí)現(xiàn)按照線性輪詢的方式依次選擇實(shí)例的功能。它的具體實(shí)現(xiàn)如下,在循環(huán)中增加了一個(gè)count計(jì)數(shù)變量,該變量會(huì)在每次輪詢之后累加,如果輪詢次數(shù)Server超過10次,選擇不到實(shí)例的話,會(huì)報(bào)警告信息。

com.netflix.loadbalancer.RetryRule
該策略實(shí)現(xiàn)了一個(gè)具備重試機(jī)制的實(shí)例選擇功能。具有RoundRobinRule實(shí)例的一個(gè)引用。也就是在一段時(shí)間內(nèi)通過RoundRobinRule選擇服務(wù)實(shí)例,一段時(shí)間內(nèi)沒有選擇出服務(wù)則線程終止。

com.netflix.loadbalancer.WeightedResponseTimeRule
該策略是對(duì)RoundRobinRule的擴(kuò)展,增加了根據(jù)實(shí)例的運(yùn)行情況來計(jì)算權(quán)重,并根據(jù)權(quán)重來挑選實(shí)例,以達(dá)到更優(yōu)的分配效果,增加了三個(gè)核心的內(nèi)容:
會(huì)在啟動(dòng)的時(shí)候啟動(dòng)一個(gè)定時(shí)任務(wù),去計(jì)算每個(gè)實(shí)例的權(quán)重,默認(rèn)30s執(zhí)行一次
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
具體計(jì)算權(quán)重的算法
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
//具體計(jì)算權(quán)重的方法
serverWeight.maintainWeights();
}catch (Throwable t) {
logger.error(
"Throwable caught while running DynamicServerWeightTask for "
+ name, t);
}
}
}
權(quán)重算法
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (serverWeightAssignmentInProgress.get()) {
return; // Ping in progress - nothing to do
} else {
serverWeightAssignmentInProgress.set(true);
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Throwable t) {
logger.error("Exception while dynamically calculating server weights", t);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
舉個(gè)簡(jiǎn)單的列子,就是4個(gè)實(shí)例,A,B,C,D平均響應(yīng)時(shí)間為10,40,80,100,所以總響應(yīng)時(shí)間是10+40+80+100=230,每個(gè)實(shí)例權(quán)重為總響應(yīng)時(shí)間與實(shí)際自身的平均響應(yīng)時(shí)間的差的累積所得,所以A,B,C,D的權(quán)重分別如下:
實(shí)例A: 230-10=220
實(shí)例B:220+(230-40)=410
實(shí)例C:410+(230-80)=560
實(shí)例D:560+(230-100)=690
所以實(shí)例A:[0.220]
實(shí)例B:(220,410]
實(shí)例C:(410,560]
實(shí)例D:(560,690)
com.netflix.loadbalancer.ClientConfigEnabledRoundRobinRule
該策略較為特殊,一般不直接使用,本身沒有實(shí)現(xiàn)什么特殊的邏輯,內(nèi)部也定義了一個(gè)RoundRobinRule策略,而choose函數(shù)的實(shí)現(xiàn)也正是使用了RoundRobinRule的線性輪詢機(jī)制,所以它實(shí)現(xiàn)的功能實(shí)際上與RoundRobinRule相同。在子類中做一些高級(jí)策略時(shí)通常有可能會(huì)存在一些無法實(shí)施的情況,使用父類的實(shí)現(xiàn)作為備選。

com.netflix.loadbalancer.BestAvailableRule
該策略繼承ClientConfigEnabledRoundRobinRule,在實(shí)現(xiàn)中它注入了負(fù)載均衡的統(tǒng)計(jì)對(duì)象LoadBalancerStats,同時(shí)在具體的choose算法中利用LoadBalancerStats保存的實(shí)例統(tǒng)計(jì)信息來選擇滿足要求的實(shí)例。它通過遍歷負(fù)載均衡器中的維護(hù)的所有實(shí)例,會(huì)過濾掉故障的實(shí)例,并找出并發(fā)請(qǐng)求數(shù)最小的一個(gè),所以該策略的特性時(shí)可選出最空閑的實(shí)例。
同時(shí),該算法核心依賴與LoadBalancerStats,當(dāng)其為空時(shí)候,策略是無法執(zhí)行,執(zhí)行父類的線性輪詢機(jī)制。

com.netflix.loadbalancer.PredicateBasedRule
這是個(gè)抽象策略,他也繼承了ClientConfigEnabledRoundRobinRule,這是一個(gè)基于Predicate實(shí)現(xiàn)的策略,Predicate是Google Guava Collection工具對(duì)集合進(jìn)行過濾的條件接口(java8也引入了這個(gè)概念)


com.netflix.loadbalancer.AvailabilityFilteringRule
繼承PredicateBasedRule類,過濾邏輯是AvailabilityPredicate類。

AvailabilityPredicate的過濾邏輯:

AvailabilityFilteringRule的輪詢算法:

com.netflix.loadbalancer.ZoneAvoidanceRule
該策略是com.netflix.loadbalancer.PredicateBasedRule的具體實(shí)現(xiàn)類。它使用了CompositePredicate來進(jìn)行服務(wù)實(shí)例清單的過濾。這是一個(gè)組合過濾條件,在其構(gòu)造函數(shù)中,它以ZoneAvoidancePredicate為主要過濾條件,AvailabilityPredicate為次要過濾條件初始化了組合過濾條件的實(shí)例。

ZoneAvoidanceRule并沒有重寫choose方法,完全遵循了父類的過濾主邏輯:“先過濾清單,再輪詢選擇”。
public class CompositePredicate extends AbstractServerPredicate {
private AbstractServerPredicate delegate;
private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
}
在獲取過濾結(jié)果的實(shí)現(xiàn)函數(shù)getEligibleServers中,處理邏輯如下:
- 使用主過濾條件對(duì)所有實(shí)例過濾并返回過濾后的實(shí)例清單。
- 依次使用過濾條件進(jìn)行過濾
- 每次過濾后都去判斷二個(gè)條件:過濾后的實(shí)例總數(shù) >=最小過濾實(shí)例數(shù)(minimalFilteredServers,默認(rèn)值是1),過濾后的實(shí)例比例 > 最小過濾百分比(minimalFilteredPercentage,默認(rèn)值為0)