限流令牌桶實(shí)現(xiàn)
我們r(jià)edis最開(kāi)始的限流只是用Semaphore信號(hào)量來(lái)限流,一個(gè)請(qǐng)求先acquire 然后在release
但是這樣的方法沒(méi)有時(shí)間的概念,限流情況并不好。
比如Semaphore容量為1000,一個(gè)請(qǐng)求耗時(shí)100ms,那么理論的1s的最大流量應(yīng)該是
1000/10 * 1000 **一秒一個(gè)并發(fā)可以走10個(gè)請(qǐng)求,最多1000并發(fā) **
具體的qps是和請(qǐng)求執(zhí)行時(shí)間有關(guān)的。
令牌桶
令牌桶可以保證最大qps為固定值,原來(lái)為先有一個(gè)固定容量的桶來(lái)存令牌。每個(gè)請(qǐng)求要先從桶中拿到令牌才能進(jìn)行。然后拿不到就堵塞。
一個(gè)單獨(dú)的線程以一定頻率向桶中放令牌,這個(gè)發(fā)入得流量就是限流的最大流量
實(shí)現(xiàn)
先定義一個(gè)接口
public interface Limit {
void acquire() throws InterruptedException;
void release();
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
public class TokenBucket implements Limit {
private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucket.class);
//線程index
private static final AtomicInteger INTEGER = new AtomicInteger();
//信號(hào)量
private Semaphore semaphore;
private int putSize;
private long time;
private TimeUnit timeUnit;
//名稱
private String name;
//put令牌的線程
private Thread putThread;
//標(biāo)志位
private volatile boolean isStop = true;
public TokenBucket(int initSize, int putSize, long time, TimeUnit timeUnit) {
this(TokenBucket.class.getName(), initSize, putSize, time, timeUnit);
}
public TokenBucket(String name, int initSize, int putSize, long time, TimeUnit timeUnit) {
this.name = name;
this.semaphore = new Semaphore(initSize);
this.putSize = putSize;
this.time = time;
this.timeUnit = timeUnit;
start();
}
//獲取令牌
@Override
public void acquire() throws InterruptedException {
if (!isStop) {
semaphore.acquire(1);
}
}
//釋放令牌 (什么也不做)
@Override
public void release() {
//do nothing
}
private void start() {
isStop = false;
Thread thread = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
if (isStop) {
LOGGER.info("TokenBucket " + name + " shutdown ");
return;
}
}
put(putSize, time, timeUnit);
}
}, "TokenBucket_thread_" + name + "_" + INTEGER.getAndIncrement());
this.putThread = thread;
thread.start();
}
private void put(int putSize, long time, TimeUnit timeUnit) {
semaphore.release(putSize);
//掛起線程,可響應(yīng)中斷
LockSupport.parkNanos(name, timeUnit.toNanos(time));
}
//停止限流
public void shutdown() {
isStop = true;
//中斷 put線程
putThread.interrupt();
}
}
上面的代碼可以嚴(yán)格保證最大qps。
如果想讓流量的曲線更平滑可以增加put的頻率,減小每次put的大小
問(wèn)題
如果下游系統(tǒng)出了問(wèn)題,響應(yīng)時(shí)間非常長(zhǎng),但是令牌桶在發(fā)令牌時(shí)是不去考慮下游系統(tǒng)的。
最好可以配合熔斷和快速失敗來(lái)做。