sentinel限流控制器

在SytemRule,DegradeRule等規(guī)則中,有針對(duì)保護(hù)系統(tǒng)資源做限流,有針對(duì)rt,異常等做降級(jí)限流的。但是如何針對(duì)具體資源限定,進(jìn)行限流。
在FlowRule中,需要獲取一個(gè)節(jié)點(diǎn),以這個(gè)節(jié)點(diǎn),做數(shù)值參考,例如有ClusterNode,有OriginNode,或者默認(rèn)的DefaultNode。在選取Node時(shí),F(xiàn)lowRule都有對(duì)應(yīng)接口TrafficShapingController控制工具做為限流判斷。

  void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        // Flow rule map cannot be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();

        List<FlowRule> rules = flowRules.get(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

在FlowSlot中,通過(guò)resource中的name,資源命名從FlowRule管理器中得到他的所有規(guī)則,然后進(jìn)行遍歷,check是否通過(guò)。
在TrafficShapingController 控制器中具體有四種實(shí)現(xiàn)方式,默認(rèn)控制器,速率控制器,預(yù)熱控制器,速率與預(yù)熱結(jié)合的控制器。

? DefaultController 默認(rèn)控制器。
主要通過(guò)node中獲取的線程數(shù)量,或者檔期那qps與規(guī)則中的閾值比較,不符合就限制。

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }

在該段核心代碼中,avgUsedTokens方法中,其實(shí)時(shí)獲取當(dāng)前節(jié)點(diǎn)線程數(shù)或者當(dāng)前的qps,與當(dāng)前規(guī)則中count,閾值比較。其中,如果存在優(yōu)先級(jí)的資源申請(qǐng),不能因?yàn)楫?dāng)前的限制,將有優(yōu)先級(jí)請(qǐng)求直接否定,這邊如何解決的?有優(yōu)先級(jí)申請(qǐng)資源時(shí),雖然當(dāng)前狀況下資源已近消耗完,但是可以占用將來(lái)的一個(gè)令牌,并且node通過(guò)addOccupiedPass方法增加資源數(shù)量,然后sleep一段時(shí)間,時(shí)間到后,拋出PriortyWaite異常,該異常不會(huì)被記錄異常數(shù)值中。在StatisticSlot中體現(xiàn),對(duì)PriortyWaite異常抓取,并且只是增加線程數(shù)量增加,并沒(méi)有對(duì)pass增加,為什么?看一下如何實(shí)現(xiàn)。
StatisticNode類(lèi)中 tryOccupyNext方法:

    public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
        double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
        long currentBorrow = rollingCounterInSecond.waiting();
        if (currentBorrow >= maxCount) {
            return OccupyTimeoutProperty.getOccupyTimeout();
        }
        int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
        long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
        int idx = 0;
        long currentPass = rollingCounterInSecond.pass();
        while (earliestTime < currentTime) {
            // 距離idx+下一個(gè)窗口的時(shí)間
            long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
            if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
                break;
            }
            // 得到最近窗口的pass值
            long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
            if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
                return waitInMs;
            }
            earliestTime += windowLength;
            currentPass -= windowPass;
            idx++;
        }
        return OccupyTimeoutProperty.getOccupyTimeout();
    }

