經(jīng)常聽到分布式鎖這個(gè)名詞,覺得高大上又離我很遙遠(yuǎn),于是也沒有進(jìn)行什么深究。直到面試被反復(fù)問道,我覺得有必要了解一下,以跟上這個(gè)時(shí)代的節(jié)奏。
常見的分布式鎖實(shí)現(xiàn),一個(gè)是ZooKeeper,另一個(gè)就是Redis了。Redis這么熟悉,當(dāng)然是拿它先開刀了。在進(jìn)行鎖的實(shí)現(xiàn)的時(shí)候,有幾個(gè)基本概念,需要先說明一下。
第一個(gè)就是鎖的概念,這個(gè)鎖不像java或者jvm中那種復(fù)雜又難懂的鎖,你可以把鎖理解為一個(gè)值,根據(jù)這個(gè)值的存在與否,來決定是否獲取到鎖。
在Redis中,有一種稱為樂觀鎖的東西,什么叫樂觀鎖,意思就是這種策略是樂觀的,總樂觀的認(rèn)為別的客戶端連接不會(huì)修改該值,因此除了本客戶端連接可以讀取到某種資源(存儲(chǔ)在Redis上的鍵值對(duì)信息),其它客戶端連接也可以讀取到。那萬一其余客戶端連接修改了資源呢?這個(gè)時(shí)候Redis Server會(huì)給加了樂觀鎖的客戶端連接發(fā)送消息,該訴你該數(shù)據(jù)已經(jīng)修改過了,這個(gè)時(shí)候本客戶端連接可以選擇循環(huán)重試或者直接退出。這個(gè)聽起來是不是像CAS的操作呢? 我的感覺是像極了。 具體涉及的命令就是watch unwatch multi exec,這些命令什么意思,下文詳細(xì)說明。
另一種鎖稱之為悲觀鎖。什么是悲觀鎖呢?就是客戶端連接總是認(rèn)為數(shù)據(jù)隨時(shí)都可能被其余客戶端連接修改掉,所以加了悲觀鎖,其余的客戶端連接在此刻是無法讀取到資源,自然也無法進(jìn)行操作。當(dāng)本客戶端連接釋放了鎖,其余的客戶端連接才可以獲取鎖,進(jìn)而進(jìn)行操作。涉及的命令不僅包括watch unwatch multi exec,也包括setnx ttl expire等命令,同樣后文細(xì)說。
不像java中已經(jīng)提供了各種鎖以及同步容器,并發(fā)工具,Redis本身它不提供任何鎖的實(shí)現(xiàn),也沒有樂觀鎖、悲觀鎖,超時(shí)鎖等東西,這需要我們組合使用Redis的各種原子命令以及不同的命令特性,來自己實(shí)現(xiàn)鎖,同時(shí)在代碼層面一致的使用和釋放鎖。
再來說一下Redis客戶端。Redis是用c語言寫的,要么使用自帶的redis-cli進(jìn)行連接和交互,要么使用根據(jù)通信協(xié)議封裝好的不同語言的客戶端庫(kù)。Redis的java客戶端庫(kù)很多,常用的有Jedis、Lettuce 。并且Spring SpringBoot默認(rèn)的就是支持這兩種客戶端,所以學(xué)習(xí)了這兩個(gè)客戶端對(duì)以后也很有幫助。Lettuce 的功能更加強(qiáng)大,它基于netty,支持Redis的哨兵模式和集群模式。但是由于我知識(shí)淺薄,只能拿Jedis來作為演示啦。
由于樂觀鎖會(huì)導(dǎo)致無謂的修改循環(huán)重試,導(dǎo)致很少能夠修改成功,耗費(fèi)資源。而悲觀鎖,雖然在獲取鎖時(shí)不斷重試,但對(duì)于修改資源,卻是一次就成功了,在資源競(jìng)爭(zhēng)嚴(yán)重的時(shí)候,悲觀鎖策略性能更好,因此這里主要選擇悲觀鎖這種思想來進(jìn)行代碼演示。
首先我們?cè)O(shè)定一個(gè)鍵為鎖(Redis就是Key Value型存儲(chǔ)),并使用setnx命令,該命令的特點(diǎn)是,如果鍵存在,則設(shè)置定指定的值,并返回1,如果鍵存在,則什么都不做,并返回0。對(duì)應(yīng)到代碼中來就是,如果執(zhí)行setnx命令返回了1,則說明獲取到了鎖,代碼可以繼續(xù)往下執(zhí)行,如果執(zhí)行setnx命令返回0,則說明沒有獲取到鎖,當(dāng)前線程等待或者重試。
獲取到鎖,執(zhí)行代碼完畢,需要釋放鎖。怎么釋放呢,就是很簡(jiǎn)單的執(zhí)行del命令即可,即把鎖的key給刪除,這樣其余的連接就可以獲取鎖了。
偽代碼如下:
while(con.setnx(lockKey,lockValue)==0){
//休眠 重試獲取鎖
}
// 執(zhí)行業(yè)務(wù)代碼
con.del(lockKey);
一切看起來很美好,不是嗎?但現(xiàn)實(shí)總是千變?nèi)f化的,一種可能是一段代碼的執(zhí)行,需要在不同地方獲取不同的鎖,導(dǎo)致死鎖的發(fā)生,又或者是網(wǎng)絡(luò)故障或者客戶進(jìn)程崩潰,造成鎖永遠(yuǎn)無法釋放。這就會(huì)導(dǎo)致其余的Redis連接一直無法獲取到鎖,因?yàn)橐慌_(tái)機(jī)器的代碼問題,網(wǎng)絡(luò)問題,機(jī)器故障等原因,導(dǎo)致所有的服務(wù)都變得不可用,這是讓人無法接受的,這違背了分布式的初衷。
怎么解決這個(gè)問題呢?我們可以指定一個(gè)鎖的過期時(shí)間,比如10s后這個(gè)鎖會(huì)過期,并且極限條件下業(yè)務(wù)執(zhí)行時(shí)間也不會(huì)超過10s。(在服務(wù)有互相依賴,復(fù)雜的服務(wù)調(diào)用中,調(diào)用鏈越長(zhǎng),超時(shí)時(shí)間越不好預(yù)估,但這個(gè)也是在解決問題和性能之間做一個(gè)平衡,超時(shí)時(shí)間設(shè)置太長(zhǎng),性能會(huì)大大降低,超時(shí)時(shí)間太短,會(huì)造成并發(fā)問題,因?yàn)橐粋€(gè)連接中代碼還沒有執(zhí)行完,鎖已經(jīng)被刪除,同時(shí)另一個(gè)連接獲取到了鎖,執(zhí)行業(yè)務(wù)代碼)。設(shè)置了鎖的過期時(shí)間,解決了鎖不釋放的問題,但是同時(shí)引入的新的問題,那就是可能會(huì)刪除其余連接的鎖。比如A連接獲取到鎖key1,執(zhí)行很長(zhǎng)時(shí)間,此時(shí)鎖過期,被刪除,另一個(gè)連接B獲取到鎖key1,并執(zhí)行對(duì)應(yīng)的代碼,此時(shí)A連接執(zhí)行結(jié)束,于是釋放鎖,但是此時(shí),其實(shí)它的鎖已經(jīng)被釋放了,在鎖過期的時(shí)候,現(xiàn)在它釋放的是B鏈接的鎖,那是不對(duì)的。假如此刻C連接進(jìn)來,是能夠獲取到鎖的,那么就意味著B C在同時(shí)執(zhí)行業(yè)務(wù)代碼,違背了鎖當(dāng)初設(shè)計(jì)的本意,因此絕對(duì)不能釋放其余連接的鎖,而只能釋放自己的。
那么如何解決這個(gè)問題?其實(shí)我們可以在鏈接獲取鎖的時(shí)候,設(shè)置一個(gè)只有當(dāng)前連接知道的唯一值,釋放的時(shí)候會(huì)先取出鎖的值,進(jìn)行比較,只有跟存入的值是一致的時(shí)候,才會(huì)釋放鎖,也就是刪除鍵,否則,什么也不做。
分析了這么多,我們可以看看獲取鎖的工具類代碼:
/**
* 在指定的等待時(shí)限內(nèi)獲取鎖
* @param jedis 連接
* @param lockName 鎖名稱
* @param timeOutMillionSeconds 獲取鎖超時(shí)時(shí)間 -1:一直等待,直到獲取到鎖
* @return 如果獲取到鎖,返回一個(gè)鎖標(biāo)識(shí)符,否則返回null
*/
public static String acquireLock(Jedis jedis,String lockName,long timeOutMillionSeconds){
// 略去各類校驗(yàn)
String identifier = UUID.randomUUID().toString();
long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
while(System.currentTimeMillis()<=timeEnd){
long result = jedis.setnx(lockName,identifier);
if(result==1){// 等于1 說明沒有設(shè)置過,獲取鎖成功
return identifier;
}
try {
TimeUnit.MILLISECONDS.sleep(100);// 線程休眠值 根據(jù)業(yè)務(wù)來定
}catch (InterruptedException e){//假設(shè)在本機(jī)沒有多線程編程 不會(huì)通知另一方線程中斷 根據(jù)具體業(yè)務(wù)來
throw new RuntimeException(e);
}
}
return null;
}
注意代碼中的jedis沒有close,根據(jù)需要,也可以在工具類中close掉。
以及超時(shí)鎖,為了防止setnx和expire之間,程序奔潰,造成超時(shí)時(shí)間沒有設(shè)置上,因此其余的連接在獲取不到鎖的時(shí)候,會(huì)先判斷鎖有沒有過期時(shí)間,如果沒有,給鎖加上過期時(shí)間:
/**
*在指定的等待時(shí)限內(nèi)獲取鎖,該鎖自身帶有超時(shí)特性
* @param jedis 連接
* @param lockName 鎖名稱
* @param timeOutMillionSeconds 獲取鎖超時(shí)時(shí)間 -1:一直等待,直到獲取到鎖
* @param lockTimeOutSeconds 鎖的超時(shí)時(shí)間
* @return 如果獲取到鎖,返回一個(gè)鎖標(biāo)識(shí)符,否則返回null
*/
public static String acquireLockTimeOut(Jedis jedis,String lockName,long timeOutMillionSeconds,int lockTimeOutSeconds){
// 略去各類校驗(yàn)
String identifier = UUID.randomUUID().toString();
long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
while(System.currentTimeMillis()<=timeEnd){
long result = jedis.setnx(lockName,identifier);
if(result==1){// 等于1 說明沒有設(shè)置過,獲取鎖成功
jedis.expire(lockName,lockTimeOutSeconds);
return identifier;
}else{
if(jedis.ttl(lockName)==-1){
jedis.expire(lockName,lockTimeOutSeconds);
}
}
try {
TimeUnit.MILLISECONDS.sleep(100);// 線程休眠值 根據(jù)業(yè)務(wù)來定
}catch (InterruptedException e){//假設(shè)在本機(jī)沒有多線程編程 不會(huì)通知另一方線程中斷 根據(jù)具體業(yè)務(wù)來
throw new RuntimeException(e);
}
}
return null;
}
釋放鎖的代碼:
/**
* 關(guān)于watch multi exec等命令,參考https://redis.io/topics/transactions 才能深刻理解
* @param jedis 連接
* @param lockName 鎖名稱
* @param identifier 鎖標(biāo)識(shí)符
* @return 是否成功釋放鎖 僅作為參考
*/
public static boolean releaseLock(Jedis jedis,String lockName,String identifier){
// 略去各類校驗(yàn)
boolean releaseNormal = false;// 鎖是否正常釋放
jedis.watch(lockName);
String identifierInRedis = jedis.get(lockName);
if(identifier.equalsIgnoreCase(identifierInRedis)){// 如果標(biāo)識(shí)符沒有改動(dòng),則說明可以解鎖
Transaction transaction = jedis.multi();
transaction.del(lockName);
transaction.exec();
releaseNormal = true;
}else{
jedis.unwatch();
releaseNormal = false;
}
return releaseNormal;
}
這里重點(diǎn)解釋一下釋放鎖的操作:只有現(xiàn)在取出的跟當(dāng)時(shí)存入的值一致,才會(huì)進(jìn)行刪除操作。但為了防止get 和del之間的某個(gè)時(shí)候,另一個(gè)連接修改了鎖的值,(為什么會(huì)修改?是因?yàn)楫?dāng)前連接A在執(zhí)行完get之后,鎖過期了,因此另一個(gè)連接B可以獲取到鎖,現(xiàn)在A執(zhí)行刪除操作,就是刪除B連接獲取到的鎖),因此需要watch 操作,如果現(xiàn)在取出的值和當(dāng)初存入的不一致,那么直接執(zhí)行unwatch并返回。為什么要執(zhí)行unwatch呢?因?yàn)闉榱税踩尤氩粓?zhí)行unwatch就返回,在后續(xù)的代碼中執(zhí)行multi和exec,那就有很大的問題,當(dāng)鎖被刪除或者修改,就會(huì)打斷當(dāng)前的事務(wù),但是該事物跟鎖是沒有任何關(guān)系的,所以u(píng)nwatch是一個(gè)需要執(zhí)行的操作。另一個(gè)情況是假如當(dāng)前連接取出的鎖的值,跟存入的一直,就需要執(zhí)行刪除鎖的操作??赡苡型瑢W(xué)就會(huì)問了,Redis的所有操作都是原子操作,執(zhí)行del和包裹在multi exec中執(zhí)行del不是一樣的原子操作嗎?為何還要多此一舉,讓代碼變得不好理解。在這一點(diǎn)上,它們確實(shí)并無任何區(qū)別,但是重點(diǎn)是之前有一個(gè)watch命令。假如沒有執(zhí)行watch multi del exec這樣的順序,就會(huì)有釋放掉其余連接的鎖的風(fēng)險(xiǎn),為什么會(huì)這樣,上文已經(jīng)做了分析。在get和del之間發(fā)生的事情,當(dāng)前連接是不知道的,get del的執(zhí)行不是原子性的。有了watch multi del exec這個(gè)順序,當(dāng)前連接A get執(zhí)行之后,鎖失效,且被另一個(gè)連接B獲取到鎖,也就是修改了鎖,因?yàn)閣atch(key),所以當(dāng)前連接就知道了有人修改了。當(dāng)執(zhí)行exec的時(shí)候,就會(huì)丟棄掉del命令,因?yàn)閣atch的通知使得事務(wù)已經(jīng)失效了,這保證了其余連接的鎖不會(huì)被刪除。同時(shí),當(dāng)執(zhí)行exec的時(shí)候,不論事務(wù)成功與否,都unwatch了。最終呢,釋放鎖的代碼看起來就是這樣了。
寫好了工具類,我們應(yīng)該測(cè)試一下,看看是不是真的,我們可以使用線程池,放入200個(gè)任務(wù),每個(gè)任務(wù)都是執(zhí)行獲取Redis的某個(gè)鍵,并加一,再設(shè)置回Redis。在執(zhí)行代碼前,進(jìn)行鎖獲取,執(zhí)行完畢,進(jìn)行鎖釋放。為了等到所有線程執(zhí)行完畢,便于獲取最終執(zhí)行結(jié)果,使用CountDownLatch進(jìn)行等待線程池所有任務(wù)的執(zhí)行完畢。另外的一些部分是初始化動(dòng)作,防止鎖已經(jīng)設(shè)置了或者指定的鍵已經(jīng)有值了。代碼如下:
public class LockTest {
private static final Log log = LogFactory.getLog(LockTest.class);
public static void main(String[] args)throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10);
final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
final CountDownLatch countDownLatch = new CountDownLatch(200);
final String lockName = "lock_a";
Jedis jedisInit = jedisPool.getResource();
jedisInit.del(lockName);
final String testResource = "test_str";
jedisInit.set(testResource,"0");
jedisInit.close();
for(int i=0;i<200;i++){
threadPoolExecutor.execute(new Runnable() {
public void run() {
Jedis jedis = jedisPool.getResource();
String identifiler = RedisSetnxLock.acquireLock(jedis,lockName,-1);
if(identifiler==null) {
log.info("獲取鎖失敗");
countDownLatch.countDown();
jedis.close();
return;
}
try{
String value = jedis.get(testResource);
Thread.sleep(200);// 故意休眠
jedis.set(testResource,(Integer.parseInt(value)+1)+"");
}catch (Exception e){
e.printStackTrace();
}finally {
RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
jedis.close();
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
log.info(jedisPool.getResource().get(testResource));
threadPoolExecutor.shutdown();
}
}
最后的執(zhí)行結(jié)果符合預(yù)期。
為了演示連接掛掉或者執(zhí)行超常任務(wù)的情形,可以執(zhí)行下面的測(cè)試:
public class LockTest {
private static final Log log = LogFactory.getLog(LockTest.class);
public static void main(String[] args)throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10);
final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
final CountDownLatch countDownLatch = new CountDownLatch(200);
final String lockName = "lock_a";
Jedis jedisInit = jedisPool.getResource();
jedisInit.del(lockName);
final String testResource = "test_str";
jedisInit.set(testResource,"0");
jedisInit.close();
for(int i=0;i<200;i++){
threadPoolExecutor.execute(new Runnable() {
public void run() {
Jedis jedis = jedisPool.getResource();
// 鎖的超時(shí)時(shí)間為1s
String identifiler = RedisSetnxLock.acquireLockTimeOur(jedis,lockName,-1,1);
if(identifiler==null) {
log.info("獲取鎖失敗");
countDownLatch.countDown();
jedis.close();
return;
}
try{
String value = jedis.get(testResource);
Thread.sleep(200);
jedis.set(testResource,(Integer.parseInt(value)+1)+"");
}catch (Exception e){
e.printStackTrace();
}finally {
// 每次不釋放鎖,模擬執(zhí)行超常任務(wù)或者進(jìn)程掛掉的情形
//RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
jedis.close();
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
log.info(jedisPool.getResource().get(testResource));
threadPoolExecutor.shutdown();
}
}
執(zhí)行結(jié)果同樣正確,只是執(zhí)行時(shí)間變長(zhǎng)了。
相關(guān)maven pom:
<!--jedis client-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<!--jedis連接池-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.1</version>
</dependency>
參考書籍:《Redis實(shí)戰(zhàn)》,一本非常棒的書。