四種限流算法原理

限流這里總結(jié)了四個(gè)算法分別是 計(jì)數(shù)器固定窗口算法、計(jì)數(shù)器滑動窗口算法、漏斗算法、令牌桶算法

1. 計(jì)數(shù)器固定窗口算法

計(jì)數(shù)器固定窗口算法是最基礎(chǔ)也是最簡單的一種限流算法。原理就是對一段固定時(shí)間窗口內(nèi)的請求進(jìn)行計(jì)數(shù),如果請求數(shù)超過了閾值,則舍棄該請求;

如果沒有達(dá)到設(shè)定的閾值,則接受該請求,且計(jì)數(shù)加1。當(dāng)時(shí)間窗口結(jié)束時(shí),重置計(jì)數(shù)器為0。

優(yōu)點(diǎn):實(shí)現(xiàn)簡單容易理解

缺點(diǎn): 流量曲線可能不夠平滑,有"突刺現(xiàn)象"

  1. 一段時(shí)間內(nèi)(不超過時(shí)間窗口)系統(tǒng)服務(wù)不可用

    比如窗口大小為1s,限流大小為100,然后恰好在某個(gè)窗口的第1ms來了100個(gè)請求,然后第2ms-999ms的請求就都會被拒絕,這段時(shí)間用戶會感覺系統(tǒng)服務(wù)不可用。

  2. 窗口切換時(shí)可能會產(chǎn)生兩倍于閾值流量的請求

    比如窗口大小為1s,限流大小為100,然后恰好在某個(gè)窗口的第999ms來了100個(gè)請求,窗口前期沒有請求,所以這100個(gè)請求都會通過。
    再恰好,下一個(gè)窗口的第1ms有來了100個(gè)請求,也全部通過了,那也就是在2ms之內(nèi)通過了200個(gè)請求,而我們設(shè)定的閾值是100,通過的請求達(dá)到了閾值的兩倍。

package com.example.demo.util;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @ClassName CounterLimiter
 * @Description 計(jì)數(shù)器固定窗口算法
 * @Date 2021/8/11 3:46 下午
 * @Created by liyanyan
 */
