原來基于Redis分布式鎖的打開方式是這樣的啊

分布式鎖是在分布式環(huán)境下(多個JVM進(jìn)程)控制多個客戶端對某一資源的同步訪問的一種實現(xiàn),與之相對應(yīng)的是線程鎖,線程鎖控制的是同一個JVM進(jìn)程內(nèi)多個線程之間的同步。分布式鎖的一般實現(xiàn)方法是在應(yīng)用服務(wù)器之外通過一個共享的存儲服務(wù)器存儲鎖資源,同一時刻只有一個客戶端能占有鎖資源來完成。通常有基于Zookeeper,Redis,或數(shù)據(jù)庫三種實現(xiàn)形式。本文介紹基于Redis的實現(xiàn)方案。

要求

基于Redis實現(xiàn)分布式鎖需要滿足如下幾點要求:

在分布式集群中,被分布式鎖控制的方法或代碼段同一時刻只能被一個客戶端上面的一個線程執(zhí)行,也就是互斥
鎖信息需要設(shè)置過期時間,避免一個線程長期占有(比如在做解鎖操作前異常退出)而導(dǎo)致死鎖
加鎖與解鎖必須一致,誰加的鎖,就由誰來解(或過期超時),一個客戶端不能解開另一個客戶端加的鎖
加鎖與解鎖的過程必須保證原子性

實現(xiàn)

1. 加鎖實現(xiàn)

基于Redis的分布式鎖加鎖操作一般使用 SETNX 命令,其含義是“將 key 的值設(shè)為 value ,當(dāng)且僅當(dāng) key 不存在。若給定的 key 已經(jīng)存在,則 SETNX 不做任何動作”。在 Spring Boot 中,可以使用 StringRedisTemplate 來實現(xiàn),如下,一行代碼即可實現(xiàn)加鎖過程。(下列代碼給出兩種調(diào)用形式——立即返回加鎖結(jié)果與給定超時時間獲取加鎖結(jié)果)

/**
    * 嘗試獲取鎖(立即返回)
    * @param key  鎖的redis key
    * @param value 鎖的value
    * @param expire 過期時間/秒
    * @return 是否獲取成功
    */
public boolean lock(String key, String value, long expire) {
    return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
}

/**
    * 嘗試獲取鎖,并至多等待timeout時長
    *
    * @param key  鎖的redis key
    * @param value 鎖的value
    * @param expire 過期時間/秒
    * @param timeout 超時時長
    * @param unit    時間單位
    * @return 是否獲取成功
    */
public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {
    long waitMillis = unit.toMillis(timeout);
    long waitAlready = 0;

    while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) {
        try {
            Thread.sleep(waitMillisPer);
        } catch (InterruptedException e) {
            log.error("Interrupted when trying to get a lock. key: {}", key, e);
        }
        waitAlready += waitMillisPer;
    }

    if (waitAlready < waitMillis) {
        return true;
    }
    log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);
    return false;
}

上述實現(xiàn)如何滿足前面提到的幾點要求:

客戶端互斥: 可以將expire過期時間設(shè)置為大于同步代碼的執(zhí)行時間,比如同步代碼塊執(zhí)行時間為1s,則可將expire設(shè)置為3s或5s。避免同步代碼執(zhí)行過程中expire時間到,其它客戶端又可以獲取鎖執(zhí)行同步代碼塊。
通過設(shè)置過期時間expire來避免某個客戶端長期占有鎖。
通過value來控制誰加的鎖,由誰解的邏輯,比如可以使用requestId作為value,requestId唯一標(biāo)記一次請求。
setIfAbsent方法 底層通過調(diào)用 Redis 的 SETNX 命令,操作具備原子性。

錯誤示例:

網(wǎng)上有如下實現(xiàn),

public boolean lock(String key, String value, long expire) {
    boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
    if(result) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
    }
    return result;
}

該實現(xiàn)的問題是如果在result為true,但還沒成功設(shè)置expire時,程序異常退出了,將導(dǎo)致該鎖一直被占用而導(dǎo)致死鎖,不滿足第二點要求。

2. 解鎖實現(xiàn)

解鎖也需要滿足前面所述的四個要求,實現(xiàn)代碼如下:

private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;
/**
    * 釋放鎖
    * @param key  鎖的redis key
    * @param value 鎖的value
    */
public boolean unLock(String key, String value) {
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
    long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
    return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}