maxCount是指在周期內(nèi),最大的數(shù)值;
currentBorrow是指當(dāng)前已經(jīng)等待資源的數(shù)量(下一個(gè)周期成功借了多少資源,在未來(lái)時(shí)間內(nèi),占好坑的);
windowLength指一個(gè)窗口的時(shí)間長(zhǎng)度;
earliestTime指當(dāng)前時(shí)間窗口為周期的,開(kāi)始窗口的時(shí)間點(diǎn),例如一個(gè)周期是由4個(gè)窗口組成的,假設(shè)當(dāng)前窗口是處于第一個(gè)窗口(該窗口為距離當(dāng)前時(shí)間最近的窗口),那么earliestTime時(shí)間是這段周期的開(kāi)始時(shí)間,即為第二個(gè)窗口(該窗口與第一個(gè)窗口組成了一個(gè)周期);
currentPass指這段周期已經(jīng)pass的值;
整個(gè)方法最終結(jié)果是想得到一個(gè)等待的時(shí)間,但是該等待時(shí)間也是有最大限制的。在while循環(huán)內(nèi),earliestTime一定要小于currentTime的,currentTime比earliestTime大((SampleCountProperty.SAMPLE_COUNT-1)個(gè)窗口時(shí)間+currentTime % windowLength)這么多時(shí)間值,idx表示已經(jīng)遍歷次數(shù)。當(dāng)?shù)谝淮螘r(shí),waitInMs的時(shí)間其實(shí)時(shí)當(dāng)前時(shí)間窗口的下一個(gè)窗口開(kāi)始時(shí)間減去當(dāng)前時(shí)間,意思就是等待waitInMs,就是下一個(gè)窗口的開(kāi)始時(shí)間了。通過(guò)earliestTime獲取與他關(guān)聯(lián)的窗口pass值。那么在接下來(lái)的
(currentPass + currentBorrow + acquireCount - windowPass <= maxCount) 判斷,maxCount是在一個(gè)完整周期內(nèi)最大的閾值,減去windowPass是相當(dāng)于減去了一個(gè)窗口的統(tǒng)計(jì)值,剩余已經(jīng)pass的值與已經(jīng)占用的資源是否小于等于maxCount。在條件不滿(mǎn)足情況下,會(huì)將earliestTime增加一個(gè)窗口長(zhǎng)度,currentPass也會(huì)減去一個(gè)窗口的記錄pass值。
此地需要圖解釋
如何記錄未來(lái)已經(jīng)占用的資源。
在StatisticNode節(jié)點(diǎn)中rollingCounterInSecond屬性中,在構(gòu)造器中,默認(rèn)由OccupiableBucketLeapArray類(lèi)去控制,而該類(lèi)也是繼承了LeapArray的,但是他由內(nèi)置屬性FutureBucketLeapArray,其實(shí)就是通過(guò)該類(lèi)型記錄未來(lái)資源占用。這里有個(gè)思考,未來(lái)占用的資源,到達(dá)時(shí)間點(diǎn)后,這些資源也應(yīng)該記錄在當(dāng)前已經(jīng)申請(qǐng)的資源中,確實(shí)要記錄,在LeapArray獲取窗口時(shí),如果窗口過(guò)期都會(huì)去重置窗口,那么OccupiableBucketLeapArray也重寫(xiě)了窗口重置方法:

  protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // Update the start time and reset value.
        w.resetTo(time);
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            w.value().reset();
            w.value().addPass((int)borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

先重置窗口的開(kāi)始時(shí)間,然后通過(guò)time,在borrowArray中得到該窗口的統(tǒng)計(jì)槽,并且將borrowBucket中的pass值賦值給重置窗口中,這樣就將已經(jīng)出借的一個(gè)窗口下的pass值給了當(dāng)前窗口了。所以在拋出PriorityWaitException異常時(shí)StatiticSlot沒(méi)有對(duì)node進(jìn)行添加pass動(dòng)作,只是增加了線程動(dòng)作。
在得到滿(mǎn)足條件的一個(gè)waitInMs值時(shí),并且waitInMs其實(shí)是當(dāng)前時(shí)間距離下一個(gè)(或者多個(gè))窗口開(kāi)始時(shí)間的時(shí)間差,那么currentTime + waitInMs就是未來(lái)某一個(gè)窗口的開(kāi)始時(shí)間,并且對(duì)該窗口中增加wait資源,然后線程sleep掉waitInMs時(shí)間。這是有優(yōu)先級(jí)資源申請(qǐng)時(shí)的邏輯。

? RateLimiterController 限速控制器
其實(shí)他與本身node統(tǒng)計(jì)的數(shù)值沒(méi)有多大關(guān)系,而且與自身設(shè)置的最大等待時(shí)間maxQueueingTimeMs與qps設(shè)定的count有關(guān)。canPass方法

  public boolean canPass(Node node, int acquireCount, boolean prioritized) {      
        if (acquireCount <= 0) {
            return true;
        }      
        if (count <= 0) {
            return false;
        }
        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // Calculate the time to wait.
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

costTime:acquireCount個(gè)數(shù)資源在count下的預(yù)估所需耗時(shí);
expectedTime:預(yù)期的時(shí)間點(diǎn);
latestPassedTime:最近通過(guò)的時(shí)間點(diǎn);
在預(yù)期時(shí)間小于等于當(dāng)前時(shí)間,就會(huì)判斷通過(guò)。
waitTime表示預(yù)計(jì)耗時(shí)加上latestPassedTime時(shí)間,最后減去當(dāng)前時(shí)間,即為耗時(shí)。如果該waitTime大于了最大等待時(shí)間,則不能通過(guò)。在latestPassedTime嘗試增加耗時(shí),如果增加后得到的值減去當(dāng)前時(shí)間,任然大于最大等待時(shí)間,那么latestPassedTime需要減去已經(jīng)增加的costTime,如果通過(guò)判斷,則線程sleep時(shí)間waitTime。latestPassedTime是原子的,不用考慮線程安全問(wèn)題。那么在高并發(fā)下同一時(shí)間點(diǎn),最多能申請(qǐng)多少資源呢?
一個(gè)資源需要消耗Math.round(1.0 * 1/ count * 1000)時(shí)間,然后有n個(gè)請(qǐng)求進(jìn)來(lái),那么一個(gè)公式 n*Math.round(1.0 * 1/ count * 1000)<= maxQueueingTimeMs。因?yàn)閘atestPassedTime時(shí)間是通過(guò)currentTime增加n個(gè)申請(qǐng)資源耗時(shí)的時(shí)間累加得到的。所以可以想象一下,有多個(gè)請(qǐng)求進(jìn)來(lái)時(shí),他們并不會(huì)并發(fā)執(zhí)行,而是有時(shí)間順序的依次執(zhí)行。

? WarmUpController 預(yù)熱控制器
預(yù)熱是將請(qǐng)求隨著時(shí)間推移,慢慢增加通過(guò)量,防止系統(tǒng)在短時(shí)間內(nèi),收到比較大的流量沖擊,保護(hù)系統(tǒng)穩(wěn)定,防止出現(xiàn)過(guò)載等情形。

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }     
        this.count = count;
        this.coldFactor = coldFactor;

        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
        // - thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

