84 redis實現分布式鎖的原理

1,Redis使用setnx 實現
2,Redisson 分布式鎖;
Redis基于 setnx 實現分布式鎖原理:
Redis Setnx 實現分布式鎖:
Setnx key value
Redis Setnx(SET if Not eXists) 命令在指定的key 不存在時,為key設置指定的值。
設置成功,返回1, 不成功返回0.
Redis具有先天性,能夠保證線程安全問題,多個redis客戶端最終只有一個Redis客戶端設置成功。

Setnx: key=mayiktRedisLock value=1;該key 如果不存在的時候 執(zhí)行結果返回1 java層面返回true.

Setnx: key=mayiktRedisLock value=1;該key 如果存在的時候 執(zhí)行結果返回0 java層面返回false.

set之間的區(qū)別:
如果該key 不存在的時候,直接創(chuàng)建,如果存在的時候覆蓋。
原理:
獲取鎖原理:
多個redis客戶端執(zhí)行setnx指令,設置一個相同的Rediskey,誰能創(chuàng)建key成功,誰能獲取鎖。
如果該key已經存在的情況下,在創(chuàng)建的時候就會返回false。
釋放原理:
就是刪除key.
Redis實現分布式鎖如何避=避免死鎖的問題?
如果Redis客戶端(獲取鎖的jvm)宕機的話,如何避免死鎖的問題?
zk如何避免該問題?先天性解決了該問題。
可以設置過期時間,過期后該key自動刪除。

獲取到鎖的jvm 業(yè)務執(zhí)行時間>過期key的時間如何處理?
續(xù)命:開啟一個定時任務實現續(xù)命,當我們的業(yè)務邏輯沒有執(zhí)行完畢的時候,就會延長過期key的時間。
一直不斷續(xù)命的情況下,也會發(fā)生死鎖的問題。
設定續(xù)命的次數,續(xù)命多次如果還沒有執(zhí)行完業(yè)務邏輯的情況下,就應該回滾業(yè)務,主動釋放鎖。
如果當前線程已經獲取到鎖的情況下,不需要重復獲取鎖,而是直接復用。

如何考慮避免死鎖的問題。
對我們的key 設置 設置鎖的過期時間,避免死鎖的問題。
如何確保該鎖是自己創(chuàng)建,被自己刪除。
當我們在執(zhí)行set的時候value為uuid,如果刪除的uuid與該uuid值保持一致,則是自己獲取的鎖,可以被自己刪除。
Redis key 過期了,但是業(yè)務還沒有執(zhí)行完畢如何處理;
當redis的過期了,應該采取續(xù)命設計,繼續(xù)延長時間,如果續(xù)命多次還是失敗的情況下,為了避免死鎖的問題,應該主動釋放鎖和當前的事務操作。
相關核心代碼:


import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName RedisLockImpl
 * @Author 螞蟻課堂余勝軍 QQ644064779 www.mayikt.com
 * @Version V1.0
 **/
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
    private String redisLockKey = "mayiktLock";
    private Long timeout = 3000L;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();

    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    @Override
    public boolean tryLock() {
        Thread cuThread = Thread.currentThread();
        RedisLockInfo redisLockInfo = lockCacheMap.get(cuThread);
        if (redisLockInfo != null && redisLockInfo.isState()) {
            log.info("<<重入鎖,直接從新獲取鎖成功>>");
            return true;
        }
        Long startTime = System.currentTimeMillis();
        for (; ; ) {

            // 1.創(chuàng)建setnx
            String lockId = UUID.randomUUID().toString();
            Long expire = 30L;
            Boolean getLock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, lockId, expire, TimeUnit.SECONDS);
            if (getLock) {
                log.info("<<獲取鎖成功>>");
                // 將該鎖緩存到Map集合中 實現重入鎖
                lockCacheMap.put(cuThread, new RedisLockInfo(lockId, cuThread, expire));
                return true;
            }
            // 2.繼續(xù)循環(huán)重試獲取 判斷是否已經超時重試
            long endTime = System.currentTimeMillis();
            if (endTime - startTime > timeout) {
                return false;
            }
            //3.避免頻繁重試 調用阻塞方法等待
            try {
                Thread.sleep(100);
            } catch (Exception e) {

            }
        }
    }

    public RedisLockImpl() {
        //開始定時任務實現續(xù)命
//        this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 5, TimeUnit.SECONDS);
    }

    @Override
    public boolean releaseLock() {
        log.info("<<釋放鎖成功>>");
        RedisLockInfo redisLockInfo = lockCacheMap.get(Thread.currentThread());
        if (redisLockInfo == null) {
            return false;
        }
        boolean state = redisLockInfo.isState();
        if (!state) {
            return false;
        }
        String redisLockId = stringRedisTemplate.opsForValue().get(redisLockKey);
        if (StringUtils.isEmpty(redisLockId)) {
            return false;
        }
        if (!redisLockId.equals(redisLockInfo.getLockId())) {
            log.info("<<非本線程自己的鎖,無法刪除>>");
            return false;
        }
        Boolean delete = stringRedisTemplate.delete(redisLockKey);
        if (!delete) {
            return false;
        }
        return lockCacheMap.remove(redisLockKey) != null ? true : false;
    }


    /**
     * 續(xù)命次數設計
     */
    class LifeExtensionThread implements Runnable {

        @Override
        public void run() {

            lockCacheMap.forEach((k, lockInfo) -> {
                // 判斷線程是否為終止狀態(tài),如果是為終止狀態(tài) 則開始對key實現續(xù)命
                Thread lockThread = lockInfo.getLockThread();
                if (!lockInfo.isState() && lockThread.isInterrupted()) {
                    log.info("獲取鎖失敗或者當前獲取鎖線程已經成功執(zhí)行完方法");
                    return;
                }
                Integer lifeCount = lockInfo.getLifeCount();
                //開始實現續(xù)命 為了避免續(xù)命為了避免續(xù)命多次還是無法釋放鎖 則應該回滾業(yè)務 主動釋放鎖
                if (lifeCount > 3) {
                    // 移除不在繼續(xù)續(xù)命
                    lockCacheMap.remove(lockThread);
                    // 回滾當前線程事務
                    // 停止該線程
                    return;
                }
                // 開始延長時間
                stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
            });
        }
    }
}



Redis過期了,但是業(yè)務還沒有執(zhí)行完畢如何處理:
采用續(xù)命設計:
看門狗線程--續(xù)命線程。
獲取鎖成功后,應該提前開啟一個續(xù)命的線程,
檢測如果當前業(yè)務邏輯還沒有執(zhí)行完畢的情況下,應該不斷的延遲過期key的時間。
續(xù)命設計: 死鎖問題,限制次數。
如果續(xù)命多次的情況下,還沒有釋放鎖,則,
1,主動回滾當前線程對應的事務。
2,主動釋放鎖,
3,主動將該線程通知。
全局續(xù)命,
開啟一個全局的線程,續(xù)命所有的過期key,不合理。
局部續(xù)命(增量續(xù)命)
只要獲取鎖成功之后,就開啟一個定時任務線程續(xù)命。
定時任務每次續(xù)命間隔的時間至少小于Redis過期key的時間。
每隔10s續(xù)命一次
Redisson設計:
key過期的時候30s
每隔10s續(xù)命一次,
當redis 過期了,應該采用續(xù)命設計,繼續(xù)延長時間,如果續(xù)命多次還是失敗的情況下,為了避免死鎖的問題,應該主動釋放鎖和當前事務。
續(xù)命設計增量續(xù)命方式。

集群問題:
Redis集群,主節(jié)點宕機后如何處理?
Redis集群數據同步,采用異步的方式。
優(yōu)點: 效率比較高。
缺點: 寫的操作效率比較高,有可能存在數據不同步的問題。
zk集群數據同步,采用異步同步的方式。
優(yōu)點: 保證每個子節(jié)點的數據的同步。
缺點: 每次做些的操作的效率比較低。
產生背景:
jvm01 連接到主的redis 做setnx操作的時候,異步將數據同步給redis,意味著jvm01獲取鎖成功,正好主redis宕機了,redis集群自動開啟哨兵機制,就會選舉從節(jié)點中某個redis為主redis,就會出現2個jvm獲取鎖成功,違背了分布式鎖原子性特征。
思考如何解決:
1,redis集群數據同步改為同步的形式,效率偏低。
2,Redis紅鎖。
原理;、
1,構建Redis集群沒有主從之分,Redis節(jié)點都可能為主節(jié)點;
2,獲取鎖的時候,當客戶端(JVM)會向多個不同的redis服務端執(zhí)行setnx操作,只要有一半的redis服務器執(zhí)行成功,則表示鎖成功,和zk數據同步思想一樣。

zk數據同步是在zk領導節(jié)點實現
Redis是有客戶端實現。
考慮問題:
需要設置連接redis超時時間5-50毫秒,時間越短越好,能夠減少每個集群redis節(jié)點過期延遲。
考慮:
為了防止寫入某個redis一直阻塞,需要考慮設置一個超時時間,5-50毫秒
如果無法寫入的情況下,直接切換到下一個redis實例,為了防止客戶端一直阻塞,影響獲取鎖的成本。
注意:redis集群個數最好是基數3.

實際上就是zk集群方式。

Redis集群中數據同步,采用異步的形式,當我們連接的主角redis做寫的操作的時候,會異步的形式將數據同步給其他從redis,從而可以提高效率,使用ap模式
zk集群數據同步,采用同步模式,當我們連接到主的zk節(jié)點,做寫的操作的時候,會同步的形式將數據同步給其他的zk從節(jié)點。有可能會阻塞,效率比較低,但是可以嚴格保證數據一致性的問題,使用cp模式

在使用redis實現分布式鎖的時候,如果主的redis宕機后,有可能其他從的redis節(jié)點會選舉主redis節(jié)點,有可能會發(fā)生多個jvm都會獲取到該分布式鎖,產生問題。

image.png

如何解決該問題呢?
Redisson 采用紅鎖解決。
需要考慮的問題:

如何客戶端給多個redis服務器設置key,總耗時時間>過期key如何處理?

RedLock(紅鎖)實現原理》
redis的分布式鎖算法采用紅鎖機制,紅鎖需要至少三個以上Redis獨立節(jié)點,這些節(jié)點相互之間可以不需要存在主從之分,每個redis保證獨立即可。

腦裂:
獲取鎖:
客戶單會在每個redis 實例創(chuàng)建鎖,只需要滿足一半的redis節(jié)點能夠獲取鎖成功,就表示加鎖成功。
該方案: 導致獲取鎖的時間成本可能非常高。
原理:
1.客戶端使用相同的key,在從所有的Redis節(jié)點獲取鎖。
2,客戶端需要設置超時時間。連接redis設置不成功的情況下立即切換到下一個Redis實例,防止一直阻塞。
3,客戶端需要計算獲取鎖的總耗時,客戶端至少需要有N/2+1節(jié)點獲取鎖成功,且總耗時時間小于鎖的過期時間才能獲取鎖成功。
4,如果客戶端最終獲取鎖失敗,必須所有節(jié)點釋放鎖。

RedLock(紅鎖)環(huán)境搭建
構建Redis集群環(huán)境
不需要設置redis集群的主從關系。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容