(9)redis實(shí)現(xiàn)分布式鎖

一、 分布式鎖的實(shí)現(xiàn)

關(guān)于鎖,其實(shí)我們或多或少都有接觸過(guò)一些,比如synchronized、 Lock這些,這類鎖的目的很簡(jiǎn)單,在多線程環(huán)境下,對(duì)共享資源的訪問(wèn)造成的線程安全問(wèn)題,通過(guò)鎖的機(jī)制來(lái)實(shí)現(xiàn)資源訪問(wèn)互斥。那么什么是分布式鎖呢?或者為什么我們需要通過(guò)Redis來(lái)構(gòu)建分布式鎖,其實(shí)最根本原因就是Score(范圍),因?yàn)樵诜植际郊軜?gòu)中,所有的應(yīng)用都是進(jìn)程隔離的,在多進(jìn)程訪問(wèn)共享資源的時(shí)候我們需要滿足互斥性,就需要設(shè)定一個(gè)所有進(jìn)程都能看得到的范圍,而這個(gè)范圍就是Redis本身。所以我們才需要把鎖構(gòu)建到Redis中。

Redis里面提供了一些比較具有能夠?qū)崿F(xiàn)鎖特性的命令,比如SETEX(在鍵不存在的情況下為鍵設(shè)置值),那么我們可以基于這個(gè)命令來(lái)去實(shí)現(xiàn)一些簡(jiǎn)單的鎖的操作

二、 分布式鎖的實(shí)戰(zhàn)

分布式鎖的實(shí)現(xiàn)會(huì)采用兩種方式來(lái)實(shí)現(xiàn),一種自己手動(dòng)來(lái)實(shí)現(xiàn),一種是基于Redission的客戶端來(lái)實(shí)現(xiàn)

  • 基于jedis手動(dòng)實(shí)現(xiàn)分布式鎖

代碼如下:

/**
 * @Project: redis
 * @description:   redis的分布式鎖的手動(dòng)實(shí)現(xiàn)
 * @author: sunkang
 * @create: 2019-01-12 16:54
 * @ModificationHistory who      when       What
 **/
