相信很多同學(xué)都聽說(shuō)過(guò)分布式鎖,但也僅僅停留在概念的理解上,這篇文章會(huì)從分布式鎖的應(yīng)用場(chǎng)景講起,從實(shí)現(xiàn)的角度上深度剖析redis如何實(shí)現(xiàn)分布式鎖。
一、超賣問(wèn)題
我們先來(lái)看超賣的概念:
當(dāng)寶貝庫(kù)存接近0時(shí),如果多個(gè)買家同時(shí)付款購(gòu)買此寶貝,或者店鋪后臺(tái)在架數(shù)量大于倉(cāng)庫(kù)實(shí)際數(shù)量,將會(huì)出現(xiàn)超賣現(xiàn)象。超賣現(xiàn)象本質(zhì)上就是買到了比倉(cāng)庫(kù)中數(shù)量更多的寶貝。
本文主要解決超賣問(wèn)題的第一種,同時(shí)多人購(gòu)買寶貝時(shí),造成超賣。
測(cè)試代碼
那么超賣問(wèn)題是如何產(chǎn)生的呢?我們準(zhǔn)備一段代碼進(jìn)行測(cè)試:
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 第一種實(shí)現(xiàn),進(jìn)程內(nèi)就存在線程安全問(wèn)題
* 可以只啟動(dòng)一個(gè)進(jìn)程測(cè)試
*/
@RequestMapping("/deduct_stock1")
public void deductStock1(){
String stock = stringRedisTemplate.opsForValue().get("stock");
int stockNum = Integer.parseInt(stock);
if(stockNum > 0){
//設(shè)置庫(kù)存減1
int realStock = stockNum - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("設(shè)置庫(kù)存" + realStock);
}else{
System.out.println("庫(kù)存不足");
}
}
這段代碼中,使用redis先獲取庫(kù)存數(shù)量(當(dāng)然實(shí)際場(chǎng)景中不會(huì)只保存一個(gè)全局庫(kù)存數(shù),應(yīng)該根據(jù)一個(gè)sku保存一份庫(kù)存數(shù))。
String stock = stringRedisTemplate.opsForValue().get("stock");
int stockNum = Integer.parseInt(stock);
接下來(lái),判斷庫(kù)存數(shù)是否大于0:
- 如果大于0,將庫(kù)存數(shù)減一,通過(guò)set命令,寫回redis
//設(shè)置庫(kù)存減1
int realStock = stockNum - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("設(shè)置庫(kù)存" + realStock);
- 如果小于等于0,提示庫(kù)存不足
JMeter測(cè)試
通過(guò)JMeter進(jìn)行并發(fā)測(cè)試,看下會(huì)不會(huì)出現(xiàn)超賣的問(wèn)題:
1.啟動(dòng)tomcat
這種情況下,只需要啟動(dòng)一個(gè)tomcat就會(huì)出現(xiàn)超賣。我們先啟動(dòng)一個(gè)tomcat在8080端口上。

2.下載JMeter
Apache JMeter是Apache組織開發(fā)的基于Java的壓力測(cè)試工具。
從官網(wǎng)上下載即可:
https://jmeter.apache.org/download_jmeter.cgi
下載完之后解壓,運(yùn)行bin目錄下的jmeter.bat,顯示如下界面:

如果嫌字體太小,可以選擇放大:

3.配置JMeter
在Test Plan上點(diǎn)擊右鍵,創(chuàng)建線程組(Thread Group)

配置一下具體參數(shù):

-
Number of Threads同時(shí)并發(fā)線程數(shù) -
Ramp-Up Period(in-seconds)代表隔多長(zhǎng)時(shí)間執(zhí)行,0代表同時(shí)并發(fā)。假設(shè)線程數(shù)為100, 估計(jì)的點(diǎn)擊率為每秒10次, 那么估計(jì)的理想ramp-up period 就是 100/10 = 10 秒 -
Loop Count循環(huán)次數(shù)
這里給出500是為了直接測(cè)試并發(fā)500搶,看看能不能正好把500個(gè)貨物搶完。
添加Http請(qǐng)求:

添加請(qǐng)求URL:

添加聚合結(jié)果,用來(lái)顯示整體的運(yùn)行情況:

到此為止JMeter的配置結(jié)束。
4.設(shè)置庫(kù)存量
啟動(dòng)redis-server,使用redis-client連接:

把庫(kù)存數(shù)設(shè)置為500。
5.開始測(cè)試
點(diǎn)擊運(yùn)行按鈕,啟動(dòng)測(cè)試:

首先我們看到聚合報(bào)告里輸出的結(jié)果:

錯(cuò)誤率0%,樣本數(shù)500,證明500個(gè)請(qǐng)求都已經(jīng)執(zhí)行,但是發(fā)現(xiàn)控制臺(tái)輸出如下:

很顯然,一份商品都被賣了多次,這顯然是不合理的。
原因分析
現(xiàn)在我們只啟動(dòng)了一個(gè)tomcat,在單jvm進(jìn)程的情況下,tomcat會(huì)使用線程池接收請(qǐng)求:

而由于每個(gè)線程可能同時(shí)獲取到庫(kù)存量,所以庫(kù)存量在兩個(gè)線程中顯示的都是500,然后兩個(gè)線程就繼續(xù)進(jìn)行扣減庫(kù)存操作,得出499寫回redis中,在這個(gè)過(guò)程中,顯然存在線程安全的問(wèn)題。同一個(gè)商品被賣出了2份,超賣問(wèn)題就出現(xiàn)了。
二、加鎖優(yōu)化
1.synchronized鎖
要保證單jvm中線程安全,最簡(jiǎn)單直接的方式就是添加synchronized關(guān)鍵字,那么這樣行不行呢,我們來(lái)做一個(gè)測(cè)試:
/**
* 第二種實(shí)現(xiàn),使用synchronized加鎖
* 可以只啟動(dòng)一個(gè)進(jìn)程測(cè)試
*/
@RequestMapping("/deduct_stock2")
public void deductStock2(){
synchronized (this){
String stock = stringRedisTemplate.opsForValue().get("stock");
int stockNum = Integer.parseInt(stock);
if(stockNum > 0){
//設(shè)置庫(kù)存減1
int realStock = stockNum - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("設(shè)置庫(kù)存" + realStock);
}else{
System.out.println("庫(kù)存不足");
}
}
}
在進(jìn)行扣減庫(kù)存前,先通過(guò)synchronized關(guān)鍵字,對(duì)資源加鎖,這樣就只有一個(gè)線程能進(jìn)入到扣減庫(kù)存的代碼塊中。來(lái)測(cè)試一下:
重置庫(kù)存
set stock 500
修改接口地址

測(cè)試

可以看到,庫(kù)存被扣減為0,并且沒(méi)有出現(xiàn)超賣的情況(設(shè)置了500庫(kù)存,并且500個(gè)人搶,正好搶完)。
但是這種方案顯然是不行的,在生產(chǎn)環(huán)境上如果部署多個(gè)tomcat實(shí)例,那么就會(huì)出現(xiàn)如下情況:

多個(gè)進(jìn)程無(wú)法共享jvm內(nèi)存中的鎖,所以會(huì)出現(xiàn)多把鎖,這種情況下也會(huì)出現(xiàn)超賣問(wèn)題。
二、分布式鎖的實(shí)現(xiàn)
多Tomcat實(shí)例下的超賣演示
接下來(lái)我們演示一下如何在多個(gè)Tomcat情況下,演示超賣的問(wèn)題:
1.啟動(dòng)兩個(gè)tomcat服務(wù)
在IDEA中配置兩個(gè)spring boot的啟動(dòng)項(xiàng),使用vm參數(shù)指定不同的端口號(hào)
-Dserver.port=8080


