談?wù)劶軜?gòu)中的限流與計(jì)數(shù)器的實(shí)現(xiàn)方式

1. 先談?wù)凬ginx分流

對(duì),要考慮限流先得假設(shè)訪問(wèn)量達(dá)到了一定得程度,再高并發(fā)得前提下,請(qǐng)求過(guò)多很有可能導(dǎo)致某天服務(wù)器承受不了導(dǎo)致死機(jī)。

在這個(gè)前提下,我相信你最先想到的一定是nginx,使用nginx分流讓所有的請(qǐng)求不要直接到達(dá)某一個(gè)服務(wù)器,當(dāng)并發(fā)量繼續(xù)上升的時(shí)候,我提供更多的服務(wù)器視乎就能解決問(wèn)題了,這想法看上去很對(duì),而且很多行業(yè)就是這么做的,比如下面的nginx配置文件,很簡(jiǎn)單,我相信你一看就能懂

#user  nobody;

worker_processes  1;

events {
    worker_connections  1024;
}

http {

    upstream enjoy{
        server 127.0.0.1:8080;        
        server 127.0.0.1:8081;      

    }
    server {
            listen 80;
            location / {
                    proxy_pass http://enjoy;
            }
    }

}

在上面案例中我配置了個(gè)代理,當(dāng)高并發(fā)的請(qǐng)求過(guò)來(lái)的時(shí)候,會(huì)把請(qǐng)求分發(fā)給8080與8081兩個(gè)端口的服務(wù)器。

但現(xiàn)在我們假設(shè)一個(gè)極端的情況,這個(gè)時(shí)候由于業(yè)務(wù)有個(gè)秒殺要求,請(qǐng)求過(guò)大一下就讓這兩個(gè)服務(wù)器爆了,這個(gè)時(shí)候在我繼續(xù)增加服務(wù)器之前這整個(gè)秒殺業(yè)務(wù)幾乎都處在癱瘓狀態(tài),而這突然的訪問(wèn)量是你始料未及的,也就是說(shuō)你根本就沒(méi)法事先準(zhǔn)備好足夠的服務(wù)器來(lái)解決這種情況。

這個(gè)時(shí)候我相信你已經(jīng)看出了如果僅僅做分流依然會(huì)出現(xiàn)很嚴(yán)重的問(wèn)題,怎么辦呢?這個(gè)時(shí)候你就需要限流了。

2. 再說(shuō)下限流

限流依然是再高并發(fā)的前提下,如果某個(gè)服務(wù)器承受不了數(shù)目過(guò)多的請(qǐng)求量的一種限制機(jī)制,不同的是分流是讓請(qǐng)求分發(fā)的其他服務(wù)器,而限流是達(dá)到某個(gè)閾值后直接不讓你訪問(wèn)了

如果想完成限流的功能其實(shí)是有一些解決方案的(算法),比如來(lái)說(shuō),基于令牌桶的,程序計(jì)數(shù)器以及漏桶算法;

今天挑一個(gè)最簡(jiǎn)單的計(jì)數(shù)器方式來(lái)講講限流

3. 解決方案

計(jì)數(shù)器的解決方式是最簡(jiǎn)單最容易實(shí)現(xiàn)的一種解決方案,假設(shè)有一個(gè)接口,要求1分鐘的訪問(wèn)量不能超過(guò)10次

這樣當(dāng)有任何請(qǐng)求過(guò)來(lái),我可以讓計(jì)數(shù)器+1;如果這個(gè)計(jì)數(shù)器的值大于10,而且和第一次的請(qǐng)求相比,時(shí)間間隔在1分鐘以內(nèi),那么久能說(shuō)明該請(qǐng)求訪問(wèn)過(guò)多。

如果這個(gè)請(qǐng)求與第一次請(qǐng)求的訪問(wèn)時(shí)間之間的間隔超過(guò)了1分鐘,那么該計(jì)數(shù)器的值久還是限流范圍之內(nèi),接下來(lái)久只要重置計(jì)數(shù)器就好;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.atomic.AtomicInteger;

