序
本文主要研究一下redisson的lockWatchdogTimeout
lockWatchdogTimeout
redisson/src/main/java/org/redisson/config/Config.java
private long lockWatchdogTimeout = 30 * 1000;
/**
* This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
* Lock expires after <code>lockWatchdogTimeout</code> if watchdog
* didn't extend it to next <code>lockWatchdogTimeout</code> time interval.
* <p>
* This prevents against infinity locked locks due to Redisson client crush or
* any other reason when lock can't be released in proper way.
* <p>
* Default is 30000 milliseconds
*
* @param lockWatchdogTimeout timeout in milliseconds
* @return config
*/
public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
this.lockWatchdogTimeout = lockWatchdogTimeout;
return this;
}
public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}
Config定義了lockWatchdogTimeout屬性,默認(rèn)30s
tryAcquireOnceAsync
redisson/src/main/java/org/redisson/RedissonLock.java
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
CompletionStage<Boolean> acquiredFuture;
if (leaseTime > 0) {
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
acquiredFuture = handleNoSync(threadId, acquiredFuture);
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// lock acquired
if (acquired) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper<>(f);
}
tryAcquireOnceAsync對(duì)于leaseTime小于等于0的,使用默認(rèn)的internalLockLeaseTime,并在獲取到鎖之后執(zhí)行scheduleExpirationRenewal
tryLockInnerAsync
redisson/src/main/java/org/redisson/RedissonLock.java
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
這里使用pexpire命令,參數(shù)為leaseTime,獲取到鎖的返回nil,獲取不到鎖的通過(guò)pttl返回該鎖的毫秒級(jí)的剩余存活時(shí)間
scheduleExpirationRenewal
redisson/src/main/java/org/redisson/RedissonBaseLock.java
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
scheduleExpirationRenewal對(duì)于剛放進(jìn)EXPIRATION_RENEWAL_MAP的執(zhí)行renewExpiration
renewExpiration
redisson/src/main/java/org/redisson/RedissonBaseLock.java
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
renewExpiration通過(guò)getServiceManager().newTimeout創(chuàng)建一個(gè)timerTask,delay為
internalLockLeaseTime/3,該task執(zhí)行renewExpirationAsync,若有異常則從EXPIRATION_RENEWAL_MAP移除,若續(xù)期成功則再次執(zhí)行renewExpiration調(diào)度timerTask,續(xù)期不成功(鎖不存在)則執(zhí)行cancelExpirationRenewal
cancelExpirationRenewal
redisson/src/main/java/org/redisson/RedissonBaseLock.java
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
cancelExpirationRenewal則取出ExpirationEntry,移除指定的threadId,若沒有其他threads的話再取出Timeout執(zhí)行cancel,最后從EXPIRATION_RENEWAL_MAP移除
unlockAsync0
redisson/src/main/java/org/redisson/RedissonBaseLock.java
private RFuture<Void> unlockAsync0(long threadId) {
CompletionStage<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
if (e instanceof CompletionException) {
throw (CompletionException) e;
}
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
unlockAsync0會(huì)執(zhí)行cancelExpirationRenewal去取消自動(dòng)續(xù)期
小結(jié)
redisson提供了lockWatchdogTimeout參數(shù),默認(rèn)為30s,對(duì)于加鎖時(shí)沒有指定leaseTime的,會(huì)使用默認(rèn)的lockWatchdogTimeout作為過(guò)期時(shí)間,并且獲取到鎖之后會(huì)啟動(dòng)定時(shí)任務(wù)scheduleExpirationRenewal去給鎖續(xù)期。需要自動(dòng)續(xù)期的,可以使用沒有l(wèi)easeTime參數(shù)的方法,或者leaseTime傳-1。
在unlock、續(xù)期時(shí)發(fā)現(xiàn)鎖不存在、renewExpiration的finally中發(fā)現(xiàn)Thread.currentThread().isInterrupted()這幾種情況會(huì)執(zhí)行cancelExpirationRenewal去取消自動(dòng)續(xù)期,若續(xù)期時(shí)redis異常則直接從EXPIRATION_RENEWAL_MAP中移除間接取消自動(dòng)續(xù)期