主要參考了博客JUC框架 源碼解析系列文章目錄 JDK8
AbstractQueuedSynchronizer
概述
實現(xiàn)大量依賴樂觀鎖的方式(即CAS+自旋)。它實現(xiàn)了一個FIFO的等待隊列用于等待獲取同步狀態(tài),而獲取/釋放同步器狀態(tài)的函數(shù)則依靠子類來實現(xiàn)。
雖然AQS是一個抽象類,但卻沒有任何抽象方法。如果定義為抽象方法確實不合適,因為繼承使用AQS并不一定需要使用到AQS提供的所有功能(獨占鎖和共享鎖),這樣子類反而需要實現(xiàn)所有抽象方法。如果定義為空實現(xiàn)的普通方法,雖然不需要子類實現(xiàn)所有空方法了,但這樣還是不夠明確。現(xiàn)在AQS將這些方法的實現(xiàn)為拋出UnsupportedOperationException異常,那么如果是子類需要使用的方法,就覆蓋掉它;如果是子類不需要使用的方法,一旦調(diào)用就會拋出異常。
AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)。
簡單使用案例(實現(xiàn)一個共享鎖):
自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true,否則返回false。
public class MyLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) throw new IllegalArgumentException();
setState(count);
}
@Override
protected int tryAcquireShared(int acquireCount) {
while (true) {
int cur = getState();
int newCount = cur - acquireCount;
if (newCount < 0 || compareAndSetState(cur, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int releaseCount) {
while (true) {
int cur = getState();
int newCount = cur + releaseCount;
if (compareAndSetState(cur, newCount)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
}
重要屬性
state
- state用volatile修飾,保證了它的可見性。
- 如果是多線程并發(fā)修改的話,采用compareAndSetState來操作state
- 如果是在沒有線程安全的環(huán)境下對state操作(例如ReentrantLock釋放鎖,因為它之前已經(jīng)獲取到獨占鎖,所以沒必要用CAS),采用setState方法
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
等待隊列
AQS中已經(jīng)為我們實現(xiàn)了一個FIFO的等待隊列,它是一個雙向鏈表。由于同步器的state一般不能讓所有線程同時獲得,所以將這些需要暫時等待的線程包裝成一個節(jié)點放到隊列中去,當(dāng)獲取state的條件滿足時,會將這個節(jié)點內(nèi)的線程喚醒,以便它接下來去嘗試獲取state。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // 表明node代表線程的狀態(tài)
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter; // 表明當(dāng)前node的線程是想要獲取共享鎖還是獨占鎖
final boolean isShared() {
return nextWaiter == SHARED;
}
}
private transient volatile Node head; // 固定是一個dummy node,因為它的thread成員固定為null
private transient volatile Node tail; // 請求鎖失敗的線程,會包裝成node,放到隊尾
- head節(jié)點中thread成員為null,可以理解為將它的thread成員放到AQS的
exclusiveOwnerThread屬性上
- 即使等待線程只有一個,等待隊列中的節(jié)點個數(shù)也肯定是2個,因為第一個節(jié)點總是dummy node。
acquire(int arg)
流程

- 首先調(diào)用子類的
tryAcquire嘗試獲取獨占鎖一次,try的意思就是只試一次,要么成功,要么失敗。 - 獲取不到則調(diào)用
addWaiter(Node.EXCLUSIVE)將該線程加入等待隊列的尾部,并標(biāo)記為獨占模式 -
acquireQueued使線程在等待隊列中獲取資源,中途可能不斷經(jīng)歷阻塞/喚醒狀態(tài),一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。 - 如果線程在等待過程中被中斷過,它是不響應(yīng)的。但是當(dāng)
acquireQueued返回真時,代表這期間函數(shù)曾經(jīng)檢測到過中斷狀態(tài),并且將中斷狀態(tài)消耗掉了(Thread.interrupted()),所以需要在退出acquire之前,將中斷狀態(tài)重新設(shè)置上。
/**
Acquires in exclusive mode, ignoring interrupts.
Implemented by invoking at least once tryAcquire, returning on success.
Otherwise the thread is queued,
possibly repeatedly blocking and unblocking, invoking tryAcquire until success.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
該方法的默認(rèn)實現(xiàn)是拋出UnsupportedOperationException,具體實現(xiàn)由自定義的擴(kuò)展了AQS的同步類來實現(xiàn)。
AQS在這里只負(fù)責(zé)定義了一個公共的方法框架。
沒有定義成abstract,是因為獨占模式下只用實現(xiàn)tryAcquire-tryRelease,
而共享模式下只用實現(xiàn)tryAcquireShared-tryReleaseShared。
如果都定義成abstract,那么每個模式也要去實現(xiàn)另一模式下的接口。
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
addWaiter(Node)

- 將當(dāng)前線程封裝成一個節(jié)點(
Node.EXCLUSIVE互斥模式、Node.SHARED共享模式) - 嘗試快速入隊:通過一次CAS加入到等待隊列的隊尾。
- 如果CAS失敗或者隊列為空,則通過enq(node)方法初始化一個等待隊列
- 在enq(node)中,如果隊列為空,則會給頭部設(shè)置一個空節(jié)點:
compareAndSetHead(new Node()),隨后不斷自旋直到把node加入到等待隊列隊尾。這個循環(huán)只有在compareAndSetTail(t, node)成功時才會退出循環(huán),這就保證了enq最終肯定能將參數(shù)node放到隊尾。就算只有一個線程入隊,入隊完畢后隊列將有兩個node,第一個node稱為dummy node,因為它的thread成員為null;第二個node才算是實際意義的隊頭,它的thread成員不為null。新建的是空node,它的所有成員都是默認(rèn)值。thread成員為null,waitStatus為0。之后你會發(fā)現(xiàn),隊尾node的waitStatus總是0,因為默認(rèn)初始化。 - 返回當(dāng)前線程所在的結(jié)點
注意點
如果是多線程執(zhí)行,可能導(dǎo)致多個node.prev鏈接到了tail,但是通過CAS保證tail.next只會鏈接到其中一個Node,并且其他的Node在不斷的自旋中最終還是會加入到等待隊列中
prev的有效性:有可能產(chǎn)生這樣一種中間狀態(tài),即node.prev指向了原先的tail,但是tail.next還沒來得及指向node。這時如果另一個線程通過next指針遍歷隊列,就會漏掉最后一個node。但是如果是通過tail.prev來遍歷等待隊列,就不會漏掉節(jié)點
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
acquireQueued

- 每次循環(huán)都會判斷是否可以嘗試獲取鎖(判斷前驅(qū)節(jié)點p是否為head),如果可以,那么嘗試tryAcquire(arg)
- 如果不可以嘗試,或者獲取鎖失敗,則通過parkAndCheckInterrupt阻塞線程并檢查線程中斷狀態(tài)
- 如果線程被unpark/interrupt,則會從park中返回,接著從parkAndCheckInterrupt()返回,繼續(xù)往下執(zhí)行
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node); // 該方法不會被執(zhí)行
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire
Node的狀態(tài)
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
CANCELLED代表線程已經(jīng)取消等待
SIGNAL說明這個node的后繼node的代表線程已經(jīng)阻塞或馬上阻塞。當(dāng)前node成為head并釋放鎖時,會根據(jù)SIGNAL來喚醒后繼node。即SIGNAL是喚醒后繼節(jié)點的標(biāo)志。
- 一個node新建的時候,它的waitStatus是默認(rèn)初始化為0
說明
獲取鎖失敗了才會執(zhí)行該函數(shù):
- p == head為false,即當(dāng)前線程的node的前驅(qū)不是head
- 雖然 p == head為true,雖然當(dāng)前線程雖然已經(jīng)排到等待隊列的最前面,但獲取鎖還是失敗了
只有當(dāng)該函數(shù)返回true時,才會去執(zhí)行parkAndCheckInterrupt
作用:跳過無效前驅(qū),把node的有效前驅(qū)(有效是指node不是CANCELLED的)找到,并且將有效前驅(qū)的狀態(tài)設(shè)置為SIGNAL,之后便返回true代表馬上可以阻塞了。給前一個節(jié)點設(shè)置SIGNAL,相當(dāng)于一個鬧鐘,當(dāng)前一個節(jié)點釋放鎖時,喚醒當(dāng)前節(jié)點
執(zhí)行兩次
如果剛開始前驅(qū)的狀態(tài)為0,那么需要第一次執(zhí)行compareAndSetWaitStatus(pred, ws, Node.SIGNAL)返回false并進(jìn)入下一次循環(huán),第二次才能進(jìn)入if (ws == Node.SIGNAL)分支,所以說至少執(zhí)行兩次。死循環(huán)保證了最終一定能設(shè)置前驅(qū)為SIGNAL成功的。(考慮當(dāng)前線程一直不能獲取到鎖)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) return true;
if (ws > 0) {
do {
// 是CANCELLED,說明前驅(qū)節(jié)點已經(jīng)因為超時或響應(yīng)了中斷,而取消了自己,需要跳過他們
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 把一個node的狀態(tài)變成SIGNAL
}
return false;
}
parkAndCheckInterrupt
調(diào)用完LockSupport.park(this),當(dāng)前線程就阻塞在這里,直到有別的線程unpark了當(dāng)前線程,或者中斷了當(dāng)前線程。而返回的Thread.interrupted()代表當(dāng)前線程在阻塞的過程中,有沒有被別的線程中斷過,如果有,則返回true。注意,Thread.interrupted()會消耗掉中斷的狀態(tài),即第一次執(zhí)行能返回true,但緊接著第二次執(zhí)行就只會返回false了。
如果是別的線程unpark了當(dāng)前線程,那么調(diào)用Thread.interrupted()返回false。
如果是別的線程中斷了當(dāng)前線程,那么調(diào)用Thread.interrupted()返回true。
回到acquireQueued的邏輯中,發(fā)現(xiàn)一旦當(dāng)前線程被中斷過一次,那么parkAndCheckInterrupt就返回了true,那么執(zhí)行interrupted = true,interrupted局部變量就一定是true的了。(該中斷狀態(tài)會永久保留,用于最外層acquire中恢復(fù)用戶中斷)
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
注意點
忽略中斷
整個過程忽略用戶發(fā)出的中斷信號(也就是由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步隊列中移出),直到acquireQueued執(zhí)行結(jié)束后,才通過selfInterrupt恢復(fù)用戶的中斷
為什么tryAcquire(arg)的前提是p == head?
從enq的邏輯可知,head只會是一個dummy node,實際意義的node只會在head的后面。而node的前驅(qū)是head(final Node p = node.predecessor()),則代表node已經(jīng)是隊列中的第一個實際node了,排在最前面的node自然可以嘗試去獲取鎖了。
回想整個調(diào)用過程,是最開始在acquire里調(diào)用tryAcquire就已經(jīng)失敗了,然而此時第一次循環(huán)時,又可能馬上去調(diào)tryAcquire(說可能,是因為需要p == head成立),這會不會是一次肯定失敗的tryAcquire?
考慮這種場景,線程1獲取了鎖正在使用還沒釋放,此時隊列為空,線程2此時也來獲取鎖,自然最開始在acquire里調(diào)用tryAcquire會失敗,假設(shè)線程2剛開始執(zhí)行acquireQueued,此時線程1釋放了鎖,此時線程2肯定排在head后面,那么線程2馬上tryAcquire,然后就可以獲取成功。
執(zhí)行acquireQueued的線程是誰?
一定是node參數(shù)的thread成員,雖然執(zhí)行過程中,可能會經(jīng)歷不斷阻塞和被喚醒的過程。
為什么剛執(zhí)行完addWaiter方法時,才把代表當(dāng)前線程的node放到隊尾,怎么之后一判斷就會發(fā)現(xiàn)自己處于head的后繼了?
考慮addWaiter時,隊列中有許多node。這說明從head到當(dāng)前方法棧中的node之間的那些node,它們自己也會在執(zhí)行acquireQueued,它們依次執(zhí)行成功(指p == head && tryAcquire(arg)成功),每次執(zhí)行成功相當(dāng)于把head成員從隊列上后移一個node,當(dāng)它們都執(zhí)行完畢,當(dāng)前方法棧中的node自然也就是head的后繼了。
“之間的那些node”的最后一個node執(zhí)行acquireQueued成功后(代表 最后一個node的代表線程獲得鎖成功,它自己成為了head),當(dāng)前方法還在阻塞之中,只有當(dāng)這“最后一個node”釋放獨占鎖時,才會執(zhí)行unparkSuccessor(head),當(dāng)前線程才會被喚醒。
finally塊是否會執(zhí)行cancelAcquire(node)?
雖然號稱此函數(shù)是不響應(yīng)中斷的函數(shù),但不響應(yīng)中斷只是對于AQS的使用者來說,如果一個線程阻塞在parkAndCheckInterrupt這里,別的線程來中斷它,它是會馬上喚醒的,然后繼續(xù)這個循環(huán)。不過想要退出這個函數(shù),只有通過return interrupted,而前一句就是failed = false,所以finally塊里,是永遠(yuǎn)不會去執(zhí)行cancelAcquire(node)的。
release(int arg)
獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
釋放鎖的過程,根本不會區(qū)分公平或不公平、響應(yīng)中斷或不響應(yīng)中斷、超時或不超時。這是因為,這些區(qū)別都只是存在于嘗試獲取鎖的方式上而已,既然已經(jīng)獲得了鎖,也就不需要有這些區(qū)別。
細(xì)節(jié)
- 如果遇到
s == null,說明我們遇到一種中間狀態(tài),next指針還沒有指好。如果遇到s.waitStatus > 0,說明head后繼剛?cè)∠?。這兩種情況,都需要從隊尾的prev往前找。 - 注意循環(huán)條件
t != null && t != node,它會從隊尾一直往前找,直到t是null或t已經(jīng)到達(dá)了node。一般情況下,不會出現(xiàn)t != null,所以,這樣循環(huán)肯定能找到node之后第一個不是取消狀態(tài)的節(jié)點。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果從頭到尾都只有一個線程在使用鎖,那么隊列也不會初始化,head肯定為null。
// 當(dāng)隊列只有一個dummy node時,它的狀態(tài)為0,也就不會執(zhí)行unparkSuccessor(h)了
// 當(dāng)head的狀態(tài)為SIGNAL時,說明head后繼已經(jīng)設(shè)置了鬧鐘,會執(zhí)行unparkSuccessor(h)。
if (h != null && h.waitStatus != 0) unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/*
* head后繼一般能直接通過next找到,但只有prev是肯定有效的。
* 所以遇到next為null,肯定需要從隊尾的prev往前找。
* 遇到next的狀態(tài)為取消,也需要從隊尾的prev往前找。
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
if (s != null) LockSupport.unpark(s.thread);
}
acquireInterruptibly(int arg)
進(jìn)入這個方法后,會第一次進(jìn)行tryAcquire嘗試。但不同的,此acquireInterruptibly函數(shù)中,會去檢測Thread.interrupted(),并拋出異常。
對于acquireInterruptibly這個方法而言,既可以是公平的,也可以是不公平的,這完全取決于tryAcquire的實現(xiàn)(即取決于ReentrantLock當(dāng)初是怎么構(gòu)造的)。
如果檢測到中斷信號,首先線程會從LockSupport.park()中返回,并且拋出InterruptedException異常,執(zhí)行cancelAcquire方法,將該線程代表的節(jié)點從等待隊列中移除,并根據(jù)情況選擇是否unparkSuccessor后續(xù)節(jié)點
doAcquireInterruptibly不需要返回值,因為該函數(shù)中如果檢測到了中斷狀態(tài),就直接拋出異常就好了。
doAcquireInterruptibly方法的finally塊是可能會執(zhí)行到cancelAcquire(node)的,而acquireQueued方法不可能去執(zhí)行cancelAcquire(node)的。在doAcquireInterruptibly方法中,如果線程阻塞在parkAndCheckInterrupt這里后,別的線程來中斷阻塞線程,阻塞線程會被喚醒,然后拋出異常。本來拋出異常該函數(shù)就馬上結(jié)束掉的,但由于有finally塊,所以在結(jié)束掉之前會去執(zhí)行finally塊,并且由于failed為true,則會執(zhí)行cancelAcquire(node)。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
if (!tryAcquire(arg)) doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed) cancelAcquire(node);
}
}

private void cancelAcquire(Node node) {
if (node == null) return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 跳過CANCELLED的節(jié)點
// 執(zhí)行完循環(huán),pred會指向node的有效前驅(qū)
Node predNext = pred.next;
// 如果別的線程在執(zhí)行這步之后,別的線程將會跳過這個node。
// 如果別的線程在執(zhí)行這步之前,別的線程還是會將這個node當(dāng)作有效節(jié)點。
node.waitStatus = Node.CANCELLED;
// 如果node是隊尾,直接設(shè)置pred為隊尾,然后設(shè)置pred的后繼為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
tryAcquireNanos(int arg, long nanosTimeout)
tryAcquireNanos這個方法與不響應(yīng)中斷的acquire方法對應(yīng)。同樣的,進(jìn)入這個方法后,會第一次進(jìn)行tryAcquire嘗試。但不同的,此tryAcquireNanos函數(shù)中,會先去檢測Thread.interrupted(),并拋出異常。
但注意,對于tryAcquireNanos這個方法而言,既可以是公平的,也可以是不公平的,這完全取決于tryAcquire的實現(xiàn)(即取決于ReentrantLock當(dāng)初是怎么構(gòu)造的)。
差別
每次循環(huán)都會檢查時間是否到達(dá)deadline。
當(dāng)剩余時間小于spinForTimeoutThreshold時,則不能調(diào)用LockSupport.parkNanos,因為時間太短,反而無法精確控制阻塞時間,所以不如在剩余的時間里一直循環(huán)。
LockSupport.parkNanos除了會因為別人的park而喚醒,也會因為別人的中斷而喚醒,當(dāng)然最重要的,時間到了,它自己會喚醒。
不管哪種情況,被喚醒后,都會檢查中斷狀態(tài)。每個循環(huán)都會檢查一次。
如果中斷,也同樣進(jìn)入cancelAcquire方法
final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L) return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted()) throw new InterruptedException();
}
} finally {
if (failed) cancelAcquire(node);
}
}
acquireShared
共享鎖與獨占鎖的區(qū)別
- 獨占鎖是線程獨占的,同一時刻只有一個線程能擁有獨占鎖,AQS里將這個線程放置到exclusiveOwnerThread成員上去。
- 共享鎖是線程共享的,同一時刻能有多個線程擁有共享鎖,但AQS里并沒有用來存儲獲得共享鎖的多個線程的成員。
- 如果一個線程剛獲取了共享鎖,那么在其之后等待的線程也很有可能能夠獲取到鎖。但獨占鎖不會這樣做,因為鎖是獨占的。
- 當(dāng)然,如果一個線程剛釋放了鎖,不管是獨占鎖還是共享鎖,都需要喚醒在后面等待的線程
流程
- 創(chuàng)建的節(jié)點不同。共享鎖使用addWaiter(Node.SHARED),所以會創(chuàng)建出想要獲取共享鎖的節(jié)點。而獨占鎖使用addWaiter(Node.EXCLUSIVE)。
- 獲取鎖成功后的善后操作不同。共享鎖使用setHeadAndPropagate(node, r),因為剛獲取共享鎖成功后,后面的線程也有可能成功獲取,所以需要在一定條件喚醒head后繼。而獨占鎖使用setHead(node)。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node);
}
}
/**
setHead函數(shù)只是將剛成為將成為head的節(jié)點變成一個dummy node。
而setHeadAndPropagate里也會調(diào)用setHead函數(shù)。
但是它在一定條件下還可能會調(diào)用doReleaseShared
“如果一個線程剛獲取了共享鎖,那么在其之后等待的線程也很有可能能夠獲取到鎖”。
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 復(fù)原中斷狀態(tài),雖然這個版本的函數(shù)不用響應(yīng)中斷。
* 當(dāng)acquireQueued返回真時,代表這期間函數(shù)曾經(jīng)檢測到過中斷狀態(tài),并且將中斷狀態(tài)消耗掉了
* 所以需要在退出acquire之前,將中斷狀態(tài)重新設(shè)置上。
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 在獲取共享鎖成功時,也可能會調(diào)用到doReleaseShared。
private void doReleaseShared() {
for (; ; ) {
Node h = head;
// 如果隊列從來沒有初始化過(head為null),或者h(yuǎn)ead就是tail,則直接判斷head是否變化過。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
}
/*
循環(huán)檢測到head沒有變化時就會退出循環(huán)
head變化一定是因為acquire thread被喚醒,之后它成功獲取鎖,然后setHead設(shè)置了新head
保證了只要在某個循環(huán)的過程中有線程剛獲取了鎖且設(shè)置了新head,就會再次循環(huán)
目的當(dāng)然是為了再次執(zhí)行unparkSuccessor(h),即喚醒隊列中第一個等待的線程
*/
if (h == head) break;
}
}