最近業(yè)務(wù)中用到了Redisson限流的功能,順便研究一下底層實(shí)現(xiàn)
基于當(dāng)前使用的版本<version>3.10.7</version>
目前用到的是accqure(),具體邏輯分析見(jiàn)代碼中的注釋
@Override
public void acquire(long permits) {
// get 同步獲取
get(acquireAsync(permits));
}
// RFuture是繼承jdk的Future類(lèi)
@Override
public <V> V get(RFuture<V> future) {
if (!future.isDone()) {
CountDownLatch l = new CountDownLatch(1);
future.onComplete((res, e) -> {
l.countDown();
});
boolean interrupted = false;
while (!future.isDone()) {
try {
// future complete以后解除阻塞
l.await();
} catch (InterruptedException e) {
interrupted = true;
break;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
// commented out due to blocking issues up to 200 ms per minute for each thread
// future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
@Override
public RFuture<Void> acquireAsync(long permits) {
RPromise<Void> promise = new RedissonPromise<Void>();
// permits 代表要獲取的許可數(shù)量,一般一次獲取一個(gè)
// -1代表不設(shè)置超時(shí) 可以看看tryAcquire帶超時(shí)設(shè)置的重載方法了解此參數(shù)
// null代表時(shí)間單位 沒(méi)有設(shè)置時(shí)間所以單位為空
tryAcquireAsync(permits, -1, null).onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
promise.trySuccess(null);
});
return promise;
}
@Override
public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
RPromise<Boolean> promise = new RedissonPromise<Boolean>();
long timeoutInMillis = -1;
// 如果有設(shè)置超時(shí)時(shí)間 轉(zhuǎn)換為毫秒 調(diào)用真正的執(zhí)行邏輯
if (timeout > 0) {
timeoutInMillis = unit.toMillis(timeout);
}
tryAcquireAsync(permits, promise, timeoutInMillis);
return promise;
}
private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
long s = System.currentTimeMillis();
// 執(zhí)行l(wèi)ua腳本,并返回Long類(lèi)型。具體腳本內(nèi)容往下看
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
// delay 代表lua腳本執(zhí)行返回的值 e代表異常
future.onComplete((delay, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
// 返回空就代表 獲取許可成功了,為啥空代表成功需要看后面lua腳本
if (delay == null) {
// 給上層返回true
promise.trySuccess(true);
return;
}
// 走到這里表示獲取許可失敗了,但是獲取許可失敗了 要繼續(xù)嘗試
// -1 代表不超時(shí)的邏輯
// 獲取許可失敗的時(shí)候 返回的值賦給了delay 為啥取名delay因?yàn)榉祷氐氖堑榷嗑貌拍芟乱淮潍@取
if (timeoutInMillis == -1) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
tryAcquireAsync(permits, promise, timeoutInMillis);
}, delay, TimeUnit.MILLISECONDS);
return;
}
// 走到這里表示或許許可失敗,但是設(shè)置了超時(shí)時(shí)間
// 先看看已經(jīng)花了多久
long el = System.currentTimeMillis() - s;
long remains = timeoutInMillis - el;
// 超時(shí)了 給上層false
if (remains <= 0) {
promise.trySuccess(false);
return;
}
// 暫時(shí)還沒(méi)有超時(shí),但是剩余的時(shí)間 比lua返回的要等待的時(shí)間還要短,那在超時(shí)時(shí)間內(nèi)也不會(huì)成功
// 給上層false
if (remains < delay) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
promise.trySuccess(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
// 等待delay時(shí)間后再次嘗試獲取許可
// 但是嘗試之前再做一次超時(shí)判斷
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
promise.trySuccess(false);
return;
}
// 遞歸調(diào)用
tryAcquireAsync(permits, promise, remains - elapsed);
}, delay, TimeUnit.MILLISECONDS);
}
});
}
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
// KEYS 有3個(gè) ARGS只有1個(gè)
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 在或許許可之前 校驗(yàn)有沒(méi)有創(chuàng)建對(duì)應(yīng)的 限流器基本信息 KEYS[1] 對(duì)應(yīng)的是限流器的name
// 往下查看trySetRate(創(chuàng)建限流器的時(shí)候會(huì)調(diào)用)可以看到是怎么設(shè)置進(jìn)去的
// rate是限速比如 每秒100 這里就是100
// interval是時(shí)間間隔 按每秒那這里就是1秒對(duì)應(yīng)的毫秒數(shù)
// type 0 全局限流 1 按客戶端限流
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
// valueName 即存放限流值對(duì)應(yīng)的redis key
+ "local valueName = KEYS[2];"
// 1 代表是按客戶端分別限流 0 代表的是全局限流
// 按客戶端限流的話 redis key還要加上 客戶端的id信息,具體看后面KEYS數(shù)組中第三個(gè)的值
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "end;"
+ "local currentValue = redis.call('get', valueName); "
// 如果限流值存在
+ "if currentValue ~= false then "
// 比較當(dāng)前值夠不夠要申請(qǐng)的許可數(shù) 不夠說(shuō)明達(dá)到限流上限了 然后返回ttl 也就是還有多久到期
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "return redis.call('pttl', valueName); "
+ "else "
// 如果夠,那就減去本次申請(qǐng)的許可數(shù) 然后返回空,空就代表成功了
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end; "
+ "else "
// 如果限流值在redis不存在 那說(shuō)明是第一次申請(qǐng)?jiān)S可或者又到了新的1秒 之前的過(guò)期了,所以要?jiǎng)?chuàng)建redis值
// 判斷申請(qǐng)的許可數(shù)是否太大,比如每秒限流100 你傳進(jìn)來(lái)101 那肯定申請(qǐng)不下來(lái)
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
// 初始化限流的值 并設(shè)置過(guò)期時(shí)間
+ "redis.call('set', valueName, rate, 'px', interval); "
// 然后扣減本次申請(qǐng)的許可數(shù)
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end;",
Arrays.<Object>asList(getName(), getValueName(), getClientValueName()),
value, commandExecutor.getConnectionManager().getId().toString());
}
@Override
public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return get(trySetRateAsync(type, rate, rateInterval, unit));
}
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
總結(jié)下來(lái)就是,往redis設(shè)置一個(gè)限流的數(shù)值,超時(shí)時(shí)間就是限流的時(shí)間區(qū)間
然后就去查詢這個(gè)限流的數(shù)值,如果沒(méi)有查到,肯定可以獲取許可;如果查到了,要看看有沒(méi)有超過(guò)許可數(shù)
如果獲取許可成功了就返回nil,上層就知道成功;如果或許失敗那就要等超時(shí)時(shí)間過(guò)去再請(qǐng)求,據(jù)返回一個(gè)delay時(shí)間。
但是,這個(gè)執(zhí)行一次沒(méi)有獲取到許可的話,還要重試,所以上一層方法增加了重試的邏輯,重試是靠io.netty.util.concurrent.EventExecutorGroup#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)來(lái)實(shí)現(xiàn),EventExecutorGroup實(shí)現(xiàn)了java.util.concurrent.ScheduledExecutorService。
如果我們熟悉Redis的話,應(yīng)該會(huì)覺(jué)得限流可以用其他的數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn),比如zset,用zrangescore來(lái)獲取某一時(shí)間窗口內(nèi)的請(qǐng)求數(shù),然后判斷有沒(méi)有達(dá)到限流閾值。
我們升級(jí)一下redisson,看看高版本有沒(méi)有優(yōu)化。
繼續(xù)看看版本<version>3.34.0</version>
private CompletableFuture<Boolean> tryAcquireAsync(long permits, long timeoutInMillis) {
long s = System.currentTimeMillis();
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
return future.thenCompose(delay -> {
if (delay == null) {
return CompletableFuture.completedFuture(true);
}
if (timeoutInMillis == -1) {
CompletableFuture<Boolean> f = new CompletableFuture<>();
getServiceManager().newTimeout(t -> {
CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
return f;
}
long el = System.currentTimeMillis() - s;
long remains = timeoutInMillis - el;
if (remains <= 0) {
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> f = new CompletableFuture<>();
if (remains < delay) {
getServiceManager().newTimeout(t -> {
f.complete(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
getServiceManager().newTimeout(t -> {
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
f.complete(false);
return;
}
CompletableFuture<Boolean> r = tryAcquireAsync(permits, remains - elapsed);
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
}
return f;
}).toCompletableFuture();
}
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
byte[] random = getServiceManager().generateIdArray();
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
+ "local valueName = KEYS[2];"
+ "local permitsName = KEYS[4];"
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "permitsName = KEYS[5];"
+ "end;"
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
// 存儲(chǔ)當(dāng)前剩余的許可數(shù)
+ "local currentValue = redis.call('get', valueName); "
+ "local res;"
+ "if currentValue ~= false then "
// 窗口滑動(dòng),查詢已過(guò)期的許可
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
+ "local random, permits = struct.unpack('Bc0I', v);"
// 所有已過(guò)期的許可數(shù)加起來(lái)
+ "released = released + permits;"
+ "end; "
+ "if released > 0 then "
// 刪除已過(guò)期的許可 避免下一次range查詢又查出來(lái)了
+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
// 如果當(dāng)前剩余許可數(shù) + 已過(guò)期 > 總限流數(shù)。這種一般不存在,除非重新設(shè)置了限流速率?
// 如果是這樣的話,重新計(jì)算一下剩余許可數(shù),用rate - 有效期內(nèi)已經(jīng)申請(qǐng)的許可數(shù),這里不是100%準(zhǔn)確 zcard是集合計(jì)數(shù) 一個(gè)條目申請(qǐng)的許可數(shù)可能是大于1的
// 當(dāng)前剩余許可數(shù) + 已過(guò)期 <= 總限流數(shù) 已過(guò)期的可以重新在新窗口使用
+ "if tonumber(currentValue) + released > tonumber(rate) then "
+ "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
+ "else "
+ "currentValue = tonumber(currentValue) + released; "
+ "end; "
+ "redis.call('set', valueName, currentValue);"
+ "end;"
// 如果當(dāng)前剩余許可數(shù) 小于 本次申請(qǐng)的許可數(shù),則申請(qǐng)失敗 最后會(huì)返回nil
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
// 計(jì)算多長(zhǎng)時(shí)間后可以重新獲取許可 3是什么意思沒(méi)有看懂
+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
+ "else "
// 申請(qǐng)成功 zset寫(xiě)入許可申請(qǐng)記錄
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end; "
+ "else "
// 初始化限流數(shù)量
+ "redis.call('set', valueName, rate); "
// 記錄本次申請(qǐng)的許可
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
// 剩余許可數(shù)量 = 限流數(shù)量 - 本次申請(qǐng)的許可數(shù)量
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end;"
+ "local ttl = redis.call('pttl', KEYS[1]); "
+ "if ttl > 0 then "
+ "redis.call('pexpire', valueName, ttl); "
+ "redis.call('pexpire', permitsName, ttl); "
+ "end; "
+ "return res;",
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
value, System.currentTimeMillis(), random);
}
新版本的設(shè)計(jì)思路完全改變了,用k/v存剩余許可數(shù),zset存許可申請(qǐng)明細(xì)。
每一次許可申請(qǐng)使用zadd往zset增加一條,用毫秒時(shí)間戳做score 用struct.park 壓縮成二進(jìn)制存儲(chǔ)申請(qǐng)的許可數(shù)。
每一次請(qǐng)求用滑動(dòng)窗口動(dòng)態(tài)判斷當(dāng)前剩余許可數(shù)。
如果達(dá)到限流上限,返回ttl,外層采用延遲重試的方式繼續(xù)請(qǐng)求獲取許可,思路和之前版本類(lèi)似。不同的是,這里使用了netty時(shí)間輪
io.netty.util.HashedWheelTimer#newTimeout