前言
主流的分布式鎖一般有三種實(shí)現(xiàn)方式:
數(shù)據(jù)庫樂觀鎖
基于Redis的分布式鎖
基于ZooKeeper的分布式鎖
之前我在博客上寫過關(guān)于mysql和redis實(shí)現(xiàn)分布式鎖的具體方案: https://www.cnblogs.com/wang-meng/p/10226618.html 里面主要是從實(shí)現(xiàn)原理出發(fā)。
這次【分布式鎖】系列文章主要是深入redis客戶端reddision源碼和zk 這兩種分布式鎖的實(shí)現(xiàn)原理。
可靠性
首先,為了確保分布式鎖可用,我們至少要確保鎖的實(shí)現(xiàn)同時(shí)滿足以下四個(gè)條件:
互斥性。在任意時(shí)刻,只有一個(gè)客戶端能持有鎖。
不會(huì)發(fā)生死鎖。即使有一個(gè)客戶端在持有鎖的期間崩潰而沒有主動(dòng)解鎖,也能保證后續(xù)其他客戶端能加鎖。
具有容錯(cuò)性。只要大部分的Redis節(jié)點(diǎn)正常運(yùn)行,客戶端就可以加鎖和解鎖。
解鈴還須系鈴人。加鎖和解鎖必須是同一個(gè)客戶端,客戶端自己不能把別人加的鎖給解了。
Redisson加鎖原理
redisson是一個(gè)非常強(qiáng)大的開源的redis客戶端框架, 官方地址: https://redisson.org/
使用起來很簡(jiǎn)單,配置好maven和連接信息,這里直接看代碼實(shí)現(xiàn):
RLock lock = redisson.getLock("anyLock");
lock.lock();
lock.unlock();
redisson具體的執(zhí)行加鎖邏輯都是通過lua腳本來完成的,lua腳本能夠保證原子性。
先看下RLock初始化的代碼:
public class Redisson implements RedissonClient {
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
}
public class RedissonLock extends RedissonExpirable implements RLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
首先看下RedissonLock 的id返回的是一個(gè)UUID對(duì)象,每個(gè)機(jī)器都對(duì)應(yīng)一個(gè)自己的id屬性,id 值就類似于:"8743c9c0-0795-4907-87fd-6c719a6b4586"
接著往后看lock()的代碼實(shí)現(xiàn):
public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 獲取當(dāng)前線程id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
}
這里省略了一些中間代碼,這里主要看tryAcquire() 方法,這里傳遞的過期時(shí)間為-1,然后就是當(dāng)前的線程id,接著就是核心的lua腳本執(zhí)行流程,我們來一步步看看是如何執(zhí)行的:
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
KEYS[1] 參數(shù)是:“anyLock”
ARGV[2] 是:“id + ":" + threadId”
首先用的exists 判斷redis中是否存在當(dāng)前key,如果不存在就等于0,然后執(zhí)行hset指令,將“anyLock id:threadId 1”存儲(chǔ)到redis中,最終redis存儲(chǔ)的數(shù)據(jù)類似于:
{
"8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
}
偷偷說一句,最后面的一個(gè)1 是為了后面可重入做的計(jì)數(shù)統(tǒng)計(jì),后面會(huì)有講解到。
接著往下看,然后使用pexpire設(shè)置過期時(shí)間,默認(rèn)使用internalLockLeaseTime為30s。最后返回為null,即時(shí)加鎖成功。
Redisson 可重入原理
我們看下鎖key存在的情況下,同一個(gè)機(jī)器同一個(gè)線程如何加鎖的?
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
ARGV[2] 是:“id + ":" + threadId”
如果同一個(gè)機(jī)器同一個(gè)線程再次來請(qǐng)求,這里就會(huì)是1,然后執(zhí)行hincrby, hset設(shè)置的value+1 變成了2,然后繼續(xù)設(shè)置過期時(shí)間。
同理,一個(gè)線程重入后,解鎖時(shí)value - 1
Redisson watchDog原理
如果一個(gè)場(chǎng)景:現(xiàn)在有A,B在執(zhí)行業(yè)務(wù),A加了分布式鎖,但是生產(chǎn)環(huán)境是各種變化的,如果萬一A鎖超時(shí)了,但是A的業(yè)務(wù)還在跑。而這時(shí)由于A鎖超時(shí)釋放,B拿到鎖,B執(zhí)行業(yè)務(wù)邏輯。這樣分布式鎖就失去了意義?
所以Redisson 引入了watch dog的概念,當(dāng)A獲取到鎖執(zhí)行后,如果鎖沒過期,有個(gè)后臺(tái)線程會(huì)自動(dòng)延長(zhǎng)鎖的過期時(shí)間,防止因?yàn)闃I(yè)務(wù)沒有執(zhí)行完而鎖過期的情況。
我們接著來看看具體實(shí)現(xiàn):
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
當(dāng)我們tryLockInnerAsync執(zhí)行完之后,會(huì)添加一個(gè)監(jiān)聽器,看看監(jiān)聽器中的具體實(shí)現(xiàn):
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
這里面調(diào)度任務(wù)每隔10s鐘執(zhí)行一次,lua腳本中是續(xù)約過期時(shí)間,使得當(dāng)前線程持有的鎖不會(huì)因?yàn)檫^期時(shí)間到了而失效

Redisson 互斥性原理
還是看上面執(zhí)行加鎖的lua腳本,最后會(huì)執(zhí)行到:
"return redis.call('pttl', KEYS[1]);",
返回鎖還有多久時(shí)間過期,我們繼續(xù)接著看代碼:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 返回ttl說明加鎖成功,不為空則是加鎖失敗
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 死循環(huán)去嘗試獲取鎖
while (true) {
// 再次嘗試加鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// 如果ttl=null說明搶占鎖成功
if (ttl == null) {
break;
}
// ttl 大于0,搶占鎖失敗,這個(gè)里面涉及到Semaphore,后續(xù)會(huì)講解
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
Redisson鎖釋放原理
直接看lua代碼:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判斷鎖key值是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 判斷當(dāng)前機(jī)器、當(dāng)前線程id對(duì)應(yīng)的key是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 計(jì)數(shù)器數(shù)量-1 可重入鎖
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果計(jì)數(shù)器大于0,說明還在持有鎖
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 使用del指令刪除key
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
總結(jié)
一圖總結(jié):
