Redis分布式鎖使用總結(jié)
前言
最近因?yàn)轫?xiàng)目需要進(jìn)行多實(shí)例的協(xié)調(diào),使用到了分布式鎖,所以對分布式鎖的原理、使用等做了一番調(diào)查、學(xué)習(xí),順便將其記錄下來,供需要的同學(xué)學(xué)習(xí)交流。
項(xiàng)目中使用的是基于Redis的分布式鎖,所以這篇文件的內(nèi)容都是是基于Redis分布式鎖。
分布式鎖簡介
談起編程語言中的鎖,開發(fā)者應(yīng)該是相當(dāng)熟悉的,當(dāng)系統(tǒng)中存在多線程并且多線程之間存在競態(tài)條件或者需要協(xié)作的時(shí)候,我們就會(huì)使用到鎖,如Java中的Lock、Synchronized等,但是編程語言中提供的鎖,基本上都只適用于在同一個(gè)機(jī)器上運(yùn)行的情況,在分布式環(huán)境下并不適用。
而在某些情況下,我們是需要在多個(gè)機(jī)器實(shí)例/節(jié)點(diǎn)之間進(jìn)行協(xié)作的,這個(gè)時(shí)候,就需要使用到分布式鎖了。
顧名思義,分布式鎖就是應(yīng)用于在分布式環(huán)境下多個(gè)節(jié)點(diǎn)之間進(jìn)行同步或者協(xié)作的鎖
分布式鎖同普通的鎖一樣,具有以下幾個(gè)重要特性
- 互斥性,保證只有持有鎖的實(shí)例中的某個(gè)線程才能進(jìn)行操作
- 可重入性,同一個(gè)實(shí)例的同一個(gè)線程可以多次獲取鎖
- 鎖超時(shí),支持超時(shí)自動(dòng)釋放鎖,避免死鎖的產(chǎn)生
- 誰加的鎖只能由誰釋放
Redis分布式鎖原理
由于Redis的命令本身是原子性的,所以,非常適合于作為分布式鎖的協(xié)調(diào)者。
一般情況下,為了保證鎖的釋放只能由加鎖者或者超時(shí)釋放,一般我們會(huì)將對應(yīng)鍵的值設(shè)置為一個(gè)線程唯一標(biāo)志,如為每個(gè)線程生成一個(gè)UUID,只有當(dāng)線程的UUID與鎖的值一致時(shí),才能釋放鎖。
利用Redis來實(shí)現(xiàn)分布式的原理非常簡單,加鎖的時(shí)候?yàn)槟硞€(gè)鍵設(shè)置值,釋放的時(shí)候?qū)?yīng)的鍵刪除即可。
不過在使用的時(shí)候,有一些需要注意的地方,下面我們詳細(xì)看下基于Redis不同命令來實(shí)現(xiàn)分布式鎖的操作
setnx命令
在Redis2.6之前,常用于分布式鎖的命令是:setnx key val,該命令在對應(yīng)的鍵沒有值的時(shí)候設(shè)置成功,存在值的時(shí)候設(shè)置失敗,保證了同時(shí)只會(huì)有一個(gè)連接者設(shè)置成功,也即保證同時(shí)只會(huì)有一個(gè)實(shí)例的一個(gè)線程獲取成功。
但是該命令存在一個(gè)缺陷,不支持超時(shí)機(jī)制,所以需要額外的命令來保證能夠在超時(shí)的情況下釋放鎖,也就是刪除鍵,可以配合expire命令來實(shí)現(xiàn)。
由于上述操作涉及到兩個(gè)命令,所以最好的方式是通過lua腳本來實(shí)現(xiàn)加鎖的操作,如下所示
# KEYS[1]是鎖的名稱,KEYS[2]是鎖的值,KEYS[3]是鎖的超時(shí)時(shí)間
local c = redis.call('setnx', KEYS[1], KEYS[2])
if(c == 1) then
redis.call('expire', KEYS[1], KEYS[3])
end
return c
釋放鎖的時(shí)候,需要驗(yàn)證釋放鎖的是不是鎖的持有者,具體代碼如下
# KEYS[1]是鎖的名稱,KEYS[2]是鎖的值
if redis.call('get', KEYS[1]) == KEYS[2] then
return redis.call('del', KEYS[1])
else return 0
end
set命令
從上面的setnx命令可以看到,加鎖的操作還是比較麻煩的,所以,在Redis2.6之后,redis的set命令進(jìn)行了增強(qiáng),設(shè)置值的時(shí)候,同時(shí)支持設(shè)置過期時(shí)間
# nx表示不存在的時(shí)候設(shè)置,ex表示設(shè)置過期時(shí)間,單位是秒
set LOCK VAL nx ex 15
可以看到,通過該命令,進(jìn)行加鎖就方便很多了
釋放鎖的操作同setnx里提到的釋放操作
Redis分布式鎖實(shí)現(xiàn)
上面我們提到的是Redis分布式鎖的實(shí)現(xiàn)原理,不過,每次需要用到鎖的時(shí)候都需要自己手動(dòng)實(shí)現(xiàn)一次,雖然代碼本身沒有多少,其實(shí)也不是很方便。
正因?yàn)槿绱耍型Χ嗟捻?xiàng)目都實(shí)現(xiàn)了分布式,并且提供了更加豐富的功能,如下面討論到的RedisLockRegistry
RedisLockRegistry
Spring-integration項(xiàng)目是Spring官方提供了集成各種工具的項(xiàng)目,通過integration-redis子項(xiàng)目,提供了非常豐富的功能,關(guān)于該項(xiàng)目,后面有時(shí)間再寫篇文章具體分析一下,這里我們用到其中的一個(gè)組件RedisLockRegistry
導(dǎo)入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
配置RedisLockRegistry
@Configuration
public class RedisLockConfiguration {
@Bean
public RedisLockRegistry redisLockRegistry(
RedisConnectionFactory redisConnectionFactory) {
// 注意這里的時(shí)間單位是毫秒
return new RedisLockRegistry(redisConnectionFactory, "registryKey", TIME);
}
}
RedisLockRegistry相當(dāng)于一個(gè)鎖的管理倉庫,所有的鎖都可以從該倉庫獲取,所有鎖的鍵名為:registryKey:LOCK_NAME,默認(rèn)時(shí)間為60s
配置完鎖的倉庫之后,只需要注入倉庫,當(dāng)需要使用到鎖的時(shí)候,從倉庫中獲取一個(gè)鎖就可以了,如下所示
Lock lock = redisLockRegistry.obtain("redis-lock");
該操作返回一個(gè)Lock對象,該對象其實(shí)是Spring實(shí)現(xiàn)的基于Redis的鎖,該鎖支持了豐富的功能,如tryLock等
但使用的時(shí)候,只需要跟普通的鎖一樣操作即可
// lock.tryLock(10, TimeUnit.SECONDS);
lock.lock();
try {
// ops
}catch(Exception e) {
}finally {
// 釋放鎖
lock.unlock();
}
可以看到,通過RedisLockRegistry,我們可以更加方便地使用Redis分布式鎖了
RedisLockRegistry源碼分析
上面學(xué)習(xí)了RedisLockRegistry的使用之后,接下來我們來具體看下RedisLockRegistry的具體實(shí)現(xiàn)

