Redis的分布式鎖

經(jīng)常聽到分布式鎖這個(gè)名詞,覺得高大上又離我很遙遠(yuǎn),于是也沒有進(jìn)行什么深究。直到面試被反復(fù)問道,我覺得有必要了解一下,以跟上這個(gè)時(shí)代的節(jié)奏。
常見的分布式鎖實(shí)現(xiàn),一個(gè)是ZooKeeper,另一個(gè)就是Redis了。Redis這么熟悉,當(dāng)然是拿它先開刀了。在進(jìn)行鎖的實(shí)現(xiàn)的時(shí)候,有幾個(gè)基本概念,需要先說明一下。
第一個(gè)就是鎖的概念,這個(gè)鎖不像java或者jvm中那種復(fù)雜又難懂的鎖,你可以把鎖理解為一個(gè)值,根據(jù)這個(gè)值的存在與否,來決定是否獲取到鎖。
在Redis中,有一種稱為樂觀鎖的東西,什么叫樂觀鎖,意思就是這種策略是樂觀的,總樂觀的認(rèn)為別的客戶端連接不會(huì)修改該值,因此除了本客戶端連接可以讀取到某種資源(存儲(chǔ)在Redis上的鍵值對(duì)信息),其它客戶端連接也可以讀取到。那萬一其余客戶端連接修改了資源呢?這個(gè)時(shí)候Redis Server會(huì)給加了樂觀鎖的客戶端連接發(fā)送消息,該訴你該數(shù)據(jù)已經(jīng)修改過了,這個(gè)時(shí)候本客戶端連接可以選擇循環(huán)重試或者直接退出。這個(gè)聽起來是不是像CAS的操作呢? 我的感覺是像極了。 具體涉及的命令就是watch unwatch multi exec,這些命令什么意思,下文詳細(xì)說明。
另一種鎖稱之為悲觀鎖。什么是悲觀鎖呢?就是客戶端連接總是認(rèn)為數(shù)據(jù)隨時(shí)都可能被其余客戶端連接修改掉,所以加了悲觀鎖,其余的客戶端連接在此刻是無法讀取到資源,自然也無法進(jìn)行操作。當(dāng)本客戶端連接釋放了鎖,其余的客戶端連接才可以獲取鎖,進(jìn)而進(jìn)行操作。涉及的命令不僅包括watch unwatch multi exec,也包括setnx ttl expire等命令,同樣后文細(xì)說。

不像java中已經(jīng)提供了各種鎖以及同步容器,并發(fā)工具,Redis本身它不提供任何鎖的實(shí)現(xiàn),也沒有樂觀鎖、悲觀鎖,超時(shí)鎖等東西,這需要我們組合使用Redis的各種原子命令以及不同的命令特性,來自己實(shí)現(xiàn)鎖,同時(shí)在代碼層面一致的使用和釋放鎖。

再來說一下Redis客戶端。Redis是用c語言寫的,要么使用自帶的redis-cli進(jìn)行連接和交互,要么使用根據(jù)通信協(xié)議封裝好的不同語言的客戶端庫(kù)。Redis的java客戶端庫(kù)很多,常用的有Jedis、Lettuce 。并且Spring SpringBoot默認(rèn)的就是支持這兩種客戶端,所以學(xué)習(xí)了這兩個(gè)客戶端對(duì)以后也很有幫助。Lettuce 的功能更加強(qiáng)大,它基于netty,支持Redis的哨兵模式和集群模式。但是由于我知識(shí)淺薄,只能拿Jedis來作為演示啦。

由于樂觀鎖會(huì)導(dǎo)致無謂的修改循環(huán)重試,導(dǎo)致很少能夠修改成功,耗費(fèi)資源。而悲觀鎖,雖然在獲取鎖時(shí)不斷重試,但對(duì)于修改資源,卻是一次就成功了,在資源競(jìng)爭(zhēng)嚴(yán)重的時(shí)候,悲觀鎖策略性能更好,因此這里主要選擇悲觀鎖這種思想來進(jìn)行代碼演示。

首先我們?cè)O(shè)定一個(gè)鍵為鎖(Redis就是Key Value型存儲(chǔ)),并使用setnx命令,該命令的特點(diǎn)是,如果鍵存在,則設(shè)置定指定的值,并返回1,如果鍵存在,則什么都不做,并返回0。對(duì)應(yīng)到代碼中來就是,如果執(zhí)行setnx命令返回了1,則說明獲取到了鎖,代碼可以繼續(xù)往下執(zhí)行,如果執(zhí)行setnx命令返回0,則說明沒有獲取到鎖,當(dāng)前線程等待或者重試。