2.配置nginx
編寫~/nginx_redis/conf/nginx.conf如下:
user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
upstream redislock{
server 192.168.226.1:8080 weight=1;
server 192.168.226.1:8081 weight=1;
}
server {
listen 80;
server_name localhost;
location /{
root html;
proxy_pass http://redislock;
}
}
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d/*.conf;
}
192.168.226.1這是我宿主機(jī)的IP
準(zhǔn)備一個(gè)虛擬機(jī),使用docker啟動(dòng)nginx:
docker pull nginx
docker run -di -p 10085:80 --name nginx-redis-hc -v ~/nginx_redis/html:/usr/share/nginx/html -v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf -v ~/nginx_redis/logs:/var/log/nginx nginx
在宿主機(jī)下使用虛擬機(jī)的IP地址:10085訪問(wèn)nginx,如果出現(xiàn)如下頁(yè)面就代表成功:

3.測(cè)試
修改接口地址為nginx:

運(yùn)行查看兩個(gè)tomcat的控制臺(tái):


沒(méi)有將庫(kù)存清空,證明存在超賣問(wèn)題。
手動(dòng)實(shí)現(xiàn)分布式鎖
使用redis手動(dòng)實(shí)現(xiàn)分布式鎖,需要用到命令setnx。先來(lái)介紹一下setnx:
SETNX key value[]
可用版本: >= 1.0.0
時(shí)間復(fù)雜度: O(1)
只在鍵 key 不存在的情況下, 將鍵 key 的值設(shè)置為 value 。
若鍵 key 已經(jīng)存在, 則 SETNX 命令不做任何動(dòng)作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡(jiǎn)寫。
返回值
命令在設(shè)置成功時(shí)返回 1 , 設(shè)置失敗時(shí)返回 0 。
代碼示例
redis> EXISTS job # job 不存在
# job 不存在
(integer) 0
redis> SETNX job "programmer" # job 設(shè)置成功
(integer) 1
redis> SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗
(integer) 0
redis> GET job # 沒(méi)有被覆蓋
使用redis構(gòu)建分布式鎖流程如下:

- 線程1申請(qǐng)鎖(
setnx),拿到了鎖。 - 線程2申請(qǐng)鎖,由于線程1已經(jīng)擁有了鎖,
setnx返回0失敗,這一步用戶操作會(huì)失敗。 - 線程1執(zhí)行扣減庫(kù)存操作并釋放鎖。
- 線程2再次申請(qǐng)鎖,獲取到鎖并執(zhí)行扣減庫(kù)存,然后釋放鎖。
注意這里線程沒(méi)有拿到鎖,如果不嘗試while(true)重新獲取鎖,這個(gè)操作就直接失敗了。
代碼實(shí)現(xiàn)
/**
* 第三種實(shí)現(xiàn),使用redis中的setIfAbsent(setnx命令)實(shí)現(xiàn)分布式鎖
*/
@RequestMapping("/deduct_stock3")
public void deductStock3(){
//在獲取到鎖的時(shí)候,給鎖分配一個(gè)id
String opId = UUID.randomUUID().toString();
Boolean stockLock = stringRedisTemplate
.opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);
if(stockLock){
try{
String stock = stringRedisTemplate.opsForValue().get("stock");
int stockNum = Integer.parseInt(stock);
if(stockNum > 0){
//設(shè)置庫(kù)存減1
int realStock = stockNum - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("設(shè)置庫(kù)存" + realStock);
}else{
System.out.println("庫(kù)存不足");
}
}catch(Exception e){
e.printStackTrace();
}finally {
if(opId.equals(stringRedisTemplate
.opsForValue().get("stockLock"))){
stringRedisTemplate.delete("stockLock");
}
}
}
}
測(cè)試略過(guò),這里有幾個(gè)知識(shí)點(diǎn)需要說(shuō)明
setIfAbsent設(shè)置超時(shí)
如果setIfAbsent不設(shè)置超時(shí)時(shí)間,假設(shè)線程執(zhí)行業(yè)務(wù)代碼時(shí)間時(shí)死鎖或者其他原因?qū)е麻L(zhǎng)時(shí)間不釋放,那么會(huì)影響其他線程獲取到鎖,這個(gè)時(shí)候整體業(yè)務(wù)就會(huì)出現(xiàn)不可用。
Boolean stockLock = stringRedisTemplate
.opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);
設(shè)置超時(shí)時(shí)間為30秒,該時(shí)間一般大于業(yè)務(wù)執(zhí)行的最大時(shí)間。
每次獲取到鎖,設(shè)置唯一ID
考慮這樣的場(chǎng)景