從上面的繼承結(jié)構(gòu)可以清晰地看出RedisLockRegistry的繼承情況,而上面的幾個(gè)接口基本上都只提供了基本的定義,這里就不展開分析了。直接看RedisLockRegistry的實(shí)現(xiàn)
構(gòu)造函數(shù)
首先是構(gòu)造函數(shù),有兩個(gè)構(gòu)造函數(shù),如下
private static final long DEFAULT_EXPIRE_AFTER = 60000L;
// 提供了默認(rèn)的的過期時(shí)間,默認(rèn)過期時(shí)間為60s
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}
public RedisLockRegistry(RedisConnectionFactory connectionFactory,
String registryKey,
long expireAfter) {
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
Assert.notNull(registryKey, "'registryKey' cannot be null");
this.redisTemplate = new StringRedisTemplate(connectionFactory);
this.obtainLockScript =
new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
this.registryKey = registryKey;
this.expireAfter = expireAfter;
this.unlinkAvailable = RedisUtils.isUnlinkAvailable(this.redisTemplate);
}
上面第二個(gè)構(gòu)造函數(shù)中,有兩個(gè)沒見過的屬性,分別是obtainLockScript以及unlinkAvailable,分析如下
obtainLockScript
private final RedisScript<Boolean> obtainLockScript;
obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
可以看到obtainLockScript是一個(gè)DefaultRedisScript實(shí)例,該實(shí)例的對象用于執(zhí)行Lua腳本,具體的看下DefaultRedisScript的源碼
上面的OBTAIN_LOCK_SCRIPT內(nèi)容如下
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
可以看到,其實(shí)就是一段簡單的Lua腳本,腳本邏輯如下
- 調(diào)用get命令獲取對應(yīng)的key,如果存在在走2,不存在,則走3
- 判斷key的值是否是入?yún)?,如果是,則調(diào)用pexire設(shè)置過期時(shí)間,返回true表示加鎖成功
- 如果不存在,則調(diào)用set命令進(jìn)行加鎖,并且設(shè)置過期時(shí)間,返回true表示加鎖成功,從命令中可以看到,使用的參是px,所以構(gòu)造函數(shù)傳入的單位是毫秒而不是秒
- 如果沒有執(zhí)行2、3操作,則返回false,表示加鎖失敗
isUnlinkAvailable
該函數(shù)檢查對應(yīng)的redis是否支持UNLINK命令,該命令用于異步刪除某個(gè)鍵,功能等同于del命令,但非阻塞,只有在redis4及以上版本才支持
函數(shù)內(nèi)容如下:
public static boolean isUnlinkAvailable(RedisOperations<?, ?> redisOperations) {
return unlinkAvailable.computeIfAbsent(
redisOperations, key -> {
Properties info = redisOperations.execute(
(RedisCallback<Properties>) connection ->
connection.serverCommands().info(SECTION));
if (info != null) {
String version = info.getProperty(VERSION_PROPERTY);
if (StringUtils.hasText(version)) {
int majorVersion = Integer.parseInt(version.split("\\.")[0]);
return majorVersion >= 4;
}
else {
return false;
}
}
else {
throw new IllegalStateException("The INFO command cannot be used in pipeline/transaction.");
}
});
}
核心
RedisLockRegistry的核心方法其實(shí)只有一個(gè),就是obtainLock,具體實(shí)現(xiàn)如下
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
@Override
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String) lockKey;
return this.locks.computeIfAbsent(path, RedisLock::new);
}
可以看到,每一個(gè)LockRegistry自己維護(hù)了一個(gè)LOCK-KEY-LOCK的map,這也表明,同一個(gè)Registry中,相同的鍵只會(huì)對應(yīng)一個(gè)Lock對象
RedisLock
從上面的分析中可以看到,LockRegistry維護(hù)了一個(gè)RedisLock對象的Map,鍵是鎖的名稱,值是對應(yīng)的Lock對象,該對象是Spring實(shí)現(xiàn)的一個(gè)內(nèi)部類,具體實(shí)現(xiàn)如下所示
構(gòu)造方法
private RedisLock(String path) {
this.lockKey = constructLockKey(path);
}
RedisLock有且只有一個(gè)私有構(gòu)造方法,所以僅能在當(dāng)前類中進(jìn)行構(gòu)造,這也意味著我們無法自己實(shí)例化RedisLock實(shí)例
構(gòu)造的過程非常簡單,只是初始化了lockKey,lockKey的內(nèi)容如下
private String constructLockKey(String path) {
return RedisLockRegistry.this.registryKey + ":" + path;
}
可以看到,lockKey的值其實(shí)就是Registry的名稱 + : + 鎖的名稱
核心方法
對于一把鎖而言,最最核心的方法莫過于加鎖和解鎖了,RedisLock實(shí)現(xiàn)了Lock接口,提供了多樣的加鎖方式,分別如下所示
不可中斷鎖
private final ReentrantLock localLock = new ReentrantLock();
@Override
public void lock() {
this.localLock.lock();
while (true) {
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}
break;
}
catch (InterruptedException e) {
// 不可中斷,所以忽略中斷異常
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
從上面的代碼可以看到,lock方法首先嘗試獲取ReentrantLock,如果獲取成功,才嘗試去獲取分布式鎖,獲取localLock的目的在于,如果本地有多個(gè)線程在競爭該鎖,則只有獲取到本地的鎖的線程才能嘗試去獲取分布式鎖,好處在于,減少了不必要的網(wǎng)絡(luò)開銷,提高性能
由于lock方法明確規(guī)定,如果獲取不到鎖,則進(jìn)行阻塞,直至獲取到鎖或者出現(xiàn)異常,所以上面每隔100毫秒會(huì)去嘗試獲取鎖,直到獲取成功或者拋出異常為止
獲取鎖的代碼也非常簡單,如下所示
// 實(shí)例化Registry的時(shí)候進(jìn)行初始化
private final String clientId = UUID.randomUUID().toString();
private boolean obtainLock() {
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(
// 獲取鎖的lua腳本
RedisLockRegistry.this.obtainLockScript,
// 獲取的鎖名稱
Collections.singletonList(this.lockKey),
// 鎖的內(nèi)容
RedisLockRegistry.this.clientId,
// 鎖的過期時(shí)間
String.valueOf(RedisLockRegistry.this.expireAfter));
boolean result = Boolean.TRUE.equals(success);
// 如果獲取成功,則記錄鎖的時(shí)間
if (result) {
this.lockedAt = System.currentTimeMillis();
}
return result;
}
從上面獲取鎖的代碼可以看到,每一個(gè)LockRegistry實(shí)例只會(huì)有一個(gè)值,該值在Registry實(shí)例化的時(shí)候通過UUID生成,一個(gè)實(shí)例內(nèi)的多個(gè)線程之間的競爭直接通過ReentrantLock進(jìn)行,不涉及到Redis相關(guān)的操作。
可中斷鎖
@Override
public void lockInterruptibly() throws InterruptedException {
this.localLock.lockInterruptibly();
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}
}
catch (InterruptedException ie) {
// 釋放鎖,并且響應(yīng)中斷信號
this.localLock.unlock();
Thread.currentThread().interrupt();
throw ie;
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
看懂了lock的代碼,再來看lockInterruptibly就非常簡單了,lock不響應(yīng)中斷信號,則lockInterruptibly則相應(yīng)中斷信號,因此,獲取鎖的過程如果出現(xiàn)中斷,則結(jié)束獲取操作了
嘗試獲取鎖
嘗試獲取鎖以為著如果能獲取鎖,則獲取,如果不能獲取,則結(jié)束,當(dāng)然,可以附帶等待是時(shí)間,有兩個(gè)版本的tryLock,如下
@Override
public boolean tryLock() {
try {
// 調(diào)用另一個(gè)tryLock,并且將時(shí)間設(shè)置為0
return tryLock(0, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
// 先嘗試獲取本地鎖,如果在指定時(shí)間內(nèi)無法獲取到本地鎖,則放棄
if (!this.localLock.tryLock(time, unit)) {
return false;
}
try {
// 記錄獲取鎖到期時(shí)間
long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired;
// 如果獲取不到鎖,并且時(shí)間還有剩余,則先休眠100毫秒,然后繼續(xù)嘗試獲取
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) {
Thread.sleep(100); //NOSONAR
}
// 到這里表示獲取鎖超時(shí)
// 如果無法獲取到分布式鎖,則釋放本地鎖
if (!acquired) {
this.localLock.unlock();
}
return acquired;
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
return false;
}
具體的分析都詳細(xì)寫在注釋里了,補(bǔ)充一點(diǎn)就是,從tryLock的實(shí)現(xiàn)中可以看到,tryLock本身是響應(yīng)中斷的,與接口的定義一致
釋放鎖
// 判斷鎖的所有者是否是當(dāng)前實(shí)例
public boolean isAcquiredInThisProcess() {
return RedisLockRegistry.this.clientId.equals(
RedisLockRegistry.this.redisTemplate.boundValueOps(this.lockKey).get());
}
// 刪除對應(yīng)的鍵,也即釋放分布式鎖
private void removeLockKey() {
if (this.unlinkAvailable) {
RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
@Override
public void unlock() {
// 如果嘗試釋放的不是本線程加的鎖,則拋出異常
if (!this.localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("You do not own lock at " + this.lockKey);
}
// 當(dāng)前線程持有的鎖的數(shù)量,即重入的次數(shù)
// 如果此時(shí) > 1,表示當(dāng)前線程有多次獲取鎖,釋放的時(shí)候只減少本地鎖的次數(shù)
// 此時(shí)其他的方法還持有鎖,不能釋放分布式鎖
if (this.localLock.getHoldCount() > 1) {
this.localLock.unlock();
return;
}
try {
// 此時(shí)分布式鎖已經(jīng)由于超時(shí)被釋放了,拋出異常
if (!isAcquiredInThisProcess()) {
throw new IllegalStateException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
// 如果收到中斷信號,則異步釋放鎖
// 盡快響應(yīng)中斷...
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(this::removeLockKey);
}
else {
removeLockKey();
}
if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this);
}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
this.localLock.unlock();
}
}
總結(jié)
本文主要簡單介紹了分布式鎖,在Redis中使用分布式鎖的原理,本質(zhì)就是set或者setnx命令的使用,以及對應(yīng)版本的加鎖以及解鎖操作。
最后分析了RedisLockRegistry的具體實(shí)現(xiàn),RedisLockRegistry是Spring提供的基于Redis的分布式鎖的實(shí)現(xiàn),主要包含兩部分,一部分是本地鎖,用于一個(gè)實(shí)例下多個(gè)線程的協(xié)調(diào),只有獲取到本地鎖的線程才去嘗試獲取分布式鎖,通過這種方式來提高獲取鎖的性能