Semaphore介紹
Semaphore,信號量,用來控制訪問特定數(shù)量的共享資源,當線程線程想訪問共享資源時,必須選獲取到一個信號量,可以認為是一個訪問許可證,才能訪問,如果沒有獲取到信號量,就需要等待,等待前面有人釋放信號量,才能獲取到信號量。
Semaphore內(nèi)部維護了一個計數(shù)器,代表能夠有多少線程可同時訪問共享資源,每次獲取信號量時,計數(shù)器會減1,釋放信號量時計數(shù)器會加1,當計數(shù)器減到0的時候,想要獲取信號量的線程會被阻塞,等待前面已經(jīng)獲取到信號量的線程釋放,釋放之后,計數(shù)器增加1,阻塞線程被喚醒,并嘗試再次獲取信號量。
Semaphore可以在初始化的時候設(shè)置信號量的個數(shù),以及是否許可公平策略。
這是Semaphore的一個簡單Demo
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(3);
private static ExecutorService executor = Executors.newFixedThreadPool(7);
private static void useSemaphore() {
try {
semaphore.acquire();
System.out.println(new Date() + ":獲取到信號量");
Thread.sleep(new Random().nextInt(10000));
semaphore.release();
System.out.println(new Date() + ":釋放信號量");
} catch (Exception e) {
}
}
public static class Task implements Runnable {
@Override
public void run() {
useSemaphore();
}
}
public static void main(String[] args) {
for (int i = 0; i < 7; i++) {
executor.execute(new Task());
}
}
}
Sat Oct 20 13:16:34 CST 2018:獲取到信號量
Sat Oct 20 13:16:34 CST 2018:獲取到信號量
Sat Oct 20 13:16:34 CST 2018:獲取到信號量
Sat Oct 20 13:16:38 CST 2018:釋放信號量
Sat Oct 20 13:16:38 CST 2018:獲取到信號量
Sat Oct 20 13:16:40 CST 2018:釋放信號量
Sat Oct 20 13:16:40 CST 2018:獲取到信號量
Sat Oct 20 13:16:41 CST 2018:釋放信號量
Sat Oct 20 13:16:41 CST 2018:獲取到信號量
Sat Oct 20 13:16:44 CST 2018:釋放信號量
Sat Oct 20 13:16:44 CST 2018:獲取到信號量
Sat Oct 20 13:16:46 CST 2018:釋放信號量
Sat Oct 20 13:16:47 CST 2018:釋放信號量
Sat Oct 20 13:16:47 CST 2018:釋放信號量
從輸出可以看出來,最多只有3個線程能夠獲取到信號量,其他線程都要阻塞,只有當前面的線程適當信號量之后,阻塞的線程才能夠獲取信號量,繼續(xù)執(zhí)行代碼。
Semaphore源碼分析
非公平信號量的獲取
基于jdk1.8分析Semaphore實現(xiàn),首先看獲取信號量的實現(xiàn),函數(shù)調(diào)用順序如下:

