之前分析過AQS的源碼,但只分析了獨占鎖的原理。
而剛好我們可以借助Semaphore來分析共享鎖。
如何使用Semaphore
public class SemaphoreDemo {
public static void main(String[] args) {
// 申請共享鎖數(shù)量
Semaphore sp = new Semaphore(3);
for(int i = 0; i < 5; i++) {
new Thread(() -> {
try {
// 獲取共享鎖
sp.acquire();
String threadName = Thread.currentThread().getName();
// 訪問API
System.out.println(threadName + " 獲取許可,訪問API。剩余許可數(shù) " + sp.availablePermits());
TimeUnit.SECONDS.sleep(1);
// 釋放共享鎖
sp.release();
System.out.println(threadName + " 釋放許可,當(dāng)前可用許可數(shù)為 " + sp.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread-" + (i+1)).start();
}
}
}
Java SDK 里面提供了 Lock,為啥還要提供一個 Semaphore ?其實實現(xiàn)一個互斥鎖,僅僅是 Semaphore 的部分功能,Semaphore 還有一個功能是 Lock 不容易實現(xiàn)的,那就是:Semaphore 可以允許多個線程訪問一個臨界區(qū)。
比較常見的需求就是我們工作中遇到的連接池、對象池、線程池等等池化資源。其中,你可能最熟悉數(shù)據(jù)庫連接池,在同一時刻,一定是允許多個線程同時使用連接池的,當(dāng)然,每個連接在被釋放前,是不允許其他線程使用的。
比如上面的代碼就演示了同時最多只允許3個線程訪問API。
如何依托AQS實現(xiàn)Semaphore
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
// 獲取鎖
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 釋放鎖
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
Semaphore的acquire/release還是使用到了AQS,它把state的值作為共享資源的數(shù)量。獲取鎖的時候state的值減去1,釋放鎖的時候state的值加上1。
鎖的獲取
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Semaphore也有公平鎖和非公平鎖兩種實現(xiàn),不過都是借助于AQS的,這里默認(rèn)實現(xiàn)是非公平鎖,所以最終會調(diào)用nonfairTryAcquireShared方法。
鎖的釋放
public void release() {
sync.releaseShared(1);
}
// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
鎖的釋放成功后,會調(diào)用doReleaseShared(),這個方法后面會分析。
獲取鎖失敗
當(dāng)獲取鎖失敗后,新的線程就會被加入隊列
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 實際調(diào)用的是NonFairSync中的nonfairTryAcquireShared
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
當(dāng)鎖的數(shù)量小于0的時候,需要入隊。
共享鎖調(diào)用的方法doAcquireSharedInterruptibly()和獨占鎖調(diào)用的方法acquireQueued()只有一些細(xì)微的區(qū)別。

首先獨占鎖構(gòu)造的節(jié)點是模式是EXCLUSIVE,而共享鎖構(gòu)造模式是SHARED,它們使用的是AQS中的nextWaiter變量來區(qū)分的。
其次在在準(zhǔn)備入隊的時候,如果嘗試獲取共享鎖成功,那么會調(diào)用setHeadAndPropagate()方法,重新設(shè)置頭節(jié)點并決定是否需要喚醒后繼節(jié)點
private void setHeadAndPropagate(Node node, int propagate) {
// 舊的頭節(jié)點
Node h = head;
// 將當(dāng)前獲取到鎖的節(jié)點設(shè)置為頭節(jié)點
setHead(node);
// 如果仍然有多的鎖(propagate的值是nonfairTryAcquireShared()返回值)
// 或者舊的頭結(jié)點為空,或者頭結(jié)點的 ws 小于0
// 又或者新的頭結(jié)點為空,或者新頭結(jié)點的 ws 小于0,則喚醒后繼節(jié)點
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 保證同步隊列中至少有兩個節(jié)點
if (h != null && h != tail) {
int ws = h.waitStatus;
// 需要喚醒后繼節(jié)點
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 將節(jié)點狀態(tài)更新為PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
其實看到這里就有一些邏輯我看不懂了,比如setHeadAndPropagate()方法中的這一段邏輯
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
寫成這樣不好嗎?
if(propagate > 0 && h.waitStatus < 0)
為什么要那么復(fù)雜呢?而且看上去這樣也可以嘛,我本來想要假裝看懂的,可是我發(fā)現(xiàn)騙自己真的不容易呀。
這里應(yīng)該是有什么特殊的原因,不然Doug Lea老爺子不會這么寫。。。。。。
PROPAGATE狀態(tài)有什么用?
我就去網(wǎng)上搜索了下,結(jié)果在Java的Bug列表中發(fā)現(xiàn)是因為有一個bug才這樣修改的
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020

看到這個bug在2011年就在JDK6中被修復(fù)了,我想說那個時候我還不知道java是啥呢。。。。
這個修改可以在Doug Lead老爺子的主頁中找到,通過JSR 166找到可對比的CSV,對比1.73和1.74兩個版本
我們先看看setHeadAndPropagate中的修改對比

以前的版本中判斷條件是這樣的
if (propagate > 0 && node.waitStatus != 0)
這樣的判斷很符合我的認(rèn)知嘛。但是會造成怎樣的問題呢?按照bug的描述,我們來紙上談兵下沒有PROPAGATE狀態(tài)的時候會出什么問題。
首先 Semaphore初始化state值為0,然后4個線程分別運行4個任務(wù)。線程t1,t2同時獲取鎖,另外兩個線程t3,t3同時釋放鎖
public class TestSemaphore {
// 這里將信號量設(shè)置成了0
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
// 獲取鎖
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
// 釋放鎖
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
根據(jù)上面的代碼,我們將信號量設(shè)置為0,所以t1,t2獲取鎖會失敗。
假設(shè)某次循環(huán)中隊列中的情況如下
head --> t1 --> t2(tail)
鎖的釋放由t3先釋放,t4后釋放
時刻1: 線程t3調(diào)用releaseShared(),然后喚醒隊列中節(jié)點(線程t1),此時head的狀態(tài)從-1變成0
時刻2: 線程t1由于線程t3釋放了鎖,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0

時刻3: 線程t4調(diào)用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),不滿足條件,因此不喚醒后繼節(jié)點

時刻4: 線程t1獲取鎖成功,調(diào)用setHeadAndPropagate(),因為不滿足propagate > 0(時刻2中propagate == 0),從而不會喚醒后繼節(jié)點
如果沒有PROPAGATE狀態(tài),上面的情況就會導(dǎo)致線程t2不會被喚醒。
那在引入了propagate之后這個變量又會是怎樣的情況呢?
時刻1: 線程t3調(diào)用doReleaseShared,然后喚醒隊列中結(jié)點(線程t1),此時head的狀態(tài)從-1變成0
時刻2: 線程t1由于t3釋放了信號量,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0
時刻3: 線程t4調(diào)用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),將節(jié)點狀態(tài)設(shè)置為PROPAGATE(值為-3)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
時刻4: 線程t1獲取鎖成功,調(diào)用setHeadAndPropagate(),雖然不滿足propagate > 0(時刻2中propagate == 0),但是waitStatus<0,所以會去喚醒后繼節(jié)點
至此我們知道了PROPAGATE的作用,就是為了避免線程無法會喚醒的窘境。,因為共享鎖會有很多線程獲取到鎖或者釋放鎖,所以有些方法是并發(fā)執(zhí)行的,就會產(chǎn)生很多中間狀態(tài),而PROPAGATE就是為了讓這些中間狀態(tài)不影響程序的正常運行。
doReleaseShared-小方法大智慧
無論是釋放鎖還是申請到鎖都會調(diào)用doReleaseShared()方法,這個方法看似簡單,其實里面的邏輯還是很精妙的。
private void doReleaseShared() {
for (;;) {
Node h = head;
// 保證同步隊列中至少有兩個節(jié)點
if (h != null && h != tail) {
int ws = h.waitStatus;
// 需要喚醒后繼節(jié)點
if (ws == Node.SIGNAL) {
// 可能有其他線程調(diào)用doReleaseShared(),unpark操作只需要其中一個調(diào)用就行了
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 將節(jié)點狀態(tài)設(shè)置為PROPAGATE(畫重點了)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
這其中有一個判斷條件
ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
這個if條件成立也巧妙
- 首先隊列中現(xiàn)在至少有兩個節(jié)點,簡化分析,我們認(rèn)為它只有兩個節(jié)點,head --> node
- 執(zhí)行到else if,說明跳過了前面的if條件,說明頭結(jié)點是剛成為頭結(jié)點的,它的waitStatus為0,尾節(jié)點是在這之后加入的,發(fā)生這種情況是shouldParkAfterFailedAcquire()中還沒來得及將前一個節(jié)點的ws值修改為SIGNAL
- CAS失敗說明此時頭結(jié)點的ws不為0了,也就表明shouldParkAfterFailedAcquire()已經(jīng)將前驅(qū)節(jié)點的waitStatus值修改為了SIGNAL了

而整個循環(huán)的退出條件是在h==head的時候,這個是為什么呢?
由于我們的head節(jié)點是一個虛擬節(jié)點(也可以叫做哨兵節(jié)點),假設(shè)我們的同步隊列中節(jié)點順序如下:
head --> A --> B --> C
現(xiàn)在假設(shè)A拿到了共享鎖,那么它將成為新的dummy node(虛擬節(jié)點),
head(A) --> B --> C
此時A線程會調(diào)用doReleaseShared方法喚醒后繼節(jié)點B,它很快就獲取到了鎖,并成為了新的頭節(jié)點
head(B) --> C
此時B線程也會調(diào)用該方法,并喚醒其后繼節(jié)點C,但是在B線程調(diào)用的時候,線程A可能還沒有運行結(jié)束,也正在執(zhí)行這個方法,
當(dāng)它執(zhí)行到h==head的時候發(fā)現(xiàn)head改變了,所以for循環(huán)就不會退出,又會繼續(xù)執(zhí)行for循環(huán),喚醒后繼節(jié)點。
至此我們共享鎖分析完畢,其實只要弄明白了AQS的邏輯,依賴于AQS實現(xiàn)的Semaphore就很簡單了。
在看共享鎖源碼過程中尤其需要注意的是方法是會被多個線程并發(fā)執(zhí)行的,所以其中很多判斷是多線程競爭情況下才會出現(xiàn)的。同時需要注意的是共享鎖并不能保證線程安全,仍然需要程序員自己保證對共享資源的操作是安全的。