Sentinel源碼分析----流控規(guī)則與FlowSlot

FlowSlot主要是用來(lái)進(jìn)行流控規(guī)則的處理,直接看下代碼

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        // 獲取流控規(guī)則
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
       //通過(guò)資源名稱來(lái)獲取規(guī)則列表
        List<FlowRule> rules = flowRules.get(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                // 遍歷規(guī)則進(jìn)行處理
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    // 如果規(guī)則校驗(yàn)不通過(guò),那么拋出FlowException異常
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
        // 交由FlowRuleChecker進(jìn)行邏輯處理
        return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
    }

這里的flowRules是一個(gè)全量的規(guī)則列表,例如我在控制臺(tái)配置了如下的規(guī)則:


image.png

那么flowRules中就有兩個(gè)元素,key分別是test和hello,對(duì)應(yīng)的值是一個(gè)集合,集合里只有一個(gè)元素,就是實(shí)際的規(guī)則實(shí)體FlowRule,具體值與我們配置相關(guān),看下FlowRule中有哪些字段

public class FlowRule extends AbstractRule {

    public FlowRule() {
        super();
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    public FlowRule(String resourceName) {
        super();
        setResource(resourceName);
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    private double count;
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    private String refResource;
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    private int warmUpPeriodSec = 10;

    private int maxQueueingTimeMs = 500;

    private boolean clusterMode;

    private ClusterFlowConfig clusterConfig;

    private TrafficShapingController controller;
}
  • limitApp:對(duì)應(yīng)新增流控規(guī)則頁(yè)面的來(lái)源應(yīng)用
  • resource:對(duì)應(yīng)新增流控規(guī)則頁(yè)面的資源名
  • grade:對(duì)應(yīng)新增流控規(guī)則頁(yè)面的閾值類型
  • count:如果頁(yè)面配置的是qps類型,字段則代表qps的值;如果配置的是線程數(shù)類型,字段則代表線程數(shù)
  • strategy:對(duì)應(yīng)新增流控規(guī)則頁(yè)面的流控模式
  • refResource:對(duì)應(yīng)流控策略為關(guān)聯(lián)情況下,出現(xiàn)的關(guān)聯(lián)資源 或 對(duì)應(yīng)流控策略為鏈路情況下,出現(xiàn)的入口資源
  • controlBehavior:對(duì)應(yīng)新增流控規(guī)則頁(yè)面的流控效果
  • warmUpPeriodSec:對(duì)應(yīng)流控效果為Warm Up情況下,出現(xiàn)的預(yù)熱時(shí)長(zhǎng)
  • maxQueueingTimeMs:對(duì)應(yīng)流控效果為排隊(duì)等待情況下,出現(xiàn)的超時(shí)時(shí)間
  • clusterMode:對(duì)應(yīng)新增流控規(guī)則頁(yè)面的是否集群
  • ClusterFlowConfig:集群流控的相關(guān)配置
  • TrafficShapingController:流量整形的實(shí)現(xiàn),不同流控效果有不同算法

FlowRule和頁(yè)面配置的規(guī)則一一對(duì)應(yīng),通過(guò)控制臺(tái)配置后可以將這些值推送到機(jī)器上生成對(duì)應(yīng)的FlowRule

接下來(lái)看下FlowRuleChecker.passCheck對(duì)具體規(guī)則的處理

    static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {// 1
            return true;
        }

        if (rule.isClusterMode()) {//2
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }

        return passLocalCheck(rule, context, node, acquireCount, prioritized);//3
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);//4
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount);//5
    }
  • 標(biāo)記1:limitApp是頁(yè)面上的來(lái)源應(yīng)用,默認(rèn)是default,表示代表所有的應(yīng)用,這里如果為空則默認(rèn)通過(guò),因?yàn)榇a中約定了default是代表所有應(yīng)用,所以空值為非法值,這里想了一下為什么不把default或者空值當(dāng)做代表所有應(yīng)用的限流,可能是因?yàn)榭罩颠€包括規(guī)則字段丟失的情況,應(yīng)該算作異常情況
  • 標(biāo)記2:集群模式特殊處理,這里暫不考慮,后續(xù)分析
  • 標(biāo)記3:本地限流邏輯實(shí)現(xiàn)
  • 標(biāo)記4:根據(jù)不同情況選擇不同Node(這里會(huì)涉及上篇文章的知識(shí)點(diǎn))
  • 標(biāo)記5:根據(jù)不同情況調(diào)用不同TrafficShapingController實(shí)現(xiàn)進(jìn)行判斷