public class CounterLimiter {
    private int windowSize; //窗口大小,毫秒單位
    private int limit; //窗口內(nèi)限流大小
    private AtomicInteger count; //當(dāng)前窗口的計(jì)數(shù)器
    private CounterLimiter() {
    }
    public CounterLimiter(int windowSize, int limit) {
        this.limit = limit;
        this.windowSize = windowSize;
        count = new AtomicInteger(0);
        //開啟一個(gè)線程,達(dá)到窗口結(jié)束時(shí)清空count
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    count.set(0);
                    try {
                        Thread.sleep(windowSize);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
    //請求到達(dá)后先調(diào)用本方法,若返回true,則請求通過,否則限流
    public boolean tryAcquire() {
        int newCount = count.addAndGet(1);
        if (newCount > limit) {
            return false;
        } else {
            return true;
        }
    }
    //測試 Client
    public static void main(String[] args) throws InterruptedException {
        //每秒20個(gè)請求
        CounterLimiter counterLimiter = new CounterLimiter(1000, 20);
        int count = 0;
        //模擬50次請求,看多少能通過
        for (int i = 0; i < 50; i++) {
            if (counterLimiter.tryAcquire()) {
                count++;
            }
        }
        System.out.println("第一波50個(gè)請求中通過: " + count + ",限流: " + (50 - count));
        //過一秒再請求
        Thread.sleep(1000);
        count = 0;
        //模擬50次請求,看多少能通過
        for (int i = 0; i < 50; i++) {
            if (counterLimiter.tryAcquire()) {
                count++;
            }
        }
        System.out.println("第二波50個(gè)請求中通過: " + count + ",限流: " + (50 - count));
    }
}

2. 計(jì)數(shù)器滑動窗口算法

計(jì)數(shù)器滑動窗口算法是計(jì)數(shù)器固定窗口算法的改進(jìn),解決了固定窗口切換時(shí)可能會產(chǎn)生兩倍于閾值流量請求的缺點(diǎn)

滑動窗口算法在固定窗口的基礎(chǔ)上,將一個(gè)計(jì)時(shí)窗口分成了若干個(gè)小窗口,然后每個(gè)小窗口維護(hù)一個(gè)獨(dú)立的計(jì)數(shù)器,當(dāng)請求的時(shí)間大于當(dāng)前窗口的最大時(shí)間時(shí),則將計(jì)時(shí)窗口向前平移一個(gè)小窗口。

平移時(shí),將第一個(gè)小窗口的數(shù)據(jù)丟棄,然后將第二個(gè)小窗口設(shè)置為第一個(gè)小窗口,同時(shí)在最后新增一個(gè)小窗口,將新的請求放在新增的小窗口中。同時(shí)要保證整個(gè)窗口中所有小窗口的請求數(shù)目之后不能超過設(shè)定的閾值。

滑動窗口其實(shí)就是固定窗口的升級版。將計(jì)時(shí)窗口劃分成一個(gè)小窗口,滑動窗口算法就退化成了固定窗口算法。而滑動窗口算法其實(shí)就是對請求數(shù)進(jìn)行了更細(xì)粒度的限流。

窗口劃分的越多,則限流越精準(zhǔn)

package com.example.demo.util;
/**
 * @ClassName CounterSlideWindowLimiter
 * @Description 計(jì)數(shù)器滑動窗口算法
 * @Date 2021/8/11 5:41 下午
 * @Created by liyanyan
 */
public class CounterSlideWindowLimiter {
    private int windowSize; //窗口大小,毫秒為單位
    private int limit; //窗口內(nèi)限流大小
    private int splitNum; //切分小窗口的數(shù)目大小
    private int[] counters; //當(dāng)前小窗口的計(jì)數(shù)數(shù)組
    private int index; //當(dāng)前小窗口計(jì)數(shù)器的索引
    private long startTime; //窗口開始時(shí)間
    private CounterSlideWindowLimiter() {
    }
    public CounterSlideWindowLimiter(int windowSize, int limit, int splitNum) {
        this.windowSize = windowSize;
        this.splitNum = splitNum;
        this.limit = limit;
        counters = new int[splitNum];
        index = 0;
        startTime = System.currentTimeMillis();
    }
    //請求達(dá)到后先調(diào)用本方法,若返回true,則請求通過,否則限流
    public synchronized boolean tryAcquire() {
        long curTime = System.currentTimeMillis();
        long windowsNum = Math.max(curTime - windowSize - startTime, 0) / (windowSize / splitNum); //計(jì)算滑動小窗口的數(shù)量
        slideWindow(windowsNum); //滑動窗口
        int count = 0;
        for (int i = 0; i < splitNum; i++) {
            count += counters[i];
        }
        if (count >= limit) {
            return false;
        } else {
            counters[index]++;
            return true;
        }
    }
    private synchronized void slideWindow(long windowsNum) {
        if (windowsNum == 0) {
            return;
        }
        long slideNum = Math.min(windowsNum, splitNum);
        for (int i = 0; i < slideNum; i++) {
            index = (index + 1) % splitNum;
            counters[index] = 0;
        }
        startTime = startTime + windowsNum * (windowSize / splitNum); //更新滑動窗口時(shí)間
    }
    /**
     * 測試 Client
     * 測試時(shí),取滑動窗口大小為 1000/10=100ms。然后模擬100 組間隔150ms的50次請求,計(jì)數(shù)器滑動窗口算法與計(jì)數(shù)器固定窗口算法進(jìn)行對別
     *
     * 固定窗口算法在窗口切換時(shí)產(chǎn)生了兩倍于閾值流量的請求的問題,而滑動你個(gè)窗口算法避免了這個(gè)問題
     * 和漏斗算法相比,心來的請求也能夠被處理到,避免了漏斗算法的饑餓問題。
     *
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        //每秒20個(gè)
        int limit = 20;
        CounterSlideWindowLimiter counterSlideWindowLimiter = new CounterSlideWindowLimiter(1000, limit, 20);
        int count = 0;
//        Thread.sleep(3000);
        //計(jì)數(shù)器滑動窗口算法模擬100組間隔30ms的50次請求
        System.out.println("計(jì)數(shù)器滑動窗口算法測試開始...");
        System.out.println("開始模擬100組間隔150ms的50次請求...");
        int failCount = 0;
        for (int j = 0; j < 100; j++) {
            count = 0;
            for (int i = 0; i < 50; i++) {
                if (counterSlideWindowLimiter.tryAcquire()) {
                    count++;
                }
            }
            Thread.sleep(150);
            //模擬50次請求,看多少能通過
            for (int i = 0; i < 50; i++) {
                if (counterSlideWindowLimiter.tryAcquire()) {
                    count++;
                }
            }
            if (count > limit) {
                System.out.println("時(shí)間窗口內(nèi)放過的請求超過閾值,放過的請求數(shù)" + count + ",限流:" + limit);
                failCount++;
            }
            Thread.sleep((int) (Math.random() * 100));
        }
        System.out.println("計(jì)數(shù)器滑動窗口算法測試結(jié)束,100組間隔150ms的50次請求模擬完成,限流失敗組數(shù): " + failCount);
        System.out.println("==========================================================================");
        //計(jì)數(shù)器固定窗口算法模擬100組間隔30ms的50次請求
        System.out.println("計(jì)數(shù)器固定窗口算法測試開始...");
        //模擬100組間隔30ms的50次請求
        CounterLimiter counterLimiter = new CounterLimiter(1000, 20);
        System.out.println("開始模擬100組間隔150ms的50次請求...");
        failCount = 0;
        for (int j = 0; j < 100; j++) {
            count = 0;
            for (int i = 0; i < 50; i++) {
                if (counterLimiter.tryAcquire()) {
                    count++;
                }
            }
            Thread.sleep(150);
            //模擬50次請求,看多少能通過
            for (int i = 0; i < 50; i++) {
                if (counterLimiter.tryAcquire()) {
                    count++;
                }
            }
            if (count > limit) {
                System.out.println("時(shí)間窗口內(nèi)放過的請求超過閾值,放過的請求數(shù)" + count + ",限流:" + limit);
                failCount++;
            }
            Thread.sleep((int) (Math.random() * 100));
        }
        System.out.println("計(jì)數(shù)器固定窗口算法測試結(jié)束,100組間隔150ms的50次請求模擬完成,限流失敗組數(shù):" + failCount);
    }
}

3. 漏斗算法

請求來了之后會首先進(jìn)到漏斗里,然后漏斗以恒定的速率將請求流出進(jìn)行處理,從而起到平滑流量的作用。當(dāng)請求的流量過大時(shí),漏斗達(dá)到最大容量時(shí)會溢出,此時(shí)請求被丟棄。

從系統(tǒng)的角度來看,我們不知道什么時(shí)候會有請求來,也不知道請求會以多大的速率來,這就給系統(tǒng)的安全性埋下隱患。但是如果加了一層漏斗算法限流之后,就能夠保證請求以恒定的速率流出。

在系統(tǒng)看來,請求永遠(yuǎn)是以平滑的傳輸速率過來,從而起到來保護(hù)系統(tǒng)的作用。

漏斗特點(diǎn):

  1. 漏斗的漏出速率是固定的,可以起到整流的作用。即雖然請求的流量可能具有隨機(jī)性,忽大忽小。但是經(jīng)過漏斗算法之后,變成來固有速率的穩(wěn)定流量,從而對下游的系統(tǒng)起到保護(hù)作用。
  2. 不能解決流量突發(fā)的問題。測試?yán)永?我們設(shè)定的漏斗速率是2個(gè)/秒 然后突然來了10個(gè)請求,受限于漏斗的容量,只有5個(gè)請求被接受,另外5個(gè)被拒絕。
    漏斗速率是2個(gè)/秒,然后瞬間接受了5個(gè)請求。但是沒有馬上處理這5個(gè)請求,處理的速度仍然是我們設(shè)定的2個(gè)/秒 令牌桶算法能夠在一定程度上解決流量突發(fā)的問題
package com.example.demo.util;
import java.util.Date;
import java.util.LinkedList;
/**
 * @ClassName LeakyBucketLimiter
 * @Description 漏斗算法
 * @Date 2021/8/12 8:00 下午
 * @Created by liyanyan
 */
public class LeakyBucketLimiter {
    private int capacity; //漏斗容量
    private int rate; //漏斗速率
    private int left; //剩余容量
    private LinkedList<Request> requestList;
    private LeakyBucketLimiter(){}
    public LeakyBucketLimiter(int capacity, int rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.left = capacity;
        requestList = new LinkedList<>();
        //開啟一個(gè)定時(shí)線程,以固定的速率將漏斗中的請求流出,進(jìn)行處理
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    if(!requestList.isEmpty()) {
                        Request request = requestList.removeFirst();
                        handleRequest(request);
                    }
                    try {
                        Thread.sleep(1000 / rate); //睡眠
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
    /**
     * 處理請求
     * @param request
     */
    private void handleRequest(Request request) {
        request.setHandleTime(new Date());
        System.out.println(request.getCode() + "號請求被處理,請求發(fā)起時(shí)間: "
                    + request.getLaunchTime() + ",請求處理時(shí)間: " + request.getHandleTime() + ",處理耗時(shí): "
                    + (request.getHandleTime().getTime() - request.getLaunchTime().getTime() + "ms"));
    }
    private synchronized boolean tryAcquire(Request request) {
        if(left <= 0) {
            return false;
        }else {
            left--;
            requestList.addLast(request);
            return true;
        }
    }
    /**
     * 請求類,屬性包含編號字符串,請求達(dá)到時(shí)間和請求處理時(shí)間
     */
    static class Request {
        private int code;
        private Date launchTime;
        private Date handleTime;
        private Request(){}
        public Request(int code, Date launchTime) {
            this.launchTime = launchTime;
            this.code = code;
        }
        public int getCode() {
            return code;
        }
        public void setCode(int code) {
            this.code = code;
        }
        public Date getLaunchTime() {
            return launchTime;
        }
        public void setLaunchTime(Date launchTime) {
            this.launchTime = launchTime;
        }
        public Date getHandleTime() {
            return handleTime;
        }
        public void setHandleTime(Date handleTime) {
            this.handleTime = handleTime;
        }
    }
    public static void main(String[] args) {
        LeakyBucketLimiter leakyBucketLimiter = new LeakyBucketLimiter(5, 2);
        for(int i=1; i<=10; i++) {
            Request request = new Request(i, new Date());
            if(leakyBucketLimiter.tryAcquire(request)) {
                System.out.println(i + "號請求被接受");
            }else {
                System.out.println(i + "號請求被拒絕");
            }
        }
    }
}

4. 令牌桶算法

令牌桶算法是對漏桶算法的一種改進(jìn),除了能夠在限制調(diào)用的平均速率的同時(shí)還允許一定程度的流量突發(fā)。

package com.example.demo.util;
import java.util.Date;
/**
 * @ClassName TokenBucketLimiter
 * @Description 令牌桶算法
 * @Date 2021/8/12 8:55 下午
 * @Created by liyanyan
 */
public class TokenBucketLimiter {
    private int capacity; //令牌桶容量
    private int rate; //令牌產(chǎn)生速率
    private int tokenAmount; //令牌數(shù)量
    public TokenBucketLimiter(int capacity, int rate) {
        this.capacity = capacity;
        this.rate = rate;
        tokenAmount = capacity;
        new Thread(new Runnable() {
            @Override
            public void run() {
                //以恒定的速率放令牌
                while (true) {
                    synchronized (this) {
                        tokenAmount++;
                        if (tokenAmount > capacity) {
                            tokenAmount = capacity;
                        }
                    }
                    try {
                        Thread.sleep(1000 / rate);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
    public synchronized boolean tryAcquire(Request request) {
        if (tokenAmount > 0) {
            tokenAmount--;
            handleRequest(request);
            return true;
        } else {
            return false;
        }
    }
    /**
     * 處理請求
     *
     * @param request
     */
    private void handleRequest(Request request) {
        request.setHandleTime(new Date());
        System.out.println(request.getCode() + "號請求被處理,請求發(fā)起時(shí)間: "
                + request.getLaunchTime() + ",請求處理時(shí)間: " + request.getHandleTime() + ",處理耗時(shí): "
                + (request.getHandleTime().getTime() - request.getLaunchTime().getTime() + "ms"));
    }
    /**
     * 請求類,屬性包含編號字符串,請求達(dá)到時(shí)間和請求處理時(shí)間
     */
    static class Request {
        private int code;
        private Date launchTime;
        private Date handleTime;
        private Request() {
        }
        public Request(int code, Date launchTime) {
            this.launchTime = launchTime;
            this.code = code;
        }
        public int getCode() {
            return code;
        }
        public void setCode(int code) {
            this.code = code;
        }
        public Date getLaunchTime() {
            return launchTime;
        }
        public void setLaunchTime(Date launchTime) {
            this.launchTime = launchTime;
        }
        public Date getHandleTime() {
            return handleTime;
        }
        public void setHandleTime(Date handleTime) {
            this.handleTime = handleTime;
        }
    }
    public static void main(String[] args) {
        TokenBucketLimiter tokenBucketLimiter = new TokenBucketLimiter(5, 2);
        for (int i = 1; i <= 10; i++) {
            Request request = new Request(i, new Date());
            if (tokenBucketLimiter.tryAcquire(request)) {
                System.out.println(i + "號請求被接受");
            }else {
                System.out.println(i + "號請求被拒絕");
            }
        }
    }
}

小結(jié):

計(jì)數(shù)器固定窗口算法實(shí)現(xiàn)簡單,容易理解。和漏斗算法相比,新來的請求也能夠被馬上處理到。但是流量曲線可能不夠平滑,有"突刺現(xiàn)象",在窗口切換時(shí)可能會產(chǎn)生兩倍于閾值流量的請求。

而計(jì)數(shù)器滑動窗口算法作為計(jì)數(shù)器固定窗口算法的一種改進(jìn),有效解決了窗口切換時(shí)可能會產(chǎn)生兩倍于閾值流量請求的問題

漏斗算法能夠?qū)α髁科鸬秸鞯淖饔?,讓隨機(jī)不穩(wěn)定的流量以固定的速率流出,但是不能解決流量突發(fā)的問題。

令牌桶算法作為漏斗算法的一種改進(jìn),除了能夠起到平滑流量的作用,還允許一定程度的流量突發(fā)。

沒有最好的算法,只有最合適的算法

比如令牌桶算法一般用于保護(hù)自身的系統(tǒng),對調(diào)用者進(jìn)行限流,保護(hù)自身的系統(tǒng)不被突發(fā)的流量打垮。如果自身的系統(tǒng)實(shí)際的處理能里強(qiáng)于配置的流量限制時(shí),可以允許一定程度的流量突發(fā),使得實(shí)際的處理速率,充分利用系統(tǒng)資源。

而漏斗算法一般用于保護(hù)第三方的系統(tǒng),比如自身的系統(tǒng)需要調(diào)用第三方的接口,為了保護(hù)第三方的系統(tǒng)不被自身的調(diào)用打垮,便可以通過漏斗算法進(jìn)行限流,保證自身的流量平穩(wěn)的達(dá)到第三方的接口上。

算法是死的,場景是活的。沒有最好的,只有最合適的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容