在SytemRule,DegradeRule等規(guī)則中,有針對(duì)保護(hù)系統(tǒng)資源做限流,有針對(duì)rt,異常等做降級(jí)限流的。但是如何針對(duì)具體資源限定,進(jìn)行限流。
在FlowRule中,需要獲取一個(gè)節(jié)點(diǎn),以這個(gè)節(jié)點(diǎn),做數(shù)值參考,例如有ClusterNode,有OriginNode,或者默認(rèn)的DefaultNode。在選取Node時(shí),F(xiàn)lowRule都有對(duì)應(yīng)接口TrafficShapingController控制工具做為限流判斷。
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// Flow rule map cannot be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
List<FlowRule> rules = flowRules.get(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
在FlowSlot中,通過(guò)resource中的name,資源命名從FlowRule管理器中得到他的所有規(guī)則,然后進(jìn)行遍歷,check是否通過(guò)。
在TrafficShapingController 控制器中具體有四種實(shí)現(xiàn)方式,默認(rèn)控制器,速率控制器,預(yù)熱控制器,速率與預(yù)熱結(jié)合的控制器。
? DefaultController 默認(rèn)控制器。
主要通過(guò)node中獲取的線程數(shù)量,或者檔期那qps與規(guī)則中的閾值比較,不符合就限制。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
在該段核心代碼中,avgUsedTokens方法中,其實(shí)時(shí)獲取當(dāng)前節(jié)點(diǎn)線程數(shù)或者當(dāng)前的qps,與當(dāng)前規(guī)則中count,閾值比較。其中,如果存在優(yōu)先級(jí)的資源申請(qǐng),不能因?yàn)楫?dāng)前的限制,將有優(yōu)先級(jí)請(qǐng)求直接否定,這邊如何解決的?有優(yōu)先級(jí)申請(qǐng)資源時(shí),雖然當(dāng)前狀況下資源已近消耗完,但是可以占用將來(lái)的一個(gè)令牌,并且node通過(guò)addOccupiedPass方法增加資源數(shù)量,然后sleep一段時(shí)間,時(shí)間到后,拋出PriortyWaite異常,該異常不會(huì)被記錄異常數(shù)值中。在StatisticSlot中體現(xiàn),對(duì)PriortyWaite異常抓取,并且只是增加線程數(shù)量增加,并沒(méi)有對(duì)pass增加,為什么?看一下如何實(shí)現(xiàn)。
StatisticNode類(lèi)中 tryOccupyNext方法:
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}
int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
int idx = 0;
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
// 距離idx+下一個(gè)窗口的時(shí)間
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
// 得到最近窗口的pass值
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}
return OccupyTimeoutProperty.getOccupyTimeout();
}
maxCount是指在周期內(nèi),最大的數(shù)值;
currentBorrow是指當(dāng)前已經(jīng)等待資源的數(shù)量(下一個(gè)周期成功借了多少資源,在未來(lái)時(shí)間內(nèi),占好坑的);
windowLength指一個(gè)窗口的時(shí)間長(zhǎng)度;
earliestTime指當(dāng)前時(shí)間窗口為周期的,開(kāi)始窗口的時(shí)間點(diǎn),例如一個(gè)周期是由4個(gè)窗口組成的,假設(shè)當(dāng)前窗口是處于第一個(gè)窗口(該窗口為距離當(dāng)前時(shí)間最近的窗口),那么earliestTime時(shí)間是這段周期的開(kāi)始時(shí)間,即為第二個(gè)窗口(該窗口與第一個(gè)窗口組成了一個(gè)周期);
currentPass指這段周期已經(jīng)pass的值;
整個(gè)方法最終結(jié)果是想得到一個(gè)等待的時(shí)間,但是該等待時(shí)間也是有最大限制的。在while循環(huán)內(nèi),earliestTime一定要小于currentTime的,currentTime比earliestTime大((SampleCountProperty.SAMPLE_COUNT-1)個(gè)窗口時(shí)間+currentTime % windowLength)這么多時(shí)間值,idx表示已經(jīng)遍歷次數(shù)。當(dāng)?shù)谝淮螘r(shí),waitInMs的時(shí)間其實(shí)時(shí)當(dāng)前時(shí)間窗口的下一個(gè)窗口開(kāi)始時(shí)間減去當(dāng)前時(shí)間,意思就是等待waitInMs,就是下一個(gè)窗口的開(kāi)始時(shí)間了。通過(guò)earliestTime獲取與他關(guān)聯(lián)的窗口pass值。那么在接下來(lái)的
(currentPass + currentBorrow + acquireCount - windowPass <= maxCount) 判斷,maxCount是在一個(gè)完整周期內(nèi)最大的閾值,減去windowPass是相當(dāng)于減去了一個(gè)窗口的統(tǒng)計(jì)值,剩余已經(jīng)pass的值與已經(jīng)占用的資源是否小于等于maxCount。在條件不滿(mǎn)足情況下,會(huì)將earliestTime增加一個(gè)窗口長(zhǎng)度,currentPass也會(huì)減去一個(gè)窗口的記錄pass值。
此地需要圖解釋
如何記錄未來(lái)已經(jīng)占用的資源。
在StatisticNode節(jié)點(diǎn)中rollingCounterInSecond屬性中,在構(gòu)造器中,默認(rèn)由OccupiableBucketLeapArray類(lèi)去控制,而該類(lèi)也是繼承了LeapArray的,但是他由內(nèi)置屬性FutureBucketLeapArray,其實(shí)就是通過(guò)該類(lèi)型記錄未來(lái)資源占用。這里有個(gè)思考,未來(lái)占用的資源,到達(dá)時(shí)間點(diǎn)后,這些資源也應(yīng)該記錄在當(dāng)前已經(jīng)申請(qǐng)的資源中,確實(shí)要記錄,在LeapArray獲取窗口時(shí),如果窗口過(guò)期都會(huì)去重置窗口,那么OccupiableBucketLeapArray也重寫(xiě)了窗口重置方法:
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
先重置窗口的開(kāi)始時(shí)間,然后通過(guò)time,在borrowArray中得到該窗口的統(tǒng)計(jì)槽,并且將borrowBucket中的pass值賦值給重置窗口中,這樣就將已經(jīng)出借的一個(gè)窗口下的pass值給了當(dāng)前窗口了。所以在拋出PriorityWaitException異常時(shí)StatiticSlot沒(méi)有對(duì)node進(jìn)行添加pass動(dòng)作,只是增加了線程動(dòng)作。
在得到滿(mǎn)足條件的一個(gè)waitInMs值時(shí),并且waitInMs其實(shí)是當(dāng)前時(shí)間距離下一個(gè)(或者多個(gè))窗口開(kāi)始時(shí)間的時(shí)間差,那么currentTime + waitInMs就是未來(lái)某一個(gè)窗口的開(kāi)始時(shí)間,并且對(duì)該窗口中增加wait資源,然后線程sleep掉waitInMs時(shí)間。這是有優(yōu)先級(jí)資源申請(qǐng)時(shí)的邏輯。
? RateLimiterController 限速控制器
其實(shí)他與本身node統(tǒng)計(jì)的數(shù)值沒(méi)有多大關(guān)系,而且與自身設(shè)置的最大等待時(shí)間maxQueueingTimeMs與qps設(shè)定的count有關(guān)。canPass方法
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
if (acquireCount <= 0) {
return true;
}
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
costTime:acquireCount個(gè)數(shù)資源在count下的預(yù)估所需耗時(shí);
expectedTime:預(yù)期的時(shí)間點(diǎn);
latestPassedTime:最近通過(guò)的時(shí)間點(diǎn);
在預(yù)期時(shí)間小于等于當(dāng)前時(shí)間,就會(huì)判斷通過(guò)。
waitTime表示預(yù)計(jì)耗時(shí)加上latestPassedTime時(shí)間,最后減去當(dāng)前時(shí)間,即為耗時(shí)。如果該waitTime大于了最大等待時(shí)間,則不能通過(guò)。在latestPassedTime嘗試增加耗時(shí),如果增加后得到的值減去當(dāng)前時(shí)間,任然大于最大等待時(shí)間,那么latestPassedTime需要減去已經(jīng)增加的costTime,如果通過(guò)判斷,則線程sleep時(shí)間waitTime。latestPassedTime是原子的,不用考慮線程安全問(wèn)題。那么在高并發(fā)下同一時(shí)間點(diǎn),最多能申請(qǐng)多少資源呢?
一個(gè)資源需要消耗Math.round(1.0 * 1/ count * 1000)時(shí)間,然后有n個(gè)請(qǐng)求進(jìn)來(lái),那么一個(gè)公式 n*Math.round(1.0 * 1/ count * 1000)<= maxQueueingTimeMs。因?yàn)閘atestPassedTime時(shí)間是通過(guò)currentTime增加n個(gè)申請(qǐng)資源耗時(shí)的時(shí)間累加得到的。所以可以想象一下,有多個(gè)請(qǐng)求進(jìn)來(lái)時(shí),他們并不會(huì)并發(fā)執(zhí)行,而是有時(shí)間順序的依次執(zhí)行。
? WarmUpController 預(yù)熱控制器
預(yù)熱是將請(qǐng)求隨著時(shí)間推移,慢慢增加通過(guò)量,防止系統(tǒng)在短時(shí)間內(nèi),收到比較大的流量沖擊,保護(hù)系統(tǒng)穩(wěn)定,防止出現(xiàn)過(guò)載等情形。
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
在構(gòu)造器中,初始化了最大令牌數(shù)maxToken,需要警告的令牌數(shù)warningToken,和需要算法指定的斜率slope。思考一下,什么是預(yù)熱,他是保證在一定的請(qǐng)求量進(jìn)來(lái)時(shí),順著時(shí)間的推移,越來(lái)約放寬,能夠申請(qǐng)資源的請(qǐng)求會(huì)越來(lái)越多的功能。其實(shí)sentinel的越熱思想,參考了guava中預(yù)熱的計(jì)算思想
到達(dá)脈沖的請(qǐng)求可能會(huì)拖累長(zhǎng)時(shí)間空閑的系統(tǒng),即使它在穩(wěn)定期間具有更大的處理能力。它通常發(fā)生在需要額外時(shí)間進(jìn)行初始化的場(chǎng)景中,例如,DB建立連接、連接到遠(yuǎn)程服務(wù)等。所以我們需要“熱身”。
Sentinel的“預(yù)熱”實(shí)現(xiàn)基于guava的算法。然而,guava的實(shí)現(xiàn)集中于調(diào)整請(qǐng)求間隔,這類(lèi)似于漏桶。Sentinel更注重在不計(jì)算其間隔的情況下控制每秒傳入請(qǐng)求的計(jì)數(shù),這類(lèi)似于令牌桶算法。
桶中剩余的令牌用于測(cè)量系統(tǒng)實(shí)用程序。假設(shè)一個(gè)系統(tǒng)可以每秒處理B個(gè)請(qǐng)求。每秒鐘B令牌將被添加到bucket中,直到bucket滿(mǎn)為止。當(dāng)系統(tǒng)處理一個(gè)請(qǐng)求時(shí),它從桶中獲取一個(gè)令牌。存儲(chǔ)桶中剩余的令牌越多,系統(tǒng)的利用率就越低;當(dāng)令牌存儲(chǔ)桶中的令牌高于某個(gè)閾值時(shí),我們稱(chēng)之為“飽和”狀態(tài)。
所以sentinel給出的策略是什么?即當(dāng)前令牌數(shù)獲取的越多,那么系統(tǒng)飽和度越高,如果系統(tǒng)令牌獲取少,則系統(tǒng)利用率低,這時(shí)候需要進(jìn)行更加嚴(yán)格的限流措施。
了解一下canPass方法:
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 開(kāi)始計(jì)算它的斜率
// 如果進(jìn)入了警戒線,開(kāi)始調(diào)整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
// 剩余的令牌數(shù)量較多,并且超過(guò)了警戒令牌數(shù)量,那么需要進(jìn)行嚴(yán)格限制
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
該方法中有兩個(gè)ops,passOps是當(dāng)前時(shí)間的qps,而previousOps可以認(rèn)為是上一秒的pqs,storeTokens是指當(dāng)前存儲(chǔ)的token令牌,當(dāng)storeTokens越小,則系統(tǒng)的使用率是越高的。但是系統(tǒng)是來(lái)預(yù)熱,所以在當(dāng)前令牌數(shù)越大,系統(tǒng)就需要進(jìn)行限制。所以是否通過(guò),就需要通過(guò)當(dāng)前剩余的令牌數(shù)量進(jìn)行相應(yīng)的公式,得到一個(gè)可以通過(guò)的warningQps,這個(gè)pqs可以看到公式,隨著aboveToken 越來(lái)越大,qps是越小的,就限制了pass的閾值的。當(dāng)然,storeTokens的令牌如何變化,例如如何減少,如何重置的。
同步syncToken()方法:
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
// 將時(shí)間只為秒整數(shù),去除了毫秒
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
// 如果最近一次填充時(shí)間與當(dāng)前時(shí)間在同一秒內(nèi)
return;
}
// 不在同一秒
long oldValue = storedTokens.get();
// 在不同時(shí)間內(nèi),新累積的令牌數(shù)量
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 并且重置storedTokens令牌,并且扣減掉已經(jīng)使用的passQps
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
其中l(wèi)astFilledTime是記錄最近一次更新storeToken的時(shí)間的,但是這個(gè)有個(gè)限制,如果記錄的時(shí)間與當(dāng)前時(shí)間currentTime是同一秒的,就不會(huì)更新storeToken值的。所以簡(jiǎn)單的理解,storeToken值是按秒數(shù)更新的,并且在同一秒內(nèi),warningQps的值是一致的,對(duì)在同一秒內(nèi)請(qǐng)求通過(guò)的資源是同等對(duì)待的。那么,storeToken是如何隨著時(shí)間推移,慢慢去變化的?
通過(guò)coolDownTokens方法得到的新token值,然后storeTokens去重置,并且減去了上一秒的pqs(passQps)更新了lastFilledTime時(shí)間,storeTokens剩下部分是認(rèn)為沒(méi)有被獲取的token值。好,考慮一下,storeToken值的重置,肯定是當(dāng)前時(shí)間與lastFilledTime有關(guān)系的,隨著時(shí)間的推移,storeToken也可能會(huì)增多??匆幌耤oolDownTokens方法:
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判斷前提條件:
// 當(dāng)令牌的消耗程度遠(yuǎn)遠(yuǎn)低于警戒線的時(shí)候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
// qps 較低,系統(tǒng)利用率低,那么需要增加一些token令牌
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
首先當(dāng)storedToken剩余的數(shù)量已經(jīng)小于warningToken,那么,確實(shí)要增加點(diǎn)令牌進(jìn)去,他的公式是將剩余的令牌數(shù)+(當(dāng)前時(shí)間-lastFilledTime時(shí)間)*每秒內(nèi)的qps。
但是當(dāng)storedTokens大于了警戒位,并沒(méi)說(shuō)類(lèi)似之前的增加令牌公式一下。而且多了一層判斷,上一秒的passQps與設(shè)定的比例小,才加上去,如果passQps較大,那storedToken任然是舊值。為什么需要這樣?我們知道,storeToken值越小,那么準(zhǔn)許進(jìn)入的限制就會(huì)放開(kāi),該預(yù)熱設(shè)計(jì)也是一秒一秒,慢慢夸大流量進(jìn)入,那么count / coldFactor這個(gè)公式,是判斷申請(qǐng)資源增強(qiáng)的標(biāo)志,要不然會(huì)出現(xiàn)什么情況呢?預(yù)熱控制器會(huì)一直綁死在越熱階段,并不會(huì)隨著時(shí)間推移,流量增多而夸大pqs的閾值。
總結(jié)
每個(gè)限流控制器的策略不盡相同,但是目的都是在保護(hù)系統(tǒng)。