節(jié)點(diǎn)選擇

上篇文章中分析了Sentinel的各種Node的含義,為什么要設(shè)計(jì)那么多種類型呢?下面就會(huì)看到,對(duì)于不同的流控規(guī)則而言,需要去拿不同的Node來(lái)獲取統(tǒng)計(jì)的數(shù)據(jù),具體看代碼(對(duì)于各種Node的知識(shí)點(diǎn)這里不再詳細(xì)分析,具體看下上篇文章)

    static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        // The limit app should not be empty.
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {//1
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();// 2
            }

            return selectReferenceNode(rule, context, node);//3
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//4
            if (strategy == RuleConstant.STRATEGY_DIRECT) {//5
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);//6
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//7
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();//8
            }

            return selectReferenceNode(rule, context, node);//9
        }

        return null;
    }
  • 標(biāo)記1:如果流控規(guī)則配置了來(lái)源應(yīng)用且不是"default"或者"other"這種特殊值,那么這種時(shí)候該規(guī)則就只對(duì)配置的來(lái)源應(yīng)用生效,例如:配置了A應(yīng)用對(duì)test資源qps為10,那么就要先取到當(dāng)前A應(yīng)用的qps看看是否超過(guò)10
  • 標(biāo)記2:如果是直接限流類型,那么也就是上面舉的栗子,獲取A應(yīng)用的統(tǒng)計(jì)數(shù)據(jù),即A應(yīng)用對(duì)應(yīng)的OriginNode進(jìn)行判斷
  • 標(biāo)記:3/6/9:selectReferenceNode方法是對(duì)流控模式為關(guān)聯(lián)或者鏈路的處理
  • 標(biāo)記4:這種情況limitApp是"default",代表針對(duì)所有應(yīng)用
  • 標(biāo)記5:如果是直接限流類型,因?yàn)椴皇轻槍?duì)某個(gè)應(yīng)用進(jìn)行限流,所以就需要取當(dāng)前資源的ClusterNode節(jié)點(diǎn),因?yàn)镃lusterNode表示所有應(yīng)用對(duì)該資源的所有請(qǐng)求情況
  • 標(biāo)記7:這個(gè)是"other"值的處理,假設(shè)當(dāng)前請(qǐng)求來(lái)源不在當(dāng)前規(guī)則的limitApp中,則進(jìn)行下面的處理
  • 標(biāo)記8:如果是直接限流類型,則返回OriginNode

關(guān)于7的應(yīng)用,具體栗子,假設(shè)一個(gè)資源有如下規(guī)則,屬性如下

ruleName limitApp
rule1 A
rule2 default
rule3 C
rule4 other

那么rule4只會(huì)處理來(lái)源應(yīng)用非A、C、default的應(yīng)用,例如D,E等統(tǒng)一使用rule4這個(gè)規(guī)則,這種情況實(shí)際應(yīng)用場(chǎng)景是:假設(shè)有非常多的來(lái)源應(yīng)用,但是又不能統(tǒng)一使用某個(gè)規(guī)則,因?yàn)榭赡苣硞€(gè)來(lái)源應(yīng)用的請(qǐng)求量很大,統(tǒng)一使用某個(gè)規(guī)則會(huì)導(dǎo)致請(qǐng)求量小的應(yīng)用被影響;又不能每個(gè)來(lái)源應(yīng)用配置一個(gè)規(guī)則,那這樣會(huì)配到手抖,那么可以為ABC分別配置一個(gè)規(guī)則(假設(shè)ABC是請(qǐng)求量非常大的,和其他的差別很大),然后再配置一個(gè)other,這樣其他請(qǐng)求量小的就可以使用這個(gè)規(guī)則了

流控模式

關(guān)聯(lián)與鏈路這兩種模式在wiki的介紹中,統(tǒng)一被稱為基于調(diào)用關(guān)系的流量控制

流控模式:關(guān)聯(lián)

