一、Semaphore
Semaphore是一種在多線程環(huán)境下使用的設(shè)施,該設(shè)施負責協(xié)調(diào)各個線程,以保證它們能夠正確、合理的使用公共資源的設(shè)施,也是操作系統(tǒng)中用于控制進程同步互斥的量。Semaphore是一種計數(shù)信號量,用于管理一組資源,內(nèi)部是基于AQS的共享模式。它相當于給線程規(guī)定一個量從而控制允許活動的線程數(shù)。
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,后面的車會看到紅燈,不能駛?cè)隭X馬路,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路,那么后面就允許有5輛車駛?cè)腭R路,這個例子里說的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞,不能執(zhí)行。
Semaphore 是 synchronized 的加強版,作用是控制線程的并發(fā)數(shù)量。就這一點而言,單純的synchronized 關(guān)鍵字是實現(xiàn)不了的。
信號量通過一組許可證來控制對共享資源的訪問。
如果需要,可以用acquire()方法獲取許可,如果許可為0,那么會進行阻塞,通過使用release()方法釋放許可,把許可歸還給Semaphore,歸還之后,阻塞的線程就會醒來嘗試獲取許可。
Semaphore提供給了若干個api對應(yīng)不同的功能:
- Semaphore(int permits):非公平模式創(chuàng)建;
- Semaphore(int permits, boolean fair):可以指定是否公平模式創(chuàng)建;
- acquire():嘗試獲取1個許可,如果沒有許可則阻塞,可以被中斷停止等待;
- acquire(int permits):跟上一個方法類型,嘗試獲取permits個許可;
- acquireUninterruptibly():嘗試獲取一個許可,不可中斷;
- acquireUninterruptibly(int permits):嘗試獲取permits個許可,不可中斷;
- tryAcquire():嘗試獲取一個許可,獲取不到則直接返回失??;
- tryAcquire(int permits):嘗試獲取permits個許可,獲取不到則直接返回失??;
- tryAcquire(int permits, long timeout, TimeUnit unit):嘗試在timeout時間內(nèi)獲取permits個許可,超時則返回false,可被中斷;
- tryAcquire(long timeout, TimeUnit unit):嘗試在timeout時間內(nèi)獲取1個許可,超時則返回false,可被中斷;
- release():釋放一個許可;
- release(int permits):釋放n個許可;
下面演示基于公平鎖的Semaphore,獲取鎖使用acquireUninterruptibly():

這里設(shè)置的許可為2,可以發(fā)現(xiàn),同一時刻最多只能有兩個線程獲得許可。
二、執(zhí)行原理
Semaphore的執(zhí)行原理相對來說比較簡單。下面描述了可中斷非公平的信號量實現(xiàn)原理,ASQ中的state值就相當于許可的數(shù)量:
- 執(zhí)行acquire的時候,會嘗試讓state - acquires,如果發(fā)現(xiàn)許可足夠,則進行cas更新,扣減許可,否則線程進入等待隊列;
- 執(zhí)行release的時候,state + releases,把許可加回去。

