Redisson 限流源碼學(xué)習(xí)

最近業(yè)務(wù)中用到了Redisson限流的功能,順便研究一下底層實(shí)現(xiàn)
基于當(dāng)前使用的版本<version>3.10.7</version>
目前用到的是accqure(),具體邏輯分析見(jiàn)代碼中的注釋

    @Override
    public void acquire(long permits) {
        // get 同步獲取
        get(acquireAsync(permits));
    }

    // RFuture是繼承jdk的Future類(lèi)
    @Override
    public <V> V get(RFuture<V> future) {
        if (!future.isDone()) {
            CountDownLatch l = new CountDownLatch(1);
            future.onComplete((res, e) -> {
                l.countDown();
            });

            boolean interrupted = false;
            while (!future.isDone()) {
                try {
                    // future complete以后解除阻塞
                    l.await();
                } catch (InterruptedException e) {
                    interrupted = true;
                    break;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }

        // commented out due to blocking issues up to 200 ms per minute for each thread
        // future.awaitUninterruptibly();
        if (future.isSuccess()) {
            return future.getNow();
        }

        throw convertException(future);
    }

    @Override
    public RFuture<Void> acquireAsync(long permits) {
        RPromise<Void> promise = new RedissonPromise<Void>();
        // permits 代表要獲取的許可數(shù)量,一般一次獲取一個(gè)
        // -1代表不設(shè)置超時(shí)   可以看看tryAcquire帶超時(shí)設(shè)置的重載方法了解此參數(shù)
        // null代表時(shí)間單位   沒(méi)有設(shè)置時(shí)間所以單位為空
        tryAcquireAsync(permits, -1, null).onComplete((res, e) -> {
            if (e != null) {
                promise.tryFailure(e);
                return;
            }
            
            promise.trySuccess(null);
        });
        return promise;
    }

    @Override
    public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
        RPromise<Boolean> promise = new RedissonPromise<Boolean>();
        long timeoutInMillis = -1;
        // 如果有設(shè)置超時(shí)時(shí)間 轉(zhuǎn)換為毫秒 調(diào)用真正的執(zhí)行邏輯
        if (timeout > 0) {
            timeoutInMillis = unit.toMillis(timeout);
        }
        tryAcquireAsync(permits, promise, timeoutInMillis);
        return promise;
    }

    private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
        long s = System.currentTimeMillis();
        // 執(zhí)行l(wèi)ua腳本,并返回Long類(lèi)型。具體腳本內(nèi)容往下看
        RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
        // delay 代表lua腳本執(zhí)行返回的值   e代表異常
        future.onComplete((delay, e) -> {
            if (e != null) {
                promise.tryFailure(e);
                return;
            }
            // 返回空就代表 獲取許可成功了,為啥空代表成功需要看后面lua腳本
            if (delay == null) {
                // 給上層返回true
                promise.trySuccess(true);
                return;
            }
            // 走到這里表示獲取許可失敗了,但是獲取許可失敗了 要繼續(xù)嘗試
            // -1 代表不超時(shí)的邏輯
            // 獲取許可失敗的時(shí)候  返回的值賦給了delay  為啥取名delay因?yàn)榉祷氐氖堑榷嗑貌拍芟乱淮潍@取
            if (timeoutInMillis == -1) {
                commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                    tryAcquireAsync(permits, promise, timeoutInMillis);
                }, delay, TimeUnit.MILLISECONDS);
                return;
            }
            // 走到這里表示或許許可失敗,但是設(shè)置了超時(shí)時(shí)間
            // 先看看已經(jīng)花了多久
            long el = System.currentTimeMillis() - s;
            long remains = timeoutInMillis - el;
            // 超時(shí)了  給上層false
            if (remains <= 0) {
                promise.trySuccess(false);
                return;
            }
            // 暫時(shí)還沒(méi)有超時(shí),但是剩余的時(shí)間  比lua返回的要等待的時(shí)間還要短,那在超時(shí)時(shí)間內(nèi)也不會(huì)成功
            // 給上層false
            if (remains < delay) {
                commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                    promise.trySuccess(false);
                }, remains, TimeUnit.MILLISECONDS);
            } else {
                long start = System.currentTimeMillis();
                // 等待delay時(shí)間后再次嘗試獲取許可
                // 但是嘗試之前再做一次超時(shí)判斷
                commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                    long elapsed = System.currentTimeMillis() - start;
                    if (remains <= elapsed) {
                        promise.trySuccess(false);
                        return;
                    }
                    // 遞歸調(diào)用
                    tryAcquireAsync(permits, promise, remains - elapsed);
                }, delay, TimeUnit.MILLISECONDS);
            }
        });
    }


    private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        // KEYS 有3個(gè)   ARGS只有1個(gè)
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 在或許許可之前 校驗(yàn)有沒(méi)有創(chuàng)建對(duì)應(yīng)的 限流器基本信息  KEYS[1] 對(duì)應(yīng)的是限流器的name
              // 往下查看trySetRate(創(chuàng)建限流器的時(shí)候會(huì)調(diào)用)可以看到是怎么設(shè)置進(jìn)去的
              // rate是限速比如 每秒100 這里就是100  
              // interval是時(shí)間間隔 按每秒那這里就是1秒對(duì)應(yīng)的毫秒數(shù)
              // type  0 全局限流  1 按客戶端限流
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              // valueName  即存放限流值對(duì)應(yīng)的redis key
              + "local valueName = KEYS[2];"
              // 1 代表是按客戶端分別限流   0 代表的是全局限流
              // 按客戶端限流的話  redis key還要加上 客戶端的id信息,具體看后面KEYS數(shù)組中第三個(gè)的值
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
              + "end;"
              
              + "local currentValue = redis.call('get', valueName); "
              // 如果限流值存在
              + "if currentValue ~= false then "
                     // 比較當(dāng)前值夠不夠要申請(qǐng)的許可數(shù)   不夠說(shuō)明達(dá)到限流上限了  然后返回ttl 也就是還有多久到期
                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                         + "return redis.call('pttl', valueName); "
                     + "else "
                     // 如果夠,那就減去本次申請(qǐng)的許可數(shù)  然后返回空,空就代表成功了
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "return nil; "
                     + "end; "
              + "else "
                     // 如果限流值在redis不存在  那說(shuō)明是第一次申請(qǐng)?jiān)S可或者又到了新的1秒 之前的過(guò)期了,所以要?jiǎng)?chuàng)建redis值
                     // 判斷申請(qǐng)的許可數(shù)是否太大,比如每秒限流100  你傳進(jìn)來(lái)101  那肯定申請(qǐng)不下來(lái)
                     + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
                     // 初始化限流的值  并設(shè)置過(guò)期時(shí)間
                     + "redis.call('set', valueName, rate, 'px', interval); "
                     // 然后扣減本次申請(qǐng)的許可數(shù)
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "return nil; "
              + "end;",
                Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), 
                value, commandExecutor.getConnectionManager().getId().toString());
    }

    @Override
    public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
        return get(trySetRateAsync(type, rate, rateInterval, unit));
    }

    @Override
    public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
              + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
              + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
                Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
    }

