Redis分布式鎖使用總結(jié)

Redis分布式鎖使用總結(jié)

前言

最近因?yàn)轫?xiàng)目需要進(jìn)行多實(shí)例的協(xié)調(diào),使用到了分布式鎖,所以對分布式鎖的原理、使用等做了一番調(diào)查、學(xué)習(xí),順便將其記錄下來,供需要的同學(xué)學(xué)習(xí)交流。

項(xiàng)目中使用的是基于Redis的分布式鎖,所以這篇文件的內(nèi)容都是是基于Redis分布式鎖。

分布式鎖簡介

談起編程語言中的鎖,開發(fā)者應(yīng)該是相當(dāng)熟悉的,當(dāng)系統(tǒng)中存在多線程并且多線程之間存在競態(tài)條件或者需要協(xié)作的時(shí)候,我們就會(huì)使用到鎖,如Java中的Lock、Synchronized等,但是編程語言中提供的鎖,基本上都只適用于在同一個(gè)機(jī)器上運(yùn)行的情況,在分布式環(huán)境下并不適用。

而在某些情況下,我們是需要在多個(gè)機(jī)器實(shí)例/節(jié)點(diǎn)之間進(jìn)行協(xié)作的,這個(gè)時(shí)候,就需要使用到分布式鎖了。

顧名思義,分布式鎖就是應(yīng)用于在分布式環(huán)境下多個(gè)節(jié)點(diǎn)之間進(jìn)行同步或者協(xié)作的鎖

分布式鎖同普通的鎖一樣,具有以下幾個(gè)重要特性

  • 互斥性,保證只有持有鎖的實(shí)例中的某個(gè)線程才能進(jìn)行操作
  • 可重入性,同一個(gè)實(shí)例的同一個(gè)線程可以多次獲取鎖
  • 鎖超時(shí),支持超時(shí)自動(dòng)釋放鎖,避免死鎖的產(chǎn)生
  • 誰加的鎖只能由誰釋放

Redis分布式鎖原理

由于Redis的命令本身是原子性的,所以,非常適合于作為分布式鎖的協(xié)調(diào)者。

一般情況下,為了保證鎖的釋放只能由加鎖者或者超時(shí)釋放,一般我們會(huì)將對應(yīng)鍵的值設(shè)置為一個(gè)線程唯一標(biāo)志,如為每個(gè)線程生成一個(gè)UUID,只有當(dāng)線程的UUID與鎖的值一致時(shí),才能釋放鎖。

利用Redis來實(shí)現(xiàn)分布式的原理非常簡單,加鎖的時(shí)候?yàn)槟硞€(gè)鍵設(shè)置值,釋放的時(shí)候?qū)?yīng)的鍵刪除即可。

不過在使用的時(shí)候,有一些需要注意的地方,下面我們詳細(xì)看下基于Redis不同命令來實(shí)現(xiàn)分布式鎖的操作

setnx命令

在Redis2.6之前,常用于分布式鎖的命令是:setnx key val,該命令在對應(yīng)的鍵沒有值的時(shí)候設(shè)置成功,存在值的時(shí)候設(shè)置失敗,保證了同時(shí)只會(huì)有一個(gè)連接者設(shè)置成功,也即保證同時(shí)只會(huì)有一個(gè)實(shí)例的一個(gè)線程獲取成功。

但是該命令存在一個(gè)缺陷,不支持超時(shí)機(jī)制,所以需要額外的命令來保證能夠在超時(shí)的情況下釋放鎖,也就是刪除鍵,可以配合expire命令來實(shí)現(xiàn)。

由于上述操作涉及到兩個(gè)命令,所以最好的方式是通過lua腳本來實(shí)現(xiàn)加鎖的操作,如下所示

# KEYS[1]是鎖的名稱,KEYS[2]是鎖的值,KEYS[3]是鎖的超時(shí)時(shí)間
local c = redis.call('setnx', KEYS[1], KEYS[2])
if(c == 1) then
   redis.call('expire', KEYS[1], KEYS[3])
end
return c

釋放鎖的時(shí)候,需要驗(yàn)證釋放鎖的是不是鎖的持有者,具體代碼如下