當(dāng)兩個(gè)資源之間具有資源爭(zhēng)搶或者依賴關(guān)系的時(shí)候,這兩個(gè)資源便具有了關(guān)聯(lián)。比如對(duì)數(shù)據(jù)庫(kù)同一個(gè)字段的讀操作和寫(xiě)操作存在爭(zhēng)搶,讀的速度過(guò)高會(huì)影響寫(xiě)得速度,寫(xiě)的速度過(guò)高會(huì)影響讀的速度。如果放任讀寫(xiě)操作爭(zhēng)搶資源,則爭(zhēng)搶本身帶來(lái)的開(kāi)銷會(huì)降低整體的吞吐量。可使用關(guān)聯(lián)限流來(lái)避免具有關(guān)聯(lián)關(guān)系的資源之間過(guò)度的爭(zhēng)搶,舉例來(lái)說(shuō),read_db 和 write_db 這兩個(gè)資源分別代表數(shù)據(jù)庫(kù)讀寫(xiě),我們可以給 read_db 設(shè)置限流規(guī)則來(lái)達(dá)到寫(xiě)優(yōu)先的目的:設(shè)置 FlowRule.strategy 為 RuleConstant.RELATE 同時(shí)設(shè)置 FlowRule.ref_identity 為 write_db。這樣當(dāng)寫(xiě)庫(kù)操作過(guò)于頻繁時(shí),讀數(shù)據(jù)的請(qǐng)求會(huì)被限流。

gayhub的wiki上描述如上,也就是read_db的請(qǐng)求量會(huì)被write_db影響,假設(shè)read_db配置的規(guī)則如下:


image.png
  • 這種情況下,假設(shè)write_db沒(méi)有被執(zhí)行,那么read_db最大能到多少的qps?

看下代碼

    static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        if (strategy == RuleConstant.STRATEGY_RELATE) {// 1
            return ClusterBuilderSlot.getClusterNode(refResource);
        }

        //....鏈路模式的處理
        return null;
    }

看到標(biāo)記1的地方,關(guān)聯(lián)流控模式是使用關(guān)聯(lián)資源即refResource去獲取資源的ClusterNode,以write_db和read_db為例,當(dāng)read_db請(qǐng)求的時(shí)候,是把write_db的ClusterNode與規(guī)則進(jìn)行比較,那么上面的問(wèn)題就會(huì)有答案了,假設(shè)write_db一直沒(méi)有請(qǐng)求,那么read_db就沒(méi)有限制,因?yàn)閣rite_db的ClusterNode數(shù)據(jù)為空

流控模式:鏈路

                  machine-root
                    /       \
                   /         \
             Entrance1     Entrance2
                /             \
               /               \
      DefaultNode(nodeA)   DefaultNode(nodeA)

如上所示的Node分布情況,資源nodeA分別在兩個(gè)上下文Entrance1和Entrance2下進(jìn)行調(diào)用,假設(shè)在上下文Entrance1的調(diào)用量很大,而上下文Entrance2的調(diào)用量很小,我們想針對(duì)Entrance1上下文的nodeA調(diào)用進(jìn)行限流,那么可以使用鏈路限流模式,配置如下:

image.png

那么在上下文Entrance2下對(duì)nodeA的調(diào)用就沒(méi)有影響,看下代碼

    static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        //....關(guān)聯(lián)模式的處理

        if (strategy == RuleConstant.STRATEGY_CHAIN) {//2
            if (!refResource.equals(context.getName())) {
                return null;
            }
            return node;
        }
        return null;
    }

發(fā)現(xiàn)當(dāng)前上下文(context.getName())如果和配置(refResource)的不一樣,則返回null,外部如果返回的Node為null,則直接返回true了,那么Entrance2在這種情況下就直接通過(guò)了

流控效果

當(dāng)節(jié)點(diǎn)選擇完畢后,調(diào)用rule.getRater().canPass(selectedNode, acquireCount)開(kāi)始執(zhí)行判斷,getRater()返回的是TrafficShapingController的實(shí)現(xiàn)類,根據(jù)不同流控效果有不同的實(shí)現(xiàn)

    //com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#generateRater
    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

快速失敗

