Java并發(fā)編程 - 深入剖析ReentrantLock之非公平鎖加鎖流程(第1篇)
Java并發(fā)編程 - 深入剖析ReentrantLock之非公平鎖解鎖流程(第2篇)
之前的文章講過ReentrantLock,通過調(diào)試示例分析其加鎖和結(jié)鎖的流程,ReentrantLock是獨(dú)占鎖,每次都只允許一個(gè)線程加鎖和解鎖。其內(nèi)部的state狀態(tài)只有0和1兩種值,當(dāng)線程無法通過CAS將其設(shè)為1的時(shí)候,則說明有其他線程持有鎖,那么該線程就進(jìn)入同步隊(duì)列進(jìn)行等待。
獨(dú)占鎖每次只允許一個(gè)線程持有它的使用權(quán),與之相對(duì)應(yīng)的有共享鎖模式,共享鎖允許多個(gè)線程同時(shí)擁有它。
Semaphore可以作為共享鎖使用,這篇文章我們通過它的加鎖和解鎖來分析其內(nèi)部實(shí)現(xiàn)原理。
澡堂洗澡情景
Bathhouse.java
import java.util.concurrent.Semaphore;
public class Bathhouse {
private static final Semaphore bathhouseManager = new Semaphore(2);// 澡堂管理員手上有2張?jiān)枧?
public void bathe() {
try {
bathhouseManager.acquire();
System.out.println(Thread.currentThread().getName() + ": 在洗澡...");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ": 出澡堂...");
bathhouseManager.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Bathhouse bathhouse = new Bathhouse();
for (int i=1; i<=5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
bathhouse.bathe();
}
}, "洗澡君" + i).start();
}
}
}
上面的情景中,Semaphore作為共享鎖使用,每次允許兩個(gè)人洗澡。
有5個(gè)"洗澡君",用5個(gè)線程表示,我們依次稱作為線程A、B、C、D、E。
下面的調(diào)試過程中,通過IDE多線程調(diào)試工具控制線程的執(zhí)行順序,并發(fā)有影響的代碼會(huì)做說明。如果按照實(shí)際多線程并發(fā)的情形,代碼不好分析。
請(qǐng)求鎖流程分析
執(zhí)行步驟:線程A、B、C、D、E依次執(zhí)行
# 第一步:線程A、B獲取鎖不釋放,線程C、D、E啟動(dòng)獲取鎖
創(chuàng)建Semaphore對(duì)象,傳遞2個(gè)令牌。
Semaphore bathhouseManager = new Semaphore(2)
此時(shí)Semaphore內(nèi)部數(shù)據(jù)下圖所示:

1. 線程A執(zhí)行acquire獲取令牌
Semaphore.java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly在AbstractQueuedSynchronizer中定義:
AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared用于判斷是否獲取到鎖的擁有權(quán):
Semaphore->NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
接下來調(diào)用nonfairTryAcquireShared, 這個(gè)方法在Sync中定義:
Semaphore->Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
這樣首先獲取當(dāng)前的令牌數(shù),然后減去1。如果remaining小于0,說明當(dāng)前無令牌可用,直接返回remaining。如果remaining大于0,那么重設(shè)令牌數(shù),但是CAS可能會(huì)設(shè)置失敗,因?yàn)樵谝粋€(gè)線程執(zhí)行到這里查看到令牌的數(shù)量之后,可能已經(jīng)有線程把令牌拿走了,那么我們看到的這個(gè)令牌數(shù)量就是無效的,CAS設(shè)置就會(huì)失敗。我們可以看到這里是個(gè)無限循環(huán),所以繼續(xù)執(zhí)行循環(huán)直到當(dāng)前線程看到無令牌或者成功設(shè)置了令牌剩余數(shù)。
回到acquireSharedInterruptibly方法,我們調(diào)試的時(shí)候當(dāng)前只有線程A在操作,A拿走了一個(gè)令牌,所以tryAcquireShared返回1。
tryAcquireShared(arg) < 0
條件不滿足,線程A獲取令牌執(zhí)行結(jié)束。
2. 線程B執(zhí)行acquire獲取令牌
執(zhí)行流程和線程B類似,不做重復(fù)。線程B執(zhí)行完后,當(dāng)前是令牌數(shù)為0,及state=0。
3. 線程C執(zhí)行acquire獲取令牌
當(dāng)前令牌數(shù)為0,tryAcquireShared返回-1。
if (tryAcquireShared(arg) < 0)
條件滿足,執(zhí)行if語句塊內(nèi)容,執(zhí)行doAcquireSharedInterruptibly。
AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
##第一句:創(chuàng)建代表線程C的節(jié)點(diǎn),并加入到同步隊(duì)列(節(jié)點(diǎn)入隊(duì)列不做說明)
執(zhí)行完后,Semaphore內(nèi)部數(shù)據(jù)如下:

##第二句:獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
final Node p = node.predecessor();
當(dāng)成node節(jié)點(diǎn)為C節(jié)點(diǎn),根據(jù)我們上面的圖p=head。
這里要注意一下,由于共享鎖可多線程獲取的影響,如果其他線程搶先線程C執(zhí)行addWaiter方法,那么這里的p就不會(huì)是head。不是head的執(zhí)行流程與我們下面要講述的D節(jié)點(diǎn)類似。
##第三句:重試獲取令牌
int r = tryAcquireShared(arg);
這里由于多線程的影響,可能此時(shí)可以獲取到令牌,前提是當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),我們這里因?yàn)榫€程A和線程B還沒釋放鎖,所以r=-1。
這里可以獲得令牌的情況,在同步隊(duì)列中的線程被喚醒階段說明。
##第四句:將節(jié)點(diǎn)的前置節(jié)點(diǎn)的waitStatus設(shè)置為SIGNAL(-1)
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這里不做具體說明,想了解可以參考文章頭部的兩篇文章。
我們這里shouldParkAfterFailedAcquire返回false,第一次循環(huán)結(jié)束,進(jìn)行第二次循環(huán),同樣的第一個(gè)if語句不執(zhí)行,繼續(xù)執(zhí)行第二個(gè)語句,此時(shí)調(diào)用shouldParkAfterFailedAcquire,因?yàn)閜也就是我們的head節(jié)點(diǎn)的waitStatus在第一次循環(huán)時(shí)已經(jīng)設(shè)置為-1,所以shouldParkAfterFailedAcquire返回true,執(zhí)行parkAndCheckInterrupt,線程C被掛起。
AbstractQueuedSynchronizer.java
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
此時(shí)Semaphore內(nèi)部數(shù)據(jù)如下所示:

4. 線程D、E執(zhí)行acquire獲取令牌
執(zhí)行流程和線程C操作類似,這里不再做說明,線程D、E執(zhí)行完后,Semaphore內(nèi)部數(shù)據(jù)如下:

釋放鎖流程分析
執(zhí)行步驟:線程A先執(zhí)行,執(zhí)行到doReleaseShared的unparkSuccessor行后停?。ǚ椒ɡ锩娴腘ode s = node.next行),然后線程B執(zhí)行
# 第一步:線程A執(zhí)行
執(zhí)行release方法釋放令牌:
Semaphore.java
public void release() {
sync.releaseShared(1);
}
releaseShared在AbstractQueuedSynchronizer重定義:
AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
執(zhí)行tryReleaseShared:
Semaphore->Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
由于多線程的影響,需要循環(huán)執(zhí)行,直到設(shè)置正確的值。我們這里執(zhí)行后state=1。
- @@releaseShared方法執(zhí)行流程
AbstractQueuedSynchronizer.java
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
先來看這個(gè)方法的作用, 方法的注釋如下:
Release action for shared mode -- signals successor and ensures propagation. (Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.)
共享模式下的釋放操作 —— 通知后繼者保證傳播性。(注意:對(duì)于互斥模式,釋放僅僅意味如果head節(jié)點(diǎn)需要通知的話就調(diào)用unparkSuccessor方法。)
Ensure that a release propagates, even if there are other in-progress acquires/releases. This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal. But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues. Additionally, we must loop in case a new node is added while we are doing this. Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
即使有其他的acquires/releases操作正在執(zhí)行,也要確保釋放的傳播。如果head節(jié)點(diǎn)需要傳播那么嘗試使用普通方式來執(zhí)行uparkSuccessor。如果不是,那么當(dāng)前節(jié)點(diǎn)的狀態(tài)要設(shè)置成PROPAGAE以保證上面的釋放,傳播繼續(xù)。另外,我們必須循環(huán)處理因?yàn)楫?dāng)我們執(zhí)行的時(shí)候可能會(huì)有新的節(jié)點(diǎn)添加進(jìn)行。再者,不想其他地方使用unparkSuccessor,我們需要知道CAS設(shè)置狀態(tài)是否失敗,如果失敗就重新檢查。
##第1句:獲取head節(jié)點(diǎn)
Node h = head;
unparkSuccessor操作,總是從head節(jié)點(diǎn)開始。需要注意的是上面說過了,在我們處理的過程中,可能會(huì)有其他的線程改變了head節(jié)點(diǎn),也就是說這個(gè)h保存的是當(dāng)前線程這一時(shí)刻看到的head節(jié)點(diǎn)。這個(gè)代碼之后,如果有其他線程改變了同步隊(duì)列的內(nèi)容,那么這個(gè)h指向的是舊的head,此時(shí)h指向的節(jié)點(diǎn)的內(nèi)部屬性可能已經(jīng)改變。
##第2句:判斷h是否為空并且是否為tail節(jié)點(diǎn)
有人會(huì)說,h怎么會(huì)為空呢?
因?yàn)槲覀儓?zhí)行完h=head之后h指向的這個(gè)頭節(jié)點(diǎn)可能已經(jīng)被其他線程移出了同步隊(duì)列,這個(gè)時(shí)候它的next為空,可能會(huì)進(jìn)行垃圾回收使得
h指向的對(duì)象不再存在。
為什么還要判斷是否等于tail節(jié)點(diǎn)?
因?yàn)榭赡茉谖覀儓?zhí)行h= head之前,已經(jīng)其他線程把隊(duì)列中所有的節(jié)點(diǎn)都處理完了,在將ReentrantLock鎖釋放的那篇文章的時(shí)候,最后的那個(gè)圖我們可以看到head和tail指向同一個(gè)節(jié)點(diǎn)了,這種情況下就沒有節(jié)點(diǎn)需要處理。
多線程的影響使得這里的判斷會(huì)有點(diǎn)復(fù)雜,不過其實(shí)也就兩方面:h=head執(zhí)行前多線程的影響和h=head執(zhí)行后多線程的影響。
##第3句:獲取waitStatus狀態(tài)
int ws = h.waitStatus;
##第4句:判斷現(xiàn)在我們處理的這個(gè)節(jié)點(diǎn)的狀態(tài)是否還是SIGNAL,如果是就處理
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
if能執(zhí)行的前提條件當(dāng)前的線程這一刻搶到了先處理同步隊(duì)列的權(quán)利,頭節(jié)點(diǎn)還未被其他線程處理并更新狀態(tài)。
##第5句:通過CAS將head節(jié)點(diǎn)的waitStatus狀態(tài)設(shè)置為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
這里如果CAS設(shè)值失敗的話,就繼續(xù)循環(huán)。有人會(huì)說我上面不是已經(jīng)搶到了處理頭節(jié)點(diǎn)的權(quán)利了嗎,這里怎么會(huì)失敗呢?因?yàn)槟闵弦豢淌菗尩搅?,但是下一刻被其他線程給搶到了,并且對(duì)隊(duì)列進(jìn)行了處理,也就是說此時(shí)此刻h指向的那個(gè)頭節(jié)點(diǎn),已經(jīng)被移出了隊(duì)列,waitStatus的值不再是SIGNAL。
##第6句:執(zhí)行unparkSuccessor
unparkSuccessor(h);
此時(shí)此刻,說明當(dāng)前線程已經(jīng)完全搶到了隊(duì)列head節(jié)點(diǎn)的處理權(quán)。只要我們這里不再執(zhí)行,其他線程永遠(yuǎn)進(jìn)不來這個(gè)地方。
此方法執(zhí)行會(huì)喚醒head的后繼節(jié)點(diǎn)。
##第7句:如果當(dāng)前線程看到的head節(jié)點(diǎn)的waitStatus為0,嘗試將它改為PROPAGATE
不是在unparkSuccessor之前已經(jīng)把waitStatus改為0,然后被喚醒的線程會(huì)把當(dāng)前表示自己的節(jié)點(diǎn)設(shè)置為head,這時(shí)候原h(huán)ead節(jié)點(diǎn)就已經(jīng)出隊(duì)列了嗎,出隊(duì)列的節(jié)點(diǎn)設(shè)置這個(gè)狀態(tài)有什么用?故事就發(fā)生我們的上一步,一個(gè)線程已經(jīng)把head節(jié)點(diǎn)的狀態(tài)設(shè)置為0了,但是它還沒來得及,或者被喚醒的線程還沒來得級(jí)重設(shè)隊(duì)列頭,頭節(jié)點(diǎn)的狀態(tài)已經(jīng)被成0了,但是還在隊(duì)列中。這時(shí)候我們這個(gè)線程來了,看到了當(dāng)前頭結(jié)點(diǎn)的狀態(tài)為0,然后就把它設(shè)置成PROPAGATE。
還會(huì)設(shè)置不成功? 對(duì)的,因?yàn)榭赡軙?huì)同時(shí)有兩個(gè)線程到達(dá)這里,一個(gè)線程設(shè)置成功了,另一個(gè)線程就會(huì)設(shè)置失敗,設(shè)置失敗的線程重新循環(huán)。
在我們這個(gè)例子中這樣測(cè)試:線程A先執(zhí)行,然后到達(dá)unparkSuccessor這個(gè)地方停?。痪€程B在執(zhí)行,這時(shí)候線程B到這里,你就會(huì)看到它會(huì)把頭節(jié)點(diǎn)的waitStatus設(shè)置為PROPAGATE。
##第8句:檢查當(dāng)前線程看到的是不是還是隊(duì)列的頭節(jié)點(diǎn)
if (h == head) // loop if head changed
break;
線程執(zhí)行doReleaseShared可能的情況總結(jié)
線程執(zhí)行doReleaseShared可能出現(xiàn)的執(zhí)行情況如下:
- 第一種:線程搶到了調(diào)度unparkSuccessor權(quán)利,做了喚醒頭節(jié)點(diǎn)的任務(wù),運(yùn)行到h == head這里,喚醒的線程還沒有做完更新同步隊(duì)列的操作,頭節(jié)點(diǎn)并未改變,這時(shí)候h=head,循環(huán)結(jié)束;喚醒的線程已經(jīng)做完了更新同步隊(duì)列的操作,頭結(jié)點(diǎn)改變了,這時(shí)候h!=head,這個(gè)線程執(zhí)行新一輪循環(huán);
- 第二種:線程執(zhí)行,發(fā)現(xiàn)其他線程正在對(duì)頭結(jié)點(diǎn)進(jìn)行處理(compareAndSetWaitStatus設(shè)置waitStatus為0失敗),于是它把當(dāng)前頭結(jié)點(diǎn)的waitStatus設(shè)置為PROPAGATE,運(yùn)行到h == head這里,這是否發(fā)現(xiàn)同步隊(duì)列還未被更新,頭結(jié)點(diǎn)并未改變,這時(shí)候h=head,循環(huán)結(jié)束;喚醒的線程已經(jīng)做完了更新同步隊(duì)列的操作,頭結(jié)點(diǎn)改變了,這時(shí)候h!=head,這個(gè)線程執(zhí)行新一輪的循環(huán);
- 第三種:線程執(zhí)行,發(fā)現(xiàn)其他線程正在對(duì)頭結(jié)點(diǎn)進(jìn)行處理(compareAndSetWaitStatus設(shè)置waitStatus為0失敗)并且嘗試設(shè)置waitStatus為PROPAGATE失?。ǔ霈F(xiàn)第二種情況),線程執(zhí)行新一輪的循環(huán)。
從這里可以看出,線程執(zhí)行release操作,并不一定能喚醒同步隊(duì)列中的線程,也許只是做了設(shè)置waitStatus為PROPAGATE這個(gè)功能。
# 第二步:線程B執(zhí)行
按照我們上面規(guī)定的動(dòng)作,線程A執(zhí)行到unparkSuccessor時(shí)候停住(方法內(nèi)部Node s = node.next行),然后線程B執(zhí)行,線程B執(zhí)行完,線程A再從unparkSuccessor開始執(zhí)行。
按照我們上面的執(zhí)行步驟,執(zhí)行完成后,Semaphore內(nèi)部數(shù)據(jù)如下:

被喚醒的線程執(zhí)行流程分析
在"請(qǐng)求鎖流程分析"小節(jié),線程C在doAcquireSharedInterruptibly方法內(nèi)部停住:
AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
調(diào)用parkAndCheckInterrupt這個(gè)方法的時(shí)候停住的。
現(xiàn)在經(jīng)過我們上面釋放鎖步驟的執(zhí)行,線程C被喚醒了,繼續(xù)執(zhí)行循環(huán)。
##第1句:獲取C節(jié)點(diǎn)的前置節(jié)點(diǎn)
final Node p = node.predecessor();
在我們的示例當(dāng)中,p==head,執(zhí)行第一個(gè)if語句塊。
##第2句:獲取令牌
int r = tryAcquireShared(arg);
這個(gè)r是令牌的余數(shù),所以r=1,此時(shí)state=1;
##第3句:判斷令牌余數(shù)
if (r >= 0)
喚醒的線程不一定能獲得令牌,所以這里要做判斷。為什么令牌等于0也可以呢?當(dāng)前線程獲得了一個(gè)令牌后,令牌余數(shù)它此時(shí)看到的是0,但是有可能下一刻其他線程歸還了令牌。
##第4句:設(shè)置頭部并傳播
setHeadAndPropagate(node, r);
這里先提一個(gè)注意點(diǎn):我們進(jìn)入到setHeadAndPropagate方法內(nèi)部去,發(fā)現(xiàn)設(shè)置頭部知識(shí)簡(jiǎn)單地調(diào)用了setHead方法,有的人會(huì)說不是多線程會(huì)影響嗎?不應(yīng)該用CAS來設(shè)置嗎?
雖然有多線程的影響,但是這里不用考慮。因?yàn)橥ㄟ^上面的p=head的判斷,當(dāng)前線程已經(jīng)可以進(jìn)入if塊了,此時(shí)如果同步隊(duì)列中其他線程被喚醒了,代表它們的節(jié)點(diǎn)的前置節(jié)點(diǎn)不是head節(jié)點(diǎn),只要當(dāng)前線程不調(diào)用setHead重設(shè)頭結(jié)點(diǎn),那么其他的線程就無法進(jìn)入。setHead的調(diào)用前后在多線程情況下的影響非常關(guān)鍵。
AbstractQueuedSynchronizer.java
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
##第5句:記錄老頭結(jié)點(diǎn)
Node h = head; // Record old head for check below
##第6句:重設(shè)頭節(jié)點(diǎn)
setHead(node);
##第7句:根據(jù)條件判斷是否可以傳播
首先,我們需要弄清楚傳播了什么?
我們回憶一下,在使用ReentrantLock的時(shí)候,當(dāng)一個(gè)線程釋放鎖后,同步隊(duì)列中head節(jié)點(diǎn)的后繼節(jié)點(diǎn)被喚醒了,然后從掛起點(diǎn)開始,重新執(zhí)行循環(huán)獲取鎖,獲取鎖后它的操作是這樣的:
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
這里可以看到,僅僅是把代表它的節(jié)點(diǎn)重設(shè)成了頭節(jié)點(diǎn)。那么新的頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)所代表的線程被喚醒是什么時(shí)候?是等到重設(shè)頭節(jié)點(diǎn)的這個(gè)線程釋放鎖的時(shí)候。為什么這樣喚醒?因?yàn)殒i是獨(dú)占的,重設(shè)頭節(jié)點(diǎn)的這個(gè)線程持有鎖,那么其他的線程就無法持有鎖,無法持有鎖喚醒干嘛!
也就是獨(dú)占鎖執(zhí)行步驟是:被喚醒的線程請(qǐng)求到鎖->將代表它的節(jié)點(diǎn)設(shè)置為新頭->被喚醒的線程釋放鎖->喚醒同步隊(duì)列新頭的后繼節(jié)點(diǎn)...依次循環(huán)。
那么共享鎖模式呢?被喚醒的線程獲得鎖之后來這里設(shè)置新頭,當(dāng)它調(diào)用setHead設(shè)置新頭之后,它還必須等到它釋放鎖之后再喚醒新頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)嗎?不是必須的,因?yàn)榱钆朴卸鄠€(gè),當(dāng)前線程你不釋放掉你的這個(gè)令牌,新頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)所代表的線程也可以獲取其他線程歸還的令牌。
傳播:同步隊(duì)列更新后的喚醒傳播。
比如說:
共享模式:房間里有2個(gè)醫(yī)生在看病,A,C,D,E依次在排隊(duì)等待進(jìn)入, A被準(zhǔn)許進(jìn)入了,這時(shí)候隊(duì)頭是C了,A看到還有一個(gè)醫(yī)生在空閑著,他就會(huì)告訴C,對(duì)C說這里還有一個(gè)醫(yī)生沒在看病,你可以試試進(jìn)來。C可能就進(jìn)去了,也可能沒進(jìn)去,因?yàn)樵诜枪侥J较氯绻鸈比較蠻橫不排隊(duì),直接就闖進(jìn)去了,C就只能繼續(xù)等待。
獨(dú)占模式::房間里有1個(gè)醫(yī)生在看病,A,C,D,E依次在排隊(duì)等待進(jìn)入, A被準(zhǔn)許進(jìn)入了,只有等他出來通常C,C出來再通知D,D出來再通知E,以這樣的方式看完病才行。同時(shí)人出來了,下一個(gè)對(duì)頭的人也不一定能進(jìn)去,因?yàn)榉枪侥J较聲?huì)有不排隊(duì)闖入的人存在。
就行來看什么情況下可以進(jìn)行喚醒傳播:
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
propagate > 0
propagate就是令牌的余數(shù),大于0說明此時(shí)看到有令牌。h == null
h為老頭結(jié)點(diǎn)的引用,此時(shí)老頭結(jié)點(diǎn)已經(jīng)被移除隊(duì)列了,新隊(duì)列產(chǎn)生了,并且其他線程調(diào)用了p.next = null; // help GC,老頭結(jié)點(diǎn)被垃圾回收了。h.waitStatus < 0
我們回到doReleaseShared方法:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
當(dāng)前線程通過unparkSuccessor被喚醒了,那么此時(shí)的head節(jié)點(diǎn)的waitStatus就會(huì)被設(shè)置為0,表明我通知的任務(wù)已經(jīng)完成了,那為什么上面要判斷是否小于0呢,其實(shí)我們上面已經(jīng)分析了,因?yàn)檫€有這里的操作:
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
這個(gè)做標(biāo)記了明確指定了需要傳播。
PROPAGATE是如何設(shè)置的看釋放鎖流程的說明。
- (h = head) == null || h.waitStatus < 0
能執(zhí)行到這里來,說明前面都是false,個(gè)人覺得這兩個(gè)條件要聯(lián)合判斷,if條件要成立的話,那么(h = head) == null這個(gè)條件就不滿足,也就是實(shí)際上要為true就是(h = head) != null && h.waitStatus < 0, 這就是設(shè)置完新頭結(jié)點(diǎn),新頭waitStatus=-1這種情況。
單獨(dú)(h = head) == null為空成立,還讓繼續(xù)往下走,目前沒想到是什么樣的情況。
##第8句:執(zhí)行傳播
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
- s == null
我理解的node節(jié)點(diǎn)是尾節(jié)點(diǎn),但是下一個(gè)時(shí)刻可能會(huì)有新節(jié)點(diǎn)入隊(duì)列。 - s.isShared
節(jié)點(diǎn)就是需要傳播的節(jié)點(diǎn)。共享模式下入隊(duì)列的節(jié)點(diǎn)初始態(tài)s.isShared都滿足。
總結(jié)
共享鎖之所以能共享,是因?yàn)樗鼉?nèi)部管理者多張令牌,某一時(shí)刻多個(gè)線程可以同時(shí)獲取,同時(shí),釋放也能并發(fā)得發(fā)生。相反的獨(dú)占鎖就只有一個(gè)令牌,只能嚴(yán)格按照獲取和釋放這樣的步驟來獲取令牌。