# KEYS[1]是鎖的名稱,KEYS[2]是鎖的值
if redis.call('get', KEYS[1]) == KEYS[2] then 
    return redis.call('del', KEYS[1]) 
else return 0 
end

set命令

從上面的setnx命令可以看到,加鎖的操作還是比較麻煩的,所以,在Redis2.6之后,redis的set命令進(jìn)行了增強(qiáng),設(shè)置值的時(shí)候,同時(shí)支持設(shè)置過期時(shí)間

# nx表示不存在的時(shí)候設(shè)置,ex表示設(shè)置過期時(shí)間,單位是秒
set LOCK VAL nx ex 15

可以看到,通過該命令,進(jìn)行加鎖就方便很多了

釋放鎖的操作同setnx里提到的釋放操作

Redis分布式鎖實(shí)現(xiàn)

上面我們提到的是Redis分布式鎖的實(shí)現(xiàn)原理,不過,每次需要用到鎖的時(shí)候都需要自己手動(dòng)實(shí)現(xiàn)一次,雖然代碼本身沒有多少,其實(shí)也不是很方便。

正因?yàn)槿绱耍型Χ嗟捻?xiàng)目都實(shí)現(xiàn)了分布式,并且提供了更加豐富的功能,如下面討論到的RedisLockRegistry

RedisLockRegistry

Spring-integration項(xiàng)目是Spring官方提供了集成各種工具的項(xiàng)目,通過integration-redis子項(xiàng)目,提供了非常豐富的功能,關(guān)于該項(xiàng)目,后面有時(shí)間再寫篇文章具體分析一下,這里我們用到其中的一個(gè)組件RedisLockRegistry

導(dǎo)入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
</dependency>

配置RedisLockRegistry

@Configuration
public class RedisLockConfiguration {

    @Bean
    public RedisLockRegistry redisLockRegistry(
        RedisConnectionFactory redisConnectionFactory) {
        // 注意這里的時(shí)間單位是毫秒
        return new RedisLockRegistry(redisConnectionFactory, "registryKey", TIME);
    }
}

RedisLockRegistry相當(dāng)于一個(gè)鎖的管理倉庫,所有的鎖都可以從該倉庫獲取,所有鎖的鍵名為:registryKey:LOCK_NAME,默認(rèn)時(shí)間為60s

配置完鎖的倉庫之后,只需要注入倉庫,當(dāng)需要使用到鎖的時(shí)候,從倉庫中獲取一個(gè)鎖就可以了,如下所示

Lock lock = redisLockRegistry.obtain("redis-lock");

該操作返回一個(gè)Lock對象,該對象其實(shí)是Spring實(shí)現(xiàn)的基于Redis的鎖,該鎖支持了豐富的功能,如tryLock

但使用的時(shí)候,只需要跟普通的鎖一樣操作即可

// lock.tryLock(10, TimeUnit.SECONDS);
lock.lock();
try {
 // ops   
}catch(Exception e) {
    
}finally {
    // 釋放鎖
    lock.unlock();
}

可以看到,通過RedisLockRegistry,我們可以更加方便地使用Redis分布式鎖了

RedisLockRegistry源碼分析

上面學(xué)習(xí)了RedisLockRegistry的使用之后,接下來我們來具體看下RedisLockRegistry的具體實(shí)現(xiàn)

RedisLockRegistry結(jié)構(gòu)

從上面的繼承結(jié)構(gòu)可以清晰地看出RedisLockRegistry的繼承情況,而上面的幾個(gè)接口基本上都只提供了基本的定義,這里就不展開分析了。直接看RedisLockRegistry的實(shí)現(xiàn)

構(gòu)造函數(shù)

首先是構(gòu)造函數(shù),有兩個(gè)構(gòu)造函數(shù),如下

private static final long DEFAULT_EXPIRE_AFTER = 60000L;

// 提供了默認(rèn)的的過期時(shí)間,默認(rèn)過期時(shí)間為60s
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
    this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}

