一、 分布式鎖的實(shí)現(xiàn)
關(guān)于鎖,其實(shí)我們或多或少都有接觸過(guò)一些,比如synchronized、 Lock這些,這類鎖的目的很簡(jiǎn)單,在多線程環(huán)境下,對(duì)共享資源的訪問(wèn)造成的線程安全問(wèn)題,通過(guò)鎖的機(jī)制來(lái)實(shí)現(xiàn)資源訪問(wèn)互斥。那么什么是分布式鎖呢?或者為什么我們需要通過(guò)Redis來(lái)構(gòu)建分布式鎖,其實(shí)最根本原因就是Score(范圍),因?yàn)樵诜植际郊軜?gòu)中,所有的應(yīng)用都是進(jìn)程隔離的,在多進(jìn)程訪問(wèn)共享資源的時(shí)候我們需要滿足互斥性,就需要設(shè)定一個(gè)所有進(jìn)程都能看得到的范圍,而這個(gè)范圍就是Redis本身。所以我們才需要把鎖構(gòu)建到Redis中。
Redis里面提供了一些比較具有能夠?qū)崿F(xiàn)鎖特性的命令,比如SETEX(在鍵不存在的情況下為鍵設(shè)置值),那么我們可以基于這個(gè)命令來(lái)去實(shí)現(xiàn)一些簡(jiǎn)單的鎖的操作
二、 分布式鎖的實(shí)戰(zhàn)
分布式鎖的實(shí)現(xiàn)會(huì)采用兩種方式來(lái)實(shí)現(xiàn),一種自己手動(dòng)來(lái)實(shí)現(xiàn),一種是基于Redission的客戶端來(lái)實(shí)現(xiàn)
基于jedis手動(dòng)實(shí)現(xiàn)分布式鎖
代碼如下:
/**
* @Project: redis
* @description: redis的分布式鎖的手動(dòng)實(shí)現(xiàn)
* @author: sunkang
* @create: 2019-01-12 16:54
* @ModificationHistory who when What
**/
public class DistributedLock {
/**
* 獲取鎖
* @param lockName 鎖的名稱
* @param accquireTimeout 獲取鎖的超時(shí)時(shí)間
* @param lockTimeout 鎖本身過(guò)期時(shí)間
* @return
*/
public String acquireLock(String lockName ,long accquireTimeout,long lockTimeout){
String identify = UUID.randomUUID().toString();
String lockKey = "lock:"+lockName;
//獲取redis客戶端
long endTime = System.currentTimeMillis()+ accquireTimeout;
Jedis redis = JedisPoolUtils.getRedis();
int expireTime = (int) (lockTimeout/1000);
try{
while (System.currentTimeMillis()<endTime){
//setnx表示如果鍵不存在則設(shè)置,返回1證明key設(shè)置成功了
if( redis.setnx(lockKey,identify) ==1 ){
redis.expire(lockKey, expireTime);
return identify;
}
//說(shuō)明key的過(guò)期時(shí)間沒(méi)有設(shè)置,但是key存在,設(shè)置過(guò)期時(shí)間
if(redis.ttl(lockKey) == -1){
redis.expire(lockKey,expireTime);
}
try {
//等待片刻后進(jìn)行獲取鎖的重試
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
//最終關(guān)閉連接
redis.close();
}
return null;
}
/**
* 釋放鎖
* @param lockName
* @param identifer
*/
public boolean releaseLock(String lockName,String identifer){
System.out.println(lockName+"開(kāi)始釋放鎖:"+identifer);
boolean release = false;
String lockKey = "lock:"+ lockName;
Jedis jedis = JedisPoolUtils.getRedis();
try {
while (true){
//命令用于監(jiān)視一個(gè)(或多個(gè)) key ,如果在事務(wù)執(zhí)行之前這個(gè)key,被其他命令所改動(dòng),那么事務(wù)將被打斷
//保證了開(kāi)啟監(jiān)控到執(zhí)行對(duì)該key執(zhí)行事務(wù)提交之間,不能修改,保證了這段期間key的數(shù)據(jù)原子性。
jedis.watch(lockKey);
//如果是同一把鎖
if(jedis.get(lockKey).equals(identifer)){
Transaction transaction = jedis.multi();
//主動(dòng)刪除鎖
transaction.del(lockKey);
//執(zhí)行結(jié)果為空,說(shuō)明執(zhí)行有問(wèn)題,在輪訓(xùn)一次
if(transaction.exec().isEmpty()){
release = false;
continue;
}
release =true;
}
jedis.unwatch();
break;
}
}finally {
jedis.close();
}
return release;
}
/**
* 通過(guò)lua腳本來(lái)釋放鎖
* lua腳本執(zhí)行保證了原子性
* @param lockName
* @param identifer
* @return
*/
public boolean releaseLockWithLua(String lockName,String identifer){
System.out.println(lockName+"開(kāi)始釋放鎖:"+identifer);
Jedis jedis = JedisPoolUtils.getRedis();
String lockKey = "lock:"+ lockName;
String lua ="if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1])" +
"else return 0 end ";
Long rs=(Long)jedis.eval(lua,1,lockKey,identifer);
jedis.close();
if(rs.intValue()> 0){
return true;
}
return false;
}
}
- 測(cè)試代碼
/**
* @Project: redis
* @description: 分布式測(cè)試
* 開(kāi)啟10個(gè)線程,模擬十個(gè)客戶端請(qǐng)求鎖
* @author: sunkang
* @create: 2019-01-12 17:45
* @ModificationHistory who when What
**/
public class LockTest extends Thread{
@Override
public void run() {
while(true){
DistributedLock distributedLock=new DistributedLock();
String rs=distributedLock.acquireLock("updateOrder",
2000,5000);
if(rs!=null){
System.out.println(Thread.currentThread().getName()+"-> 成功獲得鎖:"+rs);
try {
Thread.sleep(1000);
//釋放鎖
distributedLock.releaseLockWithLua("updateOrder",rs);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}
}
public static void main(String[] args) {
LockTest lockTest=new LockTest();
for(int i=0;i<10;i++){
new Thread(lockTest,"tName:"+i).start();
}
}
}
- 輸出結(jié)果 : 基本是鎖釋放了之后,其他的線程才得到了鎖
tName:7-> 成功獲得鎖:38c683cb-1797-4abb-aa57-200e770f7445
updateOrder開(kāi)始釋放鎖:38c683cb-1797-4abb-aa57-200e770f7445
tName:6-> 成功獲得鎖:873dfe4e-00ee-4d2b-9bc5-bb16e83f8879
updateOrder開(kāi)始釋放鎖:873dfe4e-00ee-4d2b-9bc5-bb16e83f8879
tName:8-> 成功獲得鎖:e6c48dea-87d9-4fd5-ab04-09ed2f725307
updateOrder開(kāi)始釋放鎖:e6c48dea-87d9-4fd5-ab04-09ed2f725307
tName:0-> 成功獲得鎖:e0d764a5-30b5-4b6b-8490-73ffd78727e0
...
總結(jié):
acquireLock的方法主要是競(jìng)爭(zhēng)鎖的時(shí)間內(nèi),不斷輪訓(xùn)去查看lockKey的是否存在,不存在則設(shè)置鎖,返回鎖,存在會(huì)一直嘗試獲取鎖,直到獲取鎖的超時(shí)時(shí)間結(jié)束,然后釋放redis連接在釋放鎖的方式,有兩種方式,一種是給鎖本身設(shè)置超時(shí)時(shí)間,時(shí)間到了,鎖本身釋放了,一種是自動(dòng)方式來(lái)刪除鎖,刪除鎖為了鎖在釋放過(guò)程中的原子性,一個(gè)使用了lua腳本,一個(gè)使用redis的watch機(jī)制和事務(wù)刪除的方式
基于redisson實(shí)現(xiàn)分布式鎖
maven依賴:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.15.0</version>
</dependency>
redisson的分布式代碼如下
/**
* @Project: redis
* @description: 基于redission的api實(shí)現(xiàn)分布式鎖
* @author: sunkang
* @create: 2019-01-12 18:08
* @ModificationHistory who when What
**/
public class RedissionLock {
public static void main(String[] args) {
Config config = new Config();
//使用單個(gè)節(jié)點(diǎn)連接,并且設(shè)置連接地址
config.useSingleServer().setAddress("redis://192.168.44.129:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("updateOrder");
try {
//嘗試獲取鎖100秒,鎖釋放時(shí)間為10秒
lock.tryLock(100,10,TimeUnit.SECONDS);
System.out.println("得到鎖");
Thread.sleep(100);
//釋放鎖
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
基于redission 的實(shí)現(xiàn)分布式鎖的方式,獲取鎖的關(guān)鍵代碼在org.redisson.RedissonLock#tryLockInnerAsync這個(gè)方法上:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
通過(guò)lua腳本來(lái)實(shí)現(xiàn)加鎖的操作
- 判斷l(xiāng)ock鍵是否存在,不存在直接調(diào)用hset存儲(chǔ)當(dāng)前線程信息并且設(shè)置過(guò)期時(shí)間,返回nil,告訴客戶端直接獲取到鎖。
- 判斷l(xiāng)ock鍵是否存在,存在則將重入次數(shù)加1,并重新設(shè)置過(guò)期時(shí)間,返回nil,告訴客戶端直接獲取到鎖。
- 被其它線程已經(jīng)鎖定,返回鎖有效期的剩余時(shí)間,告訴客戶端需要等待。
釋放鎖的關(guān)鍵代碼在方法: org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
- 如果lock鍵不存在,發(fā)消息說(shuō)鎖已經(jīng)可用,發(fā)送一個(gè)消息
- 如果鎖不是被當(dāng)前線程鎖定,則返回nil
- 由于支持可重入,在解鎖時(shí)將重入次數(shù)需要減1
- 如果計(jì)算后的重入次數(shù)>0,則重新設(shè)置過(guò)期時(shí)間
- 如果計(jì)算后的重入次數(shù)<=0,則發(fā)消息說(shuō)鎖已經(jīng)可用