在構(gòu)造器中,初始化了最大令牌數(shù)maxToken,需要警告的令牌數(shù)warningToken,和需要算法指定的斜率slope。思考一下,什么是預(yù)熱,他是保證在一定的請(qǐng)求量進(jìn)來(lái)時(shí),順著時(shí)間的推移,越來(lái)約放寬,能夠申請(qǐng)資源的請(qǐng)求會(huì)越來(lái)越多的功能。其實(shí)sentinel的越熱思想,參考了guava中預(yù)熱的計(jì)算思想

到達(dá)脈沖的請(qǐng)求可能會(huì)拖累長(zhǎng)時(shí)間空閑的系統(tǒng),即使它在穩(wěn)定期間具有更大的處理能力。它通常發(fā)生在需要額外時(shí)間進(jìn)行初始化的場(chǎng)景中,例如,DB建立連接、連接到遠(yuǎn)程服務(wù)等。所以我們需要“熱身”。
Sentinel的“預(yù)熱”實(shí)現(xiàn)基于guava的算法。然而,guava的實(shí)現(xiàn)集中于調(diào)整請(qǐng)求間隔,這類(lèi)似于漏桶。Sentinel更注重在不計(jì)算其間隔的情況下控制每秒傳入請(qǐng)求的計(jì)數(shù),這類(lèi)似于令牌桶算法。
桶中剩余的令牌用于測(cè)量系統(tǒng)實(shí)用程序。假設(shè)一個(gè)系統(tǒng)可以每秒處理B個(gè)請(qǐng)求。每秒鐘B令牌將被添加到bucket中,直到bucket滿(mǎn)為止。當(dāng)系統(tǒng)處理一個(gè)請(qǐng)求時(shí),它從桶中獲取一個(gè)令牌。存儲(chǔ)桶中剩余的令牌越多,系統(tǒng)的利用率就越低;當(dāng)令牌存儲(chǔ)桶中的令牌高于某個(gè)閾值時(shí),我們稱(chēng)之為“飽和”狀態(tài)。

所以sentinel給出的策略是什么?即當(dāng)前令牌數(shù)獲取的越多,那么系統(tǒng)飽和度越高,如果系統(tǒng)令牌獲取少,則系統(tǒng)利用率低,這時(shí)候需要進(jìn)行更加嚴(yán)格的限流措施。
了解一下canPass方法:

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);
        // 開(kāi)始計(jì)算它的斜率
        // 如果進(jìn)入了警戒線,開(kāi)始調(diào)整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            // 剩余的令牌數(shù)量較多,并且超過(guò)了警戒令牌數(shù)量,那么需要進(jìn)行嚴(yán)格限制
            long aboveToken = restToken - warningToken;        

            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));          
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }
        return false;
    }