快速失敗這種情況,使用的是DefaultController,也是最簡(jiǎn)單的一個(gè)

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            return false;
        }
        return true;
    }
    private int avgUsedTokens(Node node) {
        if (node == null) {
            return -1;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps();
    }

獲取當(dāng)前token數(shù),和當(dāng)前請(qǐng)求的數(shù)量相加,看看是否大于規(guī)則配置的值

排隊(duì)等待

當(dāng)頁(yè)面流控效果選擇排隊(duì)等待的時(shí)候,會(huì)出現(xiàn)超時(shí)時(shí)間的選項(xiàng),該效果是讓請(qǐng)求勻速的通過(guò),可用于消息隊(duì)列在消費(fèi)的時(shí)候?qū)α髁康目刂疲瑢?duì)應(yīng)的是漏桶算法,算法實(shí)現(xiàn)的代碼是RateLimiterController

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        if (acquireCount <= 0) {
            return true;
        }
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();

        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);//1

        long expectedTime = costTime + latestPassedTime.get();//2

        if (expectedTime <= currentTime) {//3
            latestPassedTime.set(currentTime);//4
            return true;
        } else {//5
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();//6
            if (waitTime > maxQueueingTimeMs) {//7
                return false;
            } else {//8
                long oldTime = latestPassedTime.addAndGet(costTime);//9
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();//10
                    if (waitTime > maxQueueingTimeMs) {//11
                        latestPassedTime.addAndGet(-costTime);//12
                        return false;
                    }
                    if (waitTime > 0) {//13
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

這里將上面幾個(gè)時(shí)間用變量表示

  1. 下一次能獲取到token通過(guò)的時(shí)間為t1
  2. 每次請(qǐng)求花費(fèi)時(shí)間為t2
  3. 上一次獲取到token通過(guò)的時(shí)間為t3

代碼每個(gè)標(biāo)記意義如下:

  • 1:即當(dāng)前請(qǐng)求token數(shù)(默認(rèn)是1)/qps1000=1/qps1000=一個(gè)請(qǐng)求需要花費(fèi)多少時(shí)間,假設(shè)設(shè)置的qps為10,即一秒允許有10個(gè)請(qǐng)求通過(guò),那么每個(gè)請(qǐng)求的時(shí)間就是1/10*1000=100毫秒
  • 2:t3+t2=t1
  • 3:如果當(dāng)前時(shí)間已經(jīng)在t1后面了,那么請(qǐng)求可以被通過(guò)
  • 4:通過(guò)的時(shí)候需要重設(shè)一下t3
  • 5:代表當(dāng)前請(qǐng)求到來(lái)的時(shí)候,還沒(méi)到達(dá)能夠t1
  • 6:t2+t3-當(dāng)前時(shí)間=代表離t1還差多少時(shí)間
  • 7:如果請(qǐng)求很多,每個(gè)都需要進(jìn)行排隊(duì),那么會(huì)導(dǎo)致越后面的請(qǐng)求等待的時(shí)候會(huì)更久,那么當(dāng)時(shí)間超過(guò)設(shè)置的最大間隔,則返回false直接失敗
  • 8:這里表示未到達(dá)最大的間隔
  • 9:更新t3并返回最新值(這個(gè)的變量命名感覺(jué)有點(diǎn)問(wèn)題,addAndGet返回的是最新值)
  • 10~11:在7已經(jīng)判斷過(guò)一次了,這里又判斷一次的原因是因?yàn)榭赡苡卸鄠€(gè)線程并發(fā)執(zhí)行的時(shí)候,在7的時(shí)候還未超過(guò)最大的間隔時(shí)間,而經(jīng)過(guò)latestPassedTime.addAndGet的處理之后,可能有某些線程已經(jīng)超過(guò)了這個(gè)時(shí)間,所以這里又判斷了一遍
  • 12:和7不一樣,7只是進(jìn)行運(yùn)算,這里是先更新了t3了,所以需要減回去
  • 13:得到等待時(shí)間后,使用Sleep是線程睡眠一定時(shí)間

Warm Up

還有一種流控效果是Warm Up,該算法類似于令牌桶算法,其代碼與Guava的RateLimiter原理類似,限于個(gè)人能力,沒(méi)能看懂其中原理。。。。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、引子 前面介紹了SystemSlot(系統(tǒng)規(guī)則檢查)和AuthoritySlot(授權(quán)規(guī)則檢查),下面接著分析...
    橘子_好多灰閱讀 2,594評(píng)論 1 5
  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說(shuō)閱讀 12,300評(píng)論 6 13
  • ORA-00001: 違反唯一約束條件 (.) 錯(cuò)誤說(shuō)明:當(dāng)在唯一索引所對(duì)應(yīng)的列上鍵入重復(fù)值時(shí),會(huì)觸發(fā)此異常。 O...
    我想起個(gè)好名字閱讀 5,918評(píng)論 0 9
  • Getting Started Burp Suite 是用于攻擊web 應(yīng)用程序的集成平臺(tái)。它包含了許多工具,并為...
    Eva_chenx閱讀 29,231評(píng)論 0 14
  • 最近新出了一檔綜藝節(jié)目叫《國(guó)風(fēng)美少年》,已經(jīng)出了倆期,每一期我都認(rèn)認(rèn)真真的看完了,我覺(jué)得看這種節(jié)目不僅是是一種視覺(jué)...
    菊吖閱讀 1,056評(píng)論 0 0

友情鏈接更多精彩內(nèi)容