在開發(fā)高并發(fā)系統(tǒng)時,有三把利器用來保護(hù)系統(tǒng):熔斷、延遲處理、緩存、降級和限流
限流
- 問題:遇到瞬時請求量激增時,會導(dǎo)致接口占用過多服務(wù)器資源,使得其他請求響應(yīng)速度降低或是超時,更有甚者可能導(dǎo)致服務(wù)器宕機(jī)。
- 目的:通過對并發(fā)訪問/請求進(jìn)行限速或者一個時間窗口內(nèi)的的請求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)(定向到錯誤頁或告知資源沒有了)、排隊或等待(比如秒殺、評論、下單)、降級(返回兜底數(shù)據(jù)或默認(rèn)數(shù)據(jù),如商品詳情頁庫存默認(rèn)有貨)。
- 方法:限制總并發(fā)數(shù)(比如數(shù)據(jù)庫連接池、線程池)、限制瞬時并發(fā)數(shù)(如nginx的limit_conn模塊,用來限制瞬時并發(fā)連接數(shù))、限制時間窗口內(nèi)的平均速率(如Guava的RateLimiter、nginx的limit_req模塊,限制每秒的平均速率);其他還有如限制遠(yuǎn)程接口調(diào)用速率、限制MQ的消費速率。另外還可以根據(jù)網(wǎng)絡(luò)連接數(shù)、網(wǎng)絡(luò)流量、CPU或內(nèi)存負(fù)載等來限流。
分布式限流
單點限流

分布式集群 單點限流的問題

節(jié)點擴(kuò)容、縮容時無法準(zhǔn)確控制整個服務(wù)的請求限制
分布式集群 分布式限流

限流算法
計數(shù)器
如限制1秒鐘內(nèi)請求數(shù)最多為10個,每當(dāng)進(jìn)來一個請求,則計數(shù)器+1
當(dāng)計數(shù)器達(dá)到上限時,則觸發(fā)限流
時間每經(jīng)過1秒,則重置計數(shù)器
不足:在第1秒的后半時間內(nèi),涌入了大量流量,然后到第2秒的前半時間,又涌入了大量流量。由于從第1秒到第2秒,請求計數(shù)是清零的,所以在這瞬間的qps有可能超過系統(tǒng)的承載。
public class RateLimiter {
private long updateTimeStamp;
private int intervalMilliSeconds;
private int maxPermits;
private long storedPermits;
public RateLimiter(int maxPermits) {
this(maxPermits, 1);
}
public RateLimiter(int maxPermits, int intervalSeconds) {
this.maxPermits = maxPermits;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public synchronized Boolean acquire(int permits) {
while (true) {
long now = System.currentTimeMillis();
if (now < updateTimeStamp + intervalMilliSeconds) {
if (storedPermits + permits <= maxPermits) {
storedPermits += permits;
updateTimeStamp = now;
return true;
} else {
return false;
}
} else {
storedPermits = 0;
updateTimeStamp = now;
}
}
}
}
滑動窗口
滑動窗口本質(zhì)上也是一種計數(shù)器,只不過它的粒度更細(xì)。比如限制qps為1000,設(shè)定窗口大小為10,則每個窗口的時間間隔為100ms。每次窗口滑動時,重置的是前1s至900ms之間內(nèi)的計數(shù),而不是完整的1s。
public class RateLimiter {
private LinkedList<Integer> deque = new LinkedList<>();
private int windowSize;
private int windowIntervalMilliSeconds;
private int currentWindowPermits;
private long updateTimeStamp;
private int intervalMilliSeconds;
private int maxPermits;
private long storedPermits;
public RateLimiter(int maxPermits, int windowSize) {
this(maxPermits, 1, windowSize);
}
public RateLimiter(int maxPermits, int intervalSeconds, int windowSize) {
this.maxPermits = maxPermits;
this.intervalMilliSeconds = intervalSeconds * 1000;
this.windowSize = windowSize;
this.windowIntervalMilliSeconds = intervalMilliSeconds / windowSize;
}
public synchronized Boolean acquire(int permits) {
while (true) {
long now = System.currentTimeMillis();
if (now < updateTimeStamp + windowIntervalMilliSeconds) {
if (storedPermits + permits + currentWindowPermits <= maxPermits) {
currentWindowPermits += permits;
updateTimeStamp = now;
return true;
} else {
return false;
}
} else {
updateTimeStamp = now;
deque.offerLast(currentWindowPermits);
storedPermits += currentWindowPermits;
currentWindowPermits = 0;
while (deque.size() > windowSize) {
storedPermits -= deque.removeFirst();
}
}
}
}
}
漏桶算法
漏桶算法這個名字就很形象,算法內(nèi)部有一個容器,當(dāng)請求進(jìn)來時,相當(dāng)于水倒入漏斗,然后從下端小口慢慢勻速的流出。不管上面流量多大,下面流出的速度始終保持不變。 當(dāng)水流入速度過大會直接溢出(訪問頻率超過接口響應(yīng)速率),然后就拒絕請求。
- 一個固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,則不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
-
如果流入水滴超出了桶的容量,則流入的水滴溢出了(被丟棄),而漏桶容量是不變的。
public class RateLimiter {
private int capacity;
private int rate;
private int water;
private int intervalMilliSeconds;
private Semaphore exit;
public RateLimiter(int capacity, int rate) {
this(capacity, rate, 1);
}
public RateLimiter(int capacity, int rate, int intervalSeconds) {
this.exit = new Semaphore(1);
this.capacity = capacity;
this.rate = rate;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public Boolean acquire(int permits) throws InterruptedException {
if (water + permits <= capacity) {
water += permits;
while (!exit.tryAcquire(permits, intervalMilliSeconds / rate, TimeUnit.MILLISECONDS)) {}
Thread.sleep(intervalMilliSeconds / rate);
water -= permits;
exit.release(permits);
return true;
} else {
return false;
}
}
}
令牌桶算法
在令牌桶算法中,存在一個桶,用來存放固定數(shù)量的令牌。算法中存在一種機(jī)制,以一定的速率往桶中放令牌。每次請求調(diào)用需要先獲取令牌,只有拿到令牌,才有機(jī)會繼續(xù)執(zhí)行,否則選擇選擇等待可用的令牌、或者直接拒絕。

public class RateLimiter {
private long updateTimeStamp;
private int capacity;
private int rate;
private int tokens;
private int intervalMilliSeconds;
public RateLimiter(int capacity, int rate) {
this(capacity, rate, 1);
}
public RateLimiter(int capacity, int rate, int intervalSeconds) {
this.capacity = capacity;
this.rate = rate;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public synchronized Boolean acquire(int permits) {
long now = System.currentTimeMillis();
int newTokens = (int) ((now - updateTimeStamp) / intervalMilliSeconds * rate);
if (newTokens > 0) {
this.tokens = Math.min(capacity, this.tokens + newTokens);
this.updateTimeStamp = now;
}
if (tokens - permits >= 0) {
tokens -= permits;
return true;
} else {
return false;
}
}
}
開源框架
ratelimiter4j
https://github.com/wangzheng0822/ratelimiter4j
Ref:
http://www.itdecent.cn/p/2596e559db5c
https://shengbao.org/609.html
https://blog.wangqi.love/articles/Java/%E9%99%90%E6%B5%81%E6%8A%80%E6%9C%AF%E6%80%BB%E7%BB%93.html