這段實現(xiàn)使用一個Lua腳本來實現(xiàn)解鎖操作,保證操作的原子性。傳入的value值需與該線程加鎖時的value一致,可以使用requestId(具體實現(xiàn)下面給出)。

錯誤示例:

   public boolean unLock(String key, String value) {
        String oldValue = stringRedisTemplate.opsForValue().get(key);
        if(value.equals(oldValue)) {
            stringRedisTemplate.delete(key);
        }
}

該實現(xiàn)先獲取鎖的當(dāng)前值,判斷兩值相等則刪除??紤]一種極端情況,如果在判斷為true時,剛好該鎖過期時間到,另一個客戶端加鎖成功,則接下來的delete將不管三七二十一將別人加的鎖直接刪掉了,不滿足第三點要求。該示例主要是因為沒有保證解鎖操作的原子性導(dǎo)致。

3. 注解支持

為了方便使用,添加一個注解,可以放于方法上控制方法在分布式環(huán)境中的同步執(zhí)行。

/**
* 標(biāo)注在方法上的分布式鎖注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
    String key();
    String prefix() default "disLock:";
    long expire() default 10L; // 默認(rèn)10s過期
}

添加一個切面來解析注解的處理,

/**
* 分布式鎖注解處理切面
*/
@Aspect
@Slf4j
public class DistributedLockAspect {

    private DistributedLock lock;

    public DistributedLockAspect(DistributedLock lock) {
        this.lock = lock;
    }

    /**
     * 在方法上執(zhí)行同步鎖
     */
    @Around(value = "@annotation(lockable)")
    public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {
        boolean locked = false;
        String key = lockable.prefix() + lockable.key();
        try {
            locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire());
            if(locked) {
                return point.proceed();
            } else {
                log.info("Did not get a lock for key {}", key);
                return null;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if(locked) {
                if(!lock.unLock(key, WebUtil.getRequestId())){
                    log.warn("Unlock {} failed, maybe locked by another client already. ", lockable.key());
                }
            }
        }
    }
}

RequestId 的實現(xiàn)如下,通過注冊一個Filter,在請求開始時生成一個uuid存于ThreadLocal中,在請求返回時清除。

public class WebUtil {

    public static final String REQ_ID_HEADER = "Req-Id";

    private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();

    public static void setRequestId(String requestId) {
        reqIdThreadLocal.set(requestId);
    }

    public static String getRequestId(){
        String requestId = reqIdThreadLocal.get();
        if(requestId == null) {
            requestId = ObjectId.next();
            reqIdThreadLocal.set(requestId);
        }
        return requestId;
    }

    public static void removeRequestId() {
        reqIdThreadLocal.remove();
    }
}

public class RequestIdFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
        String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);
        //沒有則生成一個
        if (StringUtils.isEmpty(reqId)) {
            reqId = ObjectId.next();
        }
        WebUtil.setRequestId(reqId);
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            WebUtil.removeRequestId();
        }
    }
}

//在配置類中注冊Filter

/**
* 添加RequestId
* @return
*/
@Bean
public FilterRegistrationBean requestIdFilter() {
    RequestIdFilter reqestIdFilter = new RequestIdFilter();
    FilterRegistrationBean registrationBean = new FilterRegistrationBean();
    registrationBean.setFilter(reqestIdFilter);
    List<String> urlPatterns = Collections.singletonList("/*");
    registrationBean.setUrlPatterns(urlPatterns);
    registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);
    return registrationBean;
}

4. 使用注解

@DistributedLockable(key = "test", expire = 10)
public void test(){
    System.out.println("線程-"+Thread.currentThread().getName()+"開始執(zhí)行..." + LocalDateTime.now());
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("線程-"+Thread.currentThread().getName()+"結(jié)束執(zhí)行..." + LocalDateTime.now());
}

總結(jié)

本文給出了基于Redis的分布式鎖的實現(xiàn)方案與常見的錯誤示例。要保障分布式鎖的正確運行,需滿足本文所提的四個要求,尤其注意保證加鎖解鎖操作的原子性,設(shè)置過期時間,及對同一個鎖的加鎖解鎖線程一致。

感謝你看到這里,我是程序員麥冬,一個java開發(fā)從業(yè)者,深耕行業(yè)六年了,每天都會分享java相關(guān)技術(shù)文章或行業(yè)資訊

歡迎大家關(guān)注和轉(zhuǎn)發(fā)文章,后期還有福利贈送!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容