三、Semaphore用法
/**
* @Description: 演示Semaphore用法
*/
public class SemaphoreDemo {
public static Semaphore semaphore = new Semaphore(3,true);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"拿到了許可證");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(Thread.currentThread().getName()+"釋放了許可證");
semaphore.release();
}
});
}
executorService.shutdown();
}
}
注意,如果使用的是tryAcquire失敗之后直接返回,線程不會進入AQS等待隊列。
四、源碼
公平信號量 和 非公平信號量 的區(qū)別
"公平信號量"和"非公平信號量"的釋放信號量的機制是一樣的!
不同的是它們獲取信號量的機制:線程在嘗試獲取信號量許可時,對于公平信號量而言,如果當前線程不在CLH隊列的頭部,則排隊等候;而對于非公平信號量而言,無論當前線程是不是在CLH隊列的頭部,它都會直接獲取信號量。該差異具體的體現(xiàn)在,它們的tryAcquireShared()函數(shù)的實現(xiàn)不同。
4.1 Semaphore構(gòu)造方法
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}
1、Semaphore 構(gòu)造器,permits 為傳入的許可證數(shù),默認非公平構(gòu)造器;
2、Semaphore 構(gòu)造器,permits 為傳入的許可證數(shù),fair 是 boolean 型的,如果傳入 true,則公平,否則不公平;
4.2 NonfairSync 和 FairSync源碼
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
兩者都繼承了 Sync 同步器,初始化時都調(diào)用了父類構(gòu)造器,同時都有一個獲取信號的方法,稍后再分析獲取信號的區(qū)別。
4.3 acquire(獲取信號量)
- 這個方法是從信號量獲取一個許可,在獲取到許可,或線程中斷之前,當前線程阻塞;獲取許可后立即返回并將許可數(shù)減一
public class Semaphore implements java.io.Serializable {
private final Sync sync;
/**
* 如果沒有許可可用,則會休眠,直到發(fā)生以下兩種情況
* 1、其他調(diào)用release方法釋放許可,并且當前線程獲取到許可
* 2、其他線程中斷了當前線程
* 1)當前線程在進入這個方法時設(shè)置了中斷標志位
* 2)等待許可時發(fā)生了中斷,則拋出中斷異常
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
- acquireSharedInterruptibly
這個方法是直接調(diào)用AQS的acquireSharedInterruptibly(int ard)方法;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 首先檢測是否中斷.中斷后拋出異常
* 嘗試獲取許可,成功退出;失敗則進入AQS隊列,直至成功獲取或中斷
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取鎖,返回剩余共享鎖的數(shù)量;小于0則加入同步隊列,自旋
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
}
tryAcquireShared(arg)則會調(diào)用Semaphore中兩個同步器的tryAcquireShared實現(xiàn)方法; 如果獲取失敗則加入隊列等待喚醒;
4.4 非公平模式的實現(xiàn)
非公平實現(xiàn)都是首先查看是否有可獲取的許可,如果有則獲取成功,沒有則進隊列等待;利用此可以提高并發(fā)量
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
}
- 直接調(diào)用其父類Sync中非公平共享獲取
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 自旋直到無許可或者狀態(tài)位賦值成功
for (;;) {
int available = getState();
int remaining = available - acquires;
// 如果小于0則直接返回,否則利用CAS給AQS狀態(tài)位賦值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
通過自旋+CAS來一直嘗試獲取許可,直到獲取成功或者沒有許可,返回剩余的許可數(shù)
4.5 公平模式的實現(xiàn)
公平與非公平的區(qū)別在于始終按照AQS隊列FIFO的順序來的
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
//自旋 CAS 實現(xiàn)線程安全
for (;;) {
// 判斷是否有前置任務(wù)排隊
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 如果小于0則直接返回,否則利用CAS給AQS狀態(tài)位賦值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
如果等待隊列不為空,則直接返回-1。 以上兩種模式獲取失敗后都會調(diào)用doAcquireSharedInterruptibly(int arg);自旋等待獲取鎖
- doAcquireSharedInterruptibly方法:會使得當前線程一直等待,直到當前線程獲取到鎖(或被中斷)才返回
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建“當前線程”的Node節(jié)點,且node中記錄的鎖是“共享鎖”類型,并將節(jié)點添加到CLH隊列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取前繼節(jié)點,如果前繼節(jié)點是等待鎖隊列的表頭,則嘗試獲取共享鎖
// 判斷新增的節(jié)點的前一個節(jié)點是否頭節(jié)點
final Node p = node.predecessor();
if (p == head) {
// 是頭節(jié)點,那么在此嘗試獲取共享鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取成功,把當前節(jié)點變?yōu)樾碌膆ead節(jié)點,
//并且檢查后續(xù)節(jié)點是否可以在共享模式下等待,
//并且允許繼續(xù)傳播,則調(diào)用doReleaseShared繼續(xù)喚醒下一個節(jié)點嘗試獲取鎖
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//前繼節(jié)點不是頭節(jié)點,當前線程一直等待,直到獲取到鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
- shouldParkAfterFailedAcquire方法:判斷當前線程獲取鎖失敗之后是否需要掛起
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/*說明:4.shouldParkAfterFailedAcquire 返回當前線程是否應(yīng)該阻塞
(01) 關(guān)于waitStatus請參考下表(中擴號內(nèi)為waitStatus的值)
CANCELLED[1] -- 當前線程已被取消
SIGNAL[-1] -- “當前線程的后繼線程需要被unpark(喚醒)”。
一般發(fā)生情況是:當前線程的后繼線程處于阻塞狀態(tài),
而當前線程被release或cancel掉,因此需要喚醒當前線程的后繼線程。
CONDITION[-2] -- 當前線程(處在Condition休眠狀態(tài))在等待Condition喚醒
PROPAGATE[-3] -- (共享鎖)其它線程獲取到“共享鎖”
[0] -- 當前線程不屬于上面的任何一種狀態(tài)。
(02) shouldParkAfterFailedAcquire()通過以下規(guī)則,判斷“當前線程”是否需要被阻塞。
規(guī)則1:如果前繼節(jié)點狀態(tài)為SIGNAL,表明當前節(jié)點需要被unpark(喚醒),此時則返回true。
規(guī)則2:如果前繼節(jié)點狀態(tài)為CANCELLED(ws>0),說明前繼節(jié)點已經(jīng)被取消,則通過先前回溯找到一個有效(非CANCELLED狀態(tài))的節(jié)點,并返回false。
規(guī)則3:如果前繼節(jié)點狀態(tài)為非SIGNAL、非CANCELLED,則設(shè)置前繼的狀態(tài)為SIGNAL,并返回false。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驅(qū)節(jié)點的狀態(tài)
int ws = pred.waitStatus;
// 如果前驅(qū)節(jié)點是SIGNAL狀態(tài),則意味著當前線程需要unpark喚醒,此時返回true
if (ws == Node.SIGNAL)
return true;
// 如果前繼節(jié)點是取消的狀態(tài)即前驅(qū)節(jié)點狀態(tài)為CANCELLED
if (ws > 0) {
// 從隊尾向前尋找第一個狀態(tài)不為CANCELLED的節(jié)點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將前驅(qū)節(jié)點的狀態(tài)設(shè)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
4.6 void release()
公平和非公平使用相同的釋放 釋放許可
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public void release() {
sync.releaseShared(1);
}
}
- 調(diào)用AQS中的releaseShared(int arg)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//目的是讓當前線程釋放它所持有的共享鎖,它首先會通過tryReleaseShared()去嘗試釋放共享鎖。
//嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。
public final boolean releaseShared(int arg) {
//釋放共享鎖
if (tryReleaseShared(arg)) {
//喚醒所有共享節(jié)點線程
doReleaseShared();
return true;
}
return false;
}
}
- tryReleaseShared()在Semaphore.Sync中被重寫,釋放共享鎖,將鎖計數(shù)器加回去
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取“鎖計數(shù)器”的狀態(tài)
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通過CAS函數(shù)進行賦值。
if (compareAndSetState(current, next))
return true;
}
}
}
}
- 如果釋放許可成功,則調(diào)用AQS中的doReleaseShared()方法來喚醒AQS隊列中等待的線程
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 喚醒同步隊列中的一個線程
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//是否需要喚醒后繼節(jié)點
if (ws == Node.SIGNAL) {
//修改狀態(tài)為初始0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒h.nex節(jié)點線程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
1)獲取隊列的頭節(jié)點元素,如果不為null,并且不為尾節(jié)點,說白了,就是不止一個人等待,進入判斷。
2)如果線程節(jié)點是需要喚醒的線程,則進行喚醒,獲取資源使用。
3)失敗后重試。
4)如果沒有后繼需要喚醒的節(jié)點,則退出,就相當于每人排隊上廁所了,讓出來資源就空著。
Semaphore 總結(jié)
1、Semaphore 內(nèi)部維護一組信號量,即一個 volatile 的整型 state 變量。
2、Semaphore 分為公平或非公平兩種方式,獲取信號量或釋放信號量的本質(zhì)是對 state 進行原子的減少或增加操作。
3、獲取不到信號的線程放在等待隊列里面,釋放信號的時候會喚醒后繼節(jié)點。
4、Semaphore 主要用于對線程數(shù)量、公共資源(比如數(shù)據(jù)庫連接池)等進行數(shù)量控制。
參考:
https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html
https://www.cnblogs.com/200911/p/6060359.html