9. Redisson源碼剖析-讀寫鎖

一、讀鎖

讀寫鎖的意義:

1, redis分布式鎖,主要就是在理解他里面的lua腳本的邏輯,邏輯全部都在lua腳本里,我們只能枚舉清楚各種情況下,lua腳本會執(zhí)行什么邏輯,其實就知道了這個分布式鎖的實現(xiàn)原理

  1. 多個客戶端同時加讀鎖,是不會互斥的,多個客戶端可以同時加這個讀鎖,讀鎖和讀鎖是不互斥的
  2. 如果有人加了讀鎖,此時就不能加寫鎖,任何人都不能加寫鎖了,讀鎖和寫鎖是互斥的
  3. 如果有人加了寫鎖,此時任何人都不能加寫鎖了,寫鎖和寫鎖也是互斥的
  4. RedissonReadLock是RedissonLock的子類,所以很多邏輯會直接復用父類RedissonLock中的邏輯
  5. 這里的核心邏輯主要有三塊、第一個是加讀鎖的lua腳本的邏輯;第二個是讀鎖的釋放的lua腳本的邏輯;第三個是讀鎖的wathdog刷新鎖key的生存時間的邏輯

代碼

代碼片段一、


public class Application {


    public static void main(String[] args) throws Exception {
        Config config = new Config();
        // 自己本地的Redis集群,直接寫死了,主要是研究redisson的源碼,配置之類的可以弱化
        config.useClusterServers()
                .addNodeAddress("redis://192.168.0.107:7001")
                .addNodeAddress("redis://192.168.0.107:7002")
                .addNodeAddress("redis://192.168.0.110:7003")
                .addNodeAddress("redis://192.168.0.110:7004")
                .addNodeAddress("redis://192.168.0.111:7005")
                .addNodeAddress("redis://192.168.0.111:7006");

        RedissonClient redisson = Redisson.create(config);

        RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");
        // 代碼片段
        rwLock.readLock().lock();
        rwLock.readLock().unlock();
        rwLock.writeLock().lock();
        rwLock.writeLock().unlock();

    }


}

代碼片段二、


// RedissonLock類中
@Override
public void lock() {
    try {
        // 向下看
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}


@Override
public void lockInterruptibly() throws InterruptedException {
    // 參數(shù)-1和null
    lockInterruptibly(-1, null);
}

// leaseTime = -1,unit=null
 @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 當前加鎖客戶端的線程ID
        long threadId = Thread.currentThread().getId();
        // 實際的加鎖邏輯
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }


        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);


        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }


                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }


private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}


private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // tryLockInnerAsync會到RedissonReadLock類中,參數(shù)分別是30000毫秒,為超時時間,線程ID,代碼片段三、
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

    // 這里其實就是會有一個監(jiān)聽器,watchdog
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            // 如果加鎖失敗,直接返回
            if (!future.isSuccess()) {
                return;
            }

            // 讀取鎖的剩余生存時間
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 代碼片段四、
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

代碼片段三、

這里其實還是老路子,我們先把lua腳本里面的參數(shù)給提取出來,方便在閱讀lua腳本的時候,腦海里對redis的命令執(zhí)行結(jié)果有一個快速的認識



KEYS[1] = “anyRWLock”

KEYS[2] = “{anyRWLock}:UUID_01:threadId_01:rwlock_timeout”

ARGV[1]=30000毫秒

ARGV[2] = UUID_01:threadId_01

ARGV[3]=UUID_01:threadId_01:write 

@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);


    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            // hget anyRWLock mode 從anyRWLock這個hash里面獲取mode作為key的值,剛開始進來肯定是null
                            "local mode = redis.call('hget', KEYS[1], 'mode'); “ +
                            // if條件成立
                            "if (mode == false) then “ +
                              // hset anyRWLock mode read 此時anyRWLock的hash為 anyRWLock:{“mode”:”read"}
                              "redis.call('hset', KEYS[1], 'mode', 'read'); “ +
                              // hset anyRWLock UUID_01:threadId_01 1 此時anyRWLock:{“mode”:”read”,"UUID_01:threadId_01”:1}
                              "redis.call('hset', KEYS[1], ARGV[2], 1); “ +

                              // set {anyRWLock}:UUID_01:threadId_01:rwlock_timeout ..:1 3000,這里的..其實是字符串拼接的意思,
                              // 實際的結(jié)果是{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
                              "redis.call('set', KEYS[2] .. ':1', 1); “ +
                                //設置{anyLock}:UUID_01:threadId_01:rwlock_timeout:1的過期時間是30000毫秒
                              "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); “ +
                              // 設置anyRWLock的過期時間30000毫秒
                              "redis.call('pexpire', KEYS[1], ARGV[1]); “ +
                              // 返回nil,其實這里上面的調(diào)用這段邏輯的函數(shù)里,就會開啟一個watchdog,會每隔10秒鐘去執(zhí)行一段lua腳本,
                              // 判斷一下當前這個線程是否還持有著這個鎖,如果還持有鎖,更新一下鎖key的生存時間為30000毫秒,watchdog主要是保持redis的鎖key和java代碼中持有的鎖是保持同步的
                              "return nil; " +
                            "end; " +
                            "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                              "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                              "local key = KEYS[2] .. ':' .. ind;" +
                              "redis.call('set', key, 1); " +
                              "redis.call('pexpire', key, ARGV[1]); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                            "end;" +
                            "return redis.call('pttl', KEYS[1]);",
                    Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                    internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}