public RedisLockRegistry(RedisConnectionFactory connectionFactory, 
                         String registryKey, 
                         long expireAfter) {
    Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
    Assert.notNull(registryKey, "'registryKey' cannot be null");
    this.redisTemplate = new StringRedisTemplate(connectionFactory);
    this.obtainLockScript = 
        new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
    this.registryKey = registryKey;
    this.expireAfter = expireAfter;
    this.unlinkAvailable = RedisUtils.isUnlinkAvailable(this.redisTemplate);
}

上面第二個(gè)構(gòu)造函數(shù)中,有兩個(gè)沒見過的屬性,分別是obtainLockScript以及unlinkAvailable,分析如下

obtainLockScript

private final RedisScript<Boolean> obtainLockScript;

obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);

可以看到obtainLockScript是一個(gè)DefaultRedisScript實(shí)例,該實(shí)例的對象用于執(zhí)行Lua腳本,具體的看下DefaultRedisScript的源碼

上面的OBTAIN_LOCK_SCRIPT內(nèi)容如下

private static final String OBTAIN_LOCK_SCRIPT =
            "local lockClientId = redis.call('GET', KEYS[1])\n" +
                    "if lockClientId == ARGV[1] then\n" +
                    "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                    "  return true\n" +
                    "elseif not lockClientId then\n" +
                    "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
                    "  return true\n" +
                    "end\n" +
                    "return false";

可以看到,其實(shí)就是一段簡單的Lua腳本,腳本邏輯如下

  1. 調(diào)用get命令獲取對應(yīng)的key,如果存在在走2,不存在,則走3
  2. 判斷key的值是否是入?yún)?,如果是,則調(diào)用pexire設(shè)置過期時(shí)間,返回true表示加鎖成功
  3. 如果不存在,則調(diào)用set命令進(jìn)行加鎖,并且設(shè)置過期時(shí)間,返回true表示加鎖成功,從命令中可以看到,使用的參是px,所以構(gòu)造函數(shù)傳入的單位是毫秒而不是秒
  4. 如果沒有執(zhí)行2、3操作,則返回false,表示加鎖失敗

isUnlinkAvailable

該函數(shù)檢查對應(yīng)的redis是否支持UNLINK命令,該命令用于異步刪除某個(gè)鍵,功能等同于del命令,但非阻塞,只有在redis4及以上版本才支持

函數(shù)內(nèi)容如下:

public static boolean isUnlinkAvailable(RedisOperations<?, ?> redisOperations) {
        return unlinkAvailable.computeIfAbsent(
            redisOperations, key -> {
                Properties info = redisOperations.execute(
                    (RedisCallback<Properties>) connection -> 
                    connection.serverCommands().info(SECTION));
            if (info != null) {
                String version = info.getProperty(VERSION_PROPERTY);
                if (StringUtils.hasText(version)) {
                    int majorVersion = Integer.parseInt(version.split("\\.")[0]);
                    return majorVersion >= 4;
                }
                else {
                    return false;
                }
            }
            else {
                throw new IllegalStateException("The INFO command cannot be used in pipeline/transaction.");
            }
        });
    }

核心

RedisLockRegistry的核心方法其實(shí)只有一個(gè),就是obtainLock,具體實(shí)現(xiàn)如下

private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();

@Override
public Lock obtain(Object lockKey) {
    Assert.isInstanceOf(String.class, lockKey);
    String path = (String) lockKey;
    return this.locks.computeIfAbsent(path, RedisLock::new);
}

可以看到,每一個(gè)LockRegistry自己維護(hù)了一個(gè)LOCK-KEY-LOCK的map,這也表明,同一個(gè)Registry中,相同的鍵只會(huì)對應(yīng)一個(gè)Lock對象

RedisLock

從上面的分析中可以看到,LockRegistry維護(hù)了一個(gè)RedisLock對象的Map,鍵是鎖的名稱,值是對應(yīng)的Lock對象,該對象是Spring實(shí)現(xiàn)的一個(gè)內(nèi)部類,具體實(shí)現(xiàn)如下所示

構(gòu)造方法

private RedisLock(String path) {
    this.lockKey = constructLockKey(path);
}

