Java并發(fā)編程--ReentrantLock可重入性探索
我們直接先看其公平鎖情況下的可重入性到底是怎么回事,由于我們討論的是公平鎖的情況,而相關(guān)的代碼在ReentrantLock的內(nèi)部類FairSync中。
1. lock()
public void lock() {
sync.lock();
}
由于是公平鎖,所以我們需要重FairSync中查看lock方法:
final void lock() {
acquire(1);
}
而這里的acquire方法繼承自AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先我們先提前說一下tryAcquire返回值是一個boolean,為true說明當(dāng)前線程成功獲取了ReentrantLock的鎖,并且ReentrantLock鎖是一個獨占鎖,而這個if條件,如果成功獲取了鎖,那么acquire方法就直接返回了。
AQS已經(jīng)為該方法做了方法的實現(xiàn),在FairSync中我們只要實現(xiàn)tryAcquire方法即可:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 說明當(dāng)前鎖還未被持有
// hasQueuedPredecessors返回false的情況為:當(dāng)前線程在等待隊列的頭部或者等待隊列為空
// 這就說明了:只有等待隊列的頭結(jié)點可以獲取鎖
// Q:什么情況下當(dāng)前線程已經(jīng)在頭結(jié)點了,但是還沒有獲取鎖?
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 這里說明當(dāng)前線程就是獨占的線程
int nextc = c + acquires; // 持有鎖的線程獲取鎖的次數(shù),這也表明了可重入性
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 原來可重入性是這么個意思
return true; // 如果current再次使用該所對象加鎖,那么會直接返回true,可重入就是這么個意思
}
return false;
}
如注釋中所說,當(dāng)前線程能夠成功獲得鎖有兩種情況,分別代表首次加鎖和重入鎖:
- 如果c為0的話,說明當(dāng)前鎖還沒有被任何線程獨占,這時候會對隊列進(jìn)行判斷
- hasQueuedPredecessors方法返回false有兩種情況:當(dāng)前等待隊列為空或者當(dāng)前線程就是隊列頭部;此時才可以嘗試加鎖
- 一個CAS操作將ReentrantLock的state屬性設(shè)置為acquire值(調(diào)用的時候傳遞的為1)
- setExclusiveOwnerThread方法將當(dāng)前線程設(shè)置為獨占鎖的線程
- 如果當(dāng)前線程為獨占線程,則保證可重入性:
- 將state進(jìn)行加一操作
從這一個成功加鎖的過程我們可以產(chǎn)生一些大膽的推斷:
- 獨占該鎖的線程位于等待隊列的頭部
- state屬性表示獨占的線程加鎖的次數(shù),在之后解鎖的時候可能也要這么多次數(shù)的unlock才可以釋放鎖
- 可重入性的保證就是一句current == getExclusiveOwnerThread()
注意:有沒有發(fā)現(xiàn),對于第一個加鎖的線程,它會加鎖成功,但是這個第一次加鎖的線程,沒有被封裝成一個Node節(jié)點放到隊列中;所以說,持有鎖的線程是隊列的頭結(jié)點這句話有問題:因為持有鎖的線程根本不在隊列中,何來頭結(jié)點一說。在下文分析加鎖失敗的情況會證明這一論斷。
關(guān)于hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
首先對于第一次加鎖的線程,此時由于h == t == null所以返回false,而對于后面的返回false的情況大都是h != t但是s != null、s.thread == Thread.currentThread()返回的false,也就是只有頭結(jié)點的后繼節(jié)點調(diào)用該方法時,才會返回false表示可以嘗試加鎖。同時這也是可重入鎖中公平鎖的來源,對于之前已經(jīng)在隊列中的節(jié)點,那么新來的節(jié)點想要加鎖,該方法會返回true說明隊列中在你之前還有人在等待,得前面沒人等待了你才能返回false,才能嘗試去加鎖,保證了先來后到的公平性。
注意:對于第二個嘗試加鎖的線程,由于此時前面有一個人持有鎖,所以它在調(diào)用lock-acquire-tryAcquire方法時由于判斷state!=0且當(dāng)前線程不是獨占鎖的線程直接判斷了加鎖失敗,從而被添加到了隊列中,這條路線中不涉及到hasQueuedPredecessors方法的調(diào)用(顯然此時它是滿足h==t這個判斷的)
加鎖失敗情況
還記得acquire方法嗎:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果tryAcquire方法返回false的話,說明加鎖失敗,同時通過上面的代碼我們知道,如果加鎖失敗的話,當(dāng)前線程沒有被執(zhí)行各種處理,所以我們在分析acquireQueued方法的時候沒有任何后顧之憂,它的代碼沒有收到tryAcquire的影響:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 注意,這里的意思是頭結(jié)點的后繼節(jié)點tryAcquire成功,也就是獲取了鎖
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 這里說明了,如果頭結(jié)點的后繼節(jié)點成功獲得了鎖,直接返回false
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
對于這個入隊操作,有幾點需要說明:
- 只有頭結(jié)點的next節(jié)點才會主動調(diào)用tryAcquire方法取申請獲取鎖
- 當(dāng)頭結(jié)點的next節(jié)點成功的得到了鎖之后,通過setHead方法會將自己設(shè)為頭結(jié)點
- 移除原來的頭結(jié)點之后return false
好了,接下來我們就來說一說為啥上面說持有鎖的線程不在隊列中,如果說上文對于首次加鎖的線程沒有加入隊列產(chǎn)生懷疑的話,那么這里的setHead方法會使你幡然醒悟:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
看到了沒:node.thread = null把頭結(jié)點的線程置為空了?。?!所以,對于獨占到鎖的線程來說,它此時已經(jīng)不再隊列中了??!所以說,只有頭結(jié)點時持有鎖的節(jié)點這句話不準(zhǔn)確??!
那么問題來了,這個頭結(jié)點時怎么初始化的呢?
頭結(jié)點的初始化
答案就在調(diào)用acquireQueue方法時的addWaiter方法:
private Node addWaiter(Node mode) {
// 思路:新鍵當(dāng)前線程的Node節(jié)點;如果tail被初始化過,則直接添加到尾部;否則執(zhí)行enq操作先初始化tail再入隊
// 根據(jù)注釋:mode傳遞的值要么是Node.SHARED要么是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這里的代碼思路很簡單,就是構(gòu)建Node節(jié)點,然后插入到尾部,對于這個if判斷語句,為false的情況會執(zhí)行enq方法,在enq方法里面會有頭結(jié)點的初始化:
private Node enq(final Node node) {
// 思路:獲取尾部,將參數(shù)中的node節(jié)點直接加入尾部;然后CAS更新tail引用
for (;;) {
// 獲取尾部
Node t = tail;
// 如果tail為空,說明是第一次入隊操作,通過CAS初始化節(jié)點
// 這里初始化只是調(diào)用了默認(rèn)構(gòu)造器,目的是為了第二次for循環(huán)時tail不為null
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 因為tail是AQS在并發(fā)環(huán)境下的共享資源,所以修改tail變量要使用CAS操作
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
有沒有發(fā)現(xiàn),這個頭結(jié)點的thread沒有傳遞是參數(shù),是一個null,這也證明了我們之前說的是正確的。
所以我們就知道了,在第一次有線程加鎖的時候,它會成功得到鎖,那么當(dāng)?shù)诙€線程需要等待這個鎖的時候,調(diào)用addWaiter的時候會初始化鏈表的頭結(jié)點。
阻塞線程的掛起
可能有的人會問,每一個阻塞的節(jié)點都有一個無限for循環(huán)自旋,那么線程沒有被掛起的話豈不是很浪費cpu資源?掛起的操作就在for循環(huán)的后面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
interrupted = true;
其中shouldParkAfterFailedAcquire(p, node)方法用來判斷是否需要掛起獲取鎖失敗的線程:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 判斷是否可以掛起的思想是:如果該節(jié)點的pred節(jié)點的waitStatus已經(jīng)被設(shè)置為了SIGNAL
// 那么說明該節(jié)點已經(jīng)料理好后事了,可以在某個時刻被喚醒,所以可以安全的掛起
return true;
if (ws > 0) { // 說明pred已經(jīng)被cancelled了,直接移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 說明pred的waitStatue是0或者PROPAGATE,此時設(shè)置為SINGAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
看到了嗎,只有參數(shù)中的pred,也就是需要被掛起的節(jié)點node的前一個節(jié)點的waitStatus為SINGLE的時候,才會返回true。而我們調(diào)用addWaiter方法創(chuàng)建Node節(jié)點的時候,waitStatue都是默認(rèn)值0,所以在該方法的后面else語句中有一個CAS操作將其設(shè)置為SINGLE,這樣的話,在acquireQueued方法的for自旋中,需要被掛起的線程經(jīng)歷兩個for循環(huán)就可以使得shouldParkAfterFailedAcquire方法返回true。
之后,真正為node執(zhí)行掛起的操作位于parkAndCheckInterrupt方法。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這樣的話,沒有獲取到鎖的線程就真的被掛起了。
2. unlock()
public void unlock() {
sync.release(1);
}
release方法同acquire方法一樣,都是在AQS中又方法實現(xiàn)的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
AQS的實現(xiàn)類只需要重寫tryRelease方法即可。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease方法很簡單,就是將ReentrantLock的state值減去一,然后如果此時state為0說明獨占的線程已經(jīng)完全釋放了鎖,此時可以解除綁定,否則返回false。
而真正實現(xiàn)釋放鎖后喚醒其他線程的方法位于release中的
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
我們上面在分析線程的掛起的時候說到了要想掛起,那么node的前一個節(jié)點的waitStatus必須為SINGLE,而SINGLE在Node這個類中的值為-1.是小于0的,所以一定會執(zhí)行到unparkSuccessor方法。
private void unparkSuccessor(Node node) { // 通過名稱,該方法是喚醒后繼節(jié)點
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 根據(jù)注釋和代碼:這里是處理node參數(shù)的next節(jié)點為null或者已經(jīng)取消的情況
// 此時從尾部開始遍歷,找沒有被canclled的節(jié)點喚醒
// PS:為何不從node開始往后找,而是從尾部開始往前找?
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 代表參數(shù)node的next節(jié)點不為空,則喚醒其中的線程
if (s != null)
LockSupport.unpark(s.thread);
}
這里的LockSupport.unpark方法就是喚起其他線程的地方,且一次只會喚醒一個線程,大部分情況就是頭結(jié)點的后繼節(jié)點。