public class EnjoyCountLimit {

    private int limtCount = 60;// 限制最大訪問(wèn)的容量

    AtomicInteger atomicInteger = new AtomicInteger(0); // 每秒鐘 實(shí)際請(qǐng)求的數(shù)量

    private long start = System.currentTimeMillis();// 獲取當(dāng)前系統(tǒng)時(shí)間

    private int interval = 60*1000;// 間隔時(shí)間60秒

    public boolean acquire() {

        long newTime = System.currentTimeMillis();

        if (newTime > (start + interval)) {

            // 判斷是否是一個(gè)周期

            start = newTime;

            atomicInteger.set(0); // 清理為0

            return true;

        }

        atomicInteger.incrementAndGet();// i++;

        return atomicInteger.get() <= limtCount;

    }

    static EnjoyCountLimit limitService = new EnjoyCountLimit();

    public static void main(String[] args) {

        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

        for (int i = 1; i < 100; i++) {

            final int tempI = i;

            newCachedThreadPool.execute(new Runnable() {

                public void run() {

                    if (limitService.acquire()) {

                        System.out.println("你沒(méi)有被限流,可以正常訪問(wèn)邏輯 i:" + tempI);

                    } else {

                        System.out.println("你已經(jīng)被限流呢  i:" + tempI);

                    }

                }

            });

        }

    }

}

這個(gè)計(jì)數(shù)器的限流方式很簡(jiǎn)單吧,但這樣問(wèn)題嗎?好好想想……

還是以60運(yùn)行訪問(wèn)10次請(qǐng)求為例,在第一次0-58秒之內(nèi),沒(méi)有訪問(wèn)請(qǐng)求,在59秒之內(nèi)突然來(lái)了10次請(qǐng)求,這個(gè)時(shí)候會(huì)做什么,由于已經(jīng)到了1分鐘計(jì)數(shù)器會(huì)重置。

這個(gè)時(shí)候第二次的1秒內(nèi)(1分0秒)又了10請(qǐng)求,這個(gè)時(shí)候是不是就在2秒之內(nèi)有20個(gè)請(qǐng)求被放行了呢?(59秒,1分0秒),如果某個(gè)服務(wù)器的訪問(wèn)量只能是10次請(qǐng)求,那這種限流方式已經(jīng)導(dǎo)致服務(wù)器掛了;

4. 滑動(dòng)窗口計(jì)數(shù)器

前面已經(jīng)知道簡(jiǎn)單的計(jì)數(shù)器的實(shí)現(xiàn)方式,也知道他會(huì)出現(xiàn)的一些問(wèn)題,雖然這些問(wèn)題舉得有些極端,但還是有更好得解決方案,這方案就是使用滑動(dòng)窗口計(jì)數(shù)器

滑動(dòng)窗口計(jì)數(shù)器得原理是在沒(méi)錯(cuò)請(qǐng)求過(guò)來(lái)得時(shí)候,先判斷前面N個(gè)單位內(nèi)得總訪問(wèn)量是否操過(guò)得閾值,并且在當(dāng)前得時(shí)間單位得請(qǐng)求數(shù)上+1

舉例來(lái)說(shuō),要求1分鐘的訪問(wèn)量不能超過(guò)10次

可以把1分鐘看成是6個(gè)10秒鐘的時(shí)間,0-9秒的訪問(wèn)數(shù)記錄到第一個(gè)格子,10-19秒的訪問(wèn)數(shù)記錄數(shù)記錄到第二個(gè)格子以此內(nèi)推,每次統(tǒng)計(jì)將6個(gè)格子里面的數(shù)據(jù)求和,如果超過(guò)了10次就不允許訪問(wèn)。

import java.util.concurrent.atomic.AtomicInteger;

public class EnjoySlidingWindow {

    private AtomicInteger[] timeSlices;

    /* 隊(duì)列的總長(zhǎng)度  */

    private final int timeSliceSize;

    /* 每個(gè)時(shí)間片的時(shí)長(zhǎng) */

