Ribbon 是netflix 公司開源的基于客戶端的負(fù)載均衡組件,是Spring Cloud大家庭中非常重要的一個(gè)模塊;Ribbon應(yīng)該也是整個(gè)大家庭中相對(duì)而言比較復(fù)雜的模塊,直接影響到服務(wù)調(diào)度的質(zhì)量和性能。全面掌握Ribbon可以幫助我們了解在分布式微服務(wù)集群工作模式下,服務(wù)調(diào)度應(yīng)該考慮到的每個(gè)環(huán)節(jié)。
本文將詳細(xì)地剖析Ribbon的設(shè)計(jì)原理,幫助大家對(duì)Spring Cloud 有一個(gè)更好的認(rèn)知。
一. Spring集成下的Ribbon工作結(jié)構(gòu)
先貼一張總覽圖,說(shuō)明一下Spring如何集成Ribbon的,如下所示:

Spring Cloud集成模式下的Ribbon有以下幾個(gè)特征:
- Ribbon 服務(wù)配置方式
每一個(gè)服務(wù)配置都有一個(gè)Spring ApplicationContext上下文,用于加載各自服務(wù)的實(shí)例。
比如,當(dāng)前Spring Cloud 系統(tǒng)內(nèi),有如下幾個(gè)服務(wù):
| 服務(wù)名稱 | 角色 | 依賴服務(wù) |
|---|---|---|
order |
訂單模塊 | user |
user |
用戶模塊 | 無(wú) |
mobile-bff |
移動(dòng)端BFF |
order,user
|
mobile-bff服務(wù)在實(shí)際使用中,會(huì)用到order和user模塊,那么在mobile-bff服務(wù)的Spring上下文中,會(huì)為order 和user 分別創(chuàng)建一個(gè)子ApplicationContext,用于加載各自服務(wù)模塊的配置。也就是說(shuō),各個(gè)客戶端的配置相互獨(dú)立,彼此不收影響
- 和Feign的集成模式
在使用Feign作為客戶端時(shí),最終請(qǐng)求會(huì)轉(zhuǎn)發(fā)成http://<服務(wù)名稱>/<relative-path-to-service>的格式,通過(guò)LoadBalancerFeignClient, 提取出服務(wù)標(biāo)識(shí)<服務(wù)名稱>,然后根據(jù)服務(wù)名稱在上下文中查找對(duì)應(yīng)服務(wù)的負(fù)載均衡器FeignLoadBalancer,負(fù)載均衡器負(fù)責(zé)根據(jù)既有的服務(wù)實(shí)例的統(tǒng)計(jì)信息,挑選出最合適的服務(wù)實(shí)例
二、Spring Cloud模式下和Feign的集成實(shí)現(xiàn)方式
和Feign結(jié)合的場(chǎng)景下,F(xiàn)eign的調(diào)用會(huì)被包裝成調(diào)用請(qǐng)求LoadBalancerCommand,然后底層通過(guò)Rxjava基于事件的編碼風(fēng)格,發(fā)送請(qǐng)求;Spring Cloud框架通過(guò) Feigin 請(qǐng)求的URL,提取出服務(wù)名稱,然后在上下文中找到對(duì)應(yīng)服務(wù)的的負(fù)載均衡器實(shí)現(xiàn)FeignLoadBalancer,然后通過(guò)負(fù)載均衡器中挑選一個(gè)合適的Server實(shí)例,然后將調(diào)用請(qǐng)求轉(zhuǎn)發(fā)到該Server實(shí)例上,完成調(diào)用,在此過(guò)程中,記錄對(duì)應(yīng)Server實(shí)例的調(diào)用統(tǒng)計(jì)信息。
/**
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*/
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
// 同一Server最大嘗試次數(shù)
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//下一Server最大嘗試次數(shù)
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
// 使用負(fù)載均衡器,挑選出合適的Server,然后執(zhí)行Server請(qǐng)求,將請(qǐng)求的數(shù)據(jù)和行為整合到ServerStats中
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
// 獲取Server的統(tǒng)計(jì)值
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry 服務(wù)調(diào)用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();//重試計(jì)數(shù)
loadBalancerContext.noteOpenConnection(stats);//鏈接統(tǒng)計(jì)
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//執(zhí)行監(jiān)控器,記錄執(zhí)行時(shí)間
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//找到合適的server后,開始執(zhí)行請(qǐng)求
//底層調(diào)用有結(jié)果后,做消息處理
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// 記錄統(tǒng)計(jì)信息
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);//記錄異常信息
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;//返回結(jié)果值
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();//結(jié)束計(jì)時(shí)
//標(biāo)記請(qǐng)求結(jié)束,更新統(tǒng)計(jì)信息
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
//如果失敗,根據(jù)重試策略觸發(fā)重試邏輯
// 使用observable 做重試邏輯,根據(jù)predicate 做邏輯判斷,這里做
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
// next請(qǐng)求處理,基于重試器操作
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
從一組ServerList 列表中挑選合適的Server
/**
* Compute the final URI from a partial URI in the request. The following steps are performed:
* <ul>
* <li> 如果host尚未指定,則從負(fù)載均衡器中選定 host/port
* <li> 如果host 尚未指定并且尚未找到負(fù)載均衡器,則嘗試從 虛擬地址中確定host/port
* <li> 如果指定了HOST,并且URI的授權(quán)部分通過(guò)虛擬地址設(shè)置,并且存在負(fù)載均衡器,則通過(guò)負(fù)載就均衡器中確定host/port(指定的HOST將會(huì)被忽略)
* <li> 如果host已指定,但是尚未指定負(fù)載均衡器和虛擬地址配置,則使用真實(shí)地址作為host
* <li> if host is missing but none of the above applies, throws ClientException
* </ul>
*
* @param original Original URI passed from caller
*/
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// 提供部分URI,缺少HOST情況下
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);// 使用負(fù)載均衡器選擇Server
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
//通過(guò)負(fù)載均衡器選擇的結(jié)果中選擇host
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {
// No Full URL - and we dont have a LoadBalancer registered to
// obtain a server
// if we have a vipAddress that came with the registration, we
// can use that else we
// bail out
// 通過(guò)虛擬地址配置解析出host配置返回
if (vipAddresses != null && vipAddresses.contains(",")) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured."
+ " Also, there are multiple vipAddresses and hence no vip address can be chosen"
+ " to complete this partial uri");
} else if (vipAddresses != null) {
try {
Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
host = hostAndPort.first();
port = hostAndPort.second();
} catch (URISyntaxException e) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured. "
+ " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
}
} else {
throw new ClientException(
ClientException.ErrorType.GENERAL,
this.clientName
+ " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
+ " Also has no vipAddress registered");
}
}
} else {
// Full URL Case URL中指定了全地址,可能是虛擬地址或者是hostAndPort
// This could either be a vipAddress or a hostAndPort or a real DNS
// if vipAddress or hostAndPort, we just have to consult the loadbalancer
// but if it does not return a server, we should just proceed anyways
// and assume its a DNS
// For restClients registered using a vipAddress AND executing a request
// by passing in the full URL (including host and port), we should only
// consult lb IFF the URL passed is registered as vipAddress in Discovery
boolean shouldInterpretAsVip = false;
if (lb != null) {
shouldInterpretAsVip = isVipRecognized(original.getAuthority());
}
if (shouldInterpretAsVip) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc != null){
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("using LB returned Server: {} for request: {}", svc, original);
return svc;
} else {
// just fall back as real DNS
logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
}
} else {
// consult LB to obtain vipAddress backed instance given full URL
//Full URL execute request - where url!=vipAddress
logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
}
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL
return new Server(host, port);
}
三. LoadBalancer--負(fù)載均衡器的核心
LoadBalancer 的職能主要有三個(gè):
- 維護(hù)Sever列表的數(shù)量(新增、更新、刪除等)
- 維護(hù)Server列表的狀態(tài)(狀態(tài)更新)
- 當(dāng)請(qǐng)求Server實(shí)例時(shí),能否返回最合適的Server實(shí)例
本章節(jié)將通過(guò)詳細(xì)闡述著這三個(gè)方面。
3.1 負(fù)載均衡器的內(nèi)部基本實(shí)現(xiàn)原理
先熟悉一下負(fù)載均衡器LoadBalancer的實(shí)現(xiàn)原理圖:

| 組成部分 | 職能 | 參考章節(jié) |
|---|---|---|
| Server |
Server 作為服務(wù)實(shí)例的表示,會(huì)記錄服務(wù)實(shí)例的相關(guān)信息,如:服務(wù)地址,所屬zone,服務(wù)名稱,實(shí)例ID等 |
|
| ServerList | 維護(hù)著一組Server實(shí)例列表,在應(yīng)用運(yùn)行的過(guò)程中,Ribbon通過(guò)ServerList中的服務(wù)實(shí)例供負(fù)載均衡器選擇。ServerList維護(hù)列表可能在運(yùn)行的過(guò)程中動(dòng)態(tài)改變 |
3.2 |
| ServerStats | 作為對(duì)應(yīng)Server 的運(yùn)行情況統(tǒng)計(jì),一般是服務(wù)調(diào)用過(guò)程中的Server平均響應(yīng)時(shí)間,累計(jì)請(qǐng)求失敗計(jì)數(shù),熔斷時(shí)間控制等。一個(gè)ServerStats實(shí)例唯一對(duì)應(yīng)一個(gè)Server實(shí)例 |
|
| LoadBalancerStats | 作為 ServerStats實(shí)例列表的容器,統(tǒng)一維護(hù) |
|
| ServerListUpdater | 負(fù)載均衡器通過(guò)ServerListUpdater來(lái)更新ServerList,比如實(shí)現(xiàn)一個(gè)定時(shí)任務(wù),每隔一段時(shí)間獲取最新的Server實(shí)例列表 |
3.2 |
| Pinger | 服務(wù)狀態(tài)檢驗(yàn)器,負(fù)責(zé)維護(hù)ServerList列表中的服務(wù)狀態(tài)注意:Pinger僅僅負(fù)責(zé)Server的狀態(tài),沒(méi)有能力決定是否刪除
|
|
| PingerStrategy | 定義以何種方式還檢驗(yàn)服務(wù)是否有效,比如是按照順序的方式還是并行的方式 | |
| IPing | Ping,檢驗(yàn)服務(wù)是否可用的方法,常見的是通過(guò)HTTP,或者TCP/IP的方式看服務(wù)有無(wú)認(rèn)為正常的請(qǐng)求 |

3.2 如何維護(hù)Server列表?(新增、更新、刪除)
單從服務(wù)列表的維護(hù)角度上,Ribbon的結(jié)構(gòu)如下所示:

Server列表的維護(hù)從實(shí)現(xiàn)方法上分為兩類:
- 基于配置的服務(wù)列表
這種方式一般是通過(guò)配置文件,靜態(tài)地配置服務(wù)器列表,這種方式相對(duì)而言比較簡(jiǎn)單,但并不是意味著在機(jī)器運(yùn)行的時(shí)候就一直不變。netflix在做Spring cloud 套件時(shí),使用了分布式配置框架netflix archaius,archaius框架有一個(gè)特點(diǎn)是會(huì)動(dòng)態(tài)的監(jiān)控配置文件的變化,將變化刷新到各個(gè)應(yīng)用上。也就是說(shuō),當(dāng)我們?cè)诓魂P(guān)閉服務(wù)的情況下,如果修改了基于配置的服務(wù)列表時(shí), 服務(wù)列表可以直接刷新- 結(jié)合服務(wù)發(fā)現(xiàn)組件(如
Eureka)的服務(wù)注冊(cè)信息動(dòng)態(tài)維護(hù)服務(wù)列表
基于Spring Cloud框架下,服務(wù)注冊(cè)和發(fā)現(xiàn)是一個(gè)分布式服務(wù)集群必不可少的一個(gè)組件,它負(fù)責(zé)維護(hù)不同的服務(wù)實(shí)例(注冊(cè)、續(xù)約、取消注冊(cè)),本文將介紹和Eureka集成模式下,如果借助Eureka的服務(wù)注冊(cè)信息動(dòng)態(tài)刷新ribbon的服務(wù)列表
Ribbon 通過(guò)配置項(xiàng):<service-name>.ribbon.NIWSServerListClassName 來(lái)決定使用哪種實(shí)現(xiàn)方式。對(duì)應(yīng)地:
| 策略 | ServerList實(shí)現(xiàn) |
|---|---|
| 基于配置 | com.netflix.loadbalancer.ConfigurationBasedServerList |
| 基于服務(wù)發(fā)現(xiàn) | com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList |
Server列表可能在運(yùn)行的時(shí)候動(dòng)態(tài)的更新,而具體的更新方式由<service-name>.ribbon.ServerListUpdaterClassName,當(dāng)前有如下兩種實(shí)現(xiàn)方式:
| 更新策略 | ServerListUpdater實(shí)現(xiàn) |
|---|---|
| 基于定時(shí)任務(wù)的拉取服務(wù)列表 | com.netflix.loadbalancer.PollingServerListUpdater |
| 基于Eureka服務(wù)事件通知的方式更新 | com.netflix.loadbalancer.EurekaNotificationServerListUpdater |
-
基于定時(shí)任務(wù)拉取服務(wù)列表方式
這種方式的實(shí)現(xiàn)為:com.netflix.loadbalancer.PollingServerListUpdater,其內(nèi)部維護(hù)了一個(gè)周期性的定時(shí)任務(wù),拉取最新的服務(wù)列表,然后將最新的服務(wù)列表更新到ServerList之中,其核心的實(shí)現(xiàn)邏輯如下:
public class PollingServerListUpdater implements ServerListUpdater {
private static final Logger logger = LoggerFactory.getLogger(PollingServerListUpdater.class);
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
// 更新器線程池定義以及鉤子設(shè)置
private static class LazyHolder {
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private static Thread _shutdownThread;
static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
static {
int coreSize = poolSizeProp.get();
ThreadFactory factory = (new ThreadFactoryBuilder())
.setNameFormat("PollingServerListUpdater-%d")
.setDaemon(true)
.build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown();
if (_shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(_shutdownThread);
} catch (IllegalStateException ise) { // NOPMD
// this can happen if we're in the middle of a real
// shutdown,
// and that's 'ok'
}
}
}
}
}
// 省略部分代碼
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//創(chuàng)建定時(shí)任務(wù),按照特定的實(shí)行周期執(zhí)行更新操作
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//執(zhí)行update操作 ,更新操作定義在LoadBalancer中
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//定時(shí)任務(wù)創(chuàng)建
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs, //初始延遲時(shí)間
refreshIntervalMs, //內(nèi)部刷新時(shí)間
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
//省略部分代碼
}
有上述代碼可以看到,ServerListUpdator只是定義了更新的方式,而具體怎么更新,則是封裝成UpdateAction來(lái)操作的:
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
//在DynamicServerListLoadBalancer 中則實(shí)現(xiàn)了具體的操作:
public DynamicServerListLoadBalancer() {
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
this.updateAction = new UpdateAction() {
public void doUpdate() {
//更新服務(wù)列表
DynamicServerListLoadBalancer.this.updateListOfServers();
}
};
}
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList();
// 通過(guò)ServerList獲取最新的服務(wù)列表
if (this.serverListImpl != null) {
servers = this.serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
//返回的結(jié)果通過(guò)過(guò)濾器的方式進(jìn)行過(guò)濾
if (this.filter != null) {
servers = this.filter.getFilteredListOfServers((List)servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
}
}
//更新列表
this.updateAllServerList((List)servers);
}
protected void updateAllServerList(List<T> ls) {
if (this.serverListUpdateInProgress.compareAndSet(false, true)) {
try {
Iterator var2 = ls.iterator();
while(var2.hasNext()) {
T s = (Server)var2.next();
s.setAlive(true);
}
this.setServersList(ls);
super.forceQuickPing();
} finally {
this.serverListUpdateInProgress.set(false);
}
}
}
-
基于Eureka服務(wù)事件通知的方式更新
基于Eureka的更新方式則有些不同, 當(dāng)Eureka注冊(cè)中心發(fā)生了Server服務(wù)注冊(cè)信息變更時(shí),會(huì)將消息通知發(fā)送到EurekaNotificationServerListUpdater上,然后此Updator觸發(fā)刷新ServerList:
public class EurekaNotificationServerListUpdater implements ServerListUpdater {
//省略部分代碼
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//創(chuàng)建Eureka時(shí)間監(jiān)聽器,當(dāng)Eureka發(fā)生改變后,將觸發(fā)對(duì)應(yīng)邏輯
this.updateListener = new EurekaEventListener() {
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
//內(nèi)部消息隊(duì)列
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
//提交更新操作請(qǐng)求到消息隊(duì)列中
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate(); // 執(zhí)行真正的更新操作
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
stop();
}
}
}
};
//EurekaClient 客戶端實(shí)例
if (eurekaClient == null) {
eurekaClient = eurekaClientProvider.get();
}
//基于EeurekaClient注冊(cè)事件監(jiān)聽器
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
} else {
logger.error("Failed to register an updateListener to eureka client, eureka client is null");
throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
}
} else {
logger.info("Update listener already registered, no-op");
}
}
}
3.2.1 相關(guān)的配置項(xiàng)
| 配置項(xiàng) | 說(shuō)明 | 生效場(chǎng)景 | 默認(rèn)值 |
|---|---|---|---|
<service-name>.ribbon.NIWSServerListClassName |
ServerList的實(shí)現(xiàn),實(shí)現(xiàn)參考上述描述 |
ConfigurationBasedServerList |
|
<service-name>.ribbon.listOfServers |
服務(wù)列表 hostname:port 形式,以逗號(hào)隔開 | 當(dāng)ServerList實(shí)現(xiàn)基于配置時(shí) |
|
<service-name>.ribbon.ServerListUpdaterClassName |
服務(wù)列表更新策略實(shí)現(xiàn),參考上述描述 | PollingServerListUpdater |
|
<service-name>.ribbon.ServerListRefreshInterval |
服務(wù)列表刷新頻率 | 基于定時(shí)任務(wù)拉取時(shí) | 30s |
3.2.2 ribbon的默認(rèn)實(shí)現(xiàn)
ribbon在默認(rèn)情況下,會(huì)采用如下的配置項(xiàng),即,采用基于配置的服務(wù)列表維護(hù),基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式,頻率為30s.
<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
<service-name>.ribbon.listOfServers=<ip:port>,<ip:port>
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
<service-name>.ribbon.ServerListRefreshInterval=30
### 更新線程池大小
DynamicServerListLoadBalancer.ThreadPoolSize=2
3.2.3 Spring Cloud集成下的配置
ribbon在默認(rèn)情況下,會(huì)采用如下的配置項(xiàng),即,采用基于配置的服務(wù)列表維護(hù),基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式,頻率為30s.
<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
### 更新線程池大小
EurekaNotificationServerListUpdater.ThreadPoolSize=2
###通知隊(duì)列接收大小
EurekaNotificationServerListUpdater.queueSize=1000
3.3 負(fù)載均衡器如何維護(hù)服務(wù)實(shí)例的狀態(tài)?
Ribbon負(fù)載均衡器將服務(wù)實(shí)例的狀態(tài)維護(hù)托交給Pinger、 PingerStrategy、IPing 來(lái)維護(hù),具體交互模式如下所示:

/**
* 定義Ping服務(wù)狀態(tài)是否有效的策略,是序列化順序Ping,還是并行的方式Ping,在此過(guò)程中,應(yīng)當(dāng)保證相互不受影響
*
*/
public interface IPingStrategy {
boolean[] pingServers(IPing ping, Server[] servers);
}
/**
* 定義如何Ping一個(gè)服務(wù),確保是否有效
* @author stonse
*
*/
public interface IPing {
/**
* Checks whether the given <code>Server</code> is "alive" i.e. should be
* considered a candidate while loadbalancing
* 校驗(yàn)是否存活
*/
public boolean isAlive(Server server);
}
3.3.1 創(chuàng)建Ping定時(shí)任務(wù)
默認(rèn)情況下,負(fù)載均衡器內(nèi)部會(huì)創(chuàng)建一個(gè)周期性定時(shí)任務(wù)
| 控制參數(shù) | 說(shuō)明 | 默認(rèn)值 |
|---|---|---|
| <service-name>.ribbon.NFLoadBalancerPingInterval | Ping定時(shí)任務(wù)周期 | 30 s |
| <service-name>.ribbon.NFLoadBalancerMaxTotalPingTime | Ping超時(shí)時(shí)間 | 2s |
| <service-name>.ribbon.NFLoadBalancerPingClassName | IPing實(shí)現(xiàn)類 | DummyPing,直接返回true |
默認(rèn)的PingStrategy,采用序列化的實(shí)現(xiàn)方式,依次檢查服務(wù)實(shí)例是否可用:
/**
* Default implementation for <c>IPingStrategy</c>, performs ping
* serially, which may not be desirable, if your <c>IPing</c>
* implementation is slow, or you have large number of servers.
*/
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
// 按序列依次檢查服務(wù)是否正常,并返回對(duì)應(yīng)的數(shù)組表示
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
3.3.2 Ribbon默認(rèn)的IPing實(shí)現(xiàn):DummyPing
默認(rèn)的IPing實(shí)現(xiàn),直接返回為true:
public class DummyPing extends AbstractLoadBalancerPing {
public DummyPing() {
}
public boolean isAlive(Server server) {
return true;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
除此之外,IPing還有如下幾種實(shí)現(xiàn):

3.3.3 Spring Cloud集成下的IPing實(shí)現(xiàn):NIWSDiscoveryPing
而和Spring Cloud 集成后,IPing的默認(rèn)實(shí)現(xiàn),是
NIWSDiscoveryPing,其使用Eureka作為服務(wù)注冊(cè)和發(fā)現(xiàn),則校驗(yàn)服務(wù)是否可用,則通過(guò)監(jiān)聽Eureka 服務(wù)更新來(lái)更新Ribbon的Server狀態(tài),而具體的實(shí)現(xiàn)就是NIWSDiscoveryPing:
/**
* "Ping" Discovery Client
* i.e. we dont do a real "ping". We just assume that the server is up if Discovery Client says so
* @author stonse
*
*/
public class NIWSDiscoveryPing extends AbstractLoadBalancerPing {
BaseLoadBalancer lb = null;
public NIWSDiscoveryPing() {
}
public BaseLoadBalancer getLb() {
return lb;
}
/**
* Non IPing interface method - only set this if you care about the "newServers Feature"
* @param lb
*/
public void setLb(BaseLoadBalancer lb) {
this.lb = lb;
}
public boolean isAlive(Server server) {
boolean isAlive = true;
//取 Eureka Server 的Instance實(shí)例狀態(tài)作為Ribbon服務(wù)的狀態(tài)
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
@Override
public void initWithNiwsConfig(
IClientConfig clientConfig) {
}
}
Spring Cloud下的默認(rèn)實(shí)現(xiàn)入口:
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
3.4 如何從服務(wù)列表中挑選一個(gè)合適的服務(wù)實(shí)例?
3.4.1 服務(wù)實(shí)例容器:ServerList的維護(hù)
負(fù)載均衡器通過(guò) ServerList來(lái)統(tǒng)一維護(hù)服務(wù)實(shí)例,具體模式如下:

基礎(chǔ)的接口定義非常簡(jiǎn)單:
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
//獲取初始化的服務(wù)列表
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
* 獲取更新后的的服務(wù)列表
*/
public List<T> getUpdatedListOfServers();
}
在Ribbon的實(shí)現(xiàn)中,在ServerList中,維護(hù)著Server的實(shí)例,并返回最新的List<Server>集合,供LoadBalancer使用

ServerList<Server>的職能:
負(fù)責(zé)維護(hù)服務(wù)實(shí)例,并使用ServerListFilter過(guò)濾器過(guò)濾出符合要求的服務(wù)實(shí)例列表List<Server>
3.4.2 服務(wù)實(shí)例列表過(guò)濾器ServerListFilter
服務(wù)實(shí)例列表過(guò)濾器
ServerListFilter的職能很簡(jiǎn)單:
傳入一個(gè)服務(wù)實(shí)例列表,過(guò)濾出滿足過(guò)濾條件的服務(wù)列表
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
3.4.2.1 Ribbon 的默認(rèn)ServerListFilter實(shí)現(xiàn):ZoneAffinityServerListFilter
Ribbon默認(rèn)采取了區(qū)域優(yōu)先的過(guò)濾策略,即當(dāng)Server列表中,過(guò)濾出和當(dāng)前實(shí)例所在的區(qū)域(zone)一致的server列表
與此相關(guān)聯(lián)的,Ribbon有兩個(gè)相關(guān)得配置參數(shù):
| 控制參數(shù) | 說(shuō)明 | 默認(rèn)值 |
|---|---|---|
| <service-name>.ribbon.EnableZoneAffinity | 是否開啟區(qū)域優(yōu)先 | false |
| <service-name>.ribbon.EnableZoneExclusivity | 是否采取區(qū)域排他性,即只返回和當(dāng)前Zone一致的服務(wù)實(shí)例 | false |
| <service-name>.ribbon.zoneAffinity.maxLoadPerServer | 每個(gè)Server上的最大活躍請(qǐng)求負(fù)載數(shù)閾值 | 0.6 |
| <service-name>.ribbon.zoneAffinity.maxBlackOutServesrPercentage | 最大斷路過(guò)濾的百分比 | 0.8 |
| <service-name>.ribbon.zoneAffinity.minAvailableServers | 最少可用的服務(wù)實(shí)例閾值 | 2 |
其核心實(shí)現(xiàn)如下所示:
public class ZoneAffinityServerListFilter<T extends Server> extends
AbstractServerListFilter<T> implements IClientConfigAware {
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
//zone非空,并且開啟了區(qū)域優(yōu)先,并且服務(wù)實(shí)例數(shù)量不為空
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
//基于斷言過(guò)濾服務(wù)列表
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
//如果允許區(qū)域優(yōu)先,則返回過(guò)濾列表
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
// 判斷是否應(yīng)該使用區(qū)域優(yōu)先過(guò)濾條件
private boolean shouldEnableZoneAffinity(List<T> filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
// 獲取統(tǒng)計(jì)信息
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
//獲取區(qū)域Server快照,包含統(tǒng)計(jì)數(shù)據(jù)
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
//平均負(fù)載,此負(fù)載的意思是,當(dāng)前所有的Server中,平均每臺(tái)機(jī)器上的活躍請(qǐng)求數(shù)
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
// 1. 如果Server斷路的比例超過(guò)了設(shè)置的上限(默認(rèn)`0.8`)
// 2. 或者當(dāng)前負(fù)載超過(guò)了設(shè)置的負(fù)載上限
// 3. 如果可用的服務(wù)小于設(shè)置的服務(wù)上限`默認(rèn)為2`
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
}
具體判斷流程如下所示:

3.4.2.2 Ribbon 的ServerListFilter實(shí)現(xiàn)2:ZonePreferenceServerListFilter
ZonePreferenceServerListFilter 集成自 ZoneAffinityServerListFilter,在此基礎(chǔ)上做了拓展,在 ZoneAffinityServerListFilter返回結(jié)果的基礎(chǔ)上,再過(guò)濾出和本地服務(wù)相同區(qū)域(zone)的服務(wù)列表。
核心邏輯-什么時(shí)候起作用?
當(dāng)指定了當(dāng)前服務(wù)的所在Zone,并且ZoneAffinityServerListFilter沒(méi)有起到過(guò)濾效果時(shí),ZonePreferenceServerListFilter會(huì)返回當(dāng)前Zone的Server列表。
public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {
private String zone;
@Override
public void initWithNiwsConfig(IClientConfig niwsClientConfig) {
super.initWithNiwsConfig(niwsClientConfig);
if (ConfigurationManager.getDeploymentContext() != null) {
this.zone = ConfigurationManager.getDeploymentContext().getValue(
ContextKey.zone);
}
}
@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
//父類的基礎(chǔ)上,獲取過(guò)濾結(jié)果
List<Server> output = super.getFilteredListOfServers(servers);
//沒(méi)有起到過(guò)濾效果,則進(jìn)行區(qū)域優(yōu)先過(guò)濾
if (this.zone != null && output.size() == servers.size()) {
List<Server> local = new ArrayList<>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}
public String getZone() {
return zone;
}
public void setZone(String zone) {
this.zone = zone;
}
3.4.2.3 Ribbon 的ServerListFilter實(shí)現(xiàn)3:ServerListSubsetFilter
這個(gè)過(guò)濾器作用于當(dāng)Server數(shù)量列表特別龐大時(shí)(比如有上百個(gè)Server實(shí)例),這時(shí),長(zhǎng)時(shí)間保持Http鏈接也不太合適,可以適當(dāng)?shù)乇A舨糠址?wù),舍棄其中一些服務(wù),這樣可使釋放沒(méi)必要的鏈接。
此過(guò)濾器也是繼承自 ZoneAffinityServerListFilter,在此基礎(chǔ)上做了拓展,在實(shí)際使用中不太常見,這個(gè)后續(xù)再展開介紹,暫且不表。
3.4.3 LoadBalancer選擇服務(wù)實(shí)例 的流程
LoadBalancer的核心功能是根據(jù)負(fù)載情況,從服務(wù)列表中挑選最合適的服務(wù)實(shí)例。LoadBalancer內(nèi)部采用了如下圖所示的組件完成:

LoadBalancer 選擇服務(wù)實(shí)例的流程
- 通過(guò)
ServerList獲取當(dāng)前可用的服務(wù)實(shí)例列表;- 通過(guò)
ServerListFilter將步驟1 得到的服務(wù)列表進(jìn)行一次過(guò)濾,返回滿足過(guò)濾器條件的服務(wù)實(shí)例列表;- 應(yīng)用
Rule規(guī)則,結(jié)合服務(wù)實(shí)例的統(tǒng)計(jì)信息,返回滿足規(guī)則的某一個(gè)服務(wù)實(shí)例;通過(guò)上述的流程可以看到,實(shí)際上,在服務(wù)實(shí)例列表選擇的過(guò)程中,有兩次過(guò)濾的機(jī)會(huì):第一次是首先通過(guò)ServerListFilter過(guò)濾器,另外一次是用過(guò)IRule 的選擇規(guī)則進(jìn)行過(guò)濾
通過(guò)ServerListFilter進(jìn)行服務(wù)實(shí)例過(guò)濾的策略上面已經(jīng)介紹得比較詳細(xì)了,接下來(lái)將介紹Rule是如何從一堆服務(wù)列表中選擇服務(wù)的。
在介紹Rule之前,需要介紹一個(gè)概念:Server統(tǒng)計(jì)信息
當(dāng)LoadBalancer在選擇合適的Server提供給應(yīng)用后,應(yīng)用會(huì)向該Server發(fā)送服務(wù)請(qǐng)求,則在請(qǐng)求的過(guò)程中,應(yīng)用會(huì)根據(jù)請(qǐng)求的相應(yīng)時(shí)間或者網(wǎng)絡(luò)連接情況等進(jìn)行統(tǒng)計(jì);當(dāng)應(yīng)用后續(xù)從LoadBalancer選擇合適的Server時(shí),LoadBalancer 會(huì)根據(jù)每個(gè)服務(wù)的統(tǒng)計(jì)信息,結(jié)合Rule來(lái)判定哪個(gè)服務(wù)是最合適的。
3.4.3.1 負(fù)載均衡器LoaderBalancer 都統(tǒng)計(jì)了哪些關(guān)于服務(wù)實(shí)例Server相關(guān)的信息?
| ServerStats | 說(shuō)明 | 類型 | 默認(rèn)值 |
|---|---|---|---|
| zone | 當(dāng)前服務(wù)所屬的可用區(qū) | 配置 | 可通過(guò) eureka.instance.meta.zone 指定 |
| totalRequests | 總請(qǐng)求數(shù)量,client每次調(diào)用,數(shù)量會(huì)遞增 | 實(shí)時(shí) | 0 |
| activeRequestsCountTimeout | 活動(dòng)請(qǐng)求計(jì)數(shù)時(shí)間窗niws.loadbalancer.serverStats.activeRequestsCount.effectiveWindowSeconds,如果時(shí)間窗范圍之內(nèi)沒(méi)有activeRequestsCount值的改變,則activeRequestsCounts初始化為0 |
配置 | 60*10(seconds) |
| successiveConnectionFailureCount | 連續(xù)連接失敗計(jì)數(shù) | 實(shí)時(shí) | |
| connectionFailureThreshold | 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置 |
配置 | 3 |
| circuitTrippedTimeoutFactor | 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds
|
配置 | 10(seconds) |
| maxCircuitTrippedTimeout | 最大斷路器超時(shí)秒數(shù),niws.loadbalancer.default.circuitTripMaxTimeoutSeconds
|
配置 | 30(seconds) |
| totalCircuitBreakerBlackOutPeriod | 累計(jì)斷路器終端時(shí)間區(qū)間 | 實(shí)時(shí) | milliseconds |
| lastAccessedTimestamp | 最后連接時(shí)間 | 實(shí)時(shí) | |
| lastConnectionFailedTimestamp | 最后連接失敗時(shí)間 | 實(shí)時(shí) | |
| firstConnectionTimestamp | 首次連接時(shí)間 | 實(shí)時(shí) | |
| activeRequestsCount | 當(dāng)前活躍的連接數(shù) | 實(shí)時(shí) | |
| failureCountSlidingWindowInterval | 失敗次數(shù)統(tǒng)計(jì)時(shí)間窗 | 配置 | 1000(ms) |
| serverFailureCounts | 當(dāng)前時(shí)間窗內(nèi)連接失敗的數(shù)量 | 實(shí)時(shí) | |
| responseTimeDist.mean | 請(qǐng)求平均響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
| responseTimeDist.max | 請(qǐng)求最大響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
| responseTimeDist.minimum | 請(qǐng)求最小響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
| responseTimeDist.minimum | 請(qǐng)求最小響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
| responseTimeDist.stddev | 請(qǐng)求響應(yīng)時(shí)間標(biāo)準(zhǔn)差 | 實(shí)時(shí) | (ms) |
| dataDist.sampleSize | QoS服務(wù)質(zhì)量采集點(diǎn)大小 | 實(shí)時(shí) | |
| dataDist.timestamp | QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn) | 實(shí)時(shí) | |
| dataDist.timestampMillis | QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn)毫秒數(shù),自1970.1.1開始 | 實(shí)時(shí) | |
| dataDist.mean | QoS 最近的時(shí)間窗內(nèi)的請(qǐng)求平均響應(yīng)時(shí)間 | 實(shí)時(shí) | |
| dataDist.10thPercentile | QoS 10% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
| dataDist.25thPercentile | QoS 25% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
| dataDist.50thPercentile | QoS 50% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
| dataDist.75thPercentile | QoS 75% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
| dataDist.95thPercentile | QoS 95% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
| dataDist.99thPercentile | QoS 99% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
| dataDist.99.5thPercentile | QoS 前99.5% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
3.4.3.2 服務(wù)斷路器的工作原理
當(dāng)有某個(gè)服務(wù)存在多個(gè)實(shí)例時(shí),在請(qǐng)求的過(guò)程中,負(fù)載均衡器會(huì)統(tǒng)計(jì)每次請(qǐng)求的情況(請(qǐng)求相應(yīng)時(shí)間,是否發(fā)生網(wǎng)絡(luò)異常等),當(dāng)出現(xiàn)了請(qǐng)求出現(xiàn)累計(jì)重試時(shí),負(fù)載均衡器會(huì)標(biāo)識(shí)當(dāng)前
服務(wù)實(shí)例,設(shè)置當(dāng)前服務(wù)實(shí)例的斷路的時(shí)間區(qū)間,在此區(qū)間內(nèi),當(dāng)請(qǐng)求過(guò)來(lái)時(shí),負(fù)載均衡器會(huì)將此服務(wù)實(shí)例從可用服務(wù)實(shí)例列表中暫時(shí)剔除,優(yōu)先選擇其他服務(wù)實(shí)例。
相關(guān)統(tǒng)計(jì)信息如下:
| ServerStats | 說(shuō)明 | 類型 | 默認(rèn)值 |
|---|---|---|---|
| successiveConnectionFailureCount | 連續(xù)連接失敗計(jì)數(shù) | 實(shí)時(shí) | |
| connectionFailureThreshold | 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置,當(dāng)successiveConnectionFailureCount 超過(guò)了此限制時(shí),將計(jì)算熔斷時(shí)間 |
配置 | 3 |
| circuitTrippedTimeoutFactor | 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds
|
配置 | 10(seconds) |
| maxCircuitTrippedTimeout | 最大斷路器超時(shí)秒數(shù),niws.loadbalancer.default.circuitTripMaxTimeoutSeconds
|
配置 | 30(seconds) |
| totalCircuitBreakerBlackOutPeriod | 累計(jì)斷路器終端時(shí)間區(qū)間 | 實(shí)時(shí) | milliseconds |
| lastAccessedTimestamp | 最后連接時(shí)間 | 實(shí)時(shí) | |
| lastConnectionFailedTimestamp | 最后連接失敗時(shí)間 | 實(shí)時(shí) | |
| firstConnectionTimestamp | 首次連接時(shí)間 | 實(shí)時(shí) |
@Monitor(name="CircuitBreakerTripped", type = DataSourceType.INFORMATIONAL)
public boolean isCircuitBreakerTripped() {
return isCircuitBreakerTripped(System.currentTimeMillis());
}
public boolean isCircuitBreakerTripped(long currentTime) {
//斷路器熔斷的時(shí)間戳
long circuitBreakerTimeout = getCircuitBreakerTimeout();
if (circuitBreakerTimeout <= 0) {
return false;
}
return circuitBreakerTimeout > currentTime;//還在熔斷區(qū)間內(nèi),則返回熔斷結(jié)果
}
//獲取熔斷超時(shí)時(shí)間
private long getCircuitBreakerTimeout() {
long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
if (blackOutPeriod <= 0) {
return 0;
}
return lastConnectionFailedTimestamp + blackOutPeriod;
}
//返回中斷毫秒數(shù)
private long getCircuitBreakerBlackoutPeriod() {
int failureCount = successiveConnectionFailureCount.get();
int threshold = connectionFailureThreshold.get();
// 連續(xù)失敗,但是尚未超過(guò)上限,則服務(wù)中斷周期為 0 ,表示可用
if (failureCount < threshold) {
return 0;
}
//當(dāng)鏈接失敗超過(guò)閾值時(shí),將進(jìn)行熔斷,熔斷的時(shí)間間隔為:
int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
blackOutSeconds = maxCircuitTrippedTimeout.get();
}
return blackOutSeconds * 1000L;
}
熔斷時(shí)間的計(jì)算
- 計(jì)算累計(jì)連接失敗計(jì)數(shù)
successiveConnectionFailureCount是否超過(guò) 鏈接失敗閾值connectionFailureThreshold。如果successiveConnectionFailureCount<connectionFailureThreshold,即尚未超過(guò)限額,則熔斷時(shí)間為 0 ;反之,如果超過(guò)限額,則進(jìn)行步驟2的計(jì)算; - 計(jì)算失敗基數(shù),最大不得超過(guò) 16:
diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold) - 根據(jù)超時(shí)因子
circuitTrippedTimeoutFactor計(jì)算超時(shí)時(shí)間:int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get(); - 超時(shí)時(shí)間不得超過(guò)最大超時(shí)時(shí)間`maxCircuitTrippedTimeout 上線,
當(dāng)有鏈接失敗情況出現(xiàn)斷路邏輯時(shí),將會(huì)最多:1<<16 * 10 =320s、最少1<<1 * 10 =100s 的請(qǐng)求熔斷時(shí)間,再此期間內(nèi),此Server將會(huì)被忽略。
即:
熔斷時(shí)間最大值:1<<16 * 10 =320s
熔斷時(shí)間最小值:1<<1 * 10 =100s
熔斷統(tǒng)計(jì)何時(shí)清空?
熔斷的統(tǒng)計(jì)有自己的清除策略,當(dāng)如下幾種場(chǎng)景存在時(shí),熔斷統(tǒng)計(jì)會(huì)清空歸零:
- 當(dāng)請(qǐng)求時(shí),發(fā)生的異常不是
斷路攔截類的異常(Exception)時(shí)(至于如何節(jié)點(diǎn)是否是斷路攔截類異常,可以自定義)- 當(dāng)請(qǐng)求未發(fā)生異常,切且有結(jié)果返回時(shí)
3.4.3.3 定義IRule,從服務(wù)實(shí)例列表中,選擇最合適的Server實(shí)例

由上圖可見,IRule會(huì)從服務(wù)列表中,根據(jù)自身定義的規(guī)則,返回最合適的Server實(shí)例,其接口定義如下:
public interface IRule{
/*
* choose one alive server from lb.allServers or
* lb.upServers according to key
*
* @return choosen Server object. NULL is returned if none
* server is available
*/
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
Ribbon定義了一些常見的規(guī)則
| 實(shí)現(xiàn) | 描述 | 備注 |
|---|---|---|
| RoundRobinRule | 通過(guò)輪詢的方式,選擇過(guò)程會(huì)有最多10次的重試機(jī)制 | |
| RandomRule | 隨機(jī)方式,從列表中隨機(jī)挑選一個(gè)服務(wù) | |
| ZoneAvoidanceRule | 基于ZoneAvoidancePredicate斷言和AvailabilityPredicate斷言的規(guī)則。ZoneAvoidancePredicate計(jì)算出哪個(gè)Zone的服務(wù)最差,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉;而AvaliabitiyPredicate是過(guò)濾掉正處于熔斷狀態(tài)的服務(wù);上述兩個(gè)斷言過(guò)濾出來(lái)的結(jié)果后,再通過(guò)RoundRobin輪詢的方式從列表中挑選一個(gè)服務(wù) | |
| BestAvailableRule | 最優(yōu)匹配規(guī)則:從服務(wù)列表中給挑選出并發(fā)數(shù)最少的Server | |
| RetryRule | 采用了裝飾模式,為Rule提供了重試機(jī)制 | |
| WeightedResponseTimeRule | 基于請(qǐng)求響應(yīng)時(shí)間加權(quán)計(jì)算的規(guī)則,如果此規(guī)則沒(méi)有生效,將采用 RoundRobinRule的的策略選擇服務(wù)實(shí)例 |

3.4.3.3.1 RoundRobinRule 的實(shí)現(xiàn)
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
//10次重試機(jī)制
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// 生成輪詢數(shù)據(jù)
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
}
3.4.3.3.2 ZoneAvoidanceRule的實(shí)現(xiàn)
ZoneAvoidanceRule的處理思路:
- ZoneAvoidancePredicate 計(jì)算出哪個(gè)Zone的服務(wù)最差,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉;
- AvailabilityPredicate 將處于熔斷狀態(tài)的服務(wù)剔除掉;
- 將上述兩步驟過(guò)濾后的服務(wù)通過(guò)RoundRobinRule挑選一個(gè)服務(wù)實(shí)例返回
ZoneAvoidancePredicate 剔除最差的Zone的過(guò)程:
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
//如果負(fù)載超過(guò)限額,則將用可用區(qū)中剔除出去
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//計(jì)算最差的Zone區(qū)域
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
// 如果最大負(fù)載沒(méi)有超過(guò)上限,則返回所有可用分區(qū)
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
// 從最差的可用分區(qū)中隨機(jī)挑選一個(gè)剔除,這么做是保證服務(wù)的高可用
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
四. Ribbon的配置參數(shù)
| 控制參數(shù) | 說(shuō)明 | 默認(rèn)值 |
|---|---|---|
| <service-name>.ribbon.NFLoadBalancerPingInterval | Ping定時(shí)任務(wù)周期 | 30 s |
| <service-name>.ribbon.NFLoadBalancerMaxTotalPingTime | Ping超時(shí)時(shí)間 | 2s |
| <service-name>.ribbon.NFLoadBalancerRuleClassName | IRule實(shí)現(xiàn)類 | RoundRobinRule,基于輪詢調(diào)度算法規(guī)則選擇服務(wù)實(shí)例 |
| <service-name>.ribbon.NFLoadBalancerPingClassName | IPing實(shí)現(xiàn)類 | DummyPing,直接返回true |
| <service-name>.ribbon.NFLoadBalancerClassName | 負(fù)載均衡器實(shí)現(xiàn)類 | 2s |
| <service-name>.ribbon.NIWSServerListClassName | ServerList實(shí)現(xiàn)類 | ConfigurationBasedServerList,基于配置的服務(wù)列表 |
| <service-name>.ribbon.ServerListUpdaterClassName | 服務(wù)列表更新類 | PollingServerListUpdater, |
| <service-name>.ribbon.NIWSServerListFilterClassName | 服務(wù)實(shí)例過(guò)濾器 | 2s |
| <service-name>.ribbon.ServerListRefreshInterval | 服務(wù)列表刷新頻率 | 2s |
| <service-name>.ribbon.NFLoadBalancerClassName | 自定義負(fù)載均衡器實(shí)現(xiàn)類 | 2s |
| <service-name>.ribbon.NFLoadBalancerClassName | 自定義負(fù)載均衡器實(shí)現(xiàn)類 | 2s |
| <service-name>.ribbon.NFLoadBalancerClassName | 自定義負(fù)載均衡器實(shí)現(xiàn)類 | 2s |
五. 結(jié)語(yǔ)
Ribbon是Spring Cloud框架中相當(dāng)核心的模塊,負(fù)責(zé)著服務(wù)負(fù)載調(diào)用,Ribbon也可以脫離SpringCloud單獨(dú)使用。
另外Ribbon是客戶端的負(fù)載均衡框架,即每個(gè)客戶端上,獨(dú)立維護(hù)著自身的調(diào)用信息統(tǒng)計(jì),相互隔離;也就是說(shuō):Ribbon的負(fù)載均衡表現(xiàn)在各個(gè)機(jī)器上變現(xiàn)并不完全一致
Ribbon 也是整個(gè)組件框架中最復(fù)雜的一環(huán),控制流程上為了保證服務(wù)的高可用性,有很多比較細(xì)節(jié)的參數(shù)控制,在使用的過(guò)程中,需要深入理清每個(gè)環(huán)節(jié)的處理機(jī)制,這樣在問(wèn)題定位上會(huì)高效很多。
