簡(jiǎn)介
Semaphore是一種同步輔助工具,翻譯過來(lái)就是信號(hào)量,用來(lái)實(shí)現(xiàn)流量控制,它可以控制同一時(shí)間內(nèi)對(duì)資源的訪問次數(shù).
無(wú)論是Synchroniezd還是ReentrantLock,一次都只允許一個(gè)線程訪問一個(gè)資源,但是Semaphore可以指定多個(gè)線程同時(shí)訪問某一個(gè)資源.
Semaphore有一個(gè)構(gòu)造函數(shù),可以傳入一個(gè)int型整數(shù)n,表示某段代碼最多只有n個(gè)線程可以訪問,如果超出了n,那么請(qǐng)等待,等到某個(gè)線程執(zhí)行完畢這段代碼塊,下一個(gè)線程再進(jìn)入。
信號(hào)量上定義兩種操作:
acquire(獲取):當(dāng)一個(gè)線程調(diào)用acquire操作時(shí),它要么成功獲取到信號(hào)量(信號(hào)量減1),要么一直等下去,直到有線程釋放信號(hào)量,或超時(shí),Semaphore內(nèi)部會(huì)維護(hù)一個(gè)等待隊(duì)列用于存儲(chǔ)這些被暫停的線程.
release(釋放)實(shí)際上會(huì)將信號(hào)量的值+1,然后喚醒相應(yīng)Sepmaphore實(shí)例的等待隊(duì)列中的一個(gè)任意等待線程.
應(yīng)用場(chǎng)景
信號(hào)量主要用于兩個(gè)目的:
用于多個(gè)共享資源的互斥使用.
用于并發(fā)線程數(shù)的控制.
例子
以下的例子:5個(gè)線程搶3個(gè)車位,同時(shí)最多只有3個(gè)線程能搶到車位,等其他線程釋放信號(hào)量后,才能搶到車位.
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//申請(qǐng)資源
System.out.println(Thread.currentThread().getName()+"搶到車位");
ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
System.out.println(Thread.currentThread().getName()+"歸還車位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//釋放資源
semaphore.release();
}
}
},"線程"+i).start();
}
}
復(fù)制代碼注意事項(xiàng)
Semaphore.acquire()和Semaphore.release()總是配對(duì)使用的,這點(diǎn)需要由應(yīng)用代碼自身保證.
Semaphore.release()調(diào)用應(yīng)該放在finally塊中,已避免應(yīng)用代碼出現(xiàn)異常的情況下,當(dāng)前線程所獲得的信號(hào)量無(wú)法返還.
如果Semaphore構(gòu)造器中的參數(shù)permits值設(shè)置為1,所創(chuàng)建的Semaphore相當(dāng)于一個(gè)互斥鎖.與其他互斥鎖不同的是,這種互斥鎖允許一個(gè)線程釋放另外一個(gè)線程所持有的鎖.因?yàn)橐粋€(gè)線程可以在未執(zhí)行過Semaphore.acquire()的情況下執(zhí)行相應(yīng)的Semaphore.release().
默認(rèn)情況下,Semaphore采用的是非公平性調(diào)度策略.
原理
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略
}
復(fù)制代碼Semaphore內(nèi)部使用Sync類,Sync又是繼承AbstractQueuedSynchronizer,所以Sync底層還是使用AQS實(shí)現(xiàn)的.Sync有兩個(gè)實(shí)現(xiàn)類NonfairSync和FairSync,用來(lái)指定獲取信號(hào)量時(shí)是否采用公平策略.
初始化方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
復(fù)制代碼如上所示,Semaphore默認(rèn)采用非公平策略,如果需要使用公平策略則可以使用帶兩個(gè)參數(shù)的構(gòu)造函數(shù)來(lái)構(gòu)造Semaphore對(duì)象。
參數(shù)permits被傳遞給AQS的state值,用來(lái)表示當(dāng)前持有的信號(hào)量個(gè)數(shù).
void acquire()方法
當(dāng)前線程調(diào)用該方法的目的是希望獲取一個(gè)信號(hào)量資源。
如果當(dāng)前信號(hào)量個(gè)數(shù)大于0,則當(dāng)前信號(hào)量的計(jì)數(shù)會(huì)減1,然后該方法直接返回。否則如果當(dāng)前信號(hào)量個(gè)數(shù)等0,則當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列。當(dāng)其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程時(shí),則當(dāng)前線程會(huì)拋出InterruptedException異常返回。
//Semaphore方法
public void acquire() throws InterruptedException {
//傳遞參數(shù)為1,說(shuō)明要獲取1個(gè)信號(hào)量資源
sync.acquireSharedInterruptibly(1);
}
//AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)如果線程被中斷,則拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//(2)否則調(diào)用Sync子類方法嘗試獲取,這里根據(jù)構(gòu)造函數(shù)確定使用公平策略
if (tryAcquireShared(arg) < 0)
//如果獲取失敗則放入阻塞隊(duì)列.然后再次嘗試,如果使用則調(diào)用park方法掛起當(dāng)前線程
doAcquireSharedInterruptibly(arg);
}
復(fù)制代碼由如上代碼可知,acquire()在內(nèi)部調(diào)用了Sync的acquireSharedlnterruptibly方法,后者會(huì)對(duì)中斷進(jìn)行響應(yīng)(如果當(dāng)前線程被中斷,則拋出中斷異常)。嘗試獲取信號(hào)量資源的AQS的方法 tryAcquireShared是由Sync的子類實(shí)現(xiàn)的,所以這里分別從兩 方面來(lái)討論。
先討論非公平策略NonfairSync類的tryAcquireShared方法,代碼如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取當(dāng)前信號(hào)量值
int available = getState();
//計(jì)算當(dāng)前剩余值
int remaining = available - acquires;
//如果當(dāng)前剩余值小于0或則CAS設(shè)置成功則返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
復(fù)制代碼如上代碼先獲取當(dāng)前信號(hào)量值(available),然后減去需要獲取的值(acquires),得到剩余的信號(hào)量個(gè)數(shù)(remaining),如果剩余值小于0則說(shuō)明當(dāng)前信號(hào)量個(gè)數(shù)滿足不了需求,那么直接返回負(fù)數(shù),這時(shí)當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列而被掛起。如果剩余值大于0,則使用CAS操作設(shè)置當(dāng)前信號(hào)量值為剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平獲取的,也就是說(shuō)先調(diào)用aquire方法獲取信號(hào)量的線程不一定比后來(lái)者先獲取到信號(hào)量。
考慮下面場(chǎng)景,如果線程A先調(diào)用了aquire()方法獲取信號(hào)量,但是當(dāng)前信號(hào)量個(gè)數(shù)為0,那么線程A會(huì)被放入AQS的阻塞隊(duì)列 。過一段時(shí)間后線程C調(diào)用了release()方法釋放了一個(gè)信號(hào)量,如果當(dāng)前沒有其他線程獲取信號(hào)量,那么線程A就會(huì)被激活,然后獲取該信號(hào)量,但是假如線程C釋放信號(hào)量后,線程C調(diào)用了aquire方法,那么線程C就會(huì)和線程A去競(jìng)爭(zhēng)這個(gè)信號(hào)量資源 。 如果采用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C完全可以在線程A被激活前,或者激活后先于線程 A獲取到該信號(hào)量,也就是在這種模式下阻塞線程和當(dāng)前請(qǐng)求的線程是競(jìng)爭(zhēng)關(guān)系,而不遵循先來(lái)先得的策略。
下面看公平性的FairSync類是如何保證公平性的。
protected int tryAcquireShared(int acquires) {
for (;;) {
//查詢是否當(dāng)前線程節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)也在等待獲取該資源,有的話直接返回
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
復(fù)制代碼可見公平性還是靠hasQueuedPredecessors這個(gè)函數(shù)來(lái)保證的。所以Semaphore的公平策略是看當(dāng)前線程節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否也在等待獲取該資源,如果是則自己放棄獲取的權(quán)限,然后當(dāng)前線程會(huì)被放入AQS阻塞隊(duì)列,否則就去獲取。
void acquire(int permits)方法
該方法與acquire()方法不同,后者只需要獲取一個(gè)信號(hào)量值, 而前者則獲取permits個(gè)。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
復(fù)制代碼void acquireUninterruptibly()方法
該方法與acquire()類似,不同之處在于該方法對(duì)中斷不響應(yīng),也就是當(dāng)當(dāng)前線程調(diào)用了 acquireUninterruptibly獲取資源時(shí)(包含被阻塞后),其他線程調(diào)用了當(dāng)前線程的interrupt() 方法設(shè)置了當(dāng)前線程的中斷標(biāo)志,此時(shí)當(dāng)前線程并不會(huì)拋出IllegalArgumentException異常而返回。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
復(fù)制代碼void release()方法
該方法的作用是把當(dāng)前Semaphore對(duì)象的信號(hào)量值增加1,如果當(dāng)前有線程因?yàn)檎{(diào)用aquire方法被阻塞而被放入了AQS的阻塞 隊(duì)列,則會(huì)根據(jù)公平策略選擇一個(gè)信號(hào)量個(gè)數(shù)能被滿足的線程進(jìn)行激活, 激活的線程會(huì)嘗試獲取剛增加的信號(hào)量.
public void release() {
//(1)arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//(2)嘗試釋放資源
if (tryReleaseShared(arg)) {
//(3)資源釋放成功則調(diào)用park方法喚醒AQS隊(duì)列里面最先掛起的線程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取當(dāng)前信號(hào)量值
int current = getState();
//將當(dāng)前信號(hào)量值增加releases,這里為增加1
int next = current + releases;
//移除處理
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//使用CAS保證更新信號(hào)量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
復(fù)制代碼由代碼release()->sync.releaseShared(1),可知,release方法每次只會(huì)對(duì)信號(hào)量值增加1,tryReleaseShared方法是無(wú)限循環(huán),使用CAS保證了release方法對(duì)信號(hào)量遞增1的原子性操作.tryReleaseShared方法增加信號(hào)量值成功后會(huì)執(zhí)行代碼(3),即調(diào)用AQS的方法來(lái)激活因?yàn)檎{(diào)用acquire方法而被阻塞的線程。
void release(int permits)方法
該方法與不帶參數(shù)的release方法的不同之處在于,前者每次調(diào)用會(huì)在信號(hào)量值原來(lái)的基礎(chǔ)上增加 permits,而后者每次增加l。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
復(fù)制代碼另外可以看到,這里的sync.releaseShared是共享方法,這說(shuō)明該信號(hào)量是線程共享的,信號(hào)量沒有和固定線程綁定,多個(gè)線程可以同時(shí)使用CAS去更新信號(hào)量的值而不會(huì)被阻塞。