計(jì)數(shù)器限流
最原始的代碼
public class CounterTest {
public long timeStamp = getNowTime();
public int reqCount = 0;
public final int limit = 100; // 時(shí)間窗口內(nèi)最大請(qǐng)求數(shù)
public final long interval = 1000; // 時(shí)間窗口ms
public boolean grant() {
long now = getNowTime();
if (now < timeStamp + interval) {
// 在時(shí)間窗口內(nèi)
reqCount++;
// 判斷當(dāng)前時(shí)間窗口內(nèi)是否超過(guò)最大請(qǐng)求控制數(shù)
return reqCount <= limit;
} else {
timeStamp = now;
// 超時(shí)后重置
reqCount = 1;
return true;
}
}
public long getNowTime() {
return System.currentTimeMillis();
}
}
但是計(jì)數(shù)器限流無(wú)法對(duì)相鄰兩秒都是高qps進(jìn)行限流,比如1:29:29.999有100qps,1:30:30.001也有100qps,其實(shí)一秒內(nèi)qps已經(jīng)超過(guò)100qps,但是無(wú)法觸發(fā)限流。
計(jì)數(shù)器+時(shí)間窗口結(jié)合
可以對(duì)1s進(jìn)行拆分10個(gè)小格,一格允許10qps,每次請(qǐng)求進(jìn)來(lái),都只統(tǒng)計(jì)當(dāng)前時(shí)間所在小格和往前推9個(gè)小格的qps統(tǒng)計(jì),超過(guò)100則促發(fā)限流,否則請(qǐng)求可以通過(guò)
漏桶算法
有一個(gè)固定容量的桶,有水流進(jìn)來(lái),也有水流出去。對(duì)于流進(jìn)來(lái)的水來(lái)說(shuō),我們無(wú)法預(yù)計(jì)一共有多少水會(huì)流進(jìn)來(lái),也無(wú)法預(yù)計(jì)水流的速度。但是對(duì)于流出去的水來(lái)說(shuō),這個(gè)桶可以固定水流出的速率。而且,當(dāng)桶滿了之后,多余的水將會(huì)溢出。
這個(gè)可以看作是一個(gè)生產(chǎn)者消費(fèi)者模型,消費(fèi)者以恒定速率消費(fèi)。
缺點(diǎn):無(wú)法應(yīng)對(duì)突發(fā)流量,突發(fā)高qps流量會(huì)被攔截
令牌桶算法
我們有一個(gè)固定容量的桶,桶里存放著令牌(token)。桶一開始是空的,token以 一個(gè)固定的速率r往桶里填充,直到達(dá)到桶的容量,多余的令牌將會(huì)被丟棄。每當(dāng)一個(gè)請(qǐng)求過(guò)來(lái)時(shí),就會(huì)嘗試從桶里移除一個(gè)令牌,如果沒有令牌的話,請(qǐng)求無(wú)法通過(guò)。可以應(yīng)對(duì)突發(fā)流量
具體實(shí)現(xiàn)代碼 guava RateLimiter
RateLimiter源碼解析
變量解析
- storedPermits 目前存儲(chǔ)的可用的令牌
- maxPermits 可以存儲(chǔ)的最多令牌數(shù)
- stableIntervalMicros 請(qǐng)求的間隔,5permits 1s,該值就是200ms
- nextFreeTicketMicros 下一個(gè)請(qǐng)求可以授權(quán)通過(guò)的時(shí)間,不管該請(qǐng)求需要多少permits,但是請(qǐng)求的permits越多,請(qǐng)求通過(guò)后,nextFreeTicketMicros值越大
RateLimiter創(chuàng)建
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
默認(rèn)創(chuàng)建SmoothBursty方式的rateLimiter,它支持某一刻的突發(fā)流量,stopwatch封裝了讀取當(dāng)前時(shí)間(毫秒)的函數(shù)
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
計(jì)算應(yīng)該需要等待的毫秒,在sleep,最后返回等待的秒數(shù)
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();//如果now比nextFreeTicketMicros大,計(jì)算到now時(shí)間應(yīng)該新生成多少newPermits
storedPermits = min(maxPermits, storedPermits + newPermits);//求最小值,因?yàn)槿绻鹡ow比nextFreeTicketMicros大很多,新生成的newPermits必然也會(huì)很多,需要有一個(gè)maxPermits限制最大值,maxPermits默認(rèn)就是你設(shè)置的速率(比如1s5個(gè)permits,maxPermits就是5)
nextFreeTicketMicros = nowMicros;//更新nextFreeTicketMicros為nowMicros
}
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); //精髓所在,每次請(qǐng)求才更新storedPermits和nextFreeTicketMicros
long returnValue = nextFreeTicketMicros; //直接返回上面計(jì)算出來(lái)的nextFreeTicketMicros,這也是能應(yīng)對(duì)突發(fā)高qps的原因,只會(huì)把nextFreeTicketMicros延后,影響下一次請(qǐng)求
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); //請(qǐng)求超過(guò)存儲(chǔ)的permits,計(jì)算nextFreeTicketMicros延后的時(shí)間
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend; //storedPermits減去當(dāng)前請(qǐng)求消耗的permits,如果當(dāng)前請(qǐng)求的permits大于storedPermits也不管,減到0為止,同時(shí)延后nextFreeTicketMicros即可
return returnValue;
}
tryAcquire原理
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return nextFreeTicketMicros - timeoutMicros <= nowMicros;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
//只是計(jì)算nextFreeTicketMicro和timeoutMicros的差值,如果大于nowMicros直接返回false;否則計(jì)算出
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
//調(diào)用reserveEarliestAvailable(設(shè)置新的nextFreeTicketMicros和storedPermits),返回nextFreeTicketMicros,計(jì)算需要等待的時(shí)間
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
下面我們看看SmoothWarmingUp創(chuàng)建的ratelimiter,它是帶有預(yù)熱期的平滑限流,它啟動(dòng)后會(huì)有一段預(yù)熱期,逐步將分發(fā)頻率提升到配置的速率,

創(chuàng)建一個(gè)平均分發(fā)令牌速率為5,預(yù)熱期為3秒。由于設(shè)置了預(yù)熱時(shí)間是3秒,令牌桶一開始并不會(huì)0.2秒發(fā)一個(gè)令牌,而是形成一個(gè)平滑線性下降的坡度,頻率越來(lái)越高,在3秒鐘之內(nèi)達(dá)到原本設(shè)置的頻率,以后就以固定的頻率輸出。這種功能適合系統(tǒng)剛啟動(dòng)需要一點(diǎn)時(shí)間來(lái)“熱身”的場(chǎng)景。