??AQS是AbstractQueuedSynchronizer的簡(jiǎn)稱。AQS提供了一種實(shí)現(xiàn)阻塞鎖和一系列依賴FIFO等待隊(duì)列的同步器的框架,如下圖所示。AQS為一系列同步器依賴于一個(gè)單獨(dú)的原子變量(state)的同步器提供了一個(gè)非常有用的基礎(chǔ)。子類們必須定義改變state變量的protected方法,這些方法定義了state是如何被獲取或釋放的。鑒于此,本類中的其他方法執(zhí)行所有的排隊(duì)和阻塞機(jī)制。子類也可以維護(hù)其他的state變量,但是為了保證同步,必須原子地操作這些變量。

?? AbstractQueuedSynchronizer中對(duì)state的操作是原子的,且不能被繼承。所有的同步機(jī)制的實(shí)現(xiàn)均依賴于對(duì)改變量的原子操作。為了實(shí)現(xiàn)不同的同步機(jī)制,我們需要?jiǎng)?chuàng)建一個(gè)非共有的(non-public internal)擴(kuò)展了AQS類的內(nèi)部輔助類來(lái)實(shí)現(xiàn)相應(yīng)的同步邏輯。AbstractQueuedSynchronizer并不實(shí)現(xiàn)任何同步接口,它提供了一些可以被具體實(shí)現(xiàn)類直接調(diào)用的一些原子操作方法來(lái)重寫(xiě)相應(yīng)的同步邏輯。AQS同時(shí)提供了互斥模式(exclusive)和共享模式(shared)兩種不同的同步邏輯。一般情況下,子類只需要根據(jù)需求實(shí)現(xiàn)其中一種模式,當(dāng)然也有同時(shí)實(shí)現(xiàn)兩種模式的同步類,如
ReadWriteLock。接下來(lái)將詳細(xì)介紹AbstractQueuedSynchronizer的提供的一些具體實(shí)現(xiàn)方法。
state狀態(tài)
??AbstractQueuedSynchronizer維護(hù)了一個(gè)volatile int類型的變量,用戶表示當(dāng)前同步狀態(tài)。volatile雖然不能保證操作的原子性,但是保證了當(dāng)前變量state的可見(jiàn)性。至于volatile的具體語(yǔ)義,可以參考相關(guān)文章。state的訪問(wèn)方式有三種:
- getState()
- setState()
- compareAndSetState()
??這三種叫做均是原子操作,其中compareAndSetState的實(shí)現(xiàn)依賴于Unsafe的compareAndSwapInt()方法。代碼實(shí)現(xiàn)如下:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
自定義資源共享方式
??AQS定義兩種資源共享方式:Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock)和Share(共享,多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch)。
??不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了。自定義同步器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
- tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。
源碼實(shí)現(xiàn)
??接下來(lái)我們開(kāi)始開(kāi)始講解AQS的源碼實(shí)現(xiàn)。依照acquire-release、acquireShared-releaseShared的次序來(lái)。
1. acquire(int)
? ? acquire是一種以獨(dú)占方式獲取資源,如果獲取到資源,線程直接返回,否則進(jìn)入等待隊(duì)列,直到獲取到資源為止,且整個(gè)過(guò)程忽略中斷的影響。該方法是獨(dú)占模式下線程獲取共享資源的頂層入口。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了。下面是acquire()的源碼:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
??通過(guò)注釋我們知道,acquire方法是一種互斥模式,且忽略中斷。該方法至少執(zhí)行一次tryAcquire(int)方法,如果tryAcquire(int)方法返回true,則acquire直接返回,否則當(dāng)前線程需要進(jìn)入隊(duì)列進(jìn)行排隊(duì)。函數(shù)流程如下:
- tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
- acquireQueued()使線程在等待隊(duì)列中獲取資源,一直獲取到資源后才返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),則返回true,否則返回false。
- 如果線程在等待過(guò)程中被中斷過(guò),它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。
接下來(lái)一次介紹相關(guān)方法。
1.1 tryAcquire(int)
?? tryAcquire嘗試以獨(dú)占的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用于實(shí)現(xiàn)Lock中的tryLock()方法。該方法的默認(rèn)實(shí)現(xiàn)是拋出UnsupportedOperationException,具體實(shí)現(xiàn)由自定義的擴(kuò)展了AQS的同步類來(lái)實(shí)現(xiàn)。AQS在這里只負(fù)責(zé)定義了一個(gè)公共的方法框架。這里之所以沒(méi)有定義成abstract,是因?yàn)楠?dú)占模式下只用實(shí)現(xiàn)tryAcquire-tryRelease,而共享模式下只用實(shí)現(xiàn)tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個(gè)模式也要去實(shí)現(xiàn)另一模式下的接口。
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1.2 addWaiter(Node)
??該方法用于將當(dāng)前線程根據(jù)不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待隊(duì)列的隊(duì)尾,并返回當(dāng)前線程所在的結(jié)點(diǎn)。如果隊(duì)列不為空,則以通過(guò)compareAndSetTail方法以CAS的方式將當(dāng)前線程節(jié)點(diǎn)加入到等待隊(duì)列的末尾。否則,通過(guò)enq(node)方法初始化一個(gè)等待隊(duì)列,并返回當(dāng)前節(jié)點(diǎn)。源碼如下:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
1.2.1 enq(node)
??enq(node)用于將當(dāng)前節(jié)點(diǎn)插入等待隊(duì)列,如果隊(duì)列為空,則初始化當(dāng)前隊(duì)列。整個(gè)過(guò)程以CAS自旋的方式進(jìn)行,直到成功加入隊(duì)尾為止。源碼如下:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.3 acquireQueued(Node, int)
??acquireQueued()用于隊(duì)列中的線程自旋地以獨(dú)占且不可中斷的方式獲取同步狀態(tài)(acquire),直到拿到鎖之后再返回。該方法的實(shí)現(xiàn)分成兩部分:如果當(dāng)前節(jié)點(diǎn)已經(jīng)成為頭結(jié)點(diǎn),嘗試獲取鎖(tryAcquire)成功,然后返回;否則檢查當(dāng)前節(jié)點(diǎn)是否應(yīng)該被park,然后將該線程park并且檢查當(dāng)前線程是否被可以被中斷。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
//標(biāo)記是否成功拿到資源,默認(rèn)false
boolean failed = true;
try {
boolean interrupted = false;//標(biāo)記等待過(guò)程中是否被中斷過(guò)
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.3.1 shouldParkAfterFailedAcquire(Node, Node)
??shouldParkAfterFailedAcquire方法通過(guò)對(duì)當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的狀態(tài)進(jìn)行判斷,對(duì)當(dāng)前節(jié)點(diǎn)做出不同的操作,至于每個(gè)Node的狀態(tài)表示,可以參考接口文檔。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1.3.2 parkAndCheckInterrupt()
??該方法讓線程去休息,真正進(jìn)入等待狀態(tài)。park()會(huì)讓當(dāng)前線程進(jìn)入waiting狀態(tài)。在此狀態(tài)下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會(huì)清除當(dāng)前線程的中斷標(biāo)記位。
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
??我們?cè)倩氐絘cquireQueued(),總結(jié)下該函數(shù)的具體流程:
- 結(jié)點(diǎn)進(jìn)入隊(duì)尾后,檢查狀態(tài),找到安全休息點(diǎn);
- 調(diào)用park()進(jìn)入waiting狀態(tài),等待unpark()或interrupt()喚醒自己;
- 被喚醒后,看自己是不是有資格能拿到號(hào)。如果拿到,head指向當(dāng)前結(jié)點(diǎn),并返回從入隊(duì)到拿到號(hào)的整個(gè)過(guò)程中是否被中斷過(guò);如果沒(méi)拿到,繼續(xù)流程1。
最后,總結(jié)一下acquire()的流程:
- 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- 沒(méi)成功,則addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
- acquireQueued()使線程在等待隊(duì)列中休息,有機(jī)會(huì)時(shí)(輪到自己,會(huì)被unpark())會(huì)去嘗試獲取資源。獲取到資源后才返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),則返回true,否則返回false。
- 如果線程在等待過(guò)程中被中斷過(guò),它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。
2. release(int)
??release(int)方法是獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。這也正是unlock()的語(yǔ)義,當(dāng)然不僅僅只限于unlock()。下面是release()的源碼:
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
??與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨(dú)占模式的自定義同步器去實(shí)現(xiàn)的。正常來(lái)說(shuō),tryRelease()都會(huì)成功的,因?yàn)檫@是獨(dú)占模式,該線程來(lái)釋放資源,那么它肯定已經(jīng)拿到獨(dú)占資源了,直接減掉相應(yīng)量的資源即可(state-=arg),也不需要考慮線程安全的問(wèn)題。但要注意它的返回值,上面已經(jīng)提到了,release()是根據(jù)tryRelease()的返回值來(lái)判斷該線程是否已經(jīng)完成釋放掉資源了!所以自義定同步器在實(shí)現(xiàn)時(shí),如果已經(jīng)徹底釋放資源(state=0),要返回true,否則返回false。
??unparkSuccessor(Node)方法用于喚醒等待隊(duì)列中下一個(gè)線程。這里要注意的是,下一個(gè)線程并不一定是當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn),而是下一個(gè)可以用來(lái)喚醒的線程,如果這個(gè)節(jié)點(diǎn)存在,調(diào)用unpark()方法喚醒。
??總之,release()是獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。
3. acquireShared(int)
??acquireShared(int)方法是共享模式下線程獲取共享資源的頂層入口。它會(huì)獲取指定量的資源,獲取成功則直接返回,獲取失敗則進(jìn)入等待隊(duì)列,直到獲取到資源為止,整個(gè)過(guò)程忽略中斷。下面是acquireShared()的源碼:
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
3.1 doAcquireShared(int)
??將當(dāng)前線程加入等待隊(duì)列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回。源碼如下:
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
??跟獨(dú)占模式比,還有一點(diǎn)需要注意的是,這里只有線程是head.next時(shí)(“老二”),才會(huì)去嘗試獲取資源,有剩余的話還會(huì)喚醒之后的隊(duì)友。那么問(wèn)題就來(lái)了,假如老大用完后釋放了5個(gè)資源,而老二需要6個(gè),老三需要1個(gè),老四需要2個(gè)。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會(huì)繼續(xù)park()等待其他線程釋放資源,也更不會(huì)去喚醒老三和老四了。獨(dú)占模式,同一時(shí)刻只有一個(gè)線程去執(zhí)行,這樣做未嘗不可;但共享模式下,多個(gè)線程是可以同時(shí)執(zhí)行的,現(xiàn)在因?yàn)槔隙馁Y源需求量大,而把后面量小的老三和老四也都卡住了。當(dāng)然,這并不是問(wèn)題,只是AQS保證嚴(yán)格按照入隊(duì)順序喚醒罷了(保證公平,但降低了并發(fā))。實(shí)現(xiàn)如下:
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
??此方法在setHead()的基礎(chǔ)上多了一步,就是自己蘇醒的同時(shí),如果條件符合(比如還有剩余資源),還會(huì)去喚醒后繼結(jié)點(diǎn),畢竟是共享模式!至此,acquireShared()也要告一段落了。讓我們?cè)偈崂硪幌滤牧鞒蹋?/p>
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過(guò)doAcquireShared()進(jìn)入等待隊(duì)列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個(gè)等待過(guò)程也是忽略中斷的。
4. releaseShared(int)
??releaseShared(int)方法是共享模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。下面是releaseShared()的源碼:
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
??此方法的流程也比較簡(jiǎn)單,一句話:釋放掉資源后,喚醒后繼。跟獨(dú)占模式下的release()相似,但有一點(diǎn)稍微需要注意:獨(dú)占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會(huì)返回true去喚醒其他線程,這主要是基于獨(dú)占下可重入的考量;而共享模式下的releaseShared()則沒(méi)有這種要求,共享模式實(shí)質(zhì)就是控制一定量的線程并發(fā)執(zhí)行,那么擁有資源的線程在釋放掉部分資源時(shí)就可以喚醒后繼等待結(jié)點(diǎn)。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}