一、前言
問題引入
@Controller
public class TradeController {
@Autowired
private TradeService tradeService;
@RequestMapping("/order")
public String order() {
tradeService.decStock();
return "success";
}
}
@Service
public class TradeService {
Logger logger = Logger.getLogger(TradeService.class);
@Autowired
JdbcTemplate jdbcTemplate;
/**
* 扣減庫存
* @return
*/
public String decStock() {
Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
if (stock <= 0) {
logger.info("庫存不足,下單失??!");
return "庫存不足,下單失??!";
}
stock--;
jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
logger.info("下單成功,當(dāng)前剩余庫存:" + stock);
return "下單成功,當(dāng)前剩余庫存:" + stock;
}
}
如上代碼,用Jmeter模擬30個(gè)請(qǐng)求同時(shí)下單,結(jié)果30個(gè)請(qǐng)求都下單成功,產(chǎn)生了超賣問題。
自定義AQS解決
下面實(shí)現(xiàn)自定義一個(gè)同步器來實(shí)現(xiàn)自定義鎖
/**
* 2021/7/1
* 自定義AQS實(shí)現(xiàn)
*/
public class MyLock {
private volatile int state = 0;
private Thread lockHolder;
// 要用線程安全的隊(duì)列作為等待隊(duì)列,基于CAS實(shí)現(xiàn)
// linkedBlockedQueue基于AQS實(shí)現(xiàn),不能用
private ConcurrentLinkedDeque<Thread> waiters = new ConcurrentLinkedDeque<>();
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Thread getLockHolder() {
return lockHolder;
}
public void setLockHolder(Thread lockHolder) {
this.lockHolder = lockHolder;
}
public void lock() {
Thread currentThread = Thread.currentThread();
if (acquire()) {
return;
}
waiters.add(currentThread);
// 自旋
for (; ; ) {
// 隊(duì)列里第一個(gè)線程才能搶鎖
if (currentThread == waiters.peek() && acquire()) {
// 隊(duì)列頭線程拿到鎖 踢出等待隊(duì)列
waiters.poll();
return;
}
// 阻塞當(dāng)前線程 放棄CPU使用權(quán)
LockSupport.park();
}
}
public void unLock() {
if (Thread.currentThread() != lockHolder) {
throw new RuntimeException("lockHolder is not current thread");
}
if (compareAndSwapState(getState(), 0)) {
setLockHolder(null);
// 喚醒隊(duì)列里第一個(gè)線程
Thread first = waiters.peek();
if (first != null) {
LockSupport.unpark(first);
}
}
}
// 是否能加鎖成功
private boolean acquire() {
Thread currentThread = Thread.currentThread();
if (getState() == 0) { // 同步器尚未被持有
// 沒人排隊(duì)/自己是隊(duì)列頭,才能去嘗試原子操作改變state
if ((waiters.size() == 0 || currentThread == waiters.peek()) && compareAndSwapState(0, 1)) {
setLockHolder(currentThread);
return true;
}
}
return false;
}
// 利用Unsafe類實(shí)現(xiàn)原子操作改變值
public final boolean compareAndSwapState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private static final Unsafe unsafe = reflectGetUnsafe();
// 偏移量
private static final long stateOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
} catch (Exception e) {
throw new Error();
}
}
// 反射獲取Unsafe類
private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
return null;
}
}
}
public class TradeService {
Logger logger = Logger.getLogger(TradeService.class);
@Autowired
JdbcTemplate jdbcTemplate;
// 單例 創(chuàng)建鎖對(duì)象
MyLock myLock = new MyLock();
public String decStock() {
myLock.lock(); // 加鎖
Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
if (stock <= 0) {
logger.info("庫存不足,下單失??!");
myLock.unLock(); // 業(yè)務(wù)失敗 釋放鎖
return "庫存不足,下單失敗!";
}
stock--;
jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
logger.info("下單成功,當(dāng)前剩余庫存:" + stock);
myLock.unLock(); // 業(yè)務(wù)成功 釋放鎖
return "下單成功,當(dāng)前剩余庫存:" + stock;
}
}
二、概述
抽象同步框架,可以用來實(shí)現(xiàn)一個(gè)依賴狀態(tài)的同步器
1.可以實(shí)現(xiàn)獨(dú)占與共享兩種模式:
獨(dú)占:互斥,資源只能同時(shí)被一個(gè)線程占有,如ReentrantLock、Mutex
-
共享:資源不互斥,可以被多個(gè)線程占有,如Semaphre、CountDownLatch
一般來說同時(shí)只會(huì)實(shí)現(xiàn)一種模式,但也有ReentrantReadWriteLock同時(shí)實(shí)現(xiàn)獨(dú)占和共享兩種方式
2.核心屬性:
state:同步器的狀態(tài),即共享資源。訪問方式為:getState()、setState()、compareAndSetState()
exclusiveOwnerThread:當(dāng)前持有共享資源的線程
head:同步隊(duì)列頭
tail:同步隊(duì)列尾
3.特性:
可中斷
可重入
4.自定義AQS
一般自定義同步器的時(shí)候,只需要自定義共享資源的獲取和釋放方式。至于等待隊(duì)列的維護(hù),AQS已經(jīng)定義好了,不需要重寫。所以主要重寫以下幾種方法:
isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。
[圖片上傳失敗...(image-ed6b0e-1649076723271)]
三、源碼
waitStatus表示Node節(jié)點(diǎn)的等待狀態(tài)
| waitStatus | 判斷結(jié)果 | 說明 |
|---|---|---|
| 0 | 初始化狀態(tài) | 該節(jié)點(diǎn)尚未被初始化完成 |
| 1 | 取消狀態(tài)(CANCELLED) | 說明該線程中斷或者等待超時(shí),需要移除該線程,進(jìn)入該狀態(tài)后節(jié)點(diǎn)不會(huì)變化 |
| -1 | 有效狀態(tài)(SIGNAL) | 下一個(gè)節(jié)點(diǎn)等著自己?jiǎn)拘?。?jié)點(diǎn)入隊(duì)會(huì)把前繼節(jié)點(diǎn)狀態(tài)更新為SIGNAL |
| -2 | 有效狀態(tài)(CONDITION) | 結(jié)點(diǎn)等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。 |
| -3 | 有效狀態(tài)(PROPAGATE) | 共享模式下,自己不僅會(huì)喚醒后繼節(jié)點(diǎn),同時(shí)也可能喚醒后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn) |
AQS中阻塞隊(duì)列采用雙向鏈表結(jié)構(gòu),使用prev、next連接;而條件隊(duì)列采用單向鏈表,采用nextWaiter連接
| nextWaiter狀態(tài)標(biāo)志 | 說明 |
|---|---|
| SHARED(共享模式) = new Node() | 立即喚醒下一個(gè)節(jié)點(diǎn) |
| EXCLUSIVE(獨(dú)占模式) = null | 等待當(dāng)前線程執(zhí)行完再喚醒 |
| 其他非空值 | 根據(jù)條件決定怎么喚醒下一個(gè)節(jié)點(diǎn) |
1.acquire()
acquire()是獲取共享資源的頂級(jí)入口,獲取到資源則直接返回,否則入等待隊(duì)列,直到獲取到共享資源,這個(gè)過程忽略中斷的影響(如果需要可中斷,可以調(diào)用acquireInterruptibly())。獲取到資源之后就可以執(zhí)行臨界區(qū)的代碼了
public final void acquire(int arg) {
// 嘗試獲取共享資源
if (!tryAcquire(arg) &&
//
acquireQueued(
// 加入等待隊(duì)列尾部
addWaiter(Node.EXCLUSIVE), arg)
)
// 如果在acquireQueued()被中斷過 這里自己補(bǔ)一個(gè)中斷
selfInterrupt();
}
1.1tryAcquire()
AQS中,這個(gè)方法不能被直接調(diào)用,需要子類重寫。這里不定義為抽象方法的好處在于,獨(dú)占模式下只需重寫tryAcquire(),共享模式寫重寫tryAcquireShared(),如果定義為抽象方法則都需要實(shí)現(xiàn)。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
以ReentrantLock為例,非公平鎖中中,會(huì)直接CAS嘗試獲取共享資源,公平鎖中,會(huì)先檢查隊(duì)列中有沒有正在等待的線程,才去獲取共享資源
1.2addWaiter()
如果tryAcquire()加鎖失敗,會(huì)addWaiter(Node.EXCLUSIVE), arg),創(chuàng)建一個(gè)獨(dú)占模式的節(jié)點(diǎn)加入到隊(duì)列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 嘗試快速入隊(duì)
// 這里直接進(jìn)行了一次CAS加到尾部的嘗試,失敗才去自旋 為什么要這樣呢?直接調(diào)用enq(node)效果似乎也一樣?
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 自旋 空隊(duì)列則CAS地初始化隊(duì)列,隊(duì)列有節(jié)點(diǎn)就把節(jié)點(diǎn)掛到隊(duì)列尾部
enq(node);
return node;
}
1.3acquireQueued()
把當(dāng)前節(jié)點(diǎn)加入隊(duì)列尾部之后,acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取自己的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)(正持有資源的節(jié)點(diǎn)) 自己是第二個(gè)節(jié)點(diǎn),那么可能有機(jī)會(huì)獲得資源
// 嘗試獲取資源(非公平鎖直接獲取,公平鎖會(huì)先檢查隊(duì)列中有沒有線程在等待再獲取)
// 第一次進(jìn)來就直接看能不能獲取資源 也可能是等待后,被喚醒(前驅(qū)節(jié)點(diǎn)釋放資源/中斷)循環(huán)走到這里
if (p == head && tryAcquire(arg)) {
// 獲取到資源 把當(dāng)前節(jié)點(diǎn)設(shè)為頭結(jié)點(diǎn),前驅(qū)節(jié)點(diǎn)置空,保存的線程置空(頭結(jié)點(diǎn)沒必要保存線程了)
setHead(node);
// 去掉之前的頭節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
// 返回等待過程中有沒有被中斷過
return interrupted;
}
// 如果自己不是第二個(gè)節(jié)點(diǎn) 或者自己雖然是第二個(gè)節(jié)點(diǎn),但是由于非公平鎖,新來的線程可以不入隊(duì)列,直接獲取鎖,所以這里可能被其他線程搶先了
// 檢查是否可以休息,找到可以安心休息的地方。將前驅(qū)節(jié)點(diǎn)waitStatus改為-1,即后續(xù)來喚醒自己
if (shouldParkAfterFailedAcquire(p, node) &&
// park 阻塞等待
// 檢查中斷標(biāo)志 如果是被中斷了而不是被unPark()
// 后面tryAcquire方法()獲取到資源之后會(huì)返回中斷標(biāo)志,acquire()里自行產(chǎn)生一個(gè)中斷標(biāo)志
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
嘗試獲取資源失敗后,檢查自己是否可以安心休息,如果不能就找到一個(gè)可以安心休息的地方
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驅(qū)節(jié)點(diǎn)的狀態(tài)已經(jīng)是SIGNAL,到時(shí)候會(huì)來喚醒自己,所以可以安心休息
if (ws == Node.SIGNAL)
return true;
// 前驅(qū)節(jié)點(diǎn)狀態(tài)是被取消
if (ws > 0) {
do {
// 一直往前找沒有取消的節(jié)點(diǎn)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把自己掛到它后面
pred.next = node;
} else {
// 走到這里意味著狀態(tài)是0/PROPAGATE 則把前驅(qū)節(jié)點(diǎn)狀態(tài)置為SIGNAL
// 這里就算成功了,也不會(huì)直接返回true開始休息,而是再一輪嘗試獲取資源,獲取不到再park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 等待
LockSupport.park(this);
// 被喚醒 返回自己是不是因?yàn)橹袛啾粏拘? 中斷信號(hào)能打斷一個(gè)等待中的線程,終止等待
return Thread.interrupted();
}
總結(jié):如果當(dāng)前是第二個(gè)節(jié)點(diǎn),就去嘗試獲取資源,如果成功了直接返回。否則的話,尋找一個(gè)合適的休息點(diǎn),開始等待,直到被前驅(qū)節(jié)點(diǎn)unPark()或被中斷,才繼續(xù)判斷自己是否是第二個(gè)節(jié)點(diǎn),去嘗試獲取資源。這個(gè)方法會(huì)返回等待過程中是否被中斷過,后續(xù)用來自行產(chǎn)生一個(gè)中斷。這也是不響應(yīng)中斷的核心,等待過程不可中斷,只有獲取到資源之后,才可以被中斷
1.4selfInterrupt()
自行產(chǎn)生一個(gè)中斷
1.5總結(jié)
1.先嘗試獲取資源
2.獲取不到則把當(dāng)前線程加入隊(duì)列尾,標(biāo)記獨(dú)占模式
3.檢查自己是否為第二個(gè)節(jié)點(diǎn),是則嘗試獲取鎖,不是則進(jìn)入等待,等輪到自己了,前驅(qū)節(jié)點(diǎn)會(huì)unPark自己,自己再去嘗試獲取資源。如果被中斷過,則返回出去
4.獲取資源成功,如果被中斷過,則產(chǎn)生一個(gè)中斷
2.release()
釋放資源的頂級(jí)入口,釋放完資源,會(huì)喚醒隊(duì)列中后一個(gè)正在等待的線程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 判斷頭節(jié)點(diǎn)不為空,狀態(tài)不為0
if (h != null && h.waitStatus != 0)
// 喚醒后繼節(jié)點(diǎn) 找到隊(duì)列里離head最近的一個(gè)沒取消的node,unpark恢復(fù)其運(yùn)行
unparkSuccessor(h);
return true;
}
return false;
}
2.1tryRelease()
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2.2unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 當(dāng)前結(jié)點(diǎn)狀態(tài)為負(fù) 則置為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 一般要喚醒的就是后一個(gè)節(jié)點(diǎn),但是可能該節(jié)點(diǎn)已取消 所以要從尾結(jié)點(diǎn)一直往前找,找到真正有效的節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
疑問:這里為什么要從后往前找?
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 要掛上去的節(jié)點(diǎn) prev是一定能成功指向之前的尾結(jié)點(diǎn)的
node.prev = pred;
// CAS把尾結(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
// 但這里可能還沒有執(zhí)行到 此時(shí)之前的尾結(jié)點(diǎn)指向null
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
3.acquireShared()
共享模式下,嘗試獲取指定量的資源,獲取失敗則進(jìn)入等待隊(duì)列,直到獲取到資源。該過程忽略中斷
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
3.1tryAcquireShared()
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
負(fù)值:失敗,需要執(zhí)行doAcquireShared()進(jìn)入隊(duì)列
0:成功,但沒有剩余資源
正值:成功,還有剩余資源,其他線程還可以獲取
3.2doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 自己成為head
setHead(node);
// 如果還有剩余量 繼續(xù)喚醒下一個(gè)線程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
3.3總結(jié)
先嘗試獲取資源,沒有獲取到則進(jìn)入隊(duì)列等待,直到獲取到資源。與獨(dú)占模式相比,自己拿到資源之后,還會(huì)繼續(xù)喚醒下一個(gè)線程
4.releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放資源
doReleaseShared();//喚醒后繼結(jié)點(diǎn)
return true;
}
return false;
}
4.1tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
4.2doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒后繼
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head發(fā)生變化
break;
}
}