聊聊redisson的lockWatchdogTimeout

本文主要研究一下redisson的lockWatchdogTimeout

lockWatchdogTimeout

redisson/src/main/java/org/redisson/config/Config.java


    private long lockWatchdogTimeout = 30 * 1000;

    /**
     * This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
     * Lock expires after <code>lockWatchdogTimeout</code> if watchdog
     * didn't extend it to next <code>lockWatchdogTimeout</code> time interval.
     * <p>
     * This prevents against infinity locked locks due to Redisson client crush or
     * any other reason when lock can't be released in proper way.
     * <p>
     * Default is 30000 milliseconds
     *
     * @param lockWatchdogTimeout timeout in milliseconds
     * @return config
     */
    public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
        this.lockWatchdogTimeout = lockWatchdogTimeout;
        return this;
    }

    public long getLockWatchdogTimeout() {
        return lockWatchdogTimeout;
    }

Config定義了lockWatchdogTimeout屬性,默認(rèn)30s

tryAcquireOnceAsync

redisson/src/main/java/org/redisson/RedissonLock.java


    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

    private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        CompletionStage<Boolean> acquiredFuture;
        if (leaseTime > 0) {
            acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }

        acquiredFuture = handleNoSync(threadId, acquiredFuture);

        CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
            // lock acquired
            if (acquired) {
                if (leaseTime > 0) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
            return acquired;
        });
        return new CompletableFutureWrapper<>(f);
    }

tryAcquireOnceAsync對(duì)于leaseTime小于等于0的,使用默認(rèn)的internalLockLeaseTime,并在獲取到鎖之后執(zhí)行scheduleExpirationRenewal

tryLockInnerAsync

redisson/src/main/java/org/redisson/RedissonLock.java

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                            "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

這里使用pexpire命令,參數(shù)為leaseTime,獲取到鎖的返回nil,獲取不到鎖的通過(guò)pttl返回該鎖的毫秒級(jí)的剩余存活時(shí)間

scheduleExpirationRenewal

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            try {
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }

scheduleExpirationRenewal對(duì)于剛放進(jìn)EXPIRATION_RENEWAL_MAP的執(zhí)行renewExpiration

renewExpiration

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = getServiceManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock {} expiration", getRawName(), e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

renewExpiration通過(guò)getServiceManager().newTimeout創(chuàng)建一個(gè)timerTask,delay為internalLockLeaseTime/3,該task執(zhí)行renewExpirationAsync,若有異常則從EXPIRATION_RENEWAL_MAP移除,若續(xù)期成功則再次執(zhí)行renewExpiration調(diào)度timerTask,續(xù)期不成功(鎖不存在)則執(zhí)行cancelExpirationRenewal

cancelExpirationRenewal

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    protected void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        if (threadId != null) {
            task.removeThreadId(threadId);
        }

        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }

cancelExpirationRenewal則取出ExpirationEntry,移除指定的threadId,若沒有其他threads的話再取出Timeout執(zhí)行cancel,最后從EXPIRATION_RENEWAL_MAP移除

unlockAsync0

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    private RFuture<Void> unlockAsync0(long threadId) {
        CompletionStage<Boolean> future = unlockInnerAsync(threadId);
        CompletionStage<Void> f = future.handle((opStatus, e) -> {
            cancelExpirationRenewal(threadId);

            if (e != null) {
                if (e instanceof CompletionException) {
                    throw (CompletionException) e;
                }
                throw new CompletionException(e);
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                throw new CompletionException(cause);
            }

            return null;
        });

        return new CompletableFutureWrapper<>(f);
    }

unlockAsync0會(huì)執(zhí)行cancelExpirationRenewal去取消自動(dòng)續(xù)期

小結(jié)

redisson提供了lockWatchdogTimeout參數(shù),默認(rèn)為30s,對(duì)于加鎖時(shí)沒有指定leaseTime的,會(huì)使用默認(rèn)的lockWatchdogTimeout作為過(guò)期時(shí)間,并且獲取到鎖之后會(huì)啟動(dòng)定時(shí)任務(wù)scheduleExpirationRenewal去給鎖續(xù)期。需要自動(dòng)續(xù)期的,可以使用沒有l(wèi)easeTime參數(shù)的方法,或者leaseTime傳-1。

在unlock、續(xù)期時(shí)發(fā)現(xiàn)鎖不存在、renewExpiration的finally中發(fā)現(xiàn)Thread.currentThread().isInterrupted()這幾種情況會(huì)執(zhí)行cancelExpirationRenewal去取消自動(dòng)續(xù)期,若續(xù)期時(shí)redis異常則直接從EXPIRATION_RENEWAL_MAP中移除間接取消自動(dòng)續(xù)期

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容