/ /默認為非公平策略
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
Sync(int permits) {
setState(permits);
}
//最后其實就是設(shè)置了state為信號量的個數(shù)
protected final void setState(int newState) {
state = newState;
}
semaphore.acquire(),獲取1個信號,調(diào)用過程如下。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//現(xiàn)有有的信號量
int available = getState();
//這次獲取之后還剩下的信號量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
remaining < 0表示當前沒有信號量可獲取,會進入doAcquireSharedInterruptibly,等待信號量,并被掛起,直到被喚醒。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//把當前線程封裝成一個節(jié)點,加入到等待信號量鏈表的末尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取當前節(jié)點的前一個節(jié)點
//如果上一節(jié)點是CLH隊列的表頭,則"嘗試獲取共享鎖"
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取成功,需要將當前節(jié)點設(shè)置為AQS隊列中的第一個節(jié)點
//這是AQS的規(guī)則,隊列的頭節(jié)點表示正在獲取鎖的節(jié)點
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire檢查是否需要將當前線程掛起
//parkAndCheckInterrupt掛起當前線程,并返回線程是否被中斷
//如果當前線程被中斷,拋出中斷異常,退出循環(huán)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
從循環(huán)中可以看出,當線程被喚醒,并不一定能獲得信號量,而是繼續(xù)通過tryAcquireShared方法去競爭獲取,如果這時候正好有新的線程去獲取信號量,有可能這個沒有任何等待,剛剛來的新線程獲取到信號量,這就是不公平策略的體現(xiàn)
//在等待鏈表中,加入當前線程
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果當前鏈表有尾節(jié)點,即當前鏈表不為空時
//把當前節(jié)點放入到尾節(jié)點后面,更新尾節(jié)點為當前節(jié)點
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//當前等待鏈表還沒有節(jié)點時,進行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//把當前節(jié)點放入到尾節(jié)點后面,更新尾節(jié)點為當前節(jié)點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
waitStatus是節(jié)點Node定義的,她是標識線程的等待狀態(tài),他主要有如下四個值:
1、CANCELLED = 1,在同步隊列中等待的線程等待超時或者被中斷,需要從同步隊列中取消等待,節(jié)點進入該狀態(tài)后將不會變化;
2、SIGNAL = -1:當前線程的后繼線程需要被unpark(喚醒),后續(xù)節(jié)點的線程處于等待狀態(tài),而當前節(jié)點的線程如果釋放了同步狀態(tài)或者被取消,將會通知后續(xù)節(jié)點,使后續(xù)節(jié)點的線程得以運行;
3、 CONDITION = -2 :線程(處在Condition休眠狀態(tài))在等待Condition喚醒,節(jié)點在條件隊列中,節(jié)點線程等待在Condition上,當其他線程對Condition調(diào)用了signal()方法后,該節(jié)點將會從條件隊列中轉(zhuǎn)移到同步隊列中,加入到對同步狀態(tài)的獲取中;
4、 PROPAGATE = –3,其它線程獲取到“共享鎖”,表示下一次共享式同步狀態(tài)獲取將會無條件地傳播下去
有了這四個狀態(tài),我們再來分析上面代碼,當ws == SIGNAL時表明當前節(jié)點需要unpark(喚醒),直接返回true,當ws > 0 (CANCELLED),表明當前節(jié)點已經(jīng)被取消了,則通過回溯的方法(do{}while())向前找到一個非CANCELLED的節(jié)點并返回false。其他情況則設(shè)置該節(jié)點為SIGNAL狀態(tài)。我們再回到if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()),p是當前節(jié)點的前繼節(jié)點,當該前繼節(jié)點狀態(tài)為SIGNAL時返回true,表示當前線程需要阻塞,則調(diào)用parkAndCheckInterrupt()阻塞當前線程。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驅(qū)節(jié)點是初始或者共享狀態(tài)就設(shè)置為-1 使后續(xù)節(jié)點阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//調(diào)用LockSupport阻塞線程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
//如果還有剩余量,繼續(xù)喚醒下一個線程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
首先,使用了CAS更換了頭節(jié)點,然后,將當前節(jié)點的下一個節(jié)點取出來,如果同樣是“shared”類型的,再做一個"releaseShared"操作。
為什么要這么做呢?這就是共享功能和獨占功能最不一樣的地方,對于獨占功能來說,有且只有一個線程,能夠獲取鎖,但是對于共享功能來說,共享的狀態(tài)是可以被共享的,也就是意味著其他AQS隊列中的其他節(jié)點也應(yīng)能第一時間知道狀態(tài)的變化,實現(xiàn)節(jié)點自身獲取共享鎖成功后,喚醒下一個共享類型節(jié)點的操作,實現(xiàn)共享狀態(tài)的向后傳遞。doReleaseShared方法,其實喚醒隊列中等待的線程,下面會講解的
非公平信號量的釋放
下面來看一下釋放信號量,其實就是釋放信號量,以及喚醒等待線程兩個主要流程
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//釋放信號量
if (tryReleaseShared(arg)) {
//喚醒等待線程
doReleaseShared();
return true;
}
return false;
}
// 核心代碼
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//當前信號量
int current = getState();
//釋放后的信號量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
semaphore.release(),可以看到釋放信號量,通過compareAndSetState方法,將信號量的數(shù)量增加
修改信號量之后,會調(diào)用doReleaseShared方法,喚醒隊列中等待的線程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節(jié)點對應(yīng)的線程是SIGNAL狀態(tài)
//意味著“頭節(jié)點的下一個節(jié)點所對應(yīng)的線程”需要被unpark喚醒
if (ws == Node.SIGNAL) {
// 設(shè)置“頭節(jié)點對應(yīng)的線程狀態(tài)”為空狀態(tài)。失敗的話,則繼續(xù)循環(huán)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒頭結(jié)點的下一個結(jié)點,頭結(jié)點為new Node()或者當前獲取信號量的線程
unparkSuccessor(h);
}
//如果本身頭節(jié)點的waitStatus是處于重置狀態(tài)(waitStatus==0)的,將其設(shè)置為“傳播”狀態(tài)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
doReleaseShared方法,獲取頭結(jié)點,如果頭結(jié)點狀態(tài)為Node.SIGNAL,意味著,它正在等待一個信號,或者說,它在等待被喚醒,因此做兩件事,第一是重置waitStatus標志位,更新頭結(jié)點狀態(tài)為0,第二是重置成功后,喚醒頭結(jié)點的下一個結(jié)點的線程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
公平信號量的獲取
非公平信號量
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平信號量
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
從代碼就可以看出來,非公平和公平信號量,主要是tryAcquireShared方法不一樣,公平信號量在每次線程嘗試獲取的時候,都會判斷head.next節(jié)點(第一個阻塞等待的節(jié)點)是不是為當前線程的節(jié)點,如果不是就返回-1,插入到鏈表的尾節(jié)點,等待被喚醒,是的話才會去競爭獲取信號量,這樣就能保證獲取信號量的順序和加入到阻塞鏈表的順序保持一致
semaphore.acquire()在獲取不到信號量的時候會掛起當前線程,而tryAcquire,不會掛起線程,會直接返回是否獲取到了信號量,
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
通過tryAcquire(long timeout, TimeUnit unit)可以設(shè)置獲取信號量的等待時間,主要是通過doAcquireSharedNanos方法來實現(xiàn)的,我們來看一下具體是怎么實現(xiàn)的
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
//當能夠掛起當前線程時,將當前線程掛起nanosTimeout時間
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//解除node與線程的關(guān)聯(lián)關(guān)系
node.thread = null;
// 跳過node前面為cancelled狀態(tài)的節(jié)點,找到一個有效的前繼節(jié)點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//node為head的后繼節(jié)點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
大部分邏輯和doAcquireSharedInterruptibly類似,首先第一個不同在于進入方法時,計算出了一個“deadline”,每次循環(huán)的時候用當前時間和“deadline”比較,大于“dealine”說明超時時間已到,直接返回方法。
注意nanosTimeout > spinForTimeoutThreshold,從變量的字面意思可知,這是拿超時時間和超時自旋的最小作比較,在這里Doug Lea把超時自旋的閾值設(shè)置成了1000ns,即只有超時時間大于1000ns才會去掛起線程,否則,再次循環(huán),以實現(xiàn)“自旋”操作
如果超時沒有獲得信號量會返回false,同時執(zhí)行finally中的函數(shù),將node狀態(tài)設(shè)置為CANCELLED,刪除當前節(jié)點
AQS介紹
信號量中的FairSync和NotFairSync都是繼承自Sync,而Sync又是繼承自AQS,使用到了很多AQS的東西,AQS(AbstractQueuedSynchronizer):為java中管理鎖的抽象類。該類為實現(xiàn)依賴于先進先出 (FIFO) 等待隊列的阻塞鎖和相關(guān)同步器(信號量、事件,等等)提供一個框架。該類提供了一個非常重要的機制,在JDK API中是這樣描述的:為實現(xiàn)依賴于先進先出 (FIFO) 等待隊列的阻塞鎖和相關(guān)同步器(信號量、事件,等等)提供一個框架。此類的設(shè)計目標是成為依靠單個原子 int 值來表示狀態(tài)的大多數(shù)同步器的一個有用基礎(chǔ)。子類必須定義更改此狀態(tài)的受保護方法,并定義哪種狀態(tài)對于此對象意味著被獲取或被釋放。假定這些條件之后,此類中的其他方法就可以實現(xiàn)所有排隊和阻塞機制。子類可以維護其他狀態(tài)字段,但只是為了獲得同步而只追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操作以原子方式更新的 int 值。 這么長的話用一句話概括就是:維護鎖的當前狀態(tài)和線程等待列表。
CLH:AQS中“等待鎖”的線程隊列。我們知道在多線程環(huán)境中我們?yōu)榱吮Wo資源的安全性常使用鎖將其保護起來,同一時刻只能有一個線程能夠訪問,其余線程則需要等待,CLH就是管理這些等待鎖的隊列。
CAS(compare and swap):比較并交換函數(shù),它是原子操作函數(shù),也就是說所有通過CAS操作的數(shù)據(jù)都是以原子方式進行的。