RedisLock有且只有一個(gè)私有構(gòu)造方法,所以僅能在當(dāng)前類中進(jìn)行構(gòu)造,這也意味著我們無法自己實(shí)例化RedisLock實(shí)例

構(gòu)造的過程非常簡單,只是初始化了lockKey,lockKey的內(nèi)容如下

private String constructLockKey(String path) {
    return RedisLockRegistry.this.registryKey + ":" + path;
}

可以看到,lockKey的值其實(shí)就是Registry的名稱 + : + 鎖的名稱

核心方法

對于一把鎖而言,最最核心的方法莫過于加鎖和解鎖了,RedisLock實(shí)現(xiàn)了Lock接口,提供了多樣的加鎖方式,分別如下所示

不可中斷鎖
private final ReentrantLock localLock = new ReentrantLock();

@Override
public void lock() {
    this.localLock.lock();
    while (true) {
        try {
            while (!obtainLock()) {
                Thread.sleep(100); //NOSONAR
            }
            break;
        }
        catch (InterruptedException e) {
            // 不可中斷,所以忽略中斷異常
        }
        catch (Exception e) {
            this.localLock.unlock();
            rethrowAsLockException(e);
        }
    }
}

從上面的代碼可以看到,lock方法首先嘗試獲取ReentrantLock,如果獲取成功,才嘗試去獲取分布式鎖,獲取localLock的目的在于,如果本地有多個(gè)線程在競爭該鎖,則只有獲取到本地的鎖的線程才能嘗試去獲取分布式鎖,好處在于,減少了不必要的網(wǎng)絡(luò)開銷,提高性能

由于lock方法明確規(guī)定,如果獲取不到鎖,則進(jìn)行阻塞,直至獲取到鎖或者出現(xiàn)異常,所以上面每隔100毫秒會(huì)去嘗試獲取鎖,直到獲取成功或者拋出異常為止

獲取鎖的代碼也非常簡單,如下所示

// 實(shí)例化Registry的時(shí)候進(jìn)行初始化
private final String clientId = UUID.randomUUID().toString();

private boolean obtainLock() {
    Boolean success =
      RedisLockRegistry.this.redisTemplate.execute(
        // 獲取鎖的lua腳本
        RedisLockRegistry.this.obtainLockScript,
        // 獲取的鎖名稱
        Collections.singletonList(this.lockKey), 
        // 鎖的內(nèi)容
        RedisLockRegistry.this.clientId,
        // 鎖的過期時(shí)間
        String.valueOf(RedisLockRegistry.this.expireAfter));

    boolean result = Boolean.TRUE.equals(success);
    
    // 如果獲取成功,則記錄鎖的時(shí)間
    if (result) {
        this.lockedAt = System.currentTimeMillis();
    }
    return result;
}

從上面獲取鎖的代碼可以看到,每一個(gè)LockRegistry實(shí)例只會(huì)有一個(gè)值,該值在Registry實(shí)例化的時(shí)候通過UUID生成,一個(gè)實(shí)例內(nèi)的多個(gè)線程之間的競爭直接通過ReentrantLock進(jìn)行,不涉及到Redis相關(guān)的操作。

可中斷鎖
@Override
public void lockInterruptibly() throws InterruptedException {
    this.localLock.lockInterruptibly();
    try {
        while (!obtainLock()) {
            Thread.sleep(100); //NOSONAR
        }
    }
    catch (InterruptedException ie) {
        // 釋放鎖,并且響應(yīng)中斷信號
        this.localLock.unlock();
        Thread.currentThread().interrupt();
        throw ie;
    }
    catch (Exception e) {
        this.localLock.unlock();
        rethrowAsLockException(e);
    }
}

看懂了lock的代碼,再來看lockInterruptibly就非常簡單了,lock不響應(yīng)中斷信號,則lockInterruptibly則相應(yīng)中斷信號,因此,獲取鎖的過程如果出現(xiàn)中斷,則結(jié)束獲取操作了

嘗試獲取鎖

