限流及RateLimiter實現(xiàn)

限流算法

令牌桶算法

令牌桶算法最初來源于計算機網(wǎng)絡(luò)。在網(wǎng)絡(luò)傳輸數(shù)據(jù)時,為了防止網(wǎng)絡(luò)擁塞,需限制流出網(wǎng)絡(luò)的流量,使流量以比較均勻的速度向外發(fā)送。

令牌桶算法是網(wǎng)絡(luò)流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一種算法。典型情況下,令牌桶算法用來控制發(fā)送到網(wǎng)絡(luò)上的數(shù)據(jù)的數(shù)目,并允許突發(fā)數(shù)據(jù)的發(fā)送。

原理:大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度,令牌就會不斷地增多,直到把桶填滿。后面再產(chǎn)生的令牌就會從桶中溢出。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會超過桶的大小。

算法過程

算法描述:

  • 假如用戶配置的平均發(fā)送速率為r,則每隔1/r秒一個令牌被加入到桶中(每秒會有r個令牌放入桶中);
  • 假設(shè)桶中最多可以存放b個令牌。如果令牌到達(dá)時令牌桶已經(jīng)滿了,那么這個令牌會被丟棄;
  • 當(dāng)一個n個字節(jié)的數(shù)據(jù)包到達(dá)時,就從令牌桶中刪除n個令牌(不同大小的數(shù)據(jù)包,消耗的令牌數(shù)量不一樣),并且數(shù)據(jù)包被發(fā)送到網(wǎng)絡(luò);
  • 如果令牌桶中少于n個令牌,那么不會刪除令牌,并且認(rèn)為這個數(shù)據(jù)包在流量限制之外(n個字節(jié),需要n個令牌。該數(shù)據(jù)包將被緩存或丟棄);
  • 算法允許最長b個字節(jié)的突發(fā),但從長期運行結(jié)果看,數(shù)據(jù)包的速率被限制成常量r。對于在流量限制外的數(shù)據(jù)包可以以不同的方式處理:(1)它們可以被丟棄;(2)它們可以排放在隊列中以便當(dāng)令牌桶中累積了足夠多的令牌時再傳輸;(3)它們可以繼續(xù)發(fā)送,但需要做特殊標(biāo)記,網(wǎng)絡(luò)過載的時候?qū)⑦@些特殊標(biāo)記的包丟棄。

注意:

令牌桶算法不能與另外一種常見算法漏桶算法相混淆。這兩種算法的主要區(qū)別在于漏桶算法能夠強行限制數(shù)據(jù)的傳輸速率,而令牌桶算法在能夠限制數(shù)據(jù)的平均傳輸速率外,還允許某種程度的突發(fā)傳輸。在令牌桶算法中,只要令牌桶中存在令牌,那么就允許突發(fā)地傳輸數(shù)據(jù)直到達(dá)到用戶配置的門限,因此它適合于具有突發(fā)特性的流量。

RateLimiter實現(xiàn)

SmoothRateLimiter核心參數(shù)

  /**
   * The currently stored permits. 當(dāng)前存儲的permits
   */
  double storedPermits;

  /**
   * The maximum number of stored permits. 最大可存儲permits
   */
  double maxPermits;

  /**
   * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits per second has a stable interval of 200ms.
   * 兩個請求單元的的請求間隔,e.g 當(dāng)permitsPerSecond=5時,那么請求間隔為200ms
   */
  double stableIntervalMicros;

  /**
   * The time when the next request (no matter its size) will be granted. After granting a
   * request, this is pushed further in the future. Large requests push this further than small requests.
   * 下一次請求將能獲取的時間;允許某次請求拿走超出剩余令牌數(shù)的令牌,但是下一次請求將為此付出代價,一直等到令牌虧空補上,并且桶中有足夠本次請求使用的令牌為止
   */
  private long nextFreeTicketMicros = 0L; // could be either in the past or future

創(chuàng)建過程

// SleepingStopwatch.createFromSystemTimer()默認(rèn)的計時器 納秒級別
// permitsPerSecond為1秒內(nèi)的令牌數(shù)
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);

/************** method: RateLimiter.create **************/
// SmoothBursty:令牌桶實現(xiàn)  maxBurstSeconds:當(dāng)限流器未使用時最大可存放的令牌數(shù)
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
// 設(shè)置速率
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;

/************** method: RateLimiter.setRate **************/
// 校驗參數(shù),速率必須大于0,且不能為NaN= 0.0/0.0
checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
// 全局同步鎖,Object對象
synchronized (mutex()) {
   // 子類設(shè)置rate
   doSetRate(permitsPerSecond, stopwatch.readMicros());
}

/************** method: SmoothRateLimiter.doSetRate **************/
// 更新storedPermits和nextFreeTicketMicros時間
// 初始時storedPermits為0.0 ;nextFreeTicketMicros為當(dāng)前過去的毫秒數(shù)
resync(nowMicros);
// 請求間隔毫秒數(shù)
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
// 見SmoothBursty.doSetRate
doSetRate(permitsPerSecond, stableIntervalMicros);

/************** method: SmoothRateLimiter.resync **************/
// if nextFreeTicket is in the past, resync to now
// 如果nextFreeTicketMicros的時間已經(jīng)被獲取,則同步到當(dāng)前時間,
// 以coolDownIntervalMicros速率進行添加令牌
if (nowMicros > nextFreeTicketMicros) {
    // coolDownIntervalMicros當(dāng)SmoothBursty時返回stableIntervalMicros
    storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
    nextFreeTicketMicros = nowMicros;
}

/************** method: SmoothBursty.doSetRate **************/
// maxPermits初始時為0.0
double oldMaxPermits = this.maxPermits;
// 最大令牌數(shù) = 每秒可存放的最大令牌數(shù) * 速率
maxPermits = maxBurstSeconds * permitsPerSecond;
// storedPermits 為當(dāng)前存放的令牌數(shù)
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
} else {
    // 計算重新設(shè)置速率時的storedPermits 為什么這樣計算暫時不是很明白
    storedPermits = (oldMaxPermits == 0.0) 
        ? 0.0 // initial state
        : storedPermits * maxPermits / oldMaxPermits;
}

acquire過程

// 默認(rèn)獲取一個permits
/************** method: RateLimiter.acquire **************/
// 保留permits下獲取等待的時長
//reserve操作:permits須大于0,加鎖synchronized (mutex())后調(diào)用reserveAndGetWaitLength
long microsToWait = reserve(permits);   
// 當(dāng)microsToWait>0時執(zhí)行sleep方法
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回sleep的時間--即獲取令牌所消耗的時間
return 1.0 * microsToWait / SECONDS.toMicros(1L);

/************** method: RateLimiter.reserveAndGetWaitLength **************/
// 核心操作,獲取permits可使用的時間 實現(xiàn)見SmoothRateLimiter
// nowMicros為從RateLimiter#start方法調(diào)用時間到當(dāng)前所過去的毫秒數(shù)
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);

/************** method: SmoothRateLimiter.reserveEarliestAvailable **************/
// 同步更新令牌數(shù)
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
// 從請求的permits和當(dāng)前存放的permits選中最小值
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
// storedPermitsToWaitTime方法當(dāng)為SmoothBursty實現(xiàn)時默認(rèn)為0
// 當(dāng)令牌不足時計算需要等待的時間
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);
try {
    // 計算下一次請求獲取的時間,這里是做了權(quán)衡,當(dāng)令牌不足被提前使用時,下一次請求獲取令牌的時間將被延長
    this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
    this.nextFreeTicketMicros = Long.MAX_VALUE;
}
// 減去消耗的令牌數(shù)
this.storedPermits -= storedPermitsToSpend;
return returnValue;

FYI

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容