    private final long timeMillisPerSlice;

    /* 窗口長(zhǎng)度 */

    private final int windowSize;

    /* 當(dāng)前所使用的時(shí)間片位置 */

    private AtomicInteger cursor = new AtomicInteger(0);

    public static enum Time {

        MILLISECONDS(1),

        SECONDS(1000),

        MINUTES(SECONDS.getMillis() * 60),

        HOURS(MINUTES.getMillis() * 60),

        DAYS(HOURS.getMillis() * 24),

        WEEKS(DAYS.getMillis() * 7);

        private long millis;

        Time(long millis) {

            this.millis = millis;

        }

        public long getMillis() {

            return millis;

        }

    }

    public EnjoySlidingWindow(int windowSize, Time timeSlice) {

        this.timeMillisPerSlice = timeSlice.millis;

        this.windowSize = windowSize;

        // 保證存儲(chǔ)在至少兩個(gè)window

        this.timeSliceSize = windowSize * 2 + 1;

        init();

    }

    /**

     * 初始化

     */

    private void init() {

        AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];

        for (int i = 0; i < timeSliceSize; i++) {

            localTimeSlices[i] = new AtomicInteger(0);

        }

        timeSlices = localTimeSlices;

    }

    private int locationIndex() {

        long time = System.currentTimeMillis();

        return (int) ((time / timeMillisPerSlice) % timeSliceSize);

    }

    /**

     * <p>對(duì)時(shí)間片計(jì)數(shù)+1,并返回窗口中所有的計(jì)數(shù)總和

     * <p>該方法只要調(diào)用就一定會(huì)對(duì)某個(gè)時(shí)間片進(jìn)行+1

     * @return

     */

    public int incrementAndSum() {

        int index = locationIndex();

        int sum = 0;

        // cursor等于index,返回true

        // cursor不等于index,返回false,并會(huì)將cursor設(shè)置為index

        int oldCursor = cursor.getAndSet(index);

        if (oldCursor == index) {

            // 在當(dāng)前時(shí)間片里繼續(xù)+1

            sum += timeSlices[index].incrementAndGet();

        } else {

            //輪到新的時(shí)間片,置0,可能有其它線程也置了該值,容許

            timeSlices[index].set(0);

            // 清零,訪問(wèn)量不大時(shí)會(huì)有時(shí)間片跳躍的情況

            clearBetween(oldCursor, index);

            sum += timeSlices[index].incrementAndGet();

        }

        for (int i = 1; i < windowSize; i++) {

            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();

        }

        return sum;

    }

    /**

     * 判斷是否允許進(jìn)行訪問(wèn),未超過(guò)閾值的話才會(huì)對(duì)某個(gè)時(shí)間片+1

     * @param threshold

     * @return

     */

    public boolean allow(int threshold) {

        int index = locationIndex();

        int sum = 0;

        int oldCursor = cursor.getAndSet(index);

        if (oldCursor != index) {

            timeSlices[index].set(0);

            clearBetween(oldCursor, index);

        }

        for (int i = 0; i < windowSize; i++) {

            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();

        }

        // 閾值判斷

        if (sum < threshold) {

            // 未超過(guò)閾值才+1

            timeSlices[index].incrementAndGet();

            return true;

        }

        return false;

    }

    /**

     * <p>將fromIndex~toIndex之間的時(shí)間片計(jì)數(shù)都清零

     * <p>極端情況下,當(dāng)循環(huán)隊(duì)列已經(jīng)走了超過(guò)1個(gè)timeSliceSize以上,這里的清零并不能如期望的進(jìn)行

     * @param fromIndex 不包含

     * @param toIndex 不包含

     */

    private void clearBetween(int fromIndex, int toIndex) {

        for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {

            timeSlices[index].set(0);

        }

    }

    public static void main(String[] args) {

        EnjoySlidingWindow window = new EnjoySlidingWindow(5, Time.MILLISECONDS);

        for (int i = 0; i < 10; i++) {

            System.out.println(window.allow(7));

        }

    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容