FlowSlot則用于根據(jù)預(yù)設(shè)的限流規(guī)則,以及前面 slot 統(tǒng)計的狀態(tài),來進(jìn)行限流。
官方文檔:如何使用Sentinel

流控規(guī)則
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//流控檢查
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// 獲取該Resource對應(yīng)配置的流控規(guī)則 一個resource可以指定多個規(guī)則
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
List<FlowRule> rules = flowRules.get(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
//規(guī)則逐一校驗
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
//檢查是否通過
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
在流控檢查的時候,判斷是否對該Resource有配置規(guī)則,如果配置了流控規(guī)則,則逐一檢查配置規(guī)則。在檢查規(guī)則的時候,又分為集群控流或者單機(jī)控流,集群控流或者單機(jī)控流具體文檔參考:http://www.itdecent.cn/p/a52bf4073873
在流控的時候,首先要根據(jù)請求的Resource和請求策略(詳細(xì)見上圖),選擇流控節(jié)點(diǎn),再根據(jù)不同的流控策略,選擇不同的Controller去判斷是否通過。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//判斷是否通過
return rule.getRater().canPass(selectedNode, acquireCount);
}
//根據(jù)請求選擇節(jié)點(diǎn)
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}

Controller.png
根據(jù)配置
FlowRule配置的controlBehavior選擇不同的Controller對應(yīng)關(guān)系:
- default(reject directly)
DefaultController
- default(reject directly)
- warm up
WarmUpController
- warm up
- rate limiter
RateLimiterController
- rate limiter
- warm up + rate limiter
WarmUpRateLimiter
- warm up + rate limiter
在默認(rèn)的Controller中,首先獲取當(dāng)前的線程數(shù),或者QPS數(shù),當(dāng)當(dāng)前的線程數(shù)或者QPS+申請的數(shù)量>配置的總數(shù),則不通過,如果當(dāng)前線程數(shù)或者QPS+申請的數(shù)量<=配置的總數(shù),則直接通過。
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//獲取當(dāng)前node節(jié)點(diǎn)的線程數(shù)或者請求的qps總數(shù)
int curCount = avgUsedTokens(node);
//當(dāng)前請求數(shù)+申請總數(shù)是否>該資源配置的總數(shù)
if (curCount + acquireCount > count) {
return false;
}
return true;
}
//獲取當(dāng)前node節(jié)點(diǎn)的線程數(shù)或者請求的qps總數(shù)
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int) node.passQps();
}
private void sleep(int timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
// Ignore.
}
}
}
和其他Slot一樣,F(xiàn)lowRule的配置在FlowRuleManager#loadRules (List<FlowRule> rules)的,最終更新由FlowPropertyListener來完成。
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
@Override
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
}
}