Redis實現(xiàn)分布式鎖(利用分布式鎖,實現(xiàn)分布式定時任務(wù))

簡述

利用Redis的Setnx命令,來實現(xiàn)一個分布式的加鎖方案。利用注解,在擁有該注解的方法上,進(jìn)行切面處理,在方法執(zhí)行前,進(jìn)行加鎖,執(zhí)行結(jié)束后,根據(jù)是否自動釋放鎖,進(jìn)行解鎖。
將該注解用在定時任務(wù)的方法上,即可實現(xiàn)分布式定時任務(wù),即獲取到鎖的方法,才會執(zhí)行。

1 redis命令

  • 1.1 setnx命令
    Redis setnx(SET if Not eXists) 命令在指定的 key 不存在時,為 key 設(shè)置指定的值。(該命令無法設(shè)置過期時間)
    Redis為單進(jìn)程單線程模式,采用隊列模式將并發(fā)訪問變成串行訪問,且多客戶端對Redis的連接并不存在競爭關(guān)系。redis的SETNX命令可以方便的實現(xiàn)分布式鎖。
    當(dāng)某一個客戶端將key的值設(shè)置成功后,其他的客戶端再進(jìn)行設(shè)置,將返回失敗,保證同一時間,只有一個客戶端能夠設(shè)置成功。
  • Redis事務(wù)
    watch key1 key2 ... : 監(jiān)視一或多個key,如果在事務(wù)執(zhí)行之前,被監(jiān)視的key被其他命令改動,則事務(wù)被打斷 ( 類似樂觀鎖 )
    multi : 標(biāo)記一個事務(wù)塊的開始( queued )
    exec : 執(zhí)行所有事務(wù)塊的命令 ( 一旦執(zhí)行exec后,之前加的監(jiān)控鎖都會被取消掉 ) 
    discard : 取消事務(wù),放棄事務(wù)塊中的所有命令
    unwatch : 取消watch對所有key的監(jiān)控

事務(wù)正常使用

127.0.0.1:6379> multi
127.0.0.1:6379> set name jack
127.0.0.1:6379> exec

取消事務(wù)

127.0.0.1:6379> multi
127.0.0.1:6379> set name jack
127.0.0.1:6379> discard

watch使用

number初始為10

127.0.0.1:6379> watch number
127.0.0.1:6379> multi
127.0.0.1:6379> set number 11
127.0.0.1:6379> exec

如果在執(zhí)行exec時,number沒有被其他客戶端修改,還是10,則事務(wù)執(zhí)行成功;

如果被其他客戶端修改了,number不是10了,則事務(wù)執(zhí)行失敗,這時候就需求程序自行處理,進(jìn)行再次提交或者其他操作

