16 | Semaphore:如何快速實(shí)現(xiàn)一個(gè)限流器?

Semaphore,現(xiàn)在普遍翻譯為“信號(hào)量”,以前也曾被翻譯成“信號(hào)燈”,因?yàn)轭愃片F(xiàn)實(shí)生活里的紅綠燈,車輛能不能通行,要看是不是綠燈。同樣,在編程世界里,線程能不能執(zhí)行,也要看信號(hào)量是不是允許。

信號(hào)量是由大名鼎鼎的計(jì)算機(jī)科學(xué)家迪杰斯特拉(Dijkstra)于 1965 年提出,在這之后的 15 年,信號(hào)量一直都是并發(fā)編程領(lǐng)域的終結(jié)者,直到 1980 年管程被提出來,我們才有了第二選擇。目前幾乎所有支持并發(fā)編程的語言都支持信號(hào)量機(jī)制,所以學(xué)好信號(hào)量還是很有必要的。

下面我們首先介紹信號(hào)量模型,之后介紹如何使用信號(hào)量,最后我們再用信號(hào)量來實(shí)現(xiàn)一個(gè)限流器。

信號(hào)量模型

信號(hào)量模型還是很簡單的,可以簡單概括為:一個(gè)計(jì)數(shù)器,一個(gè)等待隊(duì)列,三個(gè)方法。在信號(hào)量模型里,計(jì)數(shù)器和等待隊(duì)列對外是透明的,所以只能通過信號(hào)量模型提供的三個(gè)方法來訪問它們,這三個(gè)方法分別是:init()、down() 和 up()。你可以結(jié)合下圖來形象化地理解。

img

這三個(gè)方法詳細(xì)的語義具體如下所示。

init():設(shè)置計(jì)數(shù)器的初始值。

down():計(jì)數(shù)器的值減 1;如果此時(shí)計(jì)數(shù)器的值小于 0,則當(dāng)前線程將被阻塞,否則當(dāng)前線程可以繼續(xù)執(zhí)行。

up():計(jì)數(shù)器的值加 1;如果此時(shí)計(jì)數(shù)器的值小于或者等于 0,則喚醒等待隊(duì)列中的一個(gè)線程,并將其從等待隊(duì)列中移除。

這里提到的 init()、down() 和 up() 三個(gè)方法都是原子性的,并且這個(gè)原子性是由信號(hào)量模型的實(shí)現(xiàn)方保證的。在 Java SDK 里面,信號(hào)量模型是由 java.util.concurrent.Semaphore 實(shí)現(xiàn)的,Semaphore 這個(gè)類能夠保證這三個(gè)方法都是原子操作。如果你覺得上面的描述有點(diǎn)繞的話,可以參考下面這個(gè)代碼化的信號(hào)量模型。


class Semaphore{
  // 計(jì)數(shù)器
  int count;
  // 等待隊(duì)列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      //將當(dāng)前線程插入等待隊(duì)列
      //阻塞當(dāng)前線程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      //移除等待隊(duì)列中的某個(gè)線程T
      //喚醒線程T
    }
  }
}

如何使用信號(hào)量

其實(shí)很簡單,就像我們用互斥鎖一樣,只需要在進(jìn)入臨界區(qū)之前執(zhí)行一下 down() 操作,退出臨界區(qū)之前執(zhí)行一下 up() 操作就可以了。下面是 Java 代碼的示例,acquire() 就是信號(hào)量里的 down() 操作,release() 就是信號(hào)量里的 up() 操作。



static int count;
//初始化信號(hào)量
static final Semaphore s 
    = new Semaphore(1);
//用信號(hào)量保證互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

下面我們再來分析一下,信號(hào)量是如何保證互斥的。假設(shè)兩個(gè)線程 T1 和 T2 同時(shí)訪問 addOne() 方法,當(dāng)它們同時(shí)調(diào)用 acquire() 的時(shí)候,由于 acquire() 是一個(gè)原子操作,所以只能有一個(gè)線程(假設(shè) T1)把信號(hào)量里的計(jì)數(shù)器減為 0,另外一個(gè)線程(T2)則是將計(jì)數(shù)器減為 -1。對于線程 T1,信號(hào)量里面的計(jì)數(shù)器的值是 0,大于等于 0,所以線程 T1 會(huì)繼續(xù)執(zhí)行;對于線程 T2,信號(hào)量里面的計(jì)數(shù)器的值是 -1,小于 0,按照信號(hào)量模型里對 down() 操作的描述,線程 T2 將被阻塞。所以此時(shí)只有線程 T1 會(huì)進(jìn)入臨界區(qū)執(zhí)行count+=1;。當(dāng)線程 T1 執(zhí)行 release() 操作,也就是 up() 操作的時(shí)候,信號(hào)量里計(jì)數(shù)器的值是 -1,加 1 之后的值是 0,小于等于 0,按照信號(hào)量模型里對 up() 操作的描述,此時(shí)等待隊(duì)列中的 T2 將會(huì)被喚醒。于是 T2 在 T1 執(zhí)行完臨界區(qū)代碼之后才獲得了進(jìn)入臨界區(qū)執(zhí)行的機(jī)會(huì),從而保證了互斥性。

快速實(shí)現(xiàn)一個(gè)限流器

