1 sentinel降級
sentinel降級的處理邏輯由FlowSlot節(jié)點進行處理,依賴設(shè)置的降級rule,下面是降級rule初始化的例子。
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
// set limit qps to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
1.1 flowSlot邏輯
flowSlot也是通過entry方法進入,然后資源配置的限流規(guī)則進行逐個遍歷,有不通過就拋出FlowException。
// FlowSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
// FlowRuleChecker
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 獲取資源對應(yīng)的限流規(guī)則,逐個遍歷
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 選擇具體的統(tǒng)計節(jié)點,按來源統(tǒng)計則選擇originNode,按default統(tǒng)計,一般選擇對應(yīng)的clusterNode,來進行判斷
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
具體的限流邏輯由TrafficShapingController接口來實現(xiàn),根據(jù)controlBehavior選擇具體的實現(xiàn)類。

private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
return new DefaultController(rule.getCount(), rule.getGrade());
}
1.2 controller種類和實現(xiàn)邏輯
1.2.1 DefaultController
默認的限流控制器,邏輯很簡單,根據(jù)grade來取node里線程數(shù)或者通過的qps,和設(shè)置的count對比。
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
// 優(yōu)先且按qps進行限制的,可以從下個時間段去借用
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 線程或qps
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
1.2.2 WarmUpController
預(yù)熱模式,令牌桶算法,起始為maxToken數(shù),桶內(nèi)令牌數(shù)量多,代表當前處理預(yù)熱模式下,還需要預(yù)熱,令牌數(shù)量小于warmingToken,說明當前已經(jīng)進入正常模式,不需要預(yù)熱了。
WarmUpController#construct
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
// 設(shè)置的qps限制
this.count = count;
// 默認為3
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100; 預(yù)熱t(yī)oken數(shù)目
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200,最大token數(shù)目
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits); 預(yù)熱斜率,cold時單個時間為coldFactor/count, 正常時單個時間為1 / count。
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
WarmUpController#canPass
進行校驗,在預(yù)熱階段,允許通過的最大qps為warningQps,正常階段最大為count值。
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 開始計算它的斜率
// 如果進入了警戒線,開始調(diào)整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count,初始值等于count/coldFactor
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 小于預(yù)熱時允許的qps
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
WarmUpController#syncToken
同步計算token數(shù)量,桶中原有值小于warningToken,則按正常速率count增加token數(shù)。如果之前一個窗口期通過的qps很低,小于count/coldFactor,也加入token,這會加快系統(tǒng)變冷,減緩系統(tǒng)預(yù)熱完成,說明流量太小了,不足以完成預(yù)熱。
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
// 獲取該有的token數(shù)量
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判斷前提條件:
// 當令牌的消耗程度遠遠低于警戒線的時候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
這里面進行設(shè)置的時候,需要注意的是初始qps會是count/coldFactor,同時,設(shè)置的預(yù)熱時間要保證warningToken大于count,不然每次放完令牌后,都會導(dǎo)致進入預(yù)熱狀態(tài),達不到正常狀態(tài)。
1.2.3 RateLimiterController
限速控制器,可以進行排隊,計算出當前請求的耗時時間,算出預(yù)期處理時間expectedTime,如果這個時間超出當前時間,超出時間是否大于排隊時間,大于的話,進行限流,否則進行sleep,睡醒返回通過。
RateLimiterController#canPass
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
1.2.4 WarmUpRateLimiterController
預(yù)熱排隊模式是1.2.2和1.2.3兩種模式的結(jié)合體,繼承了WarmUpController類,代碼是兩種模式的結(jié)合,計算costTime按當前能接收的qps進行計算,預(yù)熱模式下按warningQps進行計算。
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
1.3 限流小結(jié)
限流可以按qps和線程數(shù)兩種來進行限流,默認模式為當前統(tǒng)計數(shù)值大于count則進行限流。
當為qps時,可以有預(yù)熱、排隊、預(yù)熱排隊的處理模式,對于時延要求沒那么高的請求,可以考慮排隊,按漏桶算法,去勻速處理請求,進行削峰。
預(yù)熱模式適用于冷啟動的情況,按coldFactor進行流量切分,初始允許通過流量較小,為1/coldFactor。
2 sentinel熔斷
熔斷邏輯處理由DegradeSlot處理,熔斷方式有兩種,一種是按慢調(diào)用比例,一種是按異常,異常數(shù)或者異常比例,對應(yīng)的接口及實現(xiàn)類如下:

在達到相應(yīng)條件后會進行熔斷操作,此時狀態(tài)為open,過了熔斷窗口后,會進入half-open狀態(tài),進行探測,接下來的請求不為慢調(diào)用,或者沒有異常,則進入到close狀態(tài),關(guān)閉熔斷。
對應(yīng)測試代碼如下:
private static void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<>();
DegradeRule rule = new DegradeRule(KEY)
.setGrade(CircuitBreakerStrategy.SLOW_REQUEST_RATIO.getType())
// Max allowed response time
.setCount(50)
// Retry timeout (in second)
.setTimeWindow(10)
// Circuit breaker opens when slow request ratio > 60%
.setSlowRatioThreshold(0.6)
.setMinRequestAmount(100)
.setStatIntervalMs(20000);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
System.out.println("Degrade rule loaded: " + rules);
}
2.1 degrade代碼實現(xiàn)
DegradeSlot#entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
AbstractCircuitBreaker#tryPass
@Override
public boolean tryPass(Context context) {
// Template implementation. 模版實現(xiàn),關(guān)閉直接返回
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// 判斷是否滿足轉(zhuǎn)變?yōu)閔alfOpen的條件
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
protected boolean retryTimeoutArrived() {
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 通知注冊的狀態(tài)觀察者
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
// 防止請求被其它rule block掉,一直處于half-open狀態(tài)。
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
DegradeSlot#exit
熔斷的exit方法比著其它slot做了很多其它的操作,因為只有業(yè)務(wù)邏輯處理完,才能知道具體的rt,異常等數(shù)據(jù)。
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
// 獲取資源對應(yīng)的熔斷器
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
// passed request 遍歷配置的熔斷器
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
熔斷的主要實現(xiàn)邏輯在circuitBreaker.onRequest,下面分析下具體熔斷器的實現(xiàn)。
2.2 circuitBreaker的實現(xiàn)
2.2.1 ResponseTimeCircuitBreaker實現(xiàn)
根據(jù)慢調(diào)用比例進行請求調(diào)用的熔斷,具體實現(xiàn)如下:
ResponseTimeCircuitBreaker#onRequestComplete
@Override
public void onRequestComplete(Context context) {
// 慢請求計數(shù)器
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
// 計算請求耗時
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
counter.totalCount.add(1);
handleStateChangeWhenThresholdExceeded(rt);
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}
// half-open時,進行探測決定改為open或者close
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
} else {
fromHalfOpenToClose();
}
return;
}
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
// 各窗口相加
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
// 熔斷器打開
transformToOpen(currentRatio);
}
if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
transformToOpen(currentRatio);
}
}
AbstractCircuitBreaker#transformToOpen
protected void transformToOpen(double triggerValue) {
State cs = currentState.get();
switch (cs) {
case CLOSED:
fromCloseToOpen(triggerValue);
break;
case HALF_OPEN:
fromHalfOpenToOpen(triggerValue);
break;
default:
break;
}
}
protected boolean fromCloseToOpen(double snapshotValue) {
State prev = State.CLOSED;
if (currentState.compareAndSet(prev, State.OPEN)) {
updateNextRetryTimestamp();
notifyObservers(prev, State.OPEN, snapshotValue);
return true;
}
return false;
}
protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
}
// 通知狀態(tài)監(jiān)聽者
private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prevState, newState, rule, snapshotValue);
}
}
2.2.2 ExceptionCircuitBreaker實現(xiàn)
按異常數(shù)或異常比例決定是否要進行熔斷。
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
// 探測操作
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio 異常比例
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
2.3 熔斷小結(jié)
熔斷操作依賴于熔斷器CircuitBreaker實現(xiàn),支持慢調(diào)用比例和異常兩種,其中異常又分為按異常數(shù)和異常比例兩種方式,熔斷器有獨立的counter,狀態(tài)有open, half-open,close三種,熔斷可以用在調(diào)用其他子系統(tǒng)時,避免因其它子系統(tǒng)出現(xiàn)異常,而拖垮系統(tǒng),可以用在三方接口調(diào)用中。
3 整體總結(jié)
sentinel主要的應(yīng)用場景也就是限流和熔斷,對應(yīng)到我們具體的業(yè)務(wù)中,限流的直接拒絕可以保護大流量系統(tǒng)不被壓垮,而預(yù)熱可以解決冷啟動問題,但這種解決方式會導(dǎo)致有部分流量被流控,比著從負載均衡的角度進行流量分配要差一些,請求排隊可以用于響應(yīng)時間要求沒那么高的場景。
線程數(shù)限流的話,可以起到保護整體業(yè)務(wù)的作用,避免因單一接口流量過高而將線程池耗盡。
熔斷的話主要是避免依賴服務(wù)出現(xiàn)問題拖垮整體系統(tǒng),之前有考慮過觸達系統(tǒng)錯誤過多時,通過異常數(shù)降級為異步請求,但這樣的問題是整體流量都進不來了,得不償失,但可以用在調(diào)用三方接口上,避免接口因三方接口響應(yīng)慢而變慢,這個是沒什么問題的。