Redisson 分布式鎖實(shí)現(xiàn)分析

轉(zhuǎn)自:https://github.com/angryz/my-blog/issues/4

Why 分布式鎖

java.util.concurrent.locks 中包含了 JDK 提供的在多線程情況下對(duì)共享資源的訪問(wèn)控制的一系列工具,它們可以幫助我們解決進(jìn)程內(nèi)多線程并發(fā)時(shí)的數(shù)據(jù)一致性問(wèn)題。

但是在分布式系統(tǒng)中,JDK 原生的并發(fā)鎖工具在一些場(chǎng)景就無(wú)法滿(mǎn)足我們的要求了,這就是為什么要使用分布式鎖。我總結(jié)了一句話,分布式鎖是用于解決分布式系統(tǒng)中操作共享資源時(shí)的數(shù)據(jù)一致性問(wèn)題。

設(shè)計(jì)分布式鎖要注意的問(wèn)題

互斥

分布式系統(tǒng)中運(yùn)行著多個(gè)節(jié)點(diǎn),必須確保在同一時(shí)刻只能有一個(gè)節(jié)點(diǎn)的一個(gè)線程獲得鎖,這是最基本的一點(diǎn)。

死鎖

分布式系統(tǒng)中,可能產(chǎn)生死鎖的情況要相對(duì)復(fù)雜一些。分布式系統(tǒng)是處在復(fù)雜網(wǎng)絡(luò)環(huán)境中的,當(dāng)一個(gè)節(jié)點(diǎn)獲取到鎖,如果它在釋放鎖之前掛掉了,或者因網(wǎng)絡(luò)故障無(wú)法執(zhí)行釋放鎖的命令,都會(huì)導(dǎo)致其他節(jié)點(diǎn)無(wú)法申請(qǐng)到鎖。

因此分布式鎖有必要設(shè)置時(shí)效,確保在未來(lái)的一定時(shí)間內(nèi),無(wú)論獲得鎖的節(jié)點(diǎn)發(fā)生了什么問(wèn)題,最終鎖都能被釋放掉。

性能

對(duì)于訪問(wèn)量大的共享資源,如果針對(duì)其獲取鎖時(shí)造成長(zhǎng)時(shí)間的等待,導(dǎo)致大量節(jié)點(diǎn)阻塞,是絕對(duì)不能接受的。

所以設(shè)計(jì)分布式鎖時(shí)要能夠掌握鎖持有者的動(dòng)態(tài),若判斷鎖持有者處于不活動(dòng)狀態(tài),要能夠強(qiáng)制釋放其持有的鎖。
此外,排隊(duì)等待鎖的節(jié)點(diǎn)如果不知道鎖何時(shí)會(huì)被釋放,則只能隔一段時(shí)間嘗試獲取一次鎖,這樣無(wú)法保證資源的高效利用,因此當(dāng)鎖釋放時(shí),要能夠通知等待隊(duì)列,使一個(gè)等待節(jié)點(diǎn)能夠立刻獲得鎖。

重入

考慮到一些應(yīng)用場(chǎng)景和資源的高效利用,鎖要設(shè)計(jì)成可重入的,就像 JDK 中的 ReentrantLock 一樣,同一個(gè)線程可以重復(fù)拿到同一個(gè)資源的鎖。

RedissonLock 實(shí)現(xiàn)解讀

本文中 Redisson 的代碼版本為 2.2.17-SNAPSHOT。

這里以 lock() 方法為例,其他一系列方法與其核心實(shí)現(xiàn)基本一致。

先來(lái)看 lock() 的基本用法
RLock lock = redisson.getLock("foobar"); // 1.獲得鎖對(duì)象實(shí)例
lock.lock(); // 2.獲取分布式鎖
try {
    // do sth.
} finally {
    lock.unlock(); // 3.釋放鎖
}
  1. 通過(guò) RedissonClient 的 getLock() 方法取得一個(gè) RLock 實(shí)例。
  2. lock() 方法嘗試獲取鎖,如果成功獲得鎖,則繼續(xù)往下執(zhí)行,否則等待鎖被釋放,然后再繼續(xù)嘗試獲取鎖,直到成功獲得鎖。
  3. unlock() 方法釋放獲得的鎖,并通知等待的節(jié)點(diǎn)鎖已釋放。
