Sentinel流量控制之勻速排隊和預熱

在前面一篇文章寫了默認的DefaultNode的實現(xiàn)方法,現(xiàn)在講解剩余的幾種方式。

RateLimiterController 勻速排隊

重要參數(shù)

public class RateLimiterController implements TrafficShapingController {
    // 排隊等待的最大超時時間,如果等待時間超過該時間,就會拋出FlowException
    private final int maxQueueingTimeMs;
    //設置的閾值,以QPS為例,即1s中可以通過的數(shù)量
    private final double count;
    //上一次成功通過的時間戳  
    private final AtomicLong latestPassedTime = new AtomicLong(-1);
 public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 當需要的令牌數(shù)小于等于0,直接通過
        if (acquireCount <= 0) {
            return true;
        }
        //當閾值小于等于0時,拒絕
        if (count <= 0) {
            return false;
        }
        long currentTime = TimeUtil.currentTimeMillis();
        // 根據(jù)本次請求的令牌數(shù)計算兩個請求的間隔時間  
        // 1.0 / count * 10000 即一個令牌消耗的時間(毫秒), 再乘以所需的令牌數(shù),得到本次請求要間隔的時間
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        // 計算該請求所期望到達時間
        long expectedTime = costTime + latestPassedTime.get();
        //如果小于當前時間,沒有超出閾值,
        if (expectedTime <= currentTime) {
            // 此處可能存在爭用,但可以
            //更新上次通過事件為當前時間
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 如果expectedTime大于當前時間,說明還沒到令牌發(fā)放時間,當前請求需等待, 因為每隔一個時間產生一個令牌,超過了當前時間,說明期間產生的令牌不夠
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            //計算的需要等待的時間超過了最大排隊等待時間,就拒絕通過,并拋出FlowException異常
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
               // 更新時間,CAS操作,可能會出現(xiàn)爭搶,導致多次重試
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                   // 所以還要判斷等待時間
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // 線程等待
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

WarmUpController 預熱

Sentinel的"warm-up"的實現(xiàn)是基于Guava的算法。

但是,Guava的實施著重于調整請求間隔,類似于漏斗。Sentinel更注重控制每秒的傳入請求數(shù),而不計算其間隔,這類似于令牌桶算法。

桶中的剩余令牌用于衡量系統(tǒng)的可用性。假設系統(tǒng)每秒可以處理b個請求。每秒鐘b個令牌將被添加到存儲桶中,直到存儲桶已滿。當系統(tǒng)處理請求時,它會從存儲桶中獲取令牌。存儲桶中剩余的令牌越多,系統(tǒng)的利用率就越低; 當令牌桶中的令牌超過某個閾值時,我們將其稱為“飽和”狀態(tài)。

根據(jù)Guava的理論, 有一個線性方程可以寫成y = m * x + b,其中y(aka y(x))或qps(q))是我們在飽和周期( 例如3分鐘)所希望的Qps,m是從冷(最小)速率到穩(wěn)定(最大)速率的變化速率,x(或q)是占用令牌。

tips: 以上是在官方注釋文檔翻譯的。

重要變量:

public class WarmUpController implements TrafficShapingController {
    //流量規(guī)則設置的閾值
    protected double count;
    //冷卻因子,默認為3
    private int coldFactor;
    //警告token,與Guava中的RateLimiter的thresholdPermits對應,超過這個值后進入了預熱階段
    protected int warningToken = 0;
    //最大的令牌數(shù)和RateLimiter的maxPermits對應
    private int maxToken;
    //斜線斜率
    protected double slope;
    //當前存儲的令牌數(shù)
    protected AtomicLong storedTokens = new AtomicLong(0);
    //最后更新令牌的時間
    protected AtomicLong lastFilledTime = new AtomicLong(0);

構造方法:


public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
}

private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        //其中 1 / count 就是 stableInterval
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
       // 其中 coldInterval = 3 *  stableInterval ,所以  stableInterval + coldInterval = 4 *  stableInterval 而 stableInterval =  1 / count, 所以 stableInterval + coldInterval = 4 * (1/count) ,就可以得到下面的公式
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
        // - thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

在進行流量控制時會觸發(fā)下面的方法:

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //獲取當前節(jié)點已通過的QPs(調用的是按秒級統(tǒng)計)
        long passQps = (long) node.passQps();
        // 當前時間窗口的前一個窗口記錄的通過的QPS(調用的是按分鐘統(tǒng)計的)
        long previousQps = (long) node.previousPassQps();
        //更新 storedTokens 與 lastFilledTime 的值(最后減去了上一秒通過的令牌數(shù)QPS)
        syncToken(previousQps);