代碼片段四、
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }


    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 代碼片段五、這個線程延遲10秒后執(zhí)行
            RFuture<Boolean> future = renewExpirationAsync(threadId);

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                        // reschedule itself
                        // 調(diào)用自己,不停的延長鎖的生存時間
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }


    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);


    if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
        task.cancel();
    }
}

代碼片段六、

釋放鎖的邏輯,RedissonReadLock類中

KEYS[1] = anyRWLock

KEYS[2] = {anyRWLock} 

ARGV[1] = 30000毫秒

ARGV[2] = UUID_01:threadId_01

@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // {anyRWLock}:UUID_01:threadId_01:rwlock_timeout
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);


    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 這里判斷KEYS[1] mode是否存在以及KEYS[1], ARGV[2]是否存在,條件是不成立的
            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
            "if (mode == false) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
            "if (lockExists == 0) then " +
                "return nil;" +
            "end; " +

            // anyRWLock UUID_01:threadId_01減一
            "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
            "if (counter == 0) then “ +
                // 如果等于0,等于所有的可重入鎖都已經(jīng)釋放,這里釋放最后一把鎖,那么直接刪除anyRWLock UUID_01:threadId_01
                "redis.call('hdel', KEYS[1], ARGV[2]); " + 
            "end;” +
            "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

            // hlen anyLock = 2 > 1,就是說,如果你的讀鎖,anyLock hash內(nèi)部的key-value對超過了1個,這里肯定是成立的
            "if (redis.call('hlen', KEYS[1]) > 1) then " +
                "local maxRemainTime = -3; " + 
                "local keys = redis.call('hkeys', KEYS[1]); " + 
                "for n, key in ipairs(keys) do “ + 
                    // 加讀鎖的時候,其實是每個線程都可以加多次這個讀鎖,讀鎖也是可重入的,每次同一個線程加多次讀鎖的時候,他的加鎖次數(shù)就會加1,counter = 1 ,也可能是 10 、20
                    // 就是遍歷counter -> 1,每次遞減1,假設counter = 10,10,9,8,7,6,5,4,3,2,1,
                    "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                    "if type(counter) == 'number' then " + 
                        "for i=counter, 1, -1 do " + 
                            "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                            "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                        "end; " + 
                    "end; " + 
                "end; " +

                "if maxRemainTime > 0 then " +
                    "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                    "return 0; " +
                "end;" + 

                "if mode == 'write' then " + 
                    "return 0;" + 
                "end; " +
            "end; " +

            // 刪除anyRWLock
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; ",
            Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
            LockPubSub.unlockMessage, getLockName(threadId));
}

二、寫鎖

其實寫鎖和讀鎖的Java代碼類似,只是lua腳本略有不同,所以,這里直接分析lua腳本了

KEYS[1] = anyRWLock 

ARGV[1] = 30000

ARGV[2] = UUID_01:threadId_01:write

代碼片段一、RedissonWriteLock

@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);


    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                        // hget anyRWLock mode此時肯定獲取的是空的
                        "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                        "if (mode == false) then “ +
                              // hset anyRWLock mode write向anyRWLock model 的hash中寫入write ,即:anyRWLock:{“mode”:”write"}
                              "redis.call('hset', KEYS[1], 'mode', 'write'); “ +
                              // hset anyRWLock  UUID_01:threadId_01:write,即:anyRWLock:{“mode”:”write”,“UUID_01:threadId_01”:“write”}
                              "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                              // pexpire anyRWLock 30000設置anyRWLock的生存時間
                              "redis.call('pexpire', KEYS[1], ARGV[1]); “ +
                              // 返回nil,說明加鎖成功了,Java邏輯拿到加鎖成功標志后,watchdog其實就是用的RedissonLock中的邏輯
                              "return nil; " +
                          "end; " +
                          "if (mode == 'write') then " +
                              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                  "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                  "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                            "end;" +
                            "return redis.call('pttl', KEYS[1]);",
                    Arrays.<Object>asList(getName()), 
                    internalLockLeaseTime, getLockName(threadId));
}
本地Redis-cluster
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容