下面來(lái)看看 RedissonLock 的具體實(shí)現(xiàn)
org.redisson.Redisson#getLock()
@Override
public RLock getLock(String name) {
  return new RedissonLock(commandExecutor, name, id);
}

這里的 RLock 是繼承自 java.util.concurrent.locks.Lock 的一個(gè) interface,getLock 返回的實(shí)際上是其實(shí)現(xiàn)類(lèi) RedissonLock 的實(shí)例。

來(lái)看看構(gòu)造 RedissonLock 的參數(shù)

  • commandExecutor: 與 Redis 節(jié)點(diǎn)通信并發(fā)送指令的真正實(shí)現(xiàn)。需要說(shuō)明一下,Redisson 缺省的 CommandExecutor 實(shí)現(xiàn)是通過(guò) eval 命令來(lái)執(zhí)行 Lua 腳本,所以要求 Redis 的版本必須為 2.6 或以上,否則你可能要自己來(lái)實(shí)現(xiàn) CommandExecutor。關(guān)于 Redisson 的 CommandExecutor 以后會(huì)專(zhuān)門(mén)解讀,所以本次就不多說(shuō)了。
  • name: 鎖的全局名稱(chēng),例如上面代碼中的 "foobar",具體業(yè)務(wù)中通??赡苁褂霉蚕碣Y源的唯一標(biāo)識(shí)作為該名稱(chēng)。
  • id: Redisson 客戶(hù)端唯一標(biāo)識(shí),實(shí)際上就是一個(gè) UUID.randomUUID()
org.redisson.RedissonLock#lock()

此處略過(guò)前面幾個(gè)方法的層層調(diào)用,直接看最核心部分的方法 lockInterruptibly(),該方法在 RLock 中聲明,支持對(duì)獲取鎖的線程進(jìn)行中斷操作。在直接使用 lock() 方法獲取鎖時(shí),最后實(shí)際執(zhí)行的是 lockInterruptibly(-1, null)。

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 1.嘗試獲取鎖
    Long ttl = tryAcquire(leaseTime, unit);
    // 2.獲得鎖成功
    if (ttl == null) {
        return;
    }
    // 3.等待鎖釋放,并訂閱鎖
    long threadId = Thread.currentThread().getId();
    Future<RedissonLockEntry> future = subscribe(threadId);
    get(future);

    try {
        while (true) {
            // 4.重試獲取鎖
            ttl = tryAcquire(leaseTime, unit);
            // 5.成功獲得鎖
            if (ttl == null) {
                break;
            }
            // 6.等待鎖釋放
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 7.取消訂閱
        unsubscribe(future, threadId);
    }
}
  1. 首先嘗試獲取鎖,具體代碼下面再看,返回結(jié)果是已存在的鎖的剩余存活時(shí)間,為 null 則說(shuō)明沒(méi)有已存在的鎖并成功獲得鎖。
  2. 如果獲得鎖則結(jié)束流程,回去執(zhí)行業(yè)務(wù)邏輯。
  3. 如果沒(méi)有獲得鎖,則需等待鎖被釋放,并通過(guò) Redis 的 channel 訂閱鎖釋放的消息,這里的具體實(shí)現(xiàn)本文也不深入,只是簡(jiǎn)單提一下 Redisson 在執(zhí)行 Redis 命令時(shí)提供了同步異步的兩種實(shí)現(xiàn),但實(shí)際上同步的實(shí)現(xiàn)都是基于異步的,具體做法是使用 Netty 中的異步工具 FutureFutureListener 結(jié)合 JDK 中的 CountDownLatch 一起實(shí)現(xiàn)。
  4. 訂閱鎖的釋放消息成功后,進(jìn)入一個(gè)不斷重試獲取鎖的循環(huán),循環(huán)中每次都先試著獲取鎖,并得到已存在的鎖的剩余存活時(shí)間。
  5. 如果在重試中拿到了鎖,則結(jié)束循環(huán),跳過(guò)第 6 步。
  6. 如果鎖當(dāng)前是被占用的,那么等待釋放鎖的消息,具體實(shí)現(xiàn)使用了 JDK 并發(fā)的信號(hào)量工具 Semaphore 來(lái)阻塞線程,當(dāng)鎖釋放并發(fā)布釋放鎖的消息后,信號(hào)量的 release() 方法會(huì)被調(diào)用,此時(shí)被信號(hào)量阻塞的等待隊(duì)列中的一個(gè)線程就可以繼續(xù)嘗試獲取鎖了。
  7. 在成功獲得鎖后,就沒(méi)必要繼續(xù)訂閱鎖的釋放消息了,因此要取消對(duì) Redis 上相應(yīng) channel 的訂閱。

