一、讀鎖
讀寫鎖的意義:
1, redis分布式鎖,主要就是在理解他里面的lua腳本的邏輯,邏輯全部都在lua腳本里,我們只能枚舉清楚各種情況下,lua腳本會執(zhí)行什么邏輯,其實就知道了這個分布式鎖的實現(xiàn)原理
- 多個客戶端同時加讀鎖,是不會互斥的,多個客戶端可以同時加這個讀鎖,讀鎖和讀鎖是不互斥的
- 如果有人加了讀鎖,此時就不能加寫鎖,任何人都不能加寫鎖了,讀鎖和寫鎖是互斥的
- 如果有人加了寫鎖,此時任何人都不能加寫鎖了,寫鎖和寫鎖也是互斥的
- RedissonReadLock是RedissonLock的子類,所以很多邏輯會直接復用父類RedissonLock中的邏輯
- 這里的核心邏輯主要有三塊、第一個是加讀鎖的lua腳本的邏輯;第二個是讀鎖的釋放的lua腳本的邏輯;第三個是讀鎖的wathdog刷新鎖key的生存時間的邏輯
代碼
代碼片段一、
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
// 自己本地的Redis集群,直接寫死了,主要是研究redisson的源碼,配置之類的可以弱化
config.useClusterServers()
.addNodeAddress("redis://192.168.0.107:7001")
.addNodeAddress("redis://192.168.0.107:7002")
.addNodeAddress("redis://192.168.0.110:7003")
.addNodeAddress("redis://192.168.0.110:7004")
.addNodeAddress("redis://192.168.0.111:7005")
.addNodeAddress("redis://192.168.0.111:7006");
RedissonClient redisson = Redisson.create(config);
RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");
// 代碼片段
rwLock.readLock().lock();
rwLock.readLock().unlock();
rwLock.writeLock().lock();
rwLock.writeLock().unlock();
}
}
代碼片段二、
// RedissonLock類中
@Override
public void lock() {
try {
// 向下看
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 參數(shù)-1和null
lockInterruptibly(-1, null);
}
// leaseTime = -1,unit=null
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 當前加鎖客戶端的線程ID
long threadId = Thread.currentThread().getId();
// 實際的加鎖邏輯
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// tryLockInnerAsync會到RedissonReadLock類中,參數(shù)分別是30000毫秒,為超時時間,線程ID,代碼片段三、
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 這里其實就是會有一個監(jiān)聽器,watchdog
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
// 如果加鎖失敗,直接返回
if (!future.isSuccess()) {
return;
}
// 讀取鎖的剩余生存時間
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 代碼片段四、
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
代碼片段三、
這里其實還是老路子,我們先把lua腳本里面的參數(shù)給提取出來,方便在閱讀lua腳本的時候,腦海里對redis的命令執(zhí)行結(jié)果有一個快速的認識
KEYS[1] = “anyRWLock”
KEYS[2] = “{anyRWLock}:UUID_01:threadId_01:rwlock_timeout”
ARGV[1]=30000毫秒
ARGV[2] = UUID_01:threadId_01
ARGV[3]=UUID_01:threadId_01:write
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// hget anyRWLock mode 從anyRWLock這個hash里面獲取mode作為key的值,剛開始進來肯定是null
"local mode = redis.call('hget', KEYS[1], 'mode'); “ +
// if條件成立
"if (mode == false) then “ +
// hset anyRWLock mode read 此時anyRWLock的hash為 anyRWLock:{“mode”:”read"}
"redis.call('hset', KEYS[1], 'mode', 'read'); “ +
// hset anyRWLock UUID_01:threadId_01 1 此時anyRWLock:{“mode”:”read”,"UUID_01:threadId_01”:1}
"redis.call('hset', KEYS[1], ARGV[2], 1); “ +
// set {anyRWLock}:UUID_01:threadId_01:rwlock_timeout ..:1 3000,這里的..其實是字符串拼接的意思,
// 實際的結(jié)果是{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
"redis.call('set', KEYS[2] .. ':1', 1); “ +
//設置{anyLock}:UUID_01:threadId_01:rwlock_timeout:1的過期時間是30000毫秒
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); “ +
// 設置anyRWLock的過期時間30000毫秒
"redis.call('pexpire', KEYS[1], ARGV[1]); “ +
// 返回nil,其實這里上面的調(diào)用這段邏輯的函數(shù)里,就會開啟一個watchdog,會每隔10秒鐘去執(zhí)行一段lua腳本,
// 判斷一下當前這個線程是否還持有著這個鎖,如果還持有鎖,更新一下鎖key的生存時間為30000毫秒,watchdog主要是保持redis的鎖key和java代碼中持有的鎖是保持同步的
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
代碼片段四、
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 代碼片段五、這個線程延遲10秒后執(zhí)行
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
// 調(diào)用自己,不停的延長鎖的生存時間
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
代碼片段六、
釋放鎖的邏輯,RedissonReadLock類中
KEYS[1] = anyRWLock
KEYS[2] = {anyRWLock}
ARGV[1] = 30000毫秒
ARGV[2] = UUID_01:threadId_01
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// {anyRWLock}:UUID_01:threadId_01:rwlock_timeout
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 這里判斷KEYS[1] mode是否存在以及KEYS[1], ARGV[2]是否存在,條件是不成立的
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
// anyRWLock UUID_01:threadId_01減一
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then “ +
// 如果等于0,等于所有的可重入鎖都已經(jīng)釋放,這里釋放最后一把鎖,那么直接刪除anyRWLock UUID_01:threadId_01
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;” +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
// hlen anyLock = 2 > 1,就是說,如果你的讀鎖,anyLock hash內(nèi)部的key-value對超過了1個,這里肯定是成立的
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do “ +
// 加讀鎖的時候,其實是每個線程都可以加多次這個讀鎖,讀鎖也是可重入的,每次同一個線程加多次讀鎖的時候,他的加鎖次數(shù)就會加1,counter = 1 ,也可能是 10 、20
// 就是遍歷counter -> 1,每次遞減1,假設counter = 10,10,9,8,7,6,5,4,3,2,1,
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
// 刪除anyRWLock
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.unlockMessage, getLockName(threadId));
}
二、寫鎖
其實寫鎖和讀鎖的Java代碼類似,只是lua腳本略有不同,所以,這里直接分析lua腳本了
KEYS[1] = anyRWLock
ARGV[1] = 30000
ARGV[2] = UUID_01:threadId_01:write
代碼片段一、RedissonWriteLock
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// hget anyRWLock mode此時肯定獲取的是空的
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then “ +
// hset anyRWLock mode write向anyRWLock model 的hash中寫入write ,即:anyRWLock:{“mode”:”write"}
"redis.call('hset', KEYS[1], 'mode', 'write'); “ +
// hset anyRWLock UUID_01:threadId_01:write,即:anyRWLock:{“mode”:”write”,“UUID_01:threadId_01”:“write”}
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// pexpire anyRWLock 30000設置anyRWLock的生存時間
"redis.call('pexpire', KEYS[1], ARGV[1]); “ +
// 返回nil,說明加鎖成功了,Java邏輯拿到加鎖成功標志后,watchdog其實就是用的RedissonLock中的邏輯
"return nil; " +
"end; " +
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId));
}

本地Redis-cluster