Alibaba Sentinel 限流與滑動(dòng)時(shí)間窗口

之前發(fā)過一篇文章,介紹了alibaba Sentinel限流功能。Alibaba Sentinel限流功能
限流依賴的基礎(chǔ)就是一個(gè)基于滑動(dòng)時(shí)間窗口的計(jì)數(shù)器。

固定時(shí)間窗口

介紹滑動(dòng)時(shí)間窗口前,先簡單介紹下固定時(shí)間窗口,見下圖

固定時(shí)間窗口

以統(tǒng)計(jì)QPS為例,我們可以將時(shí)間按照固定間隔進(jìn)行切分,比如1000ms(一秒),統(tǒng)計(jì)每一個(gè)時(shí)間窗口內(nèi)的計(jì)數(shù),然后得出QPS,這是一種最簡單的統(tǒng)計(jì)方式。
那么這種方式的缺點(diǎn)是什么呢?

臨界流量問題

比如我們定義了規(guī)則,QPS不能超過一萬,如果在900ms和1100 ms分別進(jìn)來一萬流量,顯然是滿足流控限制的,但實(shí)際上,這個(gè)流量已經(jīng)是兩萬QPS了。我們的流控限制在這種固定時(shí)間窗口下,起不到應(yīng)有的限流作用,會導(dǎo)致服務(wù)過載。

滑動(dòng)時(shí)間窗口

為了規(guī)避這個(gè)問題,滑動(dòng)時(shí)間窗口將時(shí)間切分為多個(gè)窗口(一般是兩個(gè)),窗口指針隨著時(shí)間往后滑動(dòng),見下圖

滑動(dòng)時(shí)間窗口

如圖,滑動(dòng)時(shí)間窗口將每秒鐘時(shí)間(1000ms)切分為2個(gè)窗口,W1永遠(yuǎn)指向前半秒(前500ms),w2永遠(yuǎn)指向后半秒(后500ms),初始時(shí),W1指向[0,500)ms區(qū)間,W2指向[500,1000)ms區(qū)間,當(dāng)時(shí)間往后走到1001ms時(shí),w1會去指向[1000,1500)ms區(qū)間。W2同理,會不斷的替換為新的區(qū)間,以實(shí)現(xiàn)窗口滑動(dòng)。用W1+W2兩個(gè)區(qū)間的計(jì)數(shù)之和進(jìn)行QPS計(jì)算,這樣就解決了固定時(shí)間窗口下的臨界流量問題。

Alibaba Sentinel代碼實(shí)現(xiàn)

Sentinel實(shí)現(xiàn)滑動(dòng)時(shí)間窗口,基于的是類OccupiableBucketLeapArray,其特殊的點(diǎn)就是,這個(gè)數(shù)據(jù)結(jié)構(gòu)除了持有兩個(gè)正常的時(shí)間窗口之外,還持有一個(gè)完全相同結(jié)構(gòu)的borrowArray,其中包含兩個(gè)未來的時(shí)間窗口。后續(xù)將介紹這個(gè)特殊結(jié)構(gòu)的作用。
其主要邏輯在父類LeapArray中,實(shí)現(xiàn)時(shí)間窗口初始化,獲取,滑動(dòng)。
根據(jù)當(dāng)前系統(tǒng)時(shí)間戳,去獲取歸屬的時(shí)間窗口,主要邏輯包含下述注釋中的三步
1.新建時(shí)間窗口 2.命中時(shí)間窗口 3.時(shí)間窗口起始值更新

             WindowWrap<T> old = array.get(idx);
            //1.如果老的時(shí)間窗口不存在,則新建新的時(shí)間窗口,通過cas的方式進(jìn)行替換
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            //2.如果時(shí)間戳在窗口之內(nèi),則直接返回
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            //3.如果時(shí)間戳已經(jīng)大于老窗口,則將老窗口的時(shí)間指向新的起始值
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);

基于以上的滑動(dòng)時(shí)間窗口,限流的具體過程見注釋1,2,3

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //1.計(jì)算當(dāng)前窗口計(jì)數(shù)之和
        int curCount = avgUsedTokens(node);
        //2.比較當(dāng)前流量與規(guī)則限制
        if (curCount + acquireCount > count) {
            //3.即使超限,如果prioritized設(shè)為true,則認(rèn)為是重要業(yè)務(wù),可以嘗試讓業(yè)務(wù)線程sleep到下一個(gè)窗口,借用下一個(gè)窗口的計(jì)數(shù)
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

前兩個(gè)步驟都很好理解,如果定義了限流規(guī)則為一萬QPS,當(dāng)流量超限,不讓通過即可,不允許訪問服務(wù)。
可是如果是重要業(yè)務(wù),超限了直接失敗顯然不行,Sentinel除了上圖的兩個(gè)window,還特意引入了一個(gè)包含兩個(gè)未來時(shí)間窗口的borrowArray,先借用未來的計(jì)數(shù),給與業(yè)務(wù)通過,同時(shí)讓業(yè)務(wù)線程sleep一段時(shí)間,去落在新窗口上,而且當(dāng)時(shí)間滑動(dòng)到新的窗口時(shí),也不用新建一個(gè)空計(jì)數(shù)的window,直接使用這個(gè)borrowArray中window的計(jì)數(shù)。這也是前文提到OccupiableBucketLeapArray特殊數(shù)據(jù)結(jié)構(gòu)的作用。

 public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();

        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            newBucket.reset(borrowBucket);
        }

        return newBucket;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容