獲取到鎖,執(zhí)行代碼完畢,需要釋放鎖。怎么釋放呢,就是很簡(jiǎn)單的執(zhí)行del命令即可,即把鎖的key給刪除,這樣其余的連接就可以獲取鎖了。
偽代碼如下:

while(con.setnx(lockKey,lockValue)==0){
        //休眠  重試獲取鎖
}
// 執(zhí)行業(yè)務(wù)代碼
con.del(lockKey);

一切看起來很美好,不是嗎?但現(xiàn)實(shí)總是千變?nèi)f化的,一種可能是一段代碼的執(zhí)行,需要在不同地方獲取不同的鎖,導(dǎo)致死鎖的發(fā)生,又或者是網(wǎng)絡(luò)故障或者客戶進(jìn)程崩潰,造成鎖永遠(yuǎn)無法釋放。這就會(huì)導(dǎo)致其余的Redis連接一直無法獲取到鎖,因?yàn)橐慌_(tái)機(jī)器的代碼問題,網(wǎng)絡(luò)問題,機(jī)器故障等原因,導(dǎo)致所有的服務(wù)都變得不可用,這是讓人無法接受的,這違背了分布式的初衷。
怎么解決這個(gè)問題呢?我們可以指定一個(gè)鎖的過期時(shí)間,比如10s后這個(gè)鎖會(huì)過期,并且極限條件下業(yè)務(wù)執(zhí)行時(shí)間也不會(huì)超過10s。(在服務(wù)有互相依賴,復(fù)雜的服務(wù)調(diào)用中,調(diào)用鏈越長(zhǎng),超時(shí)時(shí)間越不好預(yù)估,但這個(gè)也是在解決問題和性能之間做一個(gè)平衡,超時(shí)時(shí)間設(shè)置太長(zhǎng),性能會(huì)大大降低,超時(shí)時(shí)間太短,會(huì)造成并發(fā)問題,因?yàn)橐粋€(gè)連接中代碼還沒有執(zhí)行完,鎖已經(jīng)被刪除,同時(shí)另一個(gè)連接獲取到了鎖,執(zhí)行業(yè)務(wù)代碼)。設(shè)置了鎖的過期時(shí)間,解決了鎖不釋放的問題,但是同時(shí)引入的新的問題,那就是可能會(huì)刪除其余連接的鎖。比如A連接獲取到鎖key1,執(zhí)行很長(zhǎng)時(shí)間,此時(shí)鎖過期,被刪除,另一個(gè)連接B獲取到鎖key1,并執(zhí)行對(duì)應(yīng)的代碼,此時(shí)A連接執(zhí)行結(jié)束,于是釋放鎖,但是此時(shí),其實(shí)它的鎖已經(jīng)被釋放了,在鎖過期的時(shí)候,現(xiàn)在它釋放的是B鏈接的鎖,那是不對(duì)的。假如此刻C連接進(jìn)來,是能夠獲取到鎖的,那么就意味著B C在同時(shí)執(zhí)行業(yè)務(wù)代碼,違背了鎖當(dāng)初設(shè)計(jì)的本意,因此絕對(duì)不能釋放其余連接的鎖,而只能釋放自己的。
那么如何解決這個(gè)問題?其實(shí)我們可以在鏈接獲取鎖的時(shí)候,設(shè)置一個(gè)只有當(dāng)前連接知道的唯一值,釋放的時(shí)候會(huì)先取出鎖的值,進(jìn)行比較,只有跟存入的值是一致的時(shí)候,才會(huì)釋放鎖,也就是刪除鍵,否則,什么也不做。

分析了這么多,我們可以看看獲取鎖的工具類代碼:

    /**
     * 在指定的等待時(shí)限內(nèi)獲取鎖
     * @param jedis 連接
     * @param lockName 鎖名稱
     * @param timeOutMillionSeconds  獲取鎖超時(shí)時(shí)間   -1:一直等待,直到獲取到鎖
     * @return 如果獲取到鎖,返回一個(gè)鎖標(biāo)識(shí)符,否則返回null
     */
    public static String acquireLock(Jedis jedis,String lockName,long timeOutMillionSeconds){
        // 略去各類校驗(yàn)
        String identifier = UUID.randomUUID().toString();
        long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
        while(System.currentTimeMillis()<=timeEnd){
            long result = jedis.setnx(lockName,identifier);
            if(result==1){// 等于1  說明沒有設(shè)置過,獲取鎖成功
                return identifier;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100);// 線程休眠值  根據(jù)業(yè)務(wù)來定
            }catch (InterruptedException e){//假設(shè)在本機(jī)沒有多線程編程  不會(huì)通知另一方線程中斷 根據(jù)具體業(yè)務(wù)來
                throw new RuntimeException(e);
            }
        }
        return null;
    }

注意代碼中的jedis沒有close,根據(jù)需要,也可以在工具類中close掉。

以及超時(shí)鎖,為了防止setnx和expire之間,程序奔潰,造成超時(shí)時(shí)間沒有設(shè)置上,因此其余的連接在獲取不到鎖的時(shí)候,會(huì)先判斷鎖有沒有過期時(shí)間,如果沒有,給鎖加上過期時(shí)間:

    /**
     *在指定的等待時(shí)限內(nèi)獲取鎖,該鎖自身帶有超時(shí)特性
     * @param jedis 連接
     * @param lockName 鎖名稱
     * @param timeOutMillionSeconds  獲取鎖超時(shí)時(shí)間   -1:一直等待,直到獲取到鎖
     * @param lockTimeOutSeconds 鎖的超時(shí)時(shí)間
     * @return 如果獲取到鎖,返回一個(gè)鎖標(biāo)識(shí)符,否則返回null
     */
    public static String acquireLockTimeOut(Jedis jedis,String lockName,long timeOutMillionSeconds,int lockTimeOutSeconds){
        // 略去各類校驗(yàn)
        String identifier = UUID.randomUUID().toString();
        long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
        while(System.currentTimeMillis()<=timeEnd){
            long result = jedis.setnx(lockName,identifier);
            if(result==1){// 等于1  說明沒有設(shè)置過,獲取鎖成功
                jedis.expire(lockName,lockTimeOutSeconds);
                return identifier;
            }else{
                if(jedis.ttl(lockName)==-1){
                    jedis.expire(lockName,lockTimeOutSeconds);
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100);// 線程休眠值  根據(jù)業(yè)務(wù)來定
            }catch (InterruptedException e){//假設(shè)在本機(jī)沒有多線程編程  不會(huì)通知另一方線程中斷 根據(jù)具體業(yè)務(wù)來
                throw new RuntimeException(e);
            }
        }
        return null;
    }

釋放鎖的代碼:

    /**
     * 關(guān)于watch  multi exec等命令,參考https://redis.io/topics/transactions   才能深刻理解
     * @param jedis  連接
     * @param lockName  鎖名稱
     * @param identifier  鎖標(biāo)識(shí)符
     * @return 是否成功釋放鎖   僅作為參考
     */
    public static boolean releaseLock(Jedis jedis,String lockName,String identifier){
        // 略去各類校驗(yàn)
        boolean releaseNormal = false;// 鎖是否正常釋放
        jedis.watch(lockName);
        String identifierInRedis = jedis.get(lockName);
        if(identifier.equalsIgnoreCase(identifierInRedis)){// 如果標(biāo)識(shí)符沒有改動(dòng),則說明可以解鎖
            Transaction transaction = jedis.multi();
            transaction.del(lockName);
            transaction.exec();
            releaseNormal = true;
        }else{
            jedis.unwatch();
            releaseNormal = false;
        }
        return releaseNormal;
    }

這里重點(diǎn)解釋一下釋放鎖的操作:只有現(xiàn)在取出的跟當(dāng)時(shí)存入的值一致,才會(huì)進(jìn)行刪除操作。但為了防止get 和del之間的某個(gè)時(shí)候,另一個(gè)連接修改了鎖的值,(為什么會(huì)修改?是因?yàn)楫?dāng)前連接A在執(zhí)行完get之后,鎖過期了,因此另一個(gè)連接B可以獲取到鎖,現(xiàn)在A執(zhí)行刪除操作,就是刪除B連接獲取到的鎖),因此需要watch 操作,如果現(xiàn)在取出的值和當(dāng)初存入的不一致,那么直接執(zhí)行unwatch并返回。為什么要執(zhí)行unwatch呢?因?yàn)闉榱税踩尤氩粓?zhí)行unwatch就返回,在后續(xù)的代碼中執(zhí)行multi和exec,那就有很大的問題,當(dāng)鎖被刪除或者修改,就會(huì)打斷當(dāng)前的事務(wù),但是該事物跟鎖是沒有任何關(guān)系的,所以u(píng)nwatch是一個(gè)需要執(zhí)行的操作。另一個(gè)情況是假如當(dāng)前連接取出的鎖的值,跟存入的一直,就需要執(zhí)行刪除鎖的操作??赡苡型瑢W(xué)就會(huì)問了,Redis的所有操作都是原子操作,執(zhí)行del和包裹在multi exec中執(zhí)行del不是一樣的原子操作嗎?為何還要多此一舉,讓代碼變得不好理解。在這一點(diǎn)上,它們確實(shí)并無任何區(qū)別,但是重點(diǎn)是之前有一個(gè)watch命令。假如沒有執(zhí)行watch multi del exec這樣的順序,就會(huì)有釋放掉其余連接的鎖的風(fēng)險(xiǎn),為什么會(huì)這樣,上文已經(jīng)做了分析。在get和del之間發(fā)生的事情,當(dāng)前連接是不知道的,get del的執(zhí)行不是原子性的。有了watch multi del exec這個(gè)順序,當(dāng)前連接A get執(zhí)行之后,鎖失效,且被另一個(gè)連接B獲取到鎖,也就是修改了鎖,因?yàn)閣atch(key),所以當(dāng)前連接就知道了有人修改了。當(dāng)執(zhí)行exec的時(shí)候,就會(huì)丟棄掉del命令,因?yàn)閣atch的通知使得事務(wù)已經(jīng)失效了,這保證了其余連接的鎖不會(huì)被刪除。同時(shí),當(dāng)執(zhí)行exec的時(shí)候,不論事務(wù)成功與否,都unwatch了。最終呢,釋放鎖的代碼看起來就是這樣了。
寫好了工具類,我們應(yīng)該測(cè)試一下,看看是不是真的,我們可以使用線程池,放入200個(gè)任務(wù),每個(gè)任務(wù)都是執(zhí)行獲取Redis的某個(gè)鍵,并加一,再設(shè)置回Redis。在執(zhí)行代碼前,進(jìn)行鎖獲取,執(zhí)行完畢,進(jìn)行鎖釋放。為了等到所有線程執(zhí)行完畢,便于獲取最終執(zhí)行結(jié)果,使用CountDownLatch進(jìn)行等待線程池所有任務(wù)的執(zhí)行完畢。另外的一些部分是初始化動(dòng)作,防止鎖已經(jīng)設(shè)置了或者指定的鍵已經(jīng)有值了。代碼如下:

public class LockTest {
    private static final Log log = LogFactory.getLog(LockTest.class);
    public static void main(String[] args)throws Exception {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
                TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(10);
        final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        final String lockName = "lock_a";
        Jedis jedisInit = jedisPool.getResource();
        jedisInit.del(lockName);
        final String testResource = "test_str";
        jedisInit.set(testResource,"0");
        jedisInit.close();
        for(int i=0;i<200;i++){
            threadPoolExecutor.execute(new Runnable() {
                public void run() {
                    Jedis jedis = jedisPool.getResource();
                    String identifiler = RedisSetnxLock.acquireLock(jedis,lockName,-1);
                    if(identifiler==null) {
                        log.info("獲取鎖失敗");
                        countDownLatch.countDown();
                        jedis.close();
                        return;
                    }
                    try{
                        String value = jedis.get(testResource);
                        Thread.sleep(200);// 故意休眠
                        jedis.set(testResource,(Integer.parseInt(value)+1)+"");
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
                        jedis.close();
                        countDownLatch.countDown();
                    }

                }
            });
        }
        countDownLatch.await();
        log.info(jedisPool.getResource().get(testResource));
        threadPoolExecutor.shutdown();
    }
}

最后的執(zhí)行結(jié)果符合預(yù)期。
為了演示連接掛掉或者執(zhí)行超常任務(wù)的情形,可以執(zhí)行下面的測(cè)試:

public class LockTest {
    private static final Log log = LogFactory.getLog(LockTest.class);
    public static void main(String[] args)throws Exception {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
                TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(10);
        final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        final String lockName = "lock_a";
        Jedis jedisInit = jedisPool.getResource();
        jedisInit.del(lockName);
        final String testResource = "test_str";
        jedisInit.set(testResource,"0");
        jedisInit.close();
        for(int i=0;i<200;i++){
            threadPoolExecutor.execute(new Runnable() {
                public void run() {
                    Jedis jedis = jedisPool.getResource();
                    // 鎖的超時(shí)時(shí)間為1s
                    String identifiler = RedisSetnxLock.acquireLockTimeOur(jedis,lockName,-1,1);
                    if(identifiler==null) {
                        log.info("獲取鎖失敗");
                        countDownLatch.countDown();
                        jedis.close();
                        return;
                    }
                    try{
                        String value = jedis.get(testResource);
                        Thread.sleep(200);
                        jedis.set(testResource,(Integer.parseInt(value)+1)+"");
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        // 每次不釋放鎖,模擬執(zhí)行超常任務(wù)或者進(jìn)程掛掉的情形
                        //RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
                        jedis.close();
                        countDownLatch.countDown();
                    }

                }
            });
        }
        countDownLatch.await();
        log.info(jedisPool.getResource().get(testResource));
        threadPoolExecutor.shutdown();
    }
}

執(zhí)行結(jié)果同樣正確,只是執(zhí)行時(shí)間變長(zhǎng)了。

相關(guān)maven pom:

<!--jedis client-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>
        <!--jedis連接池-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.13.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.13.1</version>
        </dependency>

參考書籍:《Redis實(shí)戰(zhàn)》,一本非常棒的書。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容