使用Redisson實(shí)現(xiàn)可重入分布式鎖

前言

主流的分布式鎖一般有三種實(shí)現(xiàn)方式:

  1. 數(shù)據(jù)庫樂觀鎖

  2. 基于Redis的分布式鎖

  3. 基于ZooKeeper的分布式鎖

之前我在博客上寫過關(guān)于mysql和redis實(shí)現(xiàn)分布式鎖的具體方案: https://www.cnblogs.com/wang-meng/p/10226618.html 里面主要是從實(shí)現(xiàn)原理出發(fā)。

這次【分布式鎖】系列文章主要是深入redis客戶端reddision源碼和zk 這兩種分布式鎖的實(shí)現(xiàn)原理。

可靠性

首先,為了確保分布式鎖可用,我們至少要確保鎖的實(shí)現(xiàn)同時(shí)滿足以下四個(gè)條件:

  1. 互斥性。在任意時(shí)刻,只有一個(gè)客戶端能持有鎖。

  2. 不會(huì)發(fā)生死鎖。即使有一個(gè)客戶端在持有鎖的期間崩潰而沒有主動(dòng)解鎖,也能保證后續(xù)其他客戶端能加鎖。

  3. 具有容錯(cuò)性。只要大部分的Redis節(jié)點(diǎn)正常運(yùn)行,客戶端就可以加鎖和解鎖。

  4. 解鈴還須系鈴人。加鎖和解鎖必須是同一個(gè)客戶端,客戶端自己不能把別人加的鎖給解了。

Redisson加鎖原理

redisson是一個(gè)非常強(qiáng)大的開源的redis客戶端框架, 官方地址: https://redisson.org/

使用起來很簡(jiǎn)單,配置好maven和連接信息,這里直接看代碼實(shí)現(xiàn):

RLock lock = redisson.getLock("anyLock");

lock.lock();
lock.unlock();

redisson具體的執(zhí)行加鎖邏輯都是通過lua腳本來完成的,lua腳本能夠保證原子性。

先看下RLock初始化的代碼:

public class Redisson implements RedissonClient {

    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
}

public class RedissonLock extends RedissonExpirable implements RLock {
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}

首先看下RedissonLock 的id返回的是一個(gè)UUID對(duì)象,每個(gè)機(jī)器都對(duì)應(yīng)一個(gè)自己的id屬性,id 值就類似于:"8743c9c0-0795-4907-87fd-6c719a6b4586"

接著往后看lock()的代碼實(shí)現(xiàn):

public class RedissonLock extends RedissonExpirable implements RLock {
    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 獲取當(dāng)前線程id
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
}

這里省略了一些中間代碼,這里主要看tryAcquire() 方法,這里傳遞的過期時(shí)間為-1,然后就是當(dāng)前的線程id,接著就是核心的lua腳本執(zhí)行流程,我們來一步步看看是如何執(zhí)行的:

"if (redis.call('exists', KEYS[1]) == 0) then " +
  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +

KEYS[1] 參數(shù)是:“anyLock”
ARGV[2] 是:“id + ":" + threadId”

首先用的exists 判斷redis中是否存在當(dāng)前key,如果不存在就等于0,然后執(zhí)行hset指令,將“anyLock id:threadId 1”存儲(chǔ)到redis中,最終redis存儲(chǔ)的數(shù)據(jù)類似于:

{
  "8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
}

偷偷說一句,最后面的一個(gè)1 是為了后面可重入做的計(jì)數(shù)統(tǒng)計(jì),后面會(huì)有講解到。

接著往下看,然后使用pexpire設(shè)置過期時(shí)間,默認(rèn)使用internalLockLeaseTime為30s。最后返回為null,即時(shí)加鎖成功。

Redisson 可重入原理

我們看下鎖key存在的情況下,同一個(gè)機(jī)器同一個(gè)線程如何加鎖的?

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",

ARGV[2] 是:“id + ":" + threadId”
如果同一個(gè)機(jī)器同一個(gè)線程再次來請(qǐng)求,這里就會(huì)是1,然后執(zhí)行hincrby, hset設(shè)置的value+1 變成了2,然后繼續(xù)設(shè)置過期時(shí)間。

同理,一個(gè)線程重入后,解鎖時(shí)value - 1

Redisson watchDog原理

如果一個(gè)場(chǎng)景:現(xiàn)在有A,B在執(zhí)行業(yè)務(wù),A加了分布式鎖,但是生產(chǎn)環(huán)境是各種變化的,如果萬一A鎖超時(shí)了,但是A的業(yè)務(wù)還在跑。而這時(shí)由于A鎖超時(shí)釋放,B拿到鎖,B執(zhí)行業(yè)務(wù)邏輯。這樣分布式鎖就失去了意義?

所以Redisson 引入了watch dog的概念,當(dāng)A獲取到鎖執(zhí)行后,如果鎖沒過期,有個(gè)后臺(tái)線程會(huì)自動(dòng)延長(zhǎng)鎖的過期時(shí)間,防止因?yàn)闃I(yè)務(wù)沒有執(zhí)行完而鎖過期的情況。

我們接著來看看具體實(shí)現(xiàn):

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

當(dāng)我們tryLockInnerAsync執(zhí)行完之后,會(huì)添加一個(gè)監(jiān)聽器,看看監(jiān)聽器中的具體實(shí)現(xiàn):

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}

這里面調(diào)度任務(wù)每隔10s鐘執(zhí)行一次,lua腳本中是續(xù)約過期時(shí)間,使得當(dāng)前線程持有的鎖不會(huì)因?yàn)檫^期時(shí)間到了而失效

image

Redisson 互斥性原理
還是看上面執(zhí)行加鎖的lua腳本,最后會(huì)執(zhí)行到:

"return redis.call('pttl', KEYS[1]);",

返回鎖還有多久時(shí)間過期,我們繼續(xù)接著看代碼:

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 返回ttl說明加鎖成功,不為空則是加鎖失敗
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        // 死循環(huán)去嘗試獲取鎖
        while (true) {
            // 再次嘗試加鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 如果ttl=null說明搶占鎖成功
            if (ttl == null) {
                break;
            }

            // ttl 大于0,搶占鎖失敗,這個(gè)里面涉及到Semaphore,后續(xù)會(huì)講解
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

Redisson鎖釋放原理
直接看lua代碼:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        // 判斷鎖key值是否存在
        "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end;" +
        // 判斷當(dāng)前機(jī)器、當(dāng)前線程id對(duì)應(yīng)的key是否存在
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +
        // 計(jì)數(shù)器數(shù)量-1 可重入鎖
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        // 如果計(jì)數(shù)器大于0,說明還在持有鎖
        "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "else " +
            // 使用del指令刪除key
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
        "end; " +
        "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}

總結(jié)
一圖總結(jié):

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容