java并發(fā)之Semaphore
知識(shí)導(dǎo)讀
- Semaphore相等于是一個(gè)電影院裝3D眼鏡的籃子,每個(gè)線程來會(huì)申請(qǐng)一定數(shù)量的眼鏡,如果夠就進(jìn)去看電影,不夠就排隊(duì)等待;每個(gè)看完電影的線程會(huì)將眼鏡放回籃子,并通知隊(duì)列頭部的人去再嘗試去拿眼鏡。
- Semaphore用于控制并發(fā)的數(shù)量
- Semaphore是AQS共享模式的一種實(shí)現(xiàn)。所以需要繼承實(shí)現(xiàn)AQS的tryAcquireShared和tryReleaseShared方法
- Semaphore同ReentrantLock一樣,提供了公平和非公平兩種模式,實(shí)現(xiàn)原理一樣
- Semaphore通過控制AQS的state來控制同步狀態(tài)的獲取,當(dāng)(state-申請(qǐng)數(shù)量>=0)的時(shí)候可以獲取同步狀態(tài),當(dāng)(state-申請(qǐng)數(shù)量<0)時(shí)阻塞等待。初始化的時(shí)候指定state的初始值代表可并發(fā)線程的最大數(shù)量,線程獲取同步狀態(tài)后state-申請(qǐng)數(shù)量,線程執(zhí)行完畢釋放資源時(shí)state+申請(qǐng)數(shù)量
用例
Semaphore 稱為計(jì)數(shù)信號(hào)量,它允許n個(gè)任務(wù)同時(shí)訪問某個(gè)資源。Semaphore持有一定數(shù)量的執(zhí)行許可證。
- 線程獲取了執(zhí)行許可證就可以獲取執(zhí)行權(quán),同時(shí)Semaphore的許可證數(shù)量減1.
- 當(dāng)占有許可證的線程釋放了許可證后,Semaphore的許可證數(shù)量加1,其他線程又可以獲取許可證
- 當(dāng)線程無法獲取許可證的時(shí)候,會(huì)阻塞等待獲取許可證
acquire方法用于獲取許可證,release方法用于釋放許可證
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() +
"進(jìn)入,當(dāng)前已有" + (3-sp.availablePermits()) + "個(gè)并發(fā)");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() +
"即將離開");
sp.release();
//下面代碼有時(shí)候執(zhí)行不準(zhǔn)確,因?yàn)槠錄]有和上面的代碼合成原子單元
System.out.println("線程" + Thread.currentThread().getName() +
"已離開,當(dāng)前已有" + (3-sp.availablePermits()) + "個(gè)并發(fā)");
}
};
service.execute(runnable);
}
}
}
源碼解析
Sempaphore的構(gòu)造方法,創(chuàng)建了內(nèi)部類Sync的實(shí)現(xiàn),提供了公平模式和非公平模式兩種。
//非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sempaphore中的內(nèi)部類Sync實(shí)現(xiàn)了AQS的共享鎖模式,通過控制state來控制獲取同步狀態(tài),當(dāng)state>0的時(shí)候可以獲取同步狀態(tài)。所以初始化的時(shí)候指定了state的初始值。
abstract static class Sync extends AbstractQueuedSynchronizer {
//將state設(shè)置為 許可證的最大數(shù)量
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
}
公平模式
FairSync提供了公平模式的實(shí)現(xiàn),覆寫AQS的tryAcquireShared方法。
- 先調(diào)用hasQueuedPredecessors判斷AQS同步隊(duì)列是否有排在當(dāng)前線程之前的等待線程,如果有,直接返回復(fù)數(shù)表示獲取同步狀態(tài)失敗,當(dāng)前線程加入同步隊(duì)列并阻塞
- 如果當(dāng)前線程是排名最靠前的,則CAS設(shè)置state減去申請(qǐng)的值
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//判斷state是否減到0,如果減到了返回負(fù)數(shù)會(huì)阻塞,否則返回正數(shù),獲的許可證
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
非公平模式
NonfairSync提供了非公平模式的實(shí)現(xiàn),覆寫AQS的tryAcquireShared方法。非公平模式比較簡(jiǎn)單,直接修改state值
- 判斷state是否大于需申請(qǐng)的許可證數(shù)量
- 如果滿足,CAS設(shè)置state值,將值修改為減去申請(qǐng)數(shù)量后的值
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
for循環(huán)+CAS保證并發(fā)安全
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
釋放許用于可證
Semaphore中release方法用于釋放許可證,直接調(diào)用內(nèi)部類Sync釋放許可證
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
Sync繼承了AQS,覆寫了tryReleaseShared方法。由于是共享模式,所以在釋放的時(shí)候會(huì)有多線程并發(fā)問題。這里使用for循環(huán)加CAS將state值加回去
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}