在前面一篇文章寫了默認的DefaultNode的實現(xiàn)方法,現(xiàn)在講解剩余的幾種方式。
RateLimiterController 勻速排隊
重要參數(shù)
public class RateLimiterController implements TrafficShapingController {
// 排隊等待的最大超時時間,如果等待時間超過該時間,就會拋出FlowException
private final int maxQueueingTimeMs;
//設置的閾值,以QPS為例,即1s中可以通過的數(shù)量
private final double count;
//上一次成功通過的時間戳
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 當需要的令牌數(shù)小于等于0,直接通過
if (acquireCount <= 0) {
return true;
}
//當閾值小于等于0時,拒絕
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 根據(jù)本次請求的令牌數(shù)計算兩個請求的間隔時間
// 1.0 / count * 10000 即一個令牌消耗的時間(毫秒), 再乘以所需的令牌數(shù),得到本次請求要間隔的時間
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 計算該請求所期望到達時間
long expectedTime = costTime + latestPassedTime.get();
//如果小于當前時間,沒有超出閾值,
if (expectedTime <= currentTime) {
// 此處可能存在爭用,但可以
//更新上次通過事件為當前時間
latestPassedTime.set(currentTime);
return true;
} else {
// 如果expectedTime大于當前時間,說明還沒到令牌發(fā)放時間,當前請求需等待, 因為每隔一個時間產生一個令牌,超過了當前時間,說明期間產生的令牌不夠
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
//計算的需要等待的時間超過了最大排隊等待時間,就拒絕通過,并拋出FlowException異常
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 更新時間,CAS操作,可能會出現(xiàn)爭搶,導致多次重試
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 所以還要判斷等待時間
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// 線程等待
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
WarmUpController 預熱
Sentinel的"warm-up"的實現(xiàn)是基于Guava的算法。
但是,Guava的實施著重于調整請求間隔,類似于漏斗。Sentinel更注重控制每秒的傳入請求數(shù),而不計算其間隔,這類似于令牌桶算法。
桶中的剩余令牌用于衡量系統(tǒng)的可用性。假設系統(tǒng)每秒可以處理b個請求。每秒鐘b個令牌將被添加到存儲桶中,直到存儲桶已滿。當系統(tǒng)處理請求時,它會從存儲桶中獲取令牌。存儲桶中剩余的令牌越多,系統(tǒng)的利用率就越低; 當令牌桶中的令牌超過某個閾值時,我們將其稱為“飽和”狀態(tài)。
根據(jù)Guava的理論, 有一個線性方程可以寫成y = m * x + b,其中y(aka y(x))或qps(q))是我們在飽和周期( 例如3分鐘)所希望的Qps,m是從冷(最小)速率到穩(wěn)定(最大)速率的變化速率,x(或q)是占用令牌。
tips: 以上是在官方注釋文檔翻譯的。
重要變量:
public class WarmUpController implements TrafficShapingController {
//流量規(guī)則設置的閾值
protected double count;
//冷卻因子,默認為3
private int coldFactor;
//警告token,與Guava中的RateLimiter的thresholdPermits對應,超過這個值后進入了預熱階段
protected int warningToken = 0;
//最大的令牌數(shù)和RateLimiter的maxPermits對應
private int maxToken;
//斜線斜率
protected double slope;
//當前存儲的令牌數(shù)
protected AtomicLong storedTokens = new AtomicLong(0);
//最后更新令牌的時間
protected AtomicLong lastFilledTime = new AtomicLong(0);
構造方法:
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
//其中 1 / count 就是 stableInterval
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// 其中 coldInterval = 3 * stableInterval ,所以 stableInterval + coldInterval = 4 * stableInterval 而 stableInterval = 1 / count, 所以 stableInterval + coldInterval = 4 * (1/count) ,就可以得到下面的公式
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
在進行流量控制時會觸發(fā)下面的方法:
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//獲取當前節(jié)點已通過的QPs(調用的是按秒級統(tǒng)計)
long passQps = (long) node.passQps();
// 當前時間窗口的前一個窗口記錄的通過的QPS(調用的是按分鐘統(tǒng)計的)
long previousQps = (long) node.previousPassQps();
//更新 storedTokens 與 lastFilledTime 的值(最后減去了上一秒通過的令牌數(shù)QPS)
syncToken(previousQps);
// 開始計算它的斜率
// 如果進入了警戒線,開始調整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
// int temp = aboveToken * slope + 1.0 / count 計算當前生成一個令牌的時間
// 1 / temp 即按照當前速率,在1s中可以產生的令牌數(shù)
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 當前通過的數(shù)加上本次需要的令牌小于warningQps可以通過
if (passQps + acquireCount <= warningQps) {
return true;
}
// 當當前處于穩(wěn)定狀態(tài)時
} else {
// 判斷當前通過數(shù)加上本次需要的數(shù)布不超過閾值,可以通過
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
注意:調用者是rollingCounterInMinute,它是統(tǒng)計分鐘級別的,有60個窗口,每個窗口代表1s的數(shù)據(jù)。
@Override
public double previousPassQps() {
return this.rollingCounterInMinute.previousWindowPass();
}
public long previousWindowPass() {
//更新時間窗口,去除過期時間
data.currentWindow();
WindowWrap<MetricBucket> wrap = data.getPreviousWindow();
if (wrap == null) {
return 0;
}
return wrap.value().pass();
}
public WindowWrap<T> getPreviousWindow() {
return getPreviousWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> getPreviousWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// windowLengthInMs為一個時間窗口的長度,因為是分鐘級別統(tǒng)計的,所以是1s
// 計算當前時間前1s,即前一個時間窗口的下標位置
int idx = calculateTimeIdx(timeMillis - windowLengthInMs);
timeMillis = timeMillis - windowLengthInMs;
WindowWrap<T> wrap = array.get(idx);
// 檢查是否不建議使用存儲桶,這意味著該存儲桶至少在整個窗口時間段(在這里是1分鐘)內都已落后
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {
return null;
}
return wrap;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
// 計算出當前時間秒的最開始時間
currentTime = currentTime - currentTime % 1000;
// 最后更新令牌的時間
long oldLastFillTime = lastFilledTime.get();
// 如果當前時間小于等于上次發(fā)放許可的時間,則跳過,無法發(fā)放令牌,即每秒發(fā)放一次令牌
if (currentTime <= oldLastFillTime) {
return;
}
//當前存放的令牌數(shù)
long oldValue = storedTokens.get();
//計算新的存放的令牌數(shù)
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 更細剩余令牌數(shù)量,即減去上1秒通過的令牌
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
// 當前存放的令牌數(shù)
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判斷前提條件:
// 當令牌的消耗程度遠遠低于警戒線的時候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
// 當超過警戒線后,進入了預熱階段
} else if (oldValue > warningToken) {
//這里不是很理解,歡迎大家交流
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
WarmUpRateLimiterController
設計上是上面介紹的RateLimiterController和WarmUpController結合起來
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
主要時下面與RateLimiterController不同
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
主要時上面與RateLimiterController不同
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
實際上,就是RateLimiterController中的代碼,然后加入了預熱的內容。
在RateLimiterController中,單個請求的costTime是固定的,就是1/count。
但是在這里,加入了WarmUp的內容,通過令牌數(shù)量,來判斷當前系統(tǒng)的QPS應該是多少,如果當前令牌書超過了warnTokens,那么系統(tǒng)的最大QPS容量已經低于我們預設的QPS了,相應的,costTime就會延長。
至此,關于Sentinel中的限流部分就講解完了。后面將會介紹降級部分。
參考文章:
RateLimiter 源碼分析(Guava 和 Sentinel 實現(xiàn))
Sentienl 流控效果之勻速排隊與預熱實現(xiàn)原理與實戰(zhàn)建議