下面著重看看 tryAcquire() 方法的實(shí)現(xiàn),

private Long tryAcquire(long leaseTime, TimeUnit unit) {
    // 1.將異步執(zhí)行的結(jié)果以同步的形式返回
    return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}

private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 2.用默認(rèn)的鎖超時(shí)時(shí)間去獲取鎖
    Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
                TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // 成功獲得鎖
            if (ttlRemaining == null) {
                // 3.鎖過(guò)期時(shí)間刷新任務(wù)調(diào)度
                scheduleExpirationRenewal();
            }
        }
    });
    return ttlRemainingFuture;
}

<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
                RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 4.使用 EVAL 命令執(zhí)行 Lua 腳本獲取鎖
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime,
                        getLockName(threadId));
}
  1. 上面說(shuō)過(guò) Redisson 實(shí)現(xiàn)的執(zhí)行 Redis 命令都是異步的,但是它在異步的基礎(chǔ)上提供了以同步的方式獲得執(zhí)行結(jié)果的封裝
  2. 前面提到分布式鎖要確保未來(lái)的一段時(shí)間內(nèi)鎖一定能夠被釋放,因此要對(duì)鎖設(shè)置超時(shí)釋放的時(shí)間,在我們沒(méi)有指定該時(shí)間的情況下,Redisson 默認(rèn)指定為30秒。
  3. 在成功獲取到鎖的情況下,為了避免業(yè)務(wù)中對(duì)共享資源的操作還未完成,鎖就被釋放掉了,需要定期(鎖失效時(shí)間的三分之一)刷新鎖失效的時(shí)間,這里 Redisson 使用了 Netty 的 TimerTask、Timeout 工具來(lái)實(shí)現(xiàn)該任務(wù)調(diào)度。
  4. 獲取鎖真正執(zhí)行的命令,Redisson 使用 EVAL 命令執(zhí)行上面的 Lua 腳本來(lái)完成獲取鎖的操作:
    1. 如果通過(guò) exists 命令發(fā)現(xiàn)當(dāng)前 key 不存在,即鎖沒(méi)被占用,則執(zhí)行 hset 寫(xiě)入 Hash 類(lèi)型數(shù)據(jù) key:全局鎖名稱(chēng)(例如共享資源ID), field:鎖實(shí)例名稱(chēng)(Redisson客戶(hù)端ID:線程ID), value:1,并執(zhí)行 pexpire 對(duì)該 key 設(shè)置失效時(shí)間,返回空值 nil,至此獲取鎖成功。
    2. 如果通過(guò) hexists 命令發(fā)現(xiàn) Redis 中已經(jīng)存在當(dāng)前 key 和 field 的 Hash 數(shù)據(jù),說(shuō)明當(dāng)前線程之前已經(jīng)獲取到鎖,因?yàn)檫@里的鎖是可重入的,則執(zhí)行 hincrby 對(duì)當(dāng)前 key field 的值加一,并重新設(shè)置失效時(shí)間,返回空值,至此重入獲取鎖成功。
    3. 最后是鎖已被占用的情況,即當(dāng)前 key 已經(jīng)存在,但是 Hash 中的 Field 與當(dāng)前值不同,則執(zhí)行 pttl 獲取鎖的剩余存活時(shí)間并返回,至此獲取鎖失敗。

以上就是對(duì) lock() 的解讀,不過(guò)在實(shí)際業(yè)務(wù)中我們可能還會(huì)經(jīng)常使用 tryLock(),雖然兩者有一定差別,但核心部分的實(shí)現(xiàn)都是相同的,另外還有其他一些方法可以支持更多自定義參數(shù),本文中就不一一詳述了。

org.redisson.RedissonLock#unlock()

最后來(lái)看鎖的釋放,