其實(shí)前不久,我在工作中也遇到了一個(gè)對象池的需求。所謂對象池呢,指的是一次性創(chuàng)建出 N 個(gè)對象,之后所有的線程重復(fù)利用這 N 個(gè)對象,當(dāng)然對象在被釋放前,也是不允許其他線程使用的。對象池,可以用 List 保存實(shí)例對象,這個(gè)很簡單。但關(guān)鍵是限流器的設(shè)計(jì),這里的限流,指的是不允許多于 N 個(gè)線程同時(shí)進(jìn)入臨界區(qū)。那如何快速實(shí)現(xiàn)一個(gè)這樣的限流器呢?這種場景,我立刻就想到了信號(hào)量的解決方案。

信號(hào)量的計(jì)數(shù)器,在上面的例子中,我們設(shè)置成了 1,這個(gè) 1 表示只允許一個(gè)線程進(jìn)入臨界區(qū),但如果我們把計(jì)數(shù)器的值設(shè)置成對象池里對象的個(gè)數(shù) N,就能完美解決對象池的限流問題了。下面就是對象池的示例代碼。



class ObjPool<T, R> {
  final List<T> pool;
  // 用信號(hào)量實(shí)現(xiàn)限流器
  final Semaphore sem;
  // 構(gòu)造函數(shù)
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }
  // 利用對象池的對象,調(diào)用func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}
// 創(chuàng)建對象池
ObjPool<Long, String> pool = 
  new ObjPool<Long, String>(10, 2);
// 通過對象池獲取t,之后執(zhí)行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

我們用一個(gè) List來保存對象實(shí)例,用 Semaphore 實(shí)現(xiàn)限流器。關(guān)鍵的代碼是 ObjPool 里面的 exec() 方法,這個(gè)方法里面實(shí)現(xiàn)了限流的功能。在這個(gè)方法里面,我們首先調(diào)用 acquire() 方法(與之匹配的是在 finally 里面調(diào)用 release() 方法),假設(shè)對象池的大小是 10,信號(hào)量的計(jì)數(shù)器初始化為 10,那么前 10 個(gè)線程調(diào)用 acquire() 方法,都能繼續(xù)執(zhí)行,相當(dāng)于通過了信號(hào)燈,而其他線程則會(huì)阻塞在 acquire() 方法上。對于通過信號(hào)燈的線程,我們?yōu)槊總€(gè)線程分配了一個(gè)對象 t(這個(gè)分配工作是通過 pool.remove(0) 實(shí)現(xiàn)的),分配完之后會(huì)執(zhí)行一個(gè)回調(diào)函數(shù) func,而函數(shù)的參數(shù)正是前面分配的對象 t ;執(zhí)行完回調(diào)函數(shù)之后,它們就會(huì)釋放對象(這個(gè)釋放工作是通過 pool.add(t) 實(shí)現(xiàn)的),同時(shí)調(diào)用 release() 方法來更新信號(hào)量的計(jì)數(shù)器。如果此時(shí)信號(hào)量里計(jì)數(shù)器的值小于等于 0,那么說明有線程在等待,此時(shí)會(huì)自動(dòng)喚醒等待的線程。簡言之,使用信號(hào)量,我們可以輕松地實(shí)現(xiàn)一個(gè)限流器,使用起來還是非常簡單的。

總結(jié)

信號(hào)量在 Java 語言里面名氣并不算大,但是在其他語言里卻是很有知名度的。Java 在并發(fā)編程領(lǐng)域走的很快,重點(diǎn)支持的還是管程模型。 管程模型理論上解決了信號(hào)量模型的一些不足,主要體現(xiàn)在易用性和工程化方面,例如用信號(hào)量解決我們曾經(jīng)提到過的阻塞隊(duì)列問題,就比管程模型麻煩很多,你如果感興趣,可以課下了解和嘗試一下。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進(jìn)程 之前我們已經(jīng)了解了操作系統(tǒng)中進(jìn)程的概念,程序并不能單獨(dú)運(yùn)行,只有...
    go以恒閱讀 1,796評(píng)論 0 6
  • Semaphore:信號(hào)量模型 信號(hào)量模型可以簡單概括為:一個(gè)計(jì)數(shù)器,一個(gè)等待隊(duì)列,三個(gè)方法。在信號(hào)量模型里,計(jì)數(shù)...
    woshishui1243閱讀 339評(píng)論 0 0
  • Java SE 基礎(chǔ): 封裝、繼承、多態(tài) 封裝: 概念:就是把對象的屬性和操作(或服務(wù))結(jié)合為一個(gè)獨(dú)立的整體,并盡...
    Jayden_Cao閱讀 2,247評(píng)論 0 8
  • Semaphore(信號(hào)量) 信號(hào)量可以簡單的概括為:一個(gè)計(jì)數(shù)器,一個(gè)等待隊(duì)列,三個(gè)方法。在信號(hào)量模型里,計(jì)數(shù)器和...
    Easy的幸福閱讀 287評(píng)論 0 0
  • 線程狀態(tài)新建,就緒,運(yùn)行,阻塞,死亡。 線程同步多線程可以同時(shí)運(yùn)行多個(gè)任務(wù),線程需要共享數(shù)據(jù)的時(shí)候,可能出現(xiàn)數(shù)據(jù)不...
    KevinCool閱讀 878評(píng)論 0 0

友情鏈接更多精彩內(nèi)容