        // 開始計算它的斜率
        // 如果進入了警戒線,開始調整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            // int temp = aboveToken * slope + 1.0 / count 計算當前生成一個令牌的時間
            // 1  / temp 即按照當前速率,在1s中可以產生的令牌數(shù)
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            // 當前通過的數(shù)加上本次需要的令牌小于warningQps可以通過
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        // 當當前處于穩(wěn)定狀態(tài)時
        } else {
            // 判斷當前通過數(shù)加上本次需要的數(shù)布不超過閾值,可以通過
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

注意:調用者是rollingCounterInMinute,它是統(tǒng)計分鐘級別的,有60個窗口,每個窗口代表1s的數(shù)據(jù)。

@Override
    public double previousPassQps() {
        return this.rollingCounterInMinute.previousWindowPass();
}

public long previousWindowPass() {
        //更新時間窗口,去除過期時間
        data.currentWindow();
        WindowWrap<MetricBucket> wrap = data.getPreviousWindow();
        if (wrap == null) {
            return 0;
        }
        return wrap.value().pass();
}

public WindowWrap<T> getPreviousWindow() {
        return getPreviousWindow(TimeUtil.currentTimeMillis());
}

public WindowWrap<T> getPreviousWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // windowLengthInMs為一個時間窗口的長度,因為是分鐘級別統(tǒng)計的,所以是1s
        // 計算當前時間前1s,即前一個時間窗口的下標位置
        int idx = calculateTimeIdx(timeMillis - windowLengthInMs);
        timeMillis = timeMillis - windowLengthInMs;
        WindowWrap<T> wrap = array.get(idx);
        // 檢查是否不建議使用存儲桶,這意味著該存儲桶至少在整個窗口時間段(在這里是1分鐘)內都已落后
        if (wrap == null || isWindowDeprecated(wrap)) {
            return null;
        }

        if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {
            return null;
        }
        return wrap;
}
protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        // 計算出當前時間秒的最開始時間
        currentTime = currentTime - currentTime % 1000; 
        // 最后更新令牌的時間
        long oldLastFillTime = lastFilledTime.get();
        // 如果當前時間小于等于上次發(fā)放許可的時間,則跳過,無法發(fā)放令牌,即每秒發(fā)放一次令牌
        if (currentTime <= oldLastFillTime) {
            return;
        }
        //當前存放的令牌數(shù)
        long oldValue = storedTokens.get();
        //計算新的存放的令牌數(shù)
        long newValue = coolDownTokens(currentTime, passQps);
        
        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 更細剩余令牌數(shù)量,即減去上1秒通過的令牌
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }
private long coolDownTokens(long currentTime, long passQps) {
        // 當前存放的令牌數(shù)
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 添加令牌的判斷前提條件:
        // 當令牌的消耗程度遠遠低于警戒線的時候
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        // 當超過警戒線后,進入了預熱階段
        } else if (oldValue > warningToken) {
            //這里不是很理解,歡迎大家交流
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

WarmUpRateLimiterController

設計上是上面介紹的RateLimiterController和WarmUpController結合起來

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();

        long restToken = storedTokens.get();
        long costTime = 0;
        long expectedTime = 0;
    
        主要時下面與RateLimiterController不同
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;

            // current interval = restToken*slope+1/count
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        主要時上面與RateLimiterController不同
        expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
}

實際上,就是RateLimiterController中的代碼,然后加入了預熱的內容。

在RateLimiterController中,單個請求的costTime是固定的,就是1/count。

但是在這里,加入了WarmUp的內容,通過令牌數(shù)量,來判斷當前系統(tǒng)的QPS應該是多少,如果當前令牌書超過了warnTokens,那么系統(tǒng)的最大QPS容量已經低于我們預設的QPS了,相應的,costTime就會延長。

至此,關于Sentinel中的限流部分就講解完了。后面將會介紹降級部分。

參考文章:

RateLimiter 源碼分析(Guava 和 Sentinel 實現(xiàn))

Sentienl 流控效果之勻速排隊與預熱實現(xiàn)原理與實戰(zhàn)建議

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容