基于redis和lua的分布式限流器設(shè)計(jì)與實(shí)現(xiàn)

前言

之前這篇文章中,我大致介紹了一下google guava庫中的RateLimiter的實(shí)現(xiàn)以及它背后的令牌桶算法原理。但是也有新的問題,在分布式的環(huán)境中,我們?nèi)绾吾槍Χ鄼C(jī)環(huán)境做限流呢?在查閱了一些資料和其他人的博客之后,我采用了redis來作為限流器的實(shí)現(xiàn)基礎(chǔ)。
原因主要有以下幾點(diǎn):

  • redis作為高性能緩存系統(tǒng),性能上能夠滿足多機(jī)之間高并發(fā)訪問的要求
  • redis有比較好的api來支持限流器令牌桶算法的實(shí)現(xiàn)
  • 對于我們的系統(tǒng)來說,通過spring data redis來操作比較簡單和常見,避免了引入新的中間件帶來的風(fēng)險(xiǎn)

但是我們也知道,限流器在每次請求令牌和放入令牌操作中,存在一個(gè)協(xié)同的問題,即獲取令牌操作要盡可能保證原子性,否則無法保證限流器是否能正常工作。在RateLimiter的實(shí)現(xiàn)中使用了mutex作為互斥鎖來保證操作的原子性,那么在redis中就需要一個(gè)類似于事務(wù)的機(jī)制來保證獲取令牌中多重操作的原子性。
面對這樣的需求,我們有幾個(gè)選擇:

  • 用redis實(shí)現(xiàn)分布式鎖來保證操作的原子性,這個(gè)方案實(shí)現(xiàn)起來應(yīng)該比較簡單,分布式鎖有現(xiàn)成的例子,然后就是把Rate Limiter的代碼套用分布式鎖就行了,但是這樣的話效率會顯得不太高,特別是在大量訪問的情況下。
  • 用redis的transaction,在我查閱redis官方文檔和stackoverflow之后發(fā)現(xiàn)redis的transaction官方并不推薦,并且有可能在未來取消事務(wù),因此不可取。
  • 通過redis分布式鎖和本地鎖組成一個(gè)雙層結(jié)構(gòu),每次分布式獲取鎖之后可以預(yù)支一部分令牌量,然后放到本地通過本地的鎖來分配這些令牌,消耗完之后再到請求redis。這樣的好處是相比第一個(gè)方案,網(wǎng)絡(luò)訪問延遲開銷會比較好,但是實(shí)現(xiàn)難度和復(fù)雜程度比較難估量,而且這樣的做法如果在多機(jī)不能保證均勻分配流量的情況下并不理想
  • 通過將獲取鎖封裝到lua腳本中,提交給redis進(jìn)行eval和evalsha操作來完成lua腳本的執(zhí)行,由于lua腳本在redis中天然的原子性,我們的需求能夠比較好的滿足,問題是將業(yè)務(wù)邏輯封裝在lua中,對于開發(fā)人員自身的能力和調(diào)試存在一定的問題。

經(jīng)過權(quán)衡,我采用了第四種方式,通過redis和lua來編寫令牌桶算法來完成分布式限流的需求。

lua腳本

話不多說,先貼出lua代碼


-- 返回碼 1:操作成功 0:未配置 -1: 獲取失敗 -2:修改錯(cuò)誤,建議重新初始化 -500:不支持的操作
-- redis hashmap 中存放的內(nèi)容:
-- last_mill_second 上次放入令牌或者初始化的時(shí)間
-- stored_permits 目前令牌桶中的令牌數(shù)量
-- max_permits 令牌桶容量
-- interval 放令牌間隔
-- app 一個(gè)標(biāo)志位,表示對于當(dāng)前key有沒有限流存在

local SUCCESS = 1
local NO_LIMIT = 0
local ACQUIRE_FAIL = -1
local MODIFY_ERROR = -2
local UNSUPPORT_METHOD = -500

local ratelimit_info = redis.pcall("HMGET",KEYS[1], "last_mill_second", "stored_permits", "max_permits", "interval", "app")
local last_mill_second = ratelimit_info[1]
local stored_permits = tonumber(ratelimit_info[2])
local max_permits = tonumber(ratelimit_info[3])
local interval = tonumber(ratelimit_info[4])
local app = ratelimit_info[5]

local method = ARGV[1]

--獲取當(dāng)前毫秒
--考慮主從策略和腳本回放機(jī)制,這個(gè)time由客戶端獲取傳入
--local curr_time_arr = redis.call('TIME')
--local curr_timestamp = curr_time_arr[1] * 1000 + curr_time_arr[2]/1000
local curr_timestamp = tonumber(ARGV[2])


-- 當(dāng)前方法為初始化
if method == 'init' then
    --如果app不為null說明已經(jīng)初始化過,不要重復(fù)初始化
    if(type(app) ~='boolean' and app ~=nil) then
        return SUCCESS
    end

    redis.pcall("HMSET", KEYS[1],
        "last_mill_second", curr_timestamp,
        "stored_permits", ARGV[3],
        "max_permits", ARGV[4],
        "interval", ARGV[5],
        "app", ARGV[6])
    --始終返回成功
    return SUCCESS
end

-- 當(dāng)前方法為修改配置
if method == "modify" then
    if(type(app) =='boolean' or app ==nil) then
        return MODIFY_ERROR
    end
    --只能修改max_permits和interval
    redis.pcall("HMSET", KEYS[1],
        "max_permits", ARGV[3],
        "interval", ARGV[4])

    return SUCCESS

end

-- 當(dāng)前方法為刪除
if method == "delete" then
    --已經(jīng)清除完畢
    if(type(app) =='boolean' or app ==nil) then
        return SUCCESS
    end
    redis.pcall("DEL", KEYS[1])
    return SUCCESS
end

-- 嘗試獲取permits
if method == "acquire" then
    -- 如果app為null說明沒有對這個(gè)進(jìn)行任何配置,返回0代表不限流
    if(type(app) =='boolean' or app ==nil) then
        return NO_LIMIT
    end
    --需要獲取令牌數(shù)量
    local acquire_permits = tonumber(ARGV[3])
    --計(jì)算上一次放令牌到現(xiàn)在的時(shí)間間隔中,一共應(yīng)該放入多少令牌
    local reserve_permits = math.max(0, math.floor((curr_timestamp - last_mill_second) / interval))
    
    local new_permits = math.min(max_permits, stored_permits + reserve_permits)
    local result = ACQUIRE_FAIL
    --如果桶中令牌數(shù)量夠則放行
    if new_permits >= acquire_permits then
        result = SUCCESS
        new_permits = new_permits - acquire_permits
    end
    --更新當(dāng)前桶中的令牌數(shù)量 
    redis.pcall("HSET", KEYS[1], "stored_permits", new_permits)
    --如果這次有放入令牌,則更新時(shí)間
    if reserve_permits > 0 then
        redis.pcall("HSET", KEYS[1], "last_mill_second", curr_timestamp)
    end
    return result
end


return UNSUPPORT_METHOD

絕大部分邏輯在注釋里面都已經(jīng)寫清楚了(我java客戶端用的代碼刪掉了所有的注釋,因?yàn)樘峤簧先?bào)編譯錯(cuò)誤,但是redis-cli調(diào)試就沒問題,我也沒太關(guān)注原因)。
大致上,我在這個(gè)腳本中編寫了4種函數(shù):

  • init 初始化限流器
  • modify 修改限流器配置(主要針對限流器的桶大小和放令牌間隔,即1/QPS)
  • delete 刪除限流器配置
  • acquire 嘗試獲取制定數(shù)目的令牌

代碼基本上仿照了Guava RateLimiter的邏輯,實(shí)現(xiàn)了觸發(fā)式的放令牌策略。
由于我的需求中不需要像guava RateLimiter那樣的預(yù)支令牌的邏輯,因此如果當(dāng)前沒有令牌可供服務(wù),我就直接返回獲取失敗了。
還有一點(diǎn)需要注意的是,我本來在腳本中寫了獲取redis服務(wù)器當(dāng)前時(shí)間的代碼,但是我通過redis-cli執(zhí)行的時(shí)候報(bào)錯(cuò)了:

Write commands not allowed after non deterministic commands.

這個(gè)錯(cuò)誤的原因大家可以參見這篇文章,大致原因跟redis集群的重放和備份策略有關(guān),相當(dāng)于我調(diào)用TIME操作,會在主從各執(zhí)行一次,得到的結(jié)果肯定會存在差異,這個(gè)差異就給最終邏輯正確性帶來了不確定性。在redis 4.0之后引入了redis.replicate_commands()來放開限制。但我考慮了幾個(gè)因素之后,還是采用網(wǎng)上大部分人的做法,在執(zhí)行前先行獲取到redis的時(shí)間戳,然后當(dāng)做參數(shù)傳上去。

lua調(diào)試

對lua調(diào)試最開始花掉了我不少時(shí)間,主要對于redis-cli命令不太熟悉。大家有一樣問題的可以參見這篇文章。大致來說就是將寫好的腳本放到redis所在文件夾下(我是windows環(huán)境),然后在cmd下執(zhí)行 redis-cli.exe --eval rate_limit.lua test2(key,可重復(fù)) , (逗號分隔) init 10101 100 100 10 test2 (后跟參數(shù),空格隔開)。

java集成

在完成了lua的調(diào)試工作之后,我們就開始java部分的集成代碼編寫,我們使用的是spring boot來完成開發(fā)。
第一部分是redis配置:

    @Bean("rateLimitLua")
    public DefaultRedisScript<Long> getRateLimitScript() {
        DefaultRedisScript<Long> rateLimitLua = new DefaultRedisScript<>();
        rateLimitLua.setLocation(new ClassPathResource("rate_limit.lua"));
        rateLimitLua.setResultType(Long.class);
        return rateLimitLua;
    }

然后是一些與lua適配的枚舉和一些bean:

/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 *
 * 限流的具體方法
 */
public enum RateLimitMethod {

    //initialize rate limiter
    init,

    //modify rate limiter parameter
    modify,

    //delete rate limiter
    delete,

    //acquire permits
    acquire;
}
/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 * rate limite result
 **/
public enum RateLimitResult {

    SUCCESS(1L),
    NO_LIMIT(0L),
    ACQUIRE_FAIL(-1L),
    MODIFY_ERROR(-2L),
    UNSUPPORT_METHOD(-500L),
    ERROR(-505L);

    private Long code;

    RateLimitResult(Long code){
        this.code = code;
    }

    public static RateLimitResult getResult(Long code){
        for(RateLimitResult enums: RateLimitResult.values()){
            if(enums.code.equals(code)){
                return enums;
            }
        }
        throw new IllegalArgumentException("unknown rate limit return code:" + code);
    }
}
/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 **/
@Getter
@Setter
public class RateLimitVo {

    private String url;

    private boolean isLimit;

    private Double interval;

    private Integer maxPermits;

    private Integer initialPermits;

}

第三部分就是限流器的調(diào)用組裝部分:

/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 **/
@Service
@Slf4j
public class RateLimitClient {

    private static final String RATE_LIMIT_PREFIX = "ratelimit:";

    @Autowired
    StringRedisTemplate redisTemplate;

    @Resource
    @Qualifier("rateLimitLua")
    RedisScript<Long> rateLimitScript;

    public RateLimitResult init(String key, RateLimitVo rateLimitInfo){
        return exec(key, RateLimitMethod.init,
                rateLimitInfo.getInitialPermits(),
                rateLimitInfo.getMaxPermits(),
                rateLimitInfo.getInterval(),
                key);
    }

    public RateLimitResult modify(String key, RateLimitVo rateLimitInfo){
        return exec(key, RateLimitMethod.modify, key,
                rateLimitInfo.getMaxPermits(),
                rateLimitInfo.getInterval());
    }

    public RateLimitResult delete(String key){
        return exec(key, RateLimitMethod.delete);
    }

    public RateLimitResult acquire(String key){
        return acquire(key, 1);
    }

    public RateLimitResult acquire(String key, Integer permits){
        return exec(key, RateLimitMethod.acquire, permits);
    }

    /**
     * 執(zhí)行redis的具體方法,限制method,保證沒有其他的東西進(jìn)來
     * @param key
     * @param method
     * @param params
     * @return
     */
    private RateLimitResult exec(String key, RateLimitMethod method, Object... params){
        try {
            Long timestamp = getRedisTimestamp();
            String[] allParams = new String[params.length + 2];
            allParams[0] = method.name();
            allParams[1] = timestamp.toString();
            for(int index = 0;index < params.length; index++){
                allParams[2 + index] = params[index].toString();
            }
            Long result = redisTemplate.execute(rateLimitScript,
                    Collections.singletonList(getKey(key)),
                    allParams);
            return RateLimitResult.getResult(result);
        } catch (Exception e){
            log.error("execute redis script fail, key:{}, method:{}",
                    key, method.name(), e);
            return RateLimitResult.ERROR;
        }
    }

    private Long getRedisTimestamp(){
        Long currMillSecond = redisTemplate.execute(
                (RedisCallback<Long>) redisConnection -> redisConnection.time()
        );
        return currMillSecond;
    }
    private String getKey(String key){
        return RATE_LIMIT_PREFIX + key;
    }
}

java代碼這塊比較簡單了,基本就是封裝了之前l(fā)ua腳本中的4項(xiàng)操作。

第四部分就是測試代碼:

/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OpenApiGatewayApplication.class)
public class RateLimitTest {

    @Autowired
    private RateLimitClient rateLimitClient;

    @Test
    public void testInit(){
        RateLimitVo vo = new RateLimitVo();
        vo.setInitialPermits(500);
        vo.setMaxPermits(500);
        vo.setInterval(2.0);
        rateLimitClient.init("test", vo);
    }

    @Test
    public void testAcquire() throws InterruptedException {
        //10個(gè)線程
        ExecutorService executorService = Executors.newFixedThreadPool(20);

        Subject<RateLimitSummary, RateLimitSummary> writeSubject = new SerializedSubject<RateLimitSummary, RateLimitSummary>(PublishSubject.<RateLimitSummary>create());
        Observable<RateLimitSummary> readSubject = writeSubject.share();
        Observable<RateLimitSummary> bucketStream = Observable.defer(()->{
            return readSubject.window(200, TimeUnit.MILLISECONDS)
                    .flatMap(
                            observable->
                                    observable.reduce(new RateLimitSummary(0,0,0),
                                            (a, b)-> a.reduce(b))
                    );
        });
        Observable<RateLimitSummary> rollingBucketStream = bucketStream.window(5, 1)
                .flatMap(observable->observable.reduce(new RateLimitSummary(0, 0, 0),
                        (a, b)-> a.reduce(b)));

        Runnable acquire = () -> {
            Random random = new Random();
            while(true){
                try {
                    Thread.sleep(30);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                RateLimitResult result = rateLimitClient.acquire("test");
                writeSubject.onNext(new RateLimitSummary(result));
            }
        };
        //初始時(shí)間
        final long currentMillis = System.currentTimeMillis();
        rollingBucketStream.subscribe(summary->{
            double timestamp = (System.currentTimeMillis() - currentMillis)/1000.0;
            System.out.println("time:"+ timestamp + ", acquired:" + summary.acquire +
                    ", reject " + summary.reject + ", error: " + summary.error);
        });
        for(int i=0;i<20;i++){
            executorService.submit(acquire);
        }
        while(true){
            Thread.sleep(5000);
        }
    }

    private static class RateLimitSummary{
        public int acquire;
        public int reject;
        public int error;

        public RateLimitSummary(RateLimitResult result){
            this.acquire = result == RateLimitResult.SUCCESS?1:0;
            this.reject = result == RateLimitResult.ACQUIRE_FAIL?1:0;
            this.error = result == RateLimitResult.ERROR?1:0;
        }

        public RateLimitSummary(int acquire, int reject, int error){
            this.acquire = acquire;
            this.reject = reject;
            this.error = error;
        }

        public RateLimitSummary reduce(RateLimitSummary toAdd){
            return new RateLimitSummary(this.acquire + toAdd.acquire,
                    this.reject + toAdd.reject,
                    this.error + toAdd.error);

        }
    }

}

這一段代碼我仿照了Hystrix中的熔斷統(tǒng)計(jì)的代碼,通過一個(gè)subject來存放獲取令牌結(jié)果,然后通過第一層bucketStream來將令牌結(jié)果按照200ms來分組并且reduce成一個(gè)結(jié)果。接著通過rollingBucketStream來將200ms的分組組合成一個(gè)一秒的時(shí)間窗(即5個(gè)為一組),并且以200ms為步長滾動。最后統(tǒng)計(jì)出來的結(jié)果通過subscribe來打印結(jié)果。之前的init代碼我們看已經(jīng)初始化了一個(gè)大小為500的令牌桶,存放令牌的時(shí)間間隔為2.0ms,所以支持的QPS為500。接著我們執(zhí)行這段代碼,并截取一部分輸出:

time:75.857, acquired:460, reject 8, error: 0
time:76.056, acquired:483, reject 36, error: 0
time:76.268, acquired:506, reject 52, error: 0
time:76.454, acquired:503, reject 59, error: 0
time:76.707, acquired:457, reject 69, error: 0
time:76.854, acquired:417, reject 66, error: 0
time:77.054, acquired:454, reject 36, error: 0
time:77.255, acquired:459, reject 54, error: 0
time:77.453, acquired:458, reject 77, error: 0
time:77.658, acquired:474, reject 103, error: 0
time:77.858, acquired:490, reject 132, error: 0

可以看到,這個(gè)結(jié)果基本每200ms輸出一次,然后一秒鐘內(nèi)的獲取了令牌數(shù)目最大值跟500接近,并且能夠很好地處理reject。有一部分結(jié)果一秒鐘獲取的令牌數(shù)與500差距較大,我分析的原因是因?yàn)檎埱笾貜?fù)時(shí)間段比較多,很多請求發(fā)生在前一個(gè)獲取了令牌之后的2ms內(nèi),產(chǎn)生了reject。

結(jié)語

通過redis和lua,我實(shí)現(xiàn)了一個(gè)簡單的分布式限流器。通過上述代碼,大家能看到一個(gè)大致的實(shí)現(xiàn)框架,并且通過測試代碼完成了驗(yàn)證。如果各位看官有什么問題歡迎留言,希望能跟大家共同學(xué)習(xí)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容