該方法中有兩個(gè)ops,passOps是當(dāng)前時(shí)間的qps,而previousOps可以認(rèn)為是上一秒的pqs,storeTokens是指當(dāng)前存儲(chǔ)的token令牌,當(dāng)storeTokens越小,則系統(tǒng)的使用率是越高的。但是系統(tǒng)是來(lái)預(yù)熱,所以在當(dāng)前令牌數(shù)越大,系統(tǒng)就需要進(jìn)行限制。所以是否通過(guò),就需要通過(guò)當(dāng)前剩余的令牌數(shù)量進(jìn)行相應(yīng)的公式,得到一個(gè)可以通過(guò)的warningQps,這個(gè)pqs可以看到公式,隨著aboveToken 越來(lái)越大,qps是越小的,就限制了pass的閾值的。當(dāng)然,storeTokens的令牌如何變化,例如如何減少,如何重置的。
同步syncToken()方法:

    protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        // 將時(shí)間只為秒整數(shù),去除了毫秒
        currentTime = currentTime - currentTime % 1000;

        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            // 如果最近一次填充時(shí)間與當(dāng)前時(shí)間在同一秒內(nèi)
            return;
        }
        // 不在同一秒
        long oldValue = storedTokens.get();
        // 在不同時(shí)間內(nèi),新累積的令牌數(shù)量
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 并且重置storedTokens令牌,并且扣減掉已經(jīng)使用的passQps
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

其中l(wèi)astFilledTime是記錄最近一次更新storeToken的時(shí)間的,但是這個(gè)有個(gè)限制,如果記錄的時(shí)間與當(dāng)前時(shí)間currentTime是同一秒的,就不會(huì)更新storeToken值的。所以簡(jiǎn)單的理解,storeToken值是按秒數(shù)更新的,并且在同一秒內(nèi),warningQps的值是一致的,對(duì)在同一秒內(nèi)請(qǐng)求通過(guò)的資源是同等對(duì)待的。那么,storeToken是如何隨著時(shí)間推移,慢慢去變化的?
通過(guò)coolDownTokens方法得到的新token值,然后storeTokens去重置,并且減去了上一秒的pqs(passQps)更新了lastFilledTime時(shí)間,storeTokens剩下部分是認(rèn)為沒(méi)有被獲取的token值。好,考慮一下,storeToken值的重置,肯定是當(dāng)前時(shí)間與lastFilledTime有關(guān)系的,隨著時(shí)間的推移,storeToken也可能會(huì)增多??匆幌耤oolDownTokens方法:

  private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 添加令牌的判斷前提條件:
        // 當(dāng)令牌的消耗程度遠(yuǎn)遠(yuǎn)低于警戒線的時(shí)候
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            if (passQps < (int)count / coldFactor) {
                // qps 較低,系統(tǒng)利用率低,那么需要增加一些token令牌
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

首先當(dāng)storedToken剩余的數(shù)量已經(jīng)小于warningToken,那么,確實(shí)要增加點(diǎn)令牌進(jìn)去,他的公式是將剩余的令牌數(shù)+(當(dāng)前時(shí)間-lastFilledTime時(shí)間)*每秒內(nèi)的qps。
但是當(dāng)storedTokens大于了警戒位,并沒(méi)說(shuō)類(lèi)似之前的增加令牌公式一下。而且多了一層判斷,上一秒的passQps與設(shè)定的比例小,才加上去,如果passQps較大,那storedToken任然是舊值。為什么需要這樣?我們知道,storeToken值越小,那么準(zhǔn)許進(jìn)入的限制就會(huì)放開(kāi),該預(yù)熱設(shè)計(jì)也是一秒一秒,慢慢夸大流量進(jìn)入,那么count / coldFactor這個(gè)公式,是判斷申請(qǐng)資源增強(qiáng)的標(biāo)志,要不然會(huì)出現(xiàn)什么情況呢?預(yù)熱控制器會(huì)一直綁死在越熱階段,并不會(huì)隨著時(shí)間推移,流量增多而夸大pqs的閾值。

總結(jié)

每個(gè)限流控制器的策略不盡相同,但是目的都是在保護(hù)系統(tǒng)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容