總結(jié)下來(lái)就是,往redis設(shè)置一個(gè)限流的數(shù)值,超時(shí)時(shí)間就是限流的時(shí)間區(qū)間
然后就去查詢這個(gè)限流的數(shù)值,如果沒(méi)有查到,肯定可以獲取許可;如果查到了,要看看有沒(méi)有超過(guò)許可數(shù)
如果獲取許可成功了就返回nil,上層就知道成功;如果或許失敗那就要等超時(shí)時(shí)間過(guò)去再請(qǐng)求,據(jù)返回一個(gè)delay時(shí)間。

但是,這個(gè)執(zhí)行一次沒(méi)有獲取到許可的話,還要重試,所以上一層方法增加了重試的邏輯,重試是靠io.netty.util.concurrent.EventExecutorGroup#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)來(lái)實(shí)現(xiàn),EventExecutorGroup實(shí)現(xiàn)了java.util.concurrent.ScheduledExecutorService。

如果我們熟悉Redis的話,應(yīng)該會(huì)覺(jué)得限流可以用其他的數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn),比如zset,用zrangescore來(lái)獲取某一時(shí)間窗口內(nèi)的請(qǐng)求數(shù),然后判斷有沒(méi)有達(dá)到限流閾值。

我們升級(jí)一下redisson,看看高版本有沒(méi)有優(yōu)化。
繼續(xù)看看版本<version>3.34.0</version>

    private CompletableFuture<Boolean> tryAcquireAsync(long permits, long timeoutInMillis) {
        long s = System.currentTimeMillis();
        RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
        return future.thenCompose(delay -> {
            if (delay == null) {
                return CompletableFuture.completedFuture(true);
            }
            
            if (timeoutInMillis == -1) {
                CompletableFuture<Boolean> f = new CompletableFuture<>();
                getServiceManager().newTimeout(t -> {
                    CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);
                    commandExecutor.transfer(r, f);
                }, delay, TimeUnit.MILLISECONDS);
                return f;
            }
            
            long el = System.currentTimeMillis() - s;
            long remains = timeoutInMillis - el;
            if (remains <= 0) {
                return CompletableFuture.completedFuture(false);
            }

            CompletableFuture<Boolean> f = new CompletableFuture<>();
            if (remains < delay) {
                getServiceManager().newTimeout(t -> {
                    f.complete(false);
                }, remains, TimeUnit.MILLISECONDS);
            } else {
                long start = System.currentTimeMillis();
                getServiceManager().newTimeout(t -> {
                    long elapsed = System.currentTimeMillis() - start;
                    if (remains <= elapsed) {
                        f.complete(false);
                        return;
                    }

                    CompletableFuture<Boolean> r = tryAcquireAsync(permits, remains - elapsed);
                    commandExecutor.transfer(r, f);
                }, delay, TimeUnit.MILLISECONDS);
            }
            return f;
        }).toCompletableFuture();
    }

    private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        byte[] random = getServiceManager().generateIdArray();

        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              
              + "local valueName = KEYS[2];"
              + "local permitsName = KEYS[4];"
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
                  + "permitsName = KEYS[5];"
              + "end;"

              + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
              // 存儲(chǔ)當(dāng)前剩余的許可數(shù)
              + "local currentValue = redis.call('get', valueName); "
              + "local res;"
              + "if currentValue ~= false then "
                     // 窗口滑動(dòng),查詢已過(guò)期的許可
                     + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                     + "local released = 0; "
                     + "for i, v in ipairs(expiredValues) do "
                          + "local random, permits = struct.unpack('Bc0I', v);"
                          // 所有已過(guò)期的許可數(shù)加起來(lái)
                          + "released = released + permits;"
                     + "end; "

                     + "if released > 0 then "
                          // 刪除已過(guò)期的許可  避免下一次range查詢又查出來(lái)了
                          + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                          // 如果當(dāng)前剩余許可數(shù) + 已過(guò)期 > 總限流數(shù)。這種一般不存在,除非重新設(shè)置了限流速率?
                         // 如果是這樣的話,重新計(jì)算一下剩余許可數(shù),用rate - 有效期內(nèi)已經(jīng)申請(qǐng)的許可數(shù),這里不是100%準(zhǔn)確   zcard是集合計(jì)數(shù)   一個(gè)條目申請(qǐng)的許可數(shù)可能是大于1的
                        // 當(dāng)前剩余許可數(shù) + 已過(guò)期 <= 總限流數(shù)   已過(guò)期的可以重新在新窗口使用
                          + "if tonumber(currentValue) + released > tonumber(rate) then "
                               + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
                          + "else "
                               + "currentValue = tonumber(currentValue) + released; "
                          + "end; "
                          + "redis.call('set', valueName, currentValue);"
                     + "end;"

                     // 如果當(dāng)前剩余許可數(shù) 小于 本次申請(qǐng)的許可數(shù),則申請(qǐng)失敗  最后會(huì)返回nil
                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                         + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
                         // 計(jì)算多長(zhǎng)時(shí)間后可以重新獲取許可  3是什么意思沒(méi)有看懂
                         + "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
                     + "else "
                         // 申請(qǐng)成功  zset寫(xiě)入許可申請(qǐng)記錄
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "res = nil; "
                     + "end; "
              + "else "
                     // 初始化限流數(shù)量
                     + "redis.call('set', valueName, rate); "
                     // 記錄本次申請(qǐng)的許可
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                     // 剩余許可數(shù)量 = 限流數(shù)量 - 本次申請(qǐng)的許可數(shù)量
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "res = nil; "
              + "end;"

              + "local ttl = redis.call('pttl', KEYS[1]); "
              + "if ttl > 0 then "
                  + "redis.call('pexpire', valueName, ttl); "
                  + "redis.call('pexpire', permitsName, ttl); "
              + "end; "
              + "return res;",
                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
                value, System.currentTimeMillis(), random);
    }

新版本的設(shè)計(jì)思路完全改變了,用k/v存剩余許可數(shù),zset存許可申請(qǐng)明細(xì)。
每一次許可申請(qǐng)使用zadd往zset增加一條,用毫秒時(shí)間戳做score 用struct.park 壓縮成二進(jìn)制存儲(chǔ)申請(qǐng)的許可數(shù)。
每一次請(qǐng)求用滑動(dòng)窗口動(dòng)態(tài)判斷當(dāng)前剩余許可數(shù)。
如果達(dá)到限流上限,返回ttl,外層采用延遲重試的方式繼續(xù)請(qǐng)求獲取許可,思路和之前版本類(lèi)似。不同的是,這里使用了netty時(shí)間輪
io.netty.util.HashedWheelTimer#newTimeout

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容