嘗試獲取鎖以為著如果能獲取鎖,則獲取,如果不能獲取,則結(jié)束,當(dāng)然,可以附帶等待是時(shí)間,有兩個(gè)版本的tryLock,如下

@Override
public boolean tryLock() {
    try {
        // 調(diào)用另一個(gè)tryLock,并且將時(shí)間設(shè)置為0
        return tryLock(0, TimeUnit.MILLISECONDS);
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return false;
    }
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    long now = System.currentTimeMillis();
    // 先嘗試獲取本地鎖,如果在指定時(shí)間內(nèi)無法獲取到本地鎖,則放棄
    if (!this.localLock.tryLock(time, unit)) {
        return false;
    }
    try {
        
        // 記錄獲取鎖到期時(shí)間
        long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
        boolean acquired;
        
        // 如果獲取不到鎖,并且時(shí)間還有剩余,則先休眠100毫秒,然后繼續(xù)嘗試獲取
        while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { 
            Thread.sleep(100); //NOSONAR
        }
        // 到這里表示獲取鎖超時(shí)
        // 如果無法獲取到分布式鎖,則釋放本地鎖
        if (!acquired) {
            this.localLock.unlock();
        }
        return acquired;
    }
    catch (Exception e) {
        this.localLock.unlock();
        rethrowAsLockException(e);
    }
    return false;
}

具體的分析都詳細(xì)寫在注釋里了,補(bǔ)充一點(diǎn)就是,從tryLock的實(shí)現(xiàn)中可以看到,tryLock本身是響應(yīng)中斷的,與接口的定義一致

釋放鎖
// 判斷鎖的所有者是否是當(dāng)前實(shí)例
public boolean isAcquiredInThisProcess() {
    return RedisLockRegistry.this.clientId.equals(
        RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey).get());
}

// 刪除對應(yīng)的鍵,也即釋放分布式鎖
private void removeLockKey() {
    if (this.unlinkAvailable) {
        RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
    }
    else {
        RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
    }
}

@Override
public void unlock() {
    // 如果嘗試釋放的不是本線程加的鎖,則拋出異常
    if (!this.localLock.isHeldByCurrentThread()) {
        throw new IllegalStateException("You do not own lock at " + this.lockKey);
    }
    // 當(dāng)前線程持有的鎖的數(shù)量,即重入的次數(shù)
    // 如果此時(shí) > 1,表示當(dāng)前線程有多次獲取鎖,釋放的時(shí)候只減少本地鎖的次數(shù)
    // 此時(shí)其他的方法還持有鎖,不能釋放分布式鎖
    if (this.localLock.getHoldCount() > 1) {
        this.localLock.unlock();
        return;
    }
    try {
        // 此時(shí)分布式鎖已經(jīng)由于超時(shí)被釋放了,拋出異常
        if (!isAcquiredInThisProcess()) {
            throw new IllegalStateException("Lock was released in the store due to expiration. " +
 "The integrity of data protected by this lock may have been compromised.");
        }
        
        // 如果收到中斷信號,則異步釋放鎖
        // 盡快響應(yīng)中斷...
        if (Thread.currentThread().isInterrupted()) {
            RedisLockRegistry.this.executor.execute(this::removeLockKey);
        }
        else {
            removeLockKey();
        }

        if (logger.isDebugEnabled()) {
            logger.debug("Released lock; " + this);
        }
    }
    catch (Exception e) {
        ReflectionUtils.rethrowRuntimeException(e);
    }
    finally {
        this.localLock.unlock();
    }
}

總結(jié)

本文主要簡單介紹了分布式鎖,在Redis中使用分布式鎖的原理,本質(zhì)就是set或者setnx命令的使用,以及對應(yīng)版本的加鎖以及解鎖操作。

最后分析了RedisLockRegistry的具體實(shí)現(xiàn),RedisLockRegistry是Spring提供的基于Redis的分布式鎖的實(shí)現(xiàn),主要包含兩部分,一部分是本地鎖,用于一個(gè)實(shí)例下多個(gè)線程的協(xié)調(diào),只有獲取到本地鎖的線程才去嘗試獲取分布式鎖,通過這種方式來提高獲取鎖的性能

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容