在本篇文章中將講解有關(guān)熔斷降級(jí)的原理。
熔斷降級(jí)策略是在DegradeSlot中實(shí)現(xiàn)的,會(huì)調(diào)用entry()方法。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
下面的邏輯主要是根據(jù)當(dāng)前的資源名從在DegradeRuleManager的map緩存中獲取所有有關(guān)他的熔斷規(guī)則。然后遍歷規(guī)則,依次判斷。 如果需要進(jìn)行熔斷,則拋出DegradeException異常。
private static final Map<String, Set<DegradeRule>> degradeRules = new ConcurrentHashMap<>();
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
具體的判斷是調(diào)用DegradeRule下的passCheck()方法。
先講解下DegradeRule的有關(guān)變量。
public class DegradeRule extends AbstractRule {
// 大小是虛擬機(jī)可用的最大數(shù)量
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));
// RT閾值或異常比率閾值計(jì)數(shù)
private double count;
// 發(fā)生降級(jí)時(shí)降級(jí)恢復(fù)超時(shí)(以秒為單位)
private int timeWindow;
// 降級(jí)策略(0:平均RT,1:異常比率,2:異常計(jì)數(shù))。
private int grade = RuleConstant.DEGRADE_GRADE_RT;
// 觸發(fā)RT響應(yīng)熔斷出現(xiàn)的最小連續(xù)慢響應(yīng)請(qǐng)求數(shù)量
// 默認(rèn)值為5次
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
// 觸發(fā)熔斷的最小的請(qǐng)求數(shù)
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
private AtomicLong passCount = new AtomicLong(0);
// 是否發(fā)生降級(jí)
private final AtomicBoolean cut = new AtomicBoolean(false);
下面開(kāi)始講解在一個(gè)降級(jí)規(guī)則DegradeRule怎么判斷是否降級(jí)的。
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
// 如果當(dāng)前正處于降級(jí)階段,則直接返回false
if (cut.get()) {
return false;
}
// 獲取該資源的ClusterNode
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
// 如果是根據(jù)請(qǐng)求響應(yīng)時(shí)間RT進(jìn)行判斷
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
// 獲取該節(jié)點(diǎn)的平均響應(yīng)時(shí)間
double rt = clusterNode.avgRt();
// 如果當(dāng)前平均響應(yīng)時(shí)間小于閾值,則可以通過(guò),并重置passcount為0
if (rt < this.count) {
passCount.set(0);
return true;
}
// 執(zhí)行到這里說(shuō)明當(dāng)前的平均響應(yīng)時(shí)間大于閾值了,此時(shí)進(jìn)入了準(zhǔn)降級(jí)階段,不會(huì)立即進(jìn)入降級(jí)
// 如果該狀態(tài)連續(xù)請(qǐng)求次數(shù)小于rtSlowRequestAmount,則放行
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
// 如果根據(jù)異常比例進(jìn)行判斷
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
// 當(dāng)前異常數(shù)
double exception = clusterNode.exceptionQps();
// 當(dāng)前請(qǐng)求成功數(shù)
double success = clusterNode.successQps();
// 請(qǐng)求總數(shù)
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
if (total < minRequestAmount) {
return true;
}
// 在相同的對(duì)齊統(tǒng)計(jì)時(shí)間窗口中,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
// 如果真實(shí)成功數(shù)小于0,并且異常數(shù)小于minRequestAmount
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
// 如果異常比例小于閾值
if (exception / success < count) {
return true;
}
// 如果根據(jù)異常數(shù)量
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
// 當(dāng)觸發(fā)熔斷降級(jí)時(shí),原子更新為true
// 并開(kāi)啟一個(gè)定時(shí)任務(wù),在熔斷時(shí)間過(guò)后,將熔斷狀態(tài)設(shè)置為false,并將通過(guò)計(jì)數(shù)重設(shè)為0
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
private static final class ResetTask implements Runnable {
private DegradeRule rule;
ResetTask(DegradeRule rule) {
this.rule = rule;
}
@Override
public void run() {
rule.passCount.set(0);
rule.cut.set(false);
}
}
比起流量控制,熔斷降級(jí)很簡(jiǎn)單。
參考文章: