Ribbon客戶端負載均衡策略
ILoadBalancer接口,有如下幾個實現(xiàn)類:

其中BaseLoadBalancer實現(xiàn)了基本的負載均衡策略,NoOpLoadBalancer沒有進行任何負載均衡策略,
而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer都在BaseLoadBalancer的基礎上進行了相應的擴展。
而它們在chooser()方法中,是通過委托IRule規(guī)則實現(xiàn)類,來實現(xiàn)服務實例的選擇,如下:
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
其中IRule的實現(xiàn)類有如下圖:

如上圖所示,每一個實現(xiàn)類都代表了一個負載均衡策略。
RoundRobinRule:采取了最常見了負載均衡策略,即輪詢的方式:
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
RandomRule:隨機選擇服務實例,可能因為高并發(fā)Bug導致該線程一直獲取不到服務實例而陷入死循環(huán),其具體代碼如下:
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}
int index = chooseRandomInt(serverCount);
server = upList.get(index);
if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}
return server;
}
RetryRule:重復嘗試負載均衡策略,在截止時間前,盡最大努力獲取服務實例,源碼如下:
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
Server answer = null;
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
WeightedResponseTimeRule:該Rule是RoundRobinRule的繼承類,在其基礎上進行了相應的功能擴展:
- 定時任務,啟動一個定時任務來統(tǒng)計每個服務實例響應情況,其源碼如下:
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
- 權重計算,權重計算算法是這樣的,(1)獲取所有實例總的響應時間total,計算 第i個服務實例的weight=total-server(i).avgResponseTime,(2)計算i個實例的加權值wSofar(i),加權值在每次計算完(i-1)實例的權重后都會加上該權重值即wSofar(i)+=weight(i),(3)加權值集合形成的權重區(qū)間[wSofar(i-1),wSofar(i)],如果該區(qū)間越寬的話,說明該區(qū)間越容易被選中
權重計算源碼如下:
class ServerWeight {
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// 計算每個實例的權重=加權值+totalReponseTime-實例平均響應時間
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;//加權值初始為0
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;//加權值每次都會加上一個計算過實例的權重值
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
選擇服務實例源碼如下:
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 在0到maxWeight中隨機生一個權重值
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
//獲取第一個權重值大于隨機權重值
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
ClientConfigEnableRoundRobinRule該負載均衡策略,本身沒有特殊之處,其內(nèi)部實現(xiàn)了一個RoundRobinRule來進行輪詢選擇服務實例,但是該策略繼承子類在進行一些高級策略時,如果遇到無法處理情況時,就可以個通過該策略進行服務實例選??;
BestAvailableRule:該策略時ClientConfigEnableRoundRobinRule子類,該策略會遍歷實例列表所有服務實例,過濾掉故障實例,并從中選擇并發(fā)請求數(shù)最小的實例,故該實例的策略時選擇最空閑的實例:
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
PredicateBaseRule:該策略主要是通過該子類實現(xiàn)getPredicate()方法,來過濾服務實例列表,然后從剩余的實例列表中以線性輪詢的方式獲取一個;
其源碼如下:
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
AvailabilityFilteringRule:該策略時PredicateBaseRule的子策略,其實現(xiàn)源碼如下:
@Override
public AbstractServerPredicate getPredicate() {
return predicate;
}
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers) {
return getEligibleServers(servers, null);
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
AvailabilityFilteringPredicate中實現(xiàn)了apply方法,該方法主要判斷該服務實例是否故障,或者該服務實例并發(fā)請求數(shù)是否超過閾值,如果超過了閾值,那么該服務實例將會被過濾掉,其源碼如下:
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}