- 線程1獲取鎖扣減庫(kù)存,但是由于操作不當(dāng),長(zhǎng)時(shí)間卡住,這樣會(huì)觸發(fā)超時(shí)時(shí)間鎖被釋放。
- 線程2獲取到鎖,扣減庫(kù)存。
- 線程1的代碼拋出異常,執(zhí)行finally釋放鎖,但是釋放的是進(jìn)程B的鎖。
解決方案就是在加鎖前生成UUID,釋放的時(shí)候校驗(yàn)UUID是否正確,如果不正確,說(shuō)明加鎖線程不是當(dāng)前線程。
使用Redisson實(shí)現(xiàn)分布式鎖
setnx雖好,但是實(shí)現(xiàn)起來(lái)畢竟太過(guò)麻煩,一不小心就可能陷入并發(fā)編程的陷阱中,那么有沒(méi)有更加簡(jiǎn)單的實(shí)現(xiàn)方式呢?答案就是redisson。
Redisson是架設(shè)在Redis基礎(chǔ)上的一個(gè)Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)?!?a target="_blank">Redis官方推薦】
Redisson在基于NIO的Netty框架上,充分的利用了Redis鍵值數(shù)據(jù)庫(kù)提供的一系列優(yōu)勢(shì),在Java實(shí)用工具包中常用接口的基礎(chǔ)上,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力,大大降低了設(shè)計(jì)和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時(shí)結(jié)合各富特色的分布式服務(wù),更進(jìn)一步簡(jiǎn)化了分布式環(huán)境中程序相互之間的協(xié)作。
總而言之,redisson提供了一系列較為完善的工具類,其中就包含了分布式鎖。用redisson實(shí)現(xiàn)分布式鎖的流程極為簡(jiǎn)單。
引入依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.14.0</version>
</dependency>
創(chuàng)建Redisson實(shí)例
@Bean
public RedissonClient redisson(){
// 1. Create config object
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// config.useClusterServers()
// // use "rediss://" for SSL connection
// .addNodeAddress("redis://127.0.0.1:7181");
return Redisson.create(config);
}
編寫分布式鎖代碼
@Autowired
private RedissonClient redissonClient;
/**
* 第四種實(shí)現(xiàn),使用redisson實(shí)現(xiàn)
*/
@RequestMapping("/deduct_stock4")
public void deductStock4(){
RLock lock = redissonClient.getLock("redisson:stockLock");
try{
//加鎖
lock.lock();
String stock = stringRedisTemplate.opsForValue().get("stock");
int stockNum = Integer.parseInt(stock);
if(stockNum > 0){
//設(shè)置庫(kù)存減1
int realStock = stockNum - 1;
stringRedisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("設(shè)置庫(kù)存" + realStock);
}else{
System.out.println("庫(kù)存不足");
}
}catch(Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
其中加鎖代碼基本與進(jìn)程內(nèi)加鎖一致,就不再詳細(xì)解讀,讀者自行實(shí)踐即可。
Redisson分布式鎖原理
Redisson分布式鎖的主要原理非常簡(jiǎn)單,利用了lua腳本的原子性。
在分布式環(huán)境下產(chǎn)生并發(fā)問(wèn)題的主要原因是三個(gè)操作并不是原子操作:
- 獲取庫(kù)存
- 扣減庫(kù)存
- 寫入庫(kù)存
那么如果我們把三個(gè)操作合并為一個(gè)操作,在默認(rèn)單線程的Redis中運(yùn)行,是不會(huì)產(chǎn)生并發(fā)問(wèn)題的。源碼如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
這一段源碼中,redisson利用了lua腳本的原子性,校驗(yàn)key是否存在,如果不存在就創(chuàng)建key并利用incrby加一操作(這步操作主要是為了實(shí)現(xiàn)可重入性)。redisson實(shí)現(xiàn)的分布式鎖具備如下特性:
- 鎖失效
- 鎖續(xù)命
執(zhí)行時(shí)間長(zhǎng)的鎖快要到期時(shí)會(huì)自動(dòng)續(xù)命
- 可重入
- 操作原子性
總結(jié)
本文介紹了超賣問(wèn)題產(chǎn)生的原因:操作不具備原子性,同時(shí)提出了集中解決思路。
-
synchronized鎖,無(wú)法保證多實(shí)例下的線程安全 -
setnx手動(dòng)實(shí)現(xiàn),坑很多、代碼較為復(fù)雜 -
redisson實(shí)現(xiàn),能夠保證多實(shí)例下線程安全,代碼簡(jiǎn)單可靠
加餐:
如何使用spring boot + redis + lua搭建高性能、線程安全的扣減庫(kù)存?我將在下一篇文章中給出答案。