@Override
public void unlock() {
    // 1.通過(guò) EVAL 和 Lua 腳本執(zhí)行 Redis 命令釋放鎖
    Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                    RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), 
                            LockPubSub.unlockMessage, internalLockLeaseTime, 
                            getLockName(Thread.currentThread().getId()));
    // 2.非鎖的持有者釋放鎖時(shí)拋出異常
    if (opStatus == null) {
        throw new IllegalMonitorStateException(
                "attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    // 3.釋放鎖后取消刷新鎖失效時(shí)間的調(diào)度任務(wù)
    if (opStatus) {
        cancelExpirationRenewal();
    }
}
  1. 使用 EVAL 命令執(zhí)行 Lua 腳本來(lái)釋放鎖:
    1. key 不存在,說(shuō)明鎖已釋放,直接執(zhí)行 publish 命令發(fā)布釋放鎖消息并返回 1
    2. key 存在,但是 field 在 Hash 中不存在,說(shuō)明自己不是鎖持有者,無(wú)權(quán)釋放鎖,返回 nil。
    3. 因?yàn)殒i可重入,所以釋放鎖時(shí)不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執(zhí)行 hincrby 對(duì)鎖的值減一
    4. 釋放一把鎖后,如果還有剩余的鎖,則刷新鎖的失效時(shí)間并返回 0;如果剛才釋放的已經(jīng)是最后一把鎖,則執(zhí)行 del 命令刪除鎖的 key,并發(fā)布鎖釋放消息,返回 1。
  2. 上面執(zhí)行結(jié)果返回 nil 的情況(即第2中情況),因?yàn)樽约翰皇擎i的持有者,不允許釋放別人的鎖,故拋出異常。
  3. 執(zhí)行結(jié)果返回 1 的情況,該鎖的所有實(shí)例都已全部釋放,所以不需要再刷新鎖的失效時(shí)間。

總結(jié)

寫(xiě)了這么多,其實(shí)最主要的就是上面的兩段 Lua 腳本,基于 Redis 的分布式鎖的設(shè)計(jì)完全體現(xiàn)在其中,看完這兩段腳本,再回顧一下前面的 設(shè)計(jì)分布式鎖要注意的問(wèn)題 就豁然開(kāi)朗了。


redisson是redis官網(wǎng)推薦的java語(yǔ)言實(shí)現(xiàn)分布式鎖的項(xiàng)目。當(dāng)然,redisson遠(yuǎn)不止分布式鎖,還包括其他一些分布式結(jié)構(gòu)。詳情請(qǐng)移步:https://github.com/mrniko/redisson/wiki

redisson支持4種鏈接redis的方式:

Cluster(集群)

Sentinel servers(哨兵)

Master/Slave servers(主從)

Single server(單機(jī))

分布式鎖之redisson

redisson是redis官網(wǎng)推薦的java語(yǔ)言實(shí)現(xiàn)分布式鎖的項(xiàng)目。當(dāng)然,redisson遠(yuǎn)不止分布式鎖,還包括其他一些分布式結(jié)構(gòu)。詳情請(qǐng)移步:https://github.com/mrniko/redisson/wiki

redisson支持4種鏈接redis的方式:

Cluster(集群)

Sentinel servers(哨兵)

Master/Slave servers(主從)

Single server(單機(jī))

下面通過(guò)簡(jiǎn)單的案例使用redisson的lock。

1、RedissonManager類(lèi),管理redisson的初始化等操作。

public class RedissonManager {

    private static final String RAtomicName = "genId_";

    private static Config config = new Config();
    private static Redisson redisson = null;

