Guava Rate Limiter實(shí)現(xiàn)分析

為何要做限制

系統(tǒng)使用下游資源時(shí),需要考慮下游資源所能提供資源能力。對(duì)于資源受限、處理能力不是很強(qiáng)的資源應(yīng)當(dāng)給予保護(hù)(在下游資源無法或者短時(shí)間內(nèi)無法提升處理性能的情況下)。可以使用限流器或者類似保護(hù)機(jī)制,避免下游服務(wù)崩潰造成整體服務(wù)的不可用。

常見算法

常見限流算法有兩種:漏桶算法及令牌桶算法。

漏桶算法

leaky_bucket
漏桶算法 Leaky Bucket (令牌桶算法 Token Bucket)學(xué)習(xí)筆記

Leaky Bucket From Wikipedia

容量有限的桶,桶底有一個(gè)漏洞,可以保證桶內(nèi)若有水時(shí)會(huì)以速度t(L/s)流出。上方有水注入,當(dāng)注入的水容量達(dá)到桶的容量,且流速超過水流出的速度,表示桶已滿。桶滿且繼續(xù)以超過t的速度注入水此時(shí)有兩種處理方案:

  1. 此時(shí)需停止注水
  2. 超過的水將溢出丟棄

算法所牽涉的參數(shù):

  1. 桶的容量
  2. 流出速度

算法核心:

  1. 如何判斷容量已滿
  2. 如何表達(dá)漏水

令牌桶算法

令牌桶算法基于這樣的場(chǎng)景的模擬:
有一個(gè)裝有token且token數(shù)量固定的桶,token添加的速率時(shí)固定的,當(dāng)有請(qǐng)求來(或者數(shù)據(jù)包到達(dá)),會(huì)檢查下桶中是否包含足夠多的token(一個(gè)請(qǐng)求可能需要多個(gè)token)。對(duì)于數(shù)據(jù)包而言,數(shù)據(jù)包的長(zhǎng)度等同于需要獲取的token數(shù)量。即從桶中消費(fèi)token,若token數(shù)量足夠,則消費(fèi)掉,不夠則根據(jù)不同的策略處理(阻塞當(dāng)前或提前消費(fèi)等)。

Guava Rate limiter實(shí)現(xiàn)

Guava實(shí)現(xiàn)更接近于令牌桶算法:將一秒鐘切割為令牌數(shù)的時(shí)間片段,每個(gè)時(shí)間片段等同于一個(gè)token。

關(guān)鍵變量:

  • nextFreeTicketMicros:表示下一次允許補(bǔ)充許可的時(shí)間(時(shí)刻)。這個(gè)變量的解釋比較拗口,看下面流程會(huì)比較清晰
  • maxPermits:最大許可數(shù)
  • storedPermits:存儲(chǔ)的許可數(shù),數(shù)量不能超過最大許可數(shù)

實(shí)現(xiàn)

這里有一個(gè)關(guān)鍵方法(重)同步方法,在初始化以及獲取操作時(shí)都會(huì)用到:

void resync(long nowMicros) {
  // if nextFreeTicket is in the past, resync to now
  if (nowMicros > nextFreeTicketMicros) {
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}

如果當(dāng)前時(shí)間(不是時(shí)刻,而是自創(chuàng)建起所流經(jīng)的時(shí)間,下同)超過了上一次所設(shè)定的nextFreeTicketMicros時(shí)間,則會(huì)重新進(jìn)行同步:

  1. 通過計(jì)算上一次設(shè)定nextFreeTicketMicros到當(dāng)前時(shí)刻的時(shí)間差獲取新增的可用許可數(shù);
  2. 計(jì)算可用的許可數(shù):如果新增的許可數(shù)+原有的許可數(shù)小于最大許可數(shù),則存儲(chǔ)的許可數(shù)增加新增的數(shù)量,否則同步為最大許可數(shù);
  3. 同步下一次允許補(bǔ)充許可時(shí)間為當(dāng)前時(shí)間

初始化

static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  rateLimiter.setRate(permitsPerSecond);
  return rateLimiter;
}

這里使用一個(gè)StopWatch來計(jì)時(shí),主要是獲取自限速器創(chuàng)建所流經(jīng)的時(shí)間。
初始化關(guān)鍵變量(其實(shí)就是通過resync方法來實(shí)現(xiàn)主要邏輯的):
nextFreeTicketMicros為當(dāng)前時(shí)間;maxPermits為傳入的每秒允許的許可數(shù);storedPermits則為0

初始化

獲取許可(acquire)

獲取一定數(shù)量的許可,如果獲取不到,則阻塞相應(yīng)時(shí)間,然后獲取相應(yīng)許可。并返回當(dāng)前操作所等待的時(shí)間。

  1. 嘗試resync操作
  2. 返回值所需等待時(shí)間設(shè)置為min(nextFreeTicketMicros-nowMicros,0)
  3. 實(shí)際消耗的許可數(shù):min(請(qǐng)求許可數(shù),存儲(chǔ)許可數(shù)中的小值);
  4. 需要刷新獲取的許可數(shù)(freshPermits):請(qǐng)求許可數(shù)-實(shí)際消耗許可數(shù)
  5. 等待時(shí)間(waitMicros):需要刷新獲取的許可數(shù)(freshPermits)*每個(gè)許可數(shù)所需時(shí)間
  6. 下一次允許補(bǔ)充許可時(shí)間(nextFreeTicketMicros)同步為:nextFreeTicketMicros+=waitMicros
  7. 更新剩余存儲(chǔ)的許可數(shù):存儲(chǔ)許可數(shù)-本次實(shí)際消耗許可數(shù)

根據(jù)resync方法條件:if (nowMicros > nextFreeTicketMicros)不難發(fā)現(xiàn),如果申請(qǐng)獲取的許可數(shù)多于剩余可分配的許可數(shù),更新后的nextFreeTicketMicros時(shí)間會(huì)超過nowMicros,但是當(dāng)前請(qǐng)求所需等待時(shí)間為0。即對(duì)于超量許可申請(qǐng)(大于當(dāng)前可提供的許可數(shù)),等待操作是在下一次請(qǐng)求時(shí)才會(huì)發(fā)生。通俗點(diǎn)說就是:前人挖坑后人跳。

當(dāng)nextFreeTicketMicros早于當(dāng)前時(shí)間,且許可數(shù)足夠的情況:

nextFreeTicketMicros早于nowMicros且許可足夠

當(dāng)nextFreeTicketMicros早于當(dāng)前,但是許可數(shù)不夠的情況:

nextFreeTicketMicros早于nowMicros但許可不足

當(dāng)nextFreeTicketMicros晚于當(dāng)前時(shí)間,主要是阻塞時(shí)間計(jì)算,許可數(shù)分發(fā)以及時(shí)間計(jì)算等同上兩場(chǎng)景。

嘗試獲取許可(tryAcquire)
如果nextFreeTicketMicros-timeout<=nowMicros,說明經(jīng)過超時(shí)時(shí)間內(nèi)也不會(huì)有一個(gè)許可可以分配(按上描述,只要有許可,就可用分配,無論申請(qǐng)的數(shù)量有多少),則tryAcquire操作直接返回false。否則按照acquire操作流程獲取許可信息。

預(yù)熱(warmingup)
首先申請(qǐng)一個(gè)容量為100(每秒)的限流器,然后多線程并發(fā)獲取許可,并發(fā)數(shù)量為20,且每個(gè)線程只獲取一次。
附上測(cè)試代碼:

public void testCurrent(){
  RateLimiter rateLimiter = RateLimiter.create(100);
  ExecutorService executorService = Executors.newFixedThreadPool(100);
  Runnable runnable = ()->{
    if(!rateLimiter.tryAcquire(1,100,TimeUnit.MILLISECONDS)){
      System.out.println("F"+Thread.currentThread().getName());
    }else {
      System.out.println("A"+Thread.currentThread().getName());
    }
  };
  for (int i = 0; i < 20; i++) {
    executorService.execute(runnable);
  }
  try {
    executorService.awaitTermination(1,TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

按上算法描述應(yīng)當(dāng)不會(huì)出現(xiàn)F開頭的輸出,但是實(shí)際卻發(fā)現(xiàn)20次輸出基本有小半數(shù)的嘗試獲取失?。?/p>

1489453467102 pool-1-thread-1
1489453467102 pool-1-thread-2
1489453467104 pool-1-thread-3
1489453467104 pool-1-thread-4
1489453467105 pool-1-thread-5
1489453467105 pool-1-thread-6
1489453467105 pool-1-thread-7
1489453467107 pool-1-thread-8
1489453467107 pool-1-thread-9
F 1489453467108 pool-1-thread-15
F 1489453467108 pool-1-thread-16
F 1489453467109 pool-1-thread-17
F 1489453467109 pool-1-thread-18
F 1489453467109 pool-1-thread-19
F 1489453467109 pool-1-thread-20
1489453467219 pool-1-thread-10
1489453467239 pool-1-thread-11
1489453467259 pool-1-thread-12
1489453467274 pool-1-thread-13
1489453467297 pool-1-thread-14

問題來自于初始化時(shí),storedPermits存儲(chǔ)的許可數(shù)為0,而第一個(gè)線程進(jìn)行獲取時(shí),離初始時(shí)時(shí)間非常近,導(dǎo)致第一個(gè)線程獲取許可后,存儲(chǔ)的可用許可數(shù)并非為聲明的最大許可數(shù),從而導(dǎo)致后續(xù)線程嘗試獲取幾次后會(huì)耗盡存儲(chǔ)的許可數(shù),繼而導(dǎo)致tryAcquire操作失敗。

RateLimiter的設(shè)計(jì)哲學(xué)

援引自com/google/common/util/concurrent/SmoothRateLimiter.javaDoc說明,非常值得讀一下:

How is the RateLimiter designed, and why?
The primary feature of a RateLimiter is its "stable rate", the maximum rate thatis should allow at normal conditions. This is enforced by "throttling" incomingrequests as needed, i.e. compute, for an incoming request, the appropriate throttle time,and make the calling thread wait as much.
The simplest way to maintain a rate of QPS is to keep the timestamp of the lastgranted request, and ensure that (1/QPS) seconds have elapsed since then. For example,for a rate of QPS=5 (5 tokens per second), if we ensure that a request isn't grantedearlier than 200ms after the last one, then we achieve the intended rate.If a request comes and the last request was granted only 100ms ago, then we wait foranother 100ms. At this rate, serving 15 fresh permits (i.e. for an acquire(15) request)naturally takes 3 seconds.
It is important to realize that such a RateLimiter has a very superficial memoryof the past: it only remembers the last request. What if the RateLimiter was unused fora long period of time, then a request arrived and was immediately granted?This RateLimiter would immediately forget about that past underutilization. This mayresult in either underutilization or overflow, depending on the real world consequencesof not using the expected rate.
Past underutilization could mean that excess resources are available. Then, the RateLimitershould speed up for a while, to take advantage of these resources. This is importantwhen the rate is applied to networking (limiting bandwidth), where past underutilizationtypically translates to "almost empty buffers", which can be filled immediately.
On the other hand, past underutilization could mean that "the server responsible forhandling the request has become less ready for future requests", i.e. its caches becomestale, and requests become more likely to trigger expensive operations (a more extremecase of this example is when a server has just booted, and it is mostly busy with gettingitself up to speed).
To deal with such scenarios, we add an extra dimension, that of "past underutilization",modeled by "storedPermits" variable. This variable is zero when there is nounderutilization, and it can grow up to maxStoredPermits, for sufficiently largeunderutilization. So, the requested permits, by an invocation acquire(permits),are served from:- stored permits (if available)- fresh permits (for any remaining permits)
How this works is best explained with an example:
For a RateLimiter that produces 1 token per second, every secondthat goes by with the RateLimiter being unused, we increase storedPermits by 1.Say we leave the RateLimiter unused for 10 seconds (i.e., we expected a request at timeX, but we are at time X + 10 seconds before a request actually arrives; this isalso related to the point made in the last paragraph), thus storedPermitsbecomes 10.0 (assuming maxStoredPermits >= 10.0). At that point, a request of acquire(3)arrives. We serve this request out of storedPermits, and reduce that to 7.0 (how this istranslated to throttling time is discussed later). Immediately after, assume that anacquire(10) request arriving. We serve the request partly from storedPermits,using all the remaining 7.0 permits, and the remaining 3.0, we serve them by fresh permitsproduced by the rate limiter.
We already know how much time it takes to serve 3 fresh permits: if the rate is"1 token per second", then this will take 3 seconds. But what does it mean to serve 7stored permits? As explained above, there is no unique answer. If we are primarilyinterested to deal with underutilization, then we want stored permits to be given out/faster/ than fresh ones, because underutilization = free resources for the taking.If we are primarily interested to deal with overflow, then stored permits couldbe given out /slower/ than fresh ones. Thus, we require a (different in each case)function that translates storedPermits to throtting time.
This role is played by storedPermitsToWaitTime(double storedPermits, double permitsToTake).The underlying model is a continuous function mapping storedPermits(from 0.0 to maxStoredPermits) onto the 1/rate (i.e. intervals) that is effective at the givenstoredPermits. "storedPermits" essentially measure unused time; we spend unused timebuying/storing permits. Rate is "permits / time", thus "1 / rate = time / permits".Thus, "1/rate" (time / permits) times "permits" gives time, i.e., integrals on thisfunction (which is what storedPermitsToWaitTime() computes) correspond to minimum intervalsbetween subsequent requests, for the specified number of requested permits.
Here is an example of storedPermitsToWaitTime:If storedPermits == 10.0, and we want 3 permits, we take them from storedPermits,reducing them to 7.0, and compute the throttling for these as a call tostoredPermitsToWaitTime(storedPermits = 10.0, permitsToTake = 3.0), which willevaluate the integral of the function from 7.0 to 10.0.
Using integrals guarantees that the effect of a single acquire(3) is equivalentto { acquire(1); acquire(1); acquire(1); }, or { acquire(2); acquire(1); }, etc,since the integral of the function in [7.0, 10.0] is equivalent to the sum of theintegrals of [7.0, 8.0], [8.0, 9.0], [9.0, 10.0] (and so on), no matterwhat the function is. This guarantees that we handle correctly requests of varying weight(permits), /no matter/ what the actual function is - so we can tweak the latter freely.(The only requirement, obviously, is that we can compute its integrals).
Note well that if, for this function, we chose a horizontal line, at height of exactly(1/QPS), then the effect of the function is non-existent: we serve storedPermits atexactly the same cost as fresh ones (1/QPS is the cost for each). We use this trick later.
If we pick a function that goes /below/ that horizontal line, it means that we reducethe area of the function, thus time. Thus, the RateLimiter becomes /faster/ after aperiod of underutilization. If, on the other hand, we pick a function thatgoes /above/ that horizontal line, then it means that the area (time) is increased,thus storedPermits are more costly than fresh permits, thus the RateLimiter becomes/slower/ after a period of underutilization.
Last, but not least: consider a RateLimiter with rate of 1 permit per second, currentlycompletely unused, and an expensive acquire(100) request comes. It would be nonsensicalto just wait for 100 seconds, and /then/ start the actual task. Why wait without doinganything? A much better approach is to /allow/ the request right away (as if it was anacquire(1) request instead), and postpone /subsequent/ requests as needed. In this version,we allow starting the task immediately, and postpone by 100 seconds future requests,thus we allow for work to get done in the meantime instead of waiting idly.
This has important consequences: it means that the RateLimiter doesn't remember the timeof the _last_ request, but it remembers the (expected) time of the next request. Thisalso enables us to tell immediately (see tryAcquire(timeout)) whether a particulartimeout is enough to get us to the point of the next scheduling time, since we alwaysmaintain that. And what we mean by "an unused RateLimiter" is also defined by thatnotion: when we observe that the "expected arrival time of the next request" is actuallyin the past, then the difference (now - past) is the amount of time that the RateLimiterwas formally unused, and it is that amount of time which we translate to storedPermits.(We increase storedPermits with the amount of permits that would have been producedin that idle time). So, if rate == 1 permit per second, and arrivals come exactlyone second after the previous, then storedPermits is _never_ increased -- we would onlyincrease it for arrivals _later_ than the expected one second.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文簡(jiǎn)介: 用org.apache.http.client.methods下的接口或類完成爬蟲程序。 1. 爬蟲的...
    浩水一方閱讀 756評(píng)論 0 4
  • 在市場(chǎng)上我們經(jīng)常見到翡翠的人物掛件,可是想要挑選一款質(zhì)量上乘的翡翠人物掛件,首先需要了解翡翠本身品質(zhì)的好壞,其次就...
    3cae390c7494閱讀 1,048評(píng)論 0 0
  • 先畫頭巾,花紋可以自己去設(shè)計(jì)的,但黑白灰關(guān)系還是不要改了! 最后畫的衣服是有規(guī)律的,下面的幾行可以自己設(shè)計(jì),但上面...
    Mao貓街后巷少女閱讀 1,948評(píng)論 14 9
  • 時(shí)光會(huì)帶走眼淚,歲月將撫平傷痕 很多事情每每當(dāng)時(shí)覺得很難越過去,可等到事后再端詳它時(shí)會(huì)發(fā)現(xiàn)它就這樣過去了,需要做的...
    梔淳閱讀 302評(píng)論 0 0

友情鏈接更多精彩內(nèi)容