java并發(fā)之Semaphore

java并發(fā)之Semaphore

知識(shí)導(dǎo)讀

  • Semaphore相等于是一個(gè)電影院裝3D眼鏡的籃子,每個(gè)線程來會(huì)申請(qǐng)一定數(shù)量的眼鏡,如果夠就進(jìn)去看電影,不夠就排隊(duì)等待;每個(gè)看完電影的線程會(huì)將眼鏡放回籃子,并通知隊(duì)列頭部的人去再嘗試去拿眼鏡。
  • Semaphore用于控制并發(fā)的數(shù)量
  • Semaphore是AQS共享模式的一種實(shí)現(xiàn)。所以需要繼承實(shí)現(xiàn)AQS的tryAcquireShared和tryReleaseShared方法
  • Semaphore同ReentrantLock一樣,提供了公平和非公平兩種模式,實(shí)現(xiàn)原理一樣
  • Semaphore通過控制AQS的state來控制同步狀態(tài)的獲取,當(dāng)(state-申請(qǐng)數(shù)量>=0)的時(shí)候可以獲取同步狀態(tài),當(dāng)(state-申請(qǐng)數(shù)量<0)時(shí)阻塞等待。初始化的時(shí)候指定state的初始值代表可并發(fā)線程的最大數(shù)量,線程獲取同步狀態(tài)后state-申請(qǐng)數(shù)量,線程執(zhí)行完畢釋放資源時(shí)state+申請(qǐng)數(shù)量

用例

Semaphore 稱為計(jì)數(shù)信號(hào)量,它允許n個(gè)任務(wù)同時(shí)訪問某個(gè)資源。Semaphore持有一定數(shù)量的執(zhí)行許可證。

  • 線程獲取了執(zhí)行許可證就可以獲取執(zhí)行權(quán),同時(shí)Semaphore的許可證數(shù)量減1.
  • 當(dāng)占有許可證的線程釋放了許可證后,Semaphore的許可證數(shù)量加1,其他線程又可以獲取許可證
  • 當(dāng)線程無法獲取許可證的時(shí)候,會(huì)阻塞等待獲取許可證

acquire方法用于獲取許可證,release方法用于釋放許可證

public class SemaphoreTest {
  public static void main(String[] args) {
    ExecutorService service = Executors.newCachedThreadPool();
    final  Semaphore sp = new Semaphore(3);
    for(int i=0;i<10;i++){
      Runnable runnable = new Runnable(){
        public void run(){
          try {
            sp.acquire();
          } catch (InterruptedException e1) {
            e1.printStackTrace();
          }
          System.out.println("線程" + Thread.currentThread().getName() + 
                             "進(jìn)入,當(dāng)前已有" + (3-sp.availablePermits()) + "個(gè)并發(fā)");
          try {
            Thread.sleep((long)(Math.random()*10000));
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("線程" + Thread.currentThread().getName() + 
                             "即將離開");                   
          sp.release();
          //下面代碼有時(shí)候執(zhí)行不準(zhǔn)確,因?yàn)槠錄]有和上面的代碼合成原子單元
          System.out.println("線程" + Thread.currentThread().getName() + 
                             "已離開,當(dāng)前已有" + (3-sp.availablePermits()) + "個(gè)并發(fā)");           
        }
      };
      service.execute(runnable);            
    }
  }
}

源碼解析

Sempaphore的構(gòu)造方法,創(chuàng)建了內(nèi)部類Sync的實(shí)現(xiàn),提供了公平模式和非公平模式兩種。

//非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sempaphore中的內(nèi)部類Sync實(shí)現(xiàn)了AQS的共享鎖模式,通過控制state來控制獲取同步狀態(tài),當(dāng)state>0的時(shí)候可以獲取同步狀態(tài)。所以初始化的時(shí)候指定了state的初始值。

abstract static class Sync extends AbstractQueuedSynchronizer {
   //將state設(shè)置為 許可證的最大數(shù)量
    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }
}

公平模式

FairSync提供了公平模式的實(shí)現(xiàn),覆寫AQS的tryAcquireShared方法。

  1. 先調(diào)用hasQueuedPredecessors判斷AQS同步隊(duì)列是否有排在當(dāng)前線程之前的等待線程,如果有,直接返回復(fù)數(shù)表示獲取同步狀態(tài)失敗,當(dāng)前線程加入同步隊(duì)列并阻塞
  2. 如果當(dāng)前線程是排名最靠前的,則CAS設(shè)置state減去申請(qǐng)的值
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        //判斷state是否減到0,如果減到了返回負(fù)數(shù)會(huì)阻塞,否則返回正數(shù),獲的許可證
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

非公平模式

NonfairSync提供了非公平模式的實(shí)現(xiàn),覆寫AQS的tryAcquireShared方法。非公平模式比較簡(jiǎn)單,直接修改state值

  1. 判斷state是否大于需申請(qǐng)的許可證數(shù)量
  2. 如果滿足,CAS設(shè)置state值,將值修改為減去申請(qǐng)數(shù)量后的值
static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

for循環(huán)+CAS保證并發(fā)安全

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

釋放許用于可證

Semaphore中release方法用于釋放許可證,直接調(diào)用內(nèi)部類Sync釋放許可證

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

Sync繼承了AQS,覆寫了tryReleaseShared方法。由于是共享模式,所以在釋放的時(shí)候會(huì)有多線程并發(fā)問題。這里使用for循環(huán)加CAS將state值加回去

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容