public class DistributedLock {
        /**
     * 獲取鎖
     * @param lockName  鎖的名稱
     * @param accquireTimeout   獲取鎖的超時(shí)時(shí)間
     * @param lockTimeout  鎖本身過(guò)期時(shí)間
     * @return
     */
    public String acquireLock(String lockName ,long accquireTimeout,long  lockTimeout){
        String identify  = UUID.randomUUID().toString();
        String lockKey = "lock:"+lockName;
        //獲取redis客戶端
        long endTime = System.currentTimeMillis()+ accquireTimeout;
        Jedis redis = JedisPoolUtils.getRedis();
        int expireTime = (int) (lockTimeout/1000);

        try{
            while (System.currentTimeMillis()<endTime){
                //setnx表示如果鍵不存在則設(shè)置,返回1證明key設(shè)置成功了
                if( redis.setnx(lockKey,identify) ==1 ){
                    redis.expire(lockKey, expireTime);
                    return identify;
                }
                //說(shuō)明key的過(guò)期時(shí)間沒(méi)有設(shè)置,但是key存在,設(shè)置過(guò)期時(shí)間
                if(redis.ttl(lockKey) == -1){
                    redis.expire(lockKey,expireTime);
                }
                try {
                    //等待片刻后進(jìn)行獲取鎖的重試
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }finally {
            //最終關(guān)閉連接
            redis.close();
        }
        return null;
    }
    /**
     * 釋放鎖
     * @param lockName
     * @param identifer
     */
    public boolean releaseLock(String lockName,String identifer){
        System.out.println(lockName+"開(kāi)始釋放鎖:"+identifer);
        boolean release = false;
        String lockKey  = "lock:"+ lockName;
        Jedis jedis  = JedisPoolUtils.getRedis();
        try {
            while (true){
                //命令用于監(jiān)視一個(gè)(或多個(gè)) key ,如果在事務(wù)執(zhí)行之前這個(gè)key,被其他命令所改動(dòng),那么事務(wù)將被打斷
                //保證了開(kāi)啟監(jiān)控到執(zhí)行對(duì)該key執(zhí)行事務(wù)提交之間,不能修改,保證了這段期間key的數(shù)據(jù)原子性。
                jedis.watch(lockKey);
                //如果是同一把鎖
                if(jedis.get(lockKey).equals(identifer)){
                    Transaction transaction  = jedis.multi();
                    //主動(dòng)刪除鎖
                    transaction.del(lockKey);
                    //執(zhí)行結(jié)果為空,說(shuō)明執(zhí)行有問(wèn)題,在輪訓(xùn)一次
                    if(transaction.exec().isEmpty()){
                        release = false;
                        continue;
                    }
                    release =true;
                }
                jedis.unwatch();
                break;
            }
        }finally {
            jedis.close();
        }
        return release;
    }
    
    /**
     * 通過(guò)lua腳本來(lái)釋放鎖
     * lua腳本執(zhí)行保證了原子性
     * @param lockName
     * @param identifer
     * @return
     */
    public boolean  releaseLockWithLua(String lockName,String identifer){

        System.out.println(lockName+"開(kāi)始釋放鎖:"+identifer);

        Jedis jedis  =     JedisPoolUtils.getRedis();
        String lockKey = "lock:"+ lockName;

        String lua ="if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1])" +
                "else return 0 end ";
        Long rs=(Long)jedis.eval(lua,1,lockKey,identifer);

        jedis.close();

        if(rs.intValue()> 0){
            return true;
        }
        return  false;
    }
}
  • 測(cè)試代碼
/**
 * @Project: redis
 * @description: 分布式測(cè)試 
 * 開(kāi)啟10個(gè)線程,模擬十個(gè)客戶端請(qǐng)求鎖
 * @author: sunkang
 * @create: 2019-01-12 17:45
 * @ModificationHistory who      when       What
 **/
public class LockTest extends Thread{
    @Override
    public void run() {
        while(true){
            DistributedLock distributedLock=new DistributedLock();
            String rs=distributedLock.acquireLock("updateOrder",
                    2000,5000);
            if(rs!=null){
                System.out.println(Thread.currentThread().getName()+"-> 成功獲得鎖:"+rs);
                try {
                    Thread.sleep(1000);
                    //釋放鎖
                    distributedLock.releaseLockWithLua("updateOrder",rs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }
    public static void main(String[] args) {
        LockTest lockTest=new LockTest();
        for(int i=0;i<10;i++){
            new Thread(lockTest,"tName:"+i).start();
        }
    }
}
  • 輸出結(jié)果 : 基本是鎖釋放了之后,其他的線程才得到了鎖
tName:7-> 成功獲得鎖:38c683cb-1797-4abb-aa57-200e770f7445
updateOrder開(kāi)始釋放鎖:38c683cb-1797-4abb-aa57-200e770f7445
tName:6-> 成功獲得鎖:873dfe4e-00ee-4d2b-9bc5-bb16e83f8879
updateOrder開(kāi)始釋放鎖:873dfe4e-00ee-4d2b-9bc5-bb16e83f8879
tName:8-> 成功獲得鎖:e6c48dea-87d9-4fd5-ab04-09ed2f725307
updateOrder開(kāi)始釋放鎖:e6c48dea-87d9-4fd5-ab04-09ed2f725307
tName:0-> 成功獲得鎖:e0d764a5-30b5-4b6b-8490-73ffd78727e0
...

總結(jié):
acquireLock的方法主要是競(jìng)爭(zhēng)鎖的時(shí)間內(nèi),不斷輪訓(xùn)去查看lockKey的是否存在,不存在則設(shè)置鎖,返回鎖,存在會(huì)一直嘗試獲取鎖,直到獲取鎖的超時(shí)時(shí)間結(jié)束,然后釋放redis連接

在釋放鎖的方式,有兩種方式,一種是給鎖本身設(shè)置超時(shí)時(shí)間,時(shí)間到了,鎖本身釋放了,一種是自動(dòng)方式來(lái)刪除鎖,刪除鎖為了鎖在釋放過(guò)程中的原子性,一個(gè)使用了lua腳本,一個(gè)使用redis的watch機(jī)制和事務(wù)刪除的方式

  • 基于redisson實(shí)現(xiàn)分布式鎖

maven依賴:

   <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>2.15.0</version>
    </dependency>

redisson的分布式代碼如下

/**
 * @Project: redis
 * @description:  基于redission的api實(shí)現(xiàn)分布式鎖
 * @author: sunkang
 * @create: 2019-01-12 18:08
 * @ModificationHistory who      when       What
 **/
public class RedissionLock {
    public static void main(String[] args) {
        Config config = new Config();
        //使用單個(gè)節(jié)點(diǎn)連接,并且設(shè)置連接地址
        config.useSingleServer().setAddress("redis://192.168.44.129:6379");
        RedissonClient redissonClient = Redisson.create(config);
        RLock lock =  redissonClient.getLock("updateOrder");
        try {
            //嘗試獲取鎖100秒,鎖釋放時(shí)間為10秒
            lock.tryLock(100,10,TimeUnit.SECONDS);
            System.out.println("得到鎖");
            Thread.sleep(100);
            //釋放鎖
            lock.unlock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

基于redission 的實(shí)現(xiàn)分布式鎖的方式,獲取鎖的關(guān)鍵代碼在org.redisson.RedissonLock#tryLockInnerAsync這個(gè)方法上:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

通過(guò)lua腳本來(lái)實(shí)現(xiàn)加鎖的操作

  1. 判斷l(xiāng)ock鍵是否存在,不存在直接調(diào)用hset存儲(chǔ)當(dāng)前線程信息并且設(shè)置過(guò)期時(shí)間,返回nil,告訴客戶端直接獲取到鎖。
  2. 判斷l(xiāng)ock鍵是否存在,存在則將重入次數(shù)加1,并重新設(shè)置過(guò)期時(shí)間,返回nil,告訴客戶端直接獲取到鎖。
  3. 被其它線程已經(jīng)鎖定,返回鎖有效期的剩余時(shí)間,告訴客戶端需要等待。

釋放鎖的關(guān)鍵代碼在方法: org.redisson.RedissonLock#unlockInnerAsync

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }
  1. 如果lock鍵不存在,發(fā)消息說(shuō)鎖已經(jīng)可用,發(fā)送一個(gè)消息
  2. 如果鎖不是被當(dāng)前線程鎖定,則返回nil
  3. 由于支持可重入,在解鎖時(shí)將重入次數(shù)需要減1
  4. 如果計(jì)算后的重入次數(shù)>0,則重新設(shè)置過(guò)期時(shí)間
  5. 如果計(jì)算后的重入次數(shù)<=0,則發(fā)消息說(shuō)鎖已經(jīng)可用
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容