    public static void init(){
        try {
            config.useClusterServers() //這是用的集群server
                    .setScanInterval(2000) //設(shè)置集群狀態(tài)掃描時(shí)間
                    .setMasterConnectionPoolSize(10000) //設(shè)置連接數(shù)
                    .setSlaveConnectionPoolSize(10000)
                    .addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");
            redisson = Redisson.create(config);
            //清空自增的ID數(shù)字
            RAtomicLong atomicLong = redisson.getAtomicLong(RAtomicName);
            atomicLong.set(1);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static Redisson getRedisson(){
        return redisson;
    }

    /** 獲取redis中的原子ID */
    public static Long nextID(){
        RAtomicLong atomicLong = getRedisson().getAtomicLong(RAtomicName);
        atomicLong.incrementAndGet();
        return atomicLong.get();
    }
}

2、DistributedRedisLock類(lèi),提供鎖和解鎖方法

public class DistributedRedisLock {
    private static Redisson redisson = RedissonManager.getRedisson();
    private static final String LOCK_TITLE = "redisLock_";

    public static void acquire(String lockName){
        String key = LOCK_TITLE + lockName;
        RLock mylock = redisson.getLock(key);
        mylock.lock(2, TimeUnit.MINUTES); //lock提供帶timeout參數(shù),timeout結(jié)束強(qiáng)制解鎖,防止死鎖
        System.err.println("======lock======"+Thread.currentThread().getName());
    }

    public static void release(String lockName){
        String key = LOCK_TITLE + lockName;
        RLock mylock = redisson.getLock(key);
        mylock.unlock();
        System.err.println("======unlock======"+Thread.currentThread().getName());
    }
}

3、測(cè)試

    private static void redisLock(){
        RedissonManager.init(); //初始化
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        String key = "test123";
                        DistributedRedisLock.acquire(key);
                        Thread.sleep(1000); //獲得鎖之后可以進(jìn)行相應(yīng)的處理
                        System.err.println("======獲得鎖后進(jìn)行相應(yīng)的操作======");
                        DistributedRedisLock.release(key);
                        System.err.println("=============================");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }

4、測(cè)試結(jié)果

======lock======Thread-91
======獲得鎖后進(jìn)行相應(yīng)的操作======
======unlock======Thread-91
=============================
======lock======Thread-63
======獲得鎖后進(jìn)行相應(yīng)的操作======
======unlock======Thread-63
=============================
======lock======Thread-31
======獲得鎖后進(jìn)行相應(yīng)的操作======
======unlock======Thread-31
=============================
======lock======Thread-97
======獲得鎖后進(jìn)行相應(yīng)的操作======
======unlock======Thread-97
=============================
======lock======Thread-8
======獲得鎖后進(jìn)行相應(yīng)的操作======
======unlock======Thread-8
=============================

從測(cè)試結(jié)果可以看出,結(jié)果還是達(dá)到了預(yù)期,在服務(wù)器跑一萬(wàn)個(gè)線程還是能很好運(yùn)行,感覺(jué)還不錯(cuò)。

題外話:

在初始化數(shù)據(jù)時(shí)候,最好不要使用static{} 即靜態(tài)塊。因?yàn)樵诙嗪藱C(jī)器的情況下讀取配置文件,會(huì)拋出java.lang.NoClassDefFoundError: Could not initialize class XXX。

所以最好還是使用init的方式,在啟動(dòng)程序的時(shí)候手動(dòng)執(zhí)行下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 最近碰到幾個(gè)業(yè)務(wù)場(chǎng)景,會(huì)遇到并發(fā)的問(wèn)題。在單實(shí)例情況下,我們會(huì)通過(guò)java.util.concurrent包...
    菜鳥(niǎo)小玄閱讀 2,313評(píng)論 0 5
  • 實(shí)現(xiàn)分布式鎖目前有三種流行方案,分別為基于數(shù)據(jù)庫(kù)、Redis、Zookeeper的方案,本文主要闡述基于Zooke...
    程序員技術(shù)圈閱讀 956評(píng)論 0 5
  • [TOC] 分布式鎖實(shí)現(xiàn)匯總 很多時(shí)候我們需要保證同一時(shí)間一個(gè)方法只能被同一個(gè)線程調(diào)用,在單機(jī)環(huán)境中,Java中其...
    Wang_Coder閱讀 1,458評(píng)論 0 49
  • 這是我初次接觸“冥想”。葉老師表示,冥想跟宗教無(wú)關(guān)。但作為基督徒,謹(jǐn)慎起見(jiàn),我還是咨詢(xún)了一下教會(huì)的師母,也特地谷歌...
    熙沐2017閱讀 1,859評(píng)論 0 0
  • 夜深人靜心難安, 心思牽掛萬(wàn)物靜。 風(fēng)起云涌惹人想, 萬(wàn)花叢中情相連。
    簡(jiǎn)愛(ài)力仁閱讀 57評(píng)論 0 1

友情鏈接更多精彩內(nèi)容