在spring boot 中,我們用StringRedisTemplate來操作Redis,它的方法:stringRedisTemplate.opsForValue().setIfAbsent()方法即對應(yīng)setnx命令,這個方法有兩個重載的方法:
1、Boolean setIfAbsent(K key, V value); 設(shè)置key value,返回成功/失敗
2、Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit); 設(shè)置key value,返回成功/失敗,同時設(shè)置過期時間,redisTemplate 會調(diào)用 EXPIRE進(jìn)行過期時間的設(shè)定,同時在設(shè)置值和過期時間時,會開啟事務(wù),保存全部成功。
```// org.springframework.data.redis.core 中實現(xiàn)的方法
@Override
public Boolean setIfAbsent(K key, V value) {

    byte[] rawKey = rawKey(key);
    byte[] rawValue = rawValue(value);
    return execute(connection -> connection.setNX(rawKey, rawValue), true);
}
@Override
public Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit) {

    byte[] rawKey = rawKey(key);
    byte[] rawValue = rawValue(value);

    Expiration expiration = Expiration.from(timeout, unit);
    return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent()), true);
    }

1.2 DEL命令、lua腳本
在加鎖之后,解鎖時,需要判斷鎖,是否是當(dāng)前線程所擁有的,如果是當(dāng)前線程擁有的,則刪除該key,刪除key,用del命令。

del key_name
我們會先取出key對應(yīng)的值,然后判斷是否和當(dāng)前線程的定義的值一致。如果一致,則說明是該線程擁有的key。如果我們在代碼中取出key的值,然后判斷通過后,調(diào)用redis del 刪除key,這就不是一個原子操作了。如果在我們?nèi)〕鰇ey的值后,然后在刪除前,其他線程獲取了鎖,當(dāng)前線程刪除的動作,就會導(dǎo)致刪除其他線程擁有的鎖。所以釋放鎖,需要利用lua腳本進(jìn)行,將判斷和刪除,這兩個動作,合為一個原子性的操作。
所以我們會利用代碼去執(zhí)行下面的lua腳本,保證判斷和刪除的原子性。

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

一般教程中,利用RedisTemplate來執(zhí)行l(wèi)ua腳本時,會將lua腳本放到靜態(tài)資源目錄中。而在下面的代碼中,利用ByteArrayResource直接從String字符串中讀取了lua腳本內(nèi)容:

    /*
     * 保存lua腳本
     */
    private DefaultRedisScript<List> getRedisScript;

    @PostConstruct
    public void init(){
        // 定義lua腳本資源
        // 也可以放到文件中,加載進(jìn)來: new ResourceScriptSource(new ClassPathResource("redis/demo.lua"))
        String luaStr = "if redis.call('get',KEYS[1]) == ARGV[1] then return   redis.call('del',KEYS[1])  else return 0 end";
        ByteArrayResource resource = new ByteArrayResource(luaStr.getBytes());

        getRedisScript = new DefaultRedisScript<>();
        getRedisScript.setResultType(List.class);
        getRedisScript.setScriptSource(new ResourceScriptSource(resource));
    }

2 分布式鎖實現(xiàn)

下面是實現(xiàn)的核心類:

RedisLock: reids分布式鎖工具類
EmLock: 分布式鎖注解
LockRangeEnum: 分布式鎖的范圍枚舉
EmLockAspect: 分布式鎖切面
2.1 RedisLock,reids分布式鎖工具類
代碼如下:

package com.emdata.lowvis.common.redislock;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.concurrent.TimeUnit;

/**
 * reids分布式鎖工具類
 *
 * @version 1.0
 * @date 2020/12/8 14:37
 */
@Slf4j
@Component
public class RedisLock {

    private static final String SPLIT = "_";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 加鎖解鎖工具類
     * @param lockKey 加鎖的key
     * @param uuid 線程的標(biāo)志
     * @param timeout 超時時間
     * @param timeUnit 超時時間粒度
     * @return true:獲取成功
     */
    public boolean lock(String lockKey, String uuid, long timeout, TimeUnit timeUnit) {
        // 根據(jù)key獲取值
        String currentLock = stringRedisTemplate.opsForValue().get(lockKey);

        // 值為:uuid_時間
        String value = uuid + SPLIT + (timeUnit.toMillis(timeout) + System.currentTimeMillis());

        // 如果為空,則設(shè)置值
        if (StringUtils.isEmpty(currentLock)) {
            if (stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, timeUnit)) {
                // 對應(yīng)setnx命令,可以成功設(shè)置,也就是key不存在,獲得鎖成功
                return true;
            } else {
                return false;
            }
        } else {
            // 可重入鎖,如果是這個uuid持有的鎖,則更新時間
            if (currentLock.startsWith(uuid)) {
                stringRedisTemplate.opsForValue().set(lockKey, value, timeout, timeUnit);
                return true;
            } else {
                return false;
            }
        }
    }
    
    /*
     * 保存lua腳本
     */
    private DefaultRedisScript<List> getRedisScript;

    @PostConstruct
    public void init(){
        // 定義lua腳本資源
        // 也可以放到文件中,加載進(jìn)來: new ResourceScriptSource(new ClassPathResource("redis/demo.lua"))
        String luaStr = "if redis.call('get',KEYS[1]) == ARGV[1] then return   redis.call('del',KEYS[1])  else return 0 end";
        ByteArrayResource resource = new ByteArrayResource(luaStr.getBytes());

        getRedisScript = new DefaultRedisScript<>();
        getRedisScript.setResultType(List.class);
        getRedisScript.setScriptSource(new ResourceScriptSource(resource));
    }

    /**
     * 釋放鎖
     *
     * @param lockKey 加鎖的key
     * @param uuid 線程的標(biāo)志
     */
    public void release(String lockKey, String uuid) {
        try {
            List<Integer> execute = stringRedisTemplate.execute(getRedisScript, Collections.singletonList(lockKey), uuid);
            log.debug("解鎖結(jié)果: {}", execute.get(0) == 0);
        } catch (Exception e) {
            log.error("解鎖異常, key: {}, uuid: {}", lockKey, uuid);
            log.error("", e);
        }
    }

}

2.2 EmLock,分布式鎖注解

package com.emdata.lowvis.common.redislock;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

/**
 * 分布式鎖注解
 *
 * @version 1.0
 * @date 2020/12/8 17:59
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface EmLock {

    /**
     * 鎖的范圍,默認(rèn)應(yīng)用級別
     * @return 鎖的范圍
     */
    LockRangeEnum lockRange() default LockRangeEnum.APPLICATION;

    /**
     * 鎖對應(yīng)的key
     * @return key
     */
    String key();

    /**
     * 鎖超時時間
     * @return 時間
     */
    int timeout() default 5;

    /**
     * 鎖超時時間粒度
     * @return 粒度
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /**
     * 是否自動釋放鎖
     * @return true: 方法完成后,自動釋放
     */
    boolean autoRelease() default true;
}

2.3 LockRangeEnum, 分布式鎖的范圍枚舉

package com.emdata.lowvis.common.redislock;

/**
 * 分布式鎖的范圍枚舉
 *
 * @author pupengfei
 * @version 1.0
 * @date 2020/12/10 13:46
 */
public enum LockRangeEnum {

    /**
     * 應(yīng)用級別,鎖的級別在整個應(yīng)用容器內(nèi)
     */
    APPLICATION,

    /**
     * 線程級別,鎖的級別在每個線程
     */
    THREAD

}

2.4 EmLockAspect,分布式鎖切面

package com.emdata.lowvis.common.redislock;

import com.emdata.lowvis.common.utils.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
 * 分布式鎖切面
 *
 * @version 1.0
 * @date 2020/12/8 17:59
 */
@Slf4j
@Component
@Aspect
@Configuration
public class EmLockAspect {

    @Autowired
    private RedisLock redisLock;

    /**
     * 應(yīng)用級別的容器的id
     */
    private final String appUUID = UUIDUtils.get();

    /**
     * 線程級別的線程的id
     */
    private final ThreadLocal<String> threadUUID = ThreadLocal.withInitial(UUIDUtils::get);

    /**
     * 定義切點
     */
    @Pointcut("@annotation(com.emdata.lowvis.common.redislock.EmLock)")
    public void lockAop() {

    }

    @Around("lockAop()")
    public Object doAround(ProceedingJoinPoint point) throws Throwable {
        // 獲取方法
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();

        // 看有沒有日志注解
        EmLock emLock = method.getAnnotation(EmLock.class);
        if (emLock == null) {
            return point.proceed();
        }

        // 獲取鎖的級別
        LockRangeEnum lockRangeEnum = emLock.lockRange();
        String uuid = lockRangeEnum == LockRangeEnum.APPLICATION ? appUUID : threadUUID.get();

        // 獲取鎖的key和超時時間
        String key = emLock.key();
        int timeout = emLock.timeout();
        TimeUnit timeUnit = emLock.timeUnit();

        // 加鎖
        boolean lock = redisLock.lock(key, uuid, timeout, timeUnit);

        Object proceed = null;

        try {
            if (lock) {
                log.info("獲取到鎖,繼續(xù)執(zhí)行...");
                // 繼續(xù)執(zhí)行
                proceed = point.proceed();
            }
        } finally {
            // 自動釋放,則釋放鎖
            if (emLock.autoRelease()) {
                redisLock.release(key, uuid);
            }
        }

        return proceed;
    }

}

3 使用示例

3.1 使用RedisLock

  @Autowired
private RedisLock redisLock;

public void useLock() {
    // 定義鎖的key
    String lockKey = "camera_update_key";
    String uuid = UUIDUtils.get();

    // 定義超時時間
    long timeout = 5;
    TimeUnit timeUnit = TimeUnit.SECONDS;

    // 加鎖
    boolean lock = redisLock.lock(lockKey, uuid, timeout, timeUnit);
    try {
        if (lock) {
            log.info("執(zhí)行...");
        } else {
            throw new IllegalStateException("未獲取到鎖,放棄執(zhí)行");
        }
    } finally {
        // 在finally里面進(jìn)行解鎖
        redisLock.release(lockKey, uuid);
    }
}

3.2 使用EmLock

@Component
@Slf4j
public class ScheduleTask {

    /**
     * 用在定時任務(wù)方法上,鎖的key為test_lock,指定了超時時間為2秒鐘
     * 鎖的級別為默認(rèn)的應(yīng)用級別(LockRangeEnum.APPLICATION),在這個如果應(yīng)用啟動了多個容器運行,在只會有一個容器獲取到鎖,
     * 自動釋放鎖為false,即方法執(zhí)行完成后,也不會自動釋放鎖,只有到超時時間了,鎖才會釋放
     */
    @Scheduled(cron = "0 0/1 * * * ? ")
    @EmLock(key = "test_lock", timeout = 2, timeUnit = TimeUnit.SECONDS, autoRelease = false)
    public void recordUpdateTask() {
        log.info("執(zhí)行任務(wù).......");
    }
   
   /**
     * 用在普通的方法上,鎖的key為method_Lock,指定了超時時間為1分鐘,
     * 鎖的級別為默認(rèn)的線程級別,在該應(yīng)用內(nèi)多個線程執(zhí)行該方法,則只會有一個線程獲取到鎖
     * 如果啟動了多個應(yīng)用容器,同樣多個容器內(nèi)的所有線程,也只會有一個線程獲取到鎖
     */
    @EmLock(key = "method_Lock", timeout = 1, timeUnit = TimeUnit.MINUTES, lockRange = LockRangeEnum.THREAD)
    public void recordUpdate() {
        log.info("執(zhí)行任務(wù)2.......");
    }
}

4 使用注意

使用Redis作為分布式鎖的實現(xiàn),依賴于Redis服務(wù),如果Redis服務(wù)無法正常訪問,則會導(dǎo)致整個方法無法執(zhí)行。
如果EmLock注解用在定時任務(wù)上時,如果應(yīng)用運行在不同的服務(wù)器上,或者不同的docker容器里面時,必須保證運行環(huán)境的時間一致。
如果設(shè)置了定時任務(wù)上面的鎖,不是自動釋放的,則運行環(huán)境的時間,相差不大于鎖超時時間的時候,也可以保證定時任務(wù),唯一執(zhí)行。因為在超時時間范圍內(nèi),某個應(yīng)用容器持有該鎖,其他應(yīng)用來獲取鎖時,同樣獲取不到,方法不會執(zhí)行。

作者:Knight_9
鏈接:http://www.itdecent.cn/p/5190600e44c1
來源:簡書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 簡述 利用Redis的Setnx命令,來實現(xiàn)一個分布式的加鎖方案。利用注解,在擁有該注解的方法上,進(jìn)行切面處理,在...
    Knight_9閱讀 2,076評論 2 2
  • 1、一個tomcat是一個進(jìn)程,其中有很多線程(與有多少個app無關(guān)) 2、一個tomcat啟動一個JVM,其中可...
    ZHL_e522閱讀 555評論 0 0
  • 目前實現(xiàn)分布式鎖的方式主要有數(shù)據(jù)庫、Redis和Zookeeper三種,本文主要闡述利用Redis的相關(guān)命令來實現(xiàn)...
    Aldeo閱讀 2,180評論 0 6
  • 一、背景 我們在開發(fā)很多業(yè)務(wù)場景會使用到鎖,例如庫存控制,抽獎等。一般我們會使用內(nèi)存鎖的方式來保證線性的執(zhí)行。但現(xiàn)...
    楊健kimyeung閱讀 403評論 0 0
  • 16宿命:用概率思維提高你的勝算 以前的我是風(fēng)險厭惡者,不喜歡去冒險,但是人生放棄了冒險,也就放棄了無數(shù)的可能。 ...
    yichen大刀閱讀 8,074評論 0 4

友情鏈接更多精彩內(nèi)容