1. 分布式應(yīng)用并發(fā)問(wèn)題
分布式應(yīng)用進(jìn)行邏輯處理時(shí)經(jīng)常會(huì)遇到并發(fā)問(wèn)題,比如下面這個(gè)例子。
在Redis里對(duì)"account"這個(gè)key進(jìn)行操作,兩個(gè)客戶端同時(shí)要修改account,首先要讀取account的值,然后對(duì)其進(jìn)行修改,修改完再存回去,那么就會(huì)出現(xiàn)并發(fā)問(wèn)題,因?yàn)樽x取和保存狀態(tài)這兩個(gè)操作不是原子的。

2. Redis實(shí)現(xiàn)分布式鎖
2.1 第一個(gè)示例
2.1.1 創(chuàng)建工程
首先創(chuàng)建一個(gè)空工程RedisStart,pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>RedisStart</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>redis-lock</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<profiles>
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>
</profiles>
</project>
2.1.2 創(chuàng)建子工程
創(chuàng)建子工程redis-lock,工程結(jié)構(gòu)如下

pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RedisStart</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>redis-lock</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置文件:
server:
port: 8090
spring:
redis:
host: 127.0.0.1
port: 6379
2.1.3 啟動(dòng)類
package com.redisson;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
2.1.4 關(guān)鍵類
首先先在Redis中執(zhí)行set account 50這條指令,加入數(shù)據(jù)。然后請(qǐng)看下面代碼:
package com.redisson;
import org.redisson.Redisson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class IndexController {
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct account")
public String deductAccount() throws InterruptedException {
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
return "end";
}
}
上面代碼重現(xiàn)了開頭提出的問(wèn)題,如果有多線程知識(shí)儲(chǔ)備的話,可以很容易想到,加入synchronized關(guān)鍵字即可解決。
synchronized(this){
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
return "end";
}
}
2.1.5 分布式鎖簡(jiǎn)單實(shí)現(xiàn)setnx
如果上邊的服務(wù)部署在單機(jī)模式下,并且程序沒(méi)有其他異常的話,可以解決原子性問(wèn)題,但是現(xiàn)在的服務(wù)都是在集群部署下的,如果有多臺(tái)服務(wù)的話,上面代碼無(wú)法解決這個(gè)問(wèn)題。
如果看過(guò)前面Redis應(yīng)用之常用數(shù)據(jù)類型文章的話,可以知道Redis提供了setnx指令實(shí)現(xiàn)分布式鎖,可以用以下代碼實(shí)現(xiàn)。
String lockKey = "lockKey";
//獲取鎖信息
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
//鎖還沒(méi)有被釋放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
//業(yè)務(wù)代碼執(zhí)行完,釋放鎖
stringRedisTemplate.delete(lockKey);
return "end";
這樣即使是集群部署下的服務(wù),也可以實(shí)現(xiàn)分布式鎖功能了,但是如果在執(zhí)行釋放鎖那行代碼之前,有異常的話,會(huì)導(dǎo)致該鎖一直被占用,無(wú)法釋放。
2.1.6 解決程序異常導(dǎo)致的無(wú)法釋放鎖
加入finally邏輯,可以保證即使程序有異常的話,也可以釋放鎖。
String lockKey = "lockKey";
try {
//獲取鎖信息
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
//鎖還沒(méi)有被釋放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
}finally {
//業(yè)務(wù)代碼執(zhí)行完,釋放鎖
stringRedisTemplate.delete(lockKey);
}
return "end";
上面代碼解決了,如果程序有異常的話,無(wú)法釋放鎖的問(wèn)題,但是如果在代碼執(zhí)行的一半的時(shí)候,服務(wù)掛了,或者被Kill掉的話,就不會(huì)執(zhí)行finally里邊的代碼,所以依然無(wú)法釋放鎖。
2.1.7 setnx命令的原子操作
設(shè)置鎖的時(shí)候,直接對(duì)該鎖加入超時(shí)時(shí)間,可以解決由于上邊的問(wèn)題。
String lockKey = "lockKey";
try {
//設(shè)置鎖超時(shí)時(shí)間
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock",10, TimeUnit.MILLISECONDS);
//鎖還沒(méi)有被釋放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
}finally {
//業(yè)務(wù)代碼執(zhí)行完,釋放鎖
stringRedisTemplate.delete(lockKey);
}
return "end";
如果并發(fā)不高的情況下,上面代碼可以實(shí)現(xiàn)一個(gè)良好的分布式鎖,但是如果在高并發(fā)情況下,會(huì)導(dǎo)致鎖一直失效。舉個(gè)例子:如果鎖的失效時(shí)間是1秒,A線程成功獲得了該鎖,但是由于并發(fā)比較高,導(dǎo)致程序在1秒鐘沒(méi)有執(zhí)行完,就不會(huì)執(zhí)行刪除鎖的代碼,但是這個(gè)時(shí)候由于超過(guò)了1秒,所以A線程獲取的鎖已經(jīng)失效了, 其他線程可以獲得該鎖,假設(shè)B線程獲取到了這把鎖,開始執(zhí)行后邊的邏輯,這個(gè)時(shí)候A線程執(zhí)行了finally中的邏輯,把B線程獲取到的鎖刪除了,那么其他線程又可以獲取到這把鎖,而過(guò)一段時(shí)間該鎖又失效了,而B線程就會(huì)刪除C線程過(guò)去到的鎖,以此類推,這個(gè)鎖就會(huì)一直失效。
2.1.8 鎖信息與線程信息綁定
為每一個(gè)線程設(shè)置線程ID,并將該ID作為鎖Key信息的value值,在刪除鎖的時(shí)候,進(jìn)行判斷,如果是本線程自己加的鎖,可以刪除,如果不是,不能刪除。
String lockKey = "lockKey";
String clientId = UUID.randomUUID().toString();
try {
//設(shè)置鎖超時(shí)時(shí)間
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId,30, TimeUnit.MILLISECONDS);
//鎖還沒(méi)有被釋放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
}finally {
//判斷該鎖是不是當(dāng)前線程加的
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
//業(yè)務(wù)代碼執(zhí)行完,釋放鎖
stringRedisTemplate.delete(lockKey);
}
}
return "end";
至此,已經(jīng)實(shí)現(xiàn)了一把比較完善的分布式鎖,但是還是會(huì)有一些問(wèn)題,比如業(yè)務(wù)代碼執(zhí)行時(shí)間大于鎖的過(guò)期時(shí)間,如何對(duì)該鎖進(jìn)行續(xù)期?可以簡(jiǎn)單的想到,在主線程啟動(dòng)的時(shí)候,啟動(dòng)另外一個(gè)線程,監(jiān)控這把鎖的狀態(tài),啟動(dòng)一個(gè)定時(shí)任務(wù),定期監(jiān)控該鎖,如果過(guò)一段時(shí)間,鎖還沒(méi)釋放,就把鎖的失效時(shí)間重新置位啟始值。但這事兒說(shuō)起來(lái)容易,實(shí)際寫的時(shí)候,稍不注意,就會(huì)引起很多BUG。
2.2 用Redisson實(shí)現(xiàn)一個(gè)分布式鎖
好在,貼心的Redisson已經(jīng)幫我們實(shí)現(xiàn)了功能完善的分布式鎖。請(qǐng)看下面代碼:
String lockKey = "lockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加鎖,實(shí)現(xiàn)鎖續(xù)命
redissonLock.lock();
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣減成功,剩余庫(kù)存:" + realAccount + "");
}else {
System.out.println("扣減失敗,庫(kù)存不足");
}
}finally {
//釋放鎖
redissonLock.unlock();
}
可以看到加鎖邏輯十分簡(jiǎn)潔。
2.2.1 Redisson加鎖邏輯
下邊用一張簡(jiǎn)單的圖來(lái)說(shuō)明下:

2.2.2 源碼閱讀
由于整個(gè)Redisson的代碼實(shí)在太多,所以這里只看主要邏輯。
2.2.2.1 初始化邏輯
先看下邊代碼對(duì)應(yīng)的源碼
RLock redissonLock = redisson.getLock(lockKey);
會(huì)調(diào)用RedissonLock的構(gòu)造方法,初始化鎖的名稱以及鎖的失效時(shí)間為30秒
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
初始化鎖的失效時(shí)間
private long lockWatchdogTimeout = 30 * 1000;
2.2.2.2 加鎖邏輯
調(diào)用RedissonLock.lockInterruptibly()
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
下邊代碼大體邏輯就是,先嘗試獲取鎖,如果獲取成功就直接返回,如果沒(méi)有獲取成功,就自旋,直到獲取成功。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//獲取當(dāng)前線程ID
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 獲取鎖就返回
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
//沒(méi)獲取鎖自旋獲取
while (true) {
//再次嘗試獲取鎖
ttl = tryAcquire(leaseTime, unit, threadId);
//獲取鎖就返回
if (ttl == null) {
break;
}
//自旋的時(shí)候等待一段時(shí)間,下次再次獲取
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
獲取鎖的邏輯
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
繼續(xù)看tryAcquireAsync(leaseTime, unit, threadId)
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
由于傳入的leaseTime為-1,所以請(qǐng)看如下代碼
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
Redisson源碼大量調(diào)用了Lua腳本,用于實(shí)現(xiàn)原子性。下面來(lái)分析這幾個(gè)腳本,KEYS[1]為鎖的名稱、ARGV[1]為鎖的失效時(shí)間、ARGV[2]為當(dāng)前線程的ID。
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
上邊腳本的邏輯為,首先判斷鎖是否存在,如果鎖不存在,那么就設(shè)置該鎖,并設(shè)置失效時(shí)間,返回空;如果鎖已存在,就判斷該鎖是否是當(dāng)前線程加的鎖,如果是當(dāng)前線程加的鎖,就對(duì)這個(gè)鎖的調(diào)用次數(shù)加1,并刷新鎖的失效時(shí)間,返回空;如果不是當(dāng)前線程加的鎖,那么該返回該鎖的失效時(shí)間。
下面來(lái)看監(jiān)聽里邊相關(guān)的代碼,這部分代碼用了一個(gè)延時(shí)的任務(wù)執(zhí)行,每隔10秒就查詢當(dāng)前線程獲取的鎖狀態(tài),如果存在就對(duì)失效時(shí)間進(jìn)行刷新,并返回1,不存在就直接返回0。
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
2.2.3 存在問(wèn)題
由于Redis一般是集群部署的,所以會(huì)出現(xiàn)由于主節(jié)點(diǎn)掛掉的話,從節(jié)點(diǎn)會(huì)取而代之。這時(shí)候如果客戶端在主節(jié)點(diǎn)上申請(qǐng)成功了一把鎖,但是這把鎖還沒(méi)有來(lái)得及同步到從節(jié)點(diǎn),主節(jié)點(diǎn)突然掛了,然后從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn),這個(gè)新的節(jié)點(diǎn)內(nèi)部沒(méi)有這個(gè)鎖,所以當(dāng)另一個(gè)客戶端過(guò)來(lái)請(qǐng)求加鎖時(shí),立即就批準(zhǔn)了,這樣就導(dǎo)致系統(tǒng)中同樣一把鎖被兩個(gè)客戶端同時(shí)持有,不安全性由此產(chǎn)生。
3. RedLock算法
為了解決由于主節(jié)點(diǎn)掛掉導(dǎo)致多個(gè)客戶端同時(shí)持有一把鎖的問(wèn)題,Antirez發(fā)明了RedLock算法,它的流程比較復(fù)雜,不過(guò)已經(jīng)有了很多大神開發(fā)了開源的庫(kù),用戶可以拿來(lái)即用,比如redlock-py以及Redisson。
為了使用 Redlock,需要提供多個(gè)Redis實(shí)例,這些實(shí)例之前相互獨(dú)立沒(méi)有主從關(guān)系。同很多分布式算法一樣,redlock 也使用「大多數(shù)機(jī)制」。
加鎖時(shí),它會(huì)向過(guò)半節(jié)點(diǎn)發(fā)送 set(key, value, nx=True, ex=xxx) 指令,只要過(guò)半節(jié)點(diǎn) set成功,那就認(rèn)為加鎖成功。釋放鎖時(shí),需要向所有節(jié)點(diǎn)發(fā)送 del 指令。不過(guò) Redlock 算法還
需要考慮出錯(cuò)重試、時(shí)鐘漂移等很多細(xì)節(jié)問(wèn)題,同時(shí)因?yàn)?Redlock 需要向多個(gè)節(jié)點(diǎn)進(jìn)行讀寫,意味著相比單實(shí)例 Redis 性能會(huì)下降一些。
3.1 RedLock使用場(chǎng)景
如果你很在乎高可用性,希望掛了一臺(tái) redis 完全不受影響,那就應(yīng)該考慮 redlock。不過(guò)代價(jià)也是有的,需要更多的 redis 實(shí)例,性能也下降了,代碼上還需要引入額外的library,運(yùn)維上也需要特殊對(duì)待,這些都是需要考慮的成本,使用前請(qǐng)?jiān)偃遄谩?/p>
參考資料
- 《Redis深度歷險(xiǎn)》