正好最近研究了下Redisson的源碼,和大家分享一下
前言
首先我們先回顧一下 Java 中的 ReentrantLock 是如何實(shí)現(xiàn)的?
這里我先簡單介紹一下ReentrantLock 實(shí)現(xiàn)的思路
鎖標(biāo)識(shí):通過AQS的state變量作為鎖標(biāo)識(shí),利用Java的CAS保證多線程競(jìng)爭(zhēng)鎖時(shí)的線程安全問題
隊(duì)列:未競(jìng)爭(zhēng)到鎖的線程進(jìn)入AQS的隊(duì)列并掛起,等待解鎖時(shí)被喚醒(或者超時(shí))
如何設(shè)計(jì)分布式可重入鎖
首先鎖標(biāo)識(shí),這個(gè)在Redis中很容易實(shí)現(xiàn),可以用lock name 作為key,當(dāng)前線程生成一個(gè)uuid,作為value,加上Redis 單線程模型,實(shí)現(xiàn)線程安全的鎖競(jìng)爭(zhēng)
這種方式在之前的博客里也提到過,可以參考下 Redis分布式鎖的正確實(shí)現(xiàn)方式
但是如何基于Redis 做一個(gè)隊(duì)列,像Java那樣可以掛起喚醒線程呢?這點(diǎn)我在看源碼之前一直沒有想到...
那么Redisson 是如何做的呢?
答案:利用Redis的發(fā)布訂閱,加上Java的Semaphore(信號(hào)量,不了解Semaphore的小伙伴可以Google一下)
Redisson 分布式鎖實(shí)現(xiàn)思路
鎖標(biāo)識(shí):Hash 數(shù)據(jù)結(jié)構(gòu),key 為鎖的名字,filed 當(dāng)前競(jìng)爭(zhēng)鎖成功線程的"唯一標(biāo)識(shí)",value 重入次數(shù)
隊(duì)列:所有競(jìng)爭(zhēng)鎖失敗的線程,會(huì)訂閱當(dāng)前鎖的解鎖事件,利用 Semaphore 實(shí)現(xiàn)線程的掛起和喚醒
源碼分析
我們來看一下tryLock方法的源碼
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖,返回null 代表獲取鎖成功,當(dāng)獲取鎖失敗時(shí)返回當(dāng)前鎖的釋放時(shí)間
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 如果此時(shí)已經(jīng)超過等待時(shí)間則獲取鎖失敗
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 訂閱解鎖事件
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 等待訂閱成功,成功后喚醒當(dāng)前線程
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
// 再次判斷一下是否超時(shí)
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 嘗試獲取鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 等待解鎖消息,此處利用Semaphore,鎖未釋放時(shí),permits=0,線程處于掛起狀態(tài)
// 當(dāng)發(fā)布解鎖消息時(shí),當(dāng)前的Semaphore對(duì)象的release() permits=1
// 所有的客戶端都會(huì)有一個(gè)線程被喚醒,去嘗試競(jìng)爭(zhēng)鎖
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
tryAcquire(leaseTime, unit, threadId); 這個(gè)方法我們下面會(huì)分析,現(xiàn)在我們只需要知道這個(gè)方法是用來獲取鎖就可以了
這個(gè)時(shí)候我們已經(jīng)可以理清Redisson可重入鎖的思路了
- 獲取鎖
- 如果獲取鎖失敗,訂閱解鎖事件
- 之后是一個(gè)無限循環(huán)
while(true) {
// 嘗試獲取鎖
// 判斷是否超時(shí)
// 等待解鎖消息釋放信號(hào)量
//(此時(shí)每個(gè)Java客戶端都可能會(huì)有多個(gè)線程被掛起,但是只有一個(gè)線程會(huì)被喚醒)
// 判斷是否超時(shí)
}
利用信號(hào)量,合理控制線程對(duì)鎖的競(jìng)爭(zhēng),合理利用系統(tǒng)資源,可以說做的灰常的奈斯了
需要注意:
!await(subscribeFuture, time, TimeUnit.MILLISECONDS),這里很多博客都解釋錯(cuò)了,這里并不是等待發(fā)布解鎖消息,只要訂閱事件成功后,就會(huì)往下執(zhí)行,真正等待解鎖消息的是getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);這里你可能不信,為什么我說的就對(duì)啊,debug一下你就知道
tryLockInnerAsync
tryAcquire 內(nèi)部依靠 tryLockInnerAsync 來實(shí)現(xiàn)獲取鎖的邏輯,我們來看下源碼
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 是否存在鎖
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 不存在則創(chuàng)建
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 設(shè)置過期時(shí)間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 競(jìng)爭(zhēng)鎖成功 返回null
"return nil; " +
"end; " +
// 如果鎖已經(jīng)被當(dāng)前線程獲取
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重入次數(shù)加1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖被其他線程獲取,返回鎖的過期時(shí)間
"return redis.call('pttl', KEYS[1]);",
// 下面三個(gè)參數(shù)分別為 KEYS[1], ARGV[1], ARGV[2]
// 即鎖的name,鎖釋放時(shí)間,當(dāng)前線程唯一標(biāo)識(shí)
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
tryLockInnerAsync 中利用lua腳本 和 Redis 單線程的特點(diǎn)來實(shí)現(xiàn)鎖的競(jìng)爭(zhēng)
這里可以看到鎖的結(jié)構(gòu),和我們上文所說的一樣,Hash 數(shù)據(jù)結(jié)構(gòu),key 為鎖的name,filed 當(dāng)前競(jìng)爭(zhēng)鎖成功線程的"唯一標(biāo)識(shí)",value 重入次數(shù)
unlockInnerAsync
接下來我們?cè)賮砜唇怄i的核心代碼
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 用鎖的name和線程唯一標(biāo)識(shí)去判斷是否存在這樣的鍵值對(duì)
// 解鈴還須系鈴人,不存在則無權(quán)解鎖,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 解鎖邏輯
// 沖入次數(shù)-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果大于0 代表當(dāng)前線程重入鎖多次無法解鎖,更新鎖的有效時(shí)間
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 解鎖,刪除key
"redis.call('del', KEYS[1]); " +
// 發(fā)布解鎖消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// KEYS[1],KEYS[2]
// 鎖的name,發(fā)布訂閱的Channel
Arrays.<Object>asList(getName(), getChannelName()),
// ARGV[1] ~ ARGV[3]
// 解鎖消息,釋放時(shí)間,當(dāng)前線程唯一標(biāo)識(shí)
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
發(fā)布解鎖消息后,會(huì)調(diào)用到LockPubSub 的 onMessage,釋放信號(hào)量,喚醒等待鎖的線程
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 釋放信號(hào)量
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}
參考
- 慢談 Redis 實(shí)現(xiàn)分布式鎖 以及 Redisson 源碼解析
- https://www.programcreek.com/java-api-examples/?code=rollenholt-SourceReading/redisson/redisson-master/src/main/java/org/redisson/RedissonLock.java
歡迎點(diǎn)贊、轉(zhuǎn)發(fā)。你的支持就是對(duì)我最大的幫助