限流令牌桶實(shí)現(xiàn)

限流令牌桶實(shí)現(xiàn)

我們r(jià)edis最開(kāi)始的限流只是用Semaphore信號(hào)量來(lái)限流,一個(gè)請(qǐng)求先acquire 然后在release

但是這樣的方法沒(méi)有時(shí)間的概念,限流情況并不好。

比如Semaphore容量為1000,一個(gè)請(qǐng)求耗時(shí)100ms,那么理論的1s的最大流量應(yīng)該是

1000/10 * 1000 **一秒一個(gè)并發(fā)可以走10個(gè)請(qǐng)求,最多1000并發(fā) **

具體的qps是和請(qǐng)求執(zhí)行時(shí)間有關(guān)的。

令牌桶

令牌桶可以保證最大qps為固定值,原來(lái)為先有一個(gè)固定容量的桶來(lái)存令牌。每個(gè)請(qǐng)求要先從桶中拿到令牌才能進(jìn)行。然后拿不到就堵塞。

一個(gè)單獨(dú)的線程以一定頻率向桶中放令牌,這個(gè)發(fā)入得流量就是限流的最大流量

實(shí)現(xiàn)

先定義一個(gè)接口

public interface Limit {

    void acquire() throws InterruptedException;

    void release();
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class TokenBucket implements Limit {

    private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucket.class);
    //線程index
    private static final AtomicInteger INTEGER = new AtomicInteger();
    //信號(hào)量
    private Semaphore semaphore;

    private int putSize;
    private long time;
    private TimeUnit timeUnit;
    //名稱
    private String name;
    //put令牌的線程
    private Thread putThread;
    //標(biāo)志位
    private volatile boolean isStop = true;

    public TokenBucket(int initSize, int putSize, long time, TimeUnit timeUnit) {
        this(TokenBucket.class.getName(), initSize, putSize, time, timeUnit);
    }

    public TokenBucket(String name, int initSize, int putSize, long time, TimeUnit timeUnit) {
        this.name = name;
        this.semaphore = new Semaphore(initSize);
        this.putSize = putSize;
        this.time = time;
        this.timeUnit = timeUnit;
        start();
    }

    //獲取令牌
    @Override
    public void acquire() throws InterruptedException {
        if (!isStop) {
            semaphore.acquire(1);
        }
    }
    //釋放令牌 (什么也不做)
    @Override
    public void release() {
        //do nothing
    }

    private void start() {
        isStop = false;
        Thread thread = new Thread(() -> {
            while (true) {
                if (Thread.currentThread().isInterrupted()) {
                    if (isStop) {
                        LOGGER.info("TokenBucket " + name + " shutdown ");
                        return;
                    }
                }
                put(putSize, time, timeUnit);
            }
        }, "TokenBucket_thread_" + name + "_" + INTEGER.getAndIncrement());
        this.putThread = thread;
        thread.start();
    }

    private void put(int putSize, long time, TimeUnit timeUnit) {
        semaphore.release(putSize);
        //掛起線程,可響應(yīng)中斷
        LockSupport.parkNanos(name, timeUnit.toNanos(time));
    }

    //停止限流
    public void shutdown() {
        isStop = true;
        //中斷 put線程
        putThread.interrupt();
    }

}

上面的代碼可以嚴(yán)格保證最大qps。

如果想讓流量的曲線更平滑可以增加put的頻率,減小每次put的大小

問(wèn)題

如果下游系統(tǒng)出了問(wèn)題,響應(yīng)時(shí)間非常長(zhǎng),但是令牌桶在發(fā)令牌時(shí)是不去考慮下游系統(tǒng)的。

最好可以配合熔斷和快速失敗來(lái)做。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容