面試官問我AQS中的PROPAGATE有什么用?

之前分析過AQS的源碼,但只分析了獨占鎖的原理。

而剛好我們可以借助Semaphore來分析共享鎖。

如何使用Semaphore

public class SemaphoreDemo {

  public static void main(String[] args) {

    // 申請共享鎖數(shù)量
    Semaphore sp = new Semaphore(3);

    for(int i = 0; i < 5; i++) {
      new Thread(() -> {
        try {

          // 獲取共享鎖
          sp.acquire();

          String threadName = Thread.currentThread().getName();

          // 訪問API
          System.out.println(threadName + " 獲取許可,訪問API。剩余許可數(shù) " + sp.availablePermits());

          TimeUnit.SECONDS.sleep(1);
          
          // 釋放共享鎖
          sp.release();

          System.out.println(threadName + " 釋放許可,當(dāng)前可用許可數(shù)為 " + sp.availablePermits());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

      }, "thread-" + (i+1)).start();
    }
  }
}

Java SDK 里面提供了 Lock,為啥還要提供一個 Semaphore ?其實實現(xiàn)一個互斥鎖,僅僅是 Semaphore 的部分功能,Semaphore 還有一個功能是 Lock 不容易實現(xiàn)的,那就是:Semaphore 可以允許多個線程訪問一個臨界區(qū)。

比較常見的需求就是我們工作中遇到的連接池、對象池、線程池等等池化資源。其中,你可能最熟悉數(shù)據(jù)庫連接池,在同一時刻,一定是允許多個線程同時使用連接池的,當(dāng)然,每個連接在被釋放前,是不允許其他線程使用的。

比如上面的代碼就演示了同時最多只允許3個線程訪問API。

如何依托AQS實現(xiàn)Semaphore

abstract static class Sync extends AbstractQueuedSynchronizer {
  Sync(int permits) {
    setState(permits);
  }

  // 獲取鎖
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
    }
  }

  // 釋放鎖
  protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current)
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
      }
  }

}

Semaphore的acquire/release還是使用到了AQS,它把state的值作為共享資源的數(shù)量。獲取鎖的時候state的值減去1,釋放鎖的時候state的值加上1。

鎖的獲取

public void acquire() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

// AQS
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}

Semaphore也有公平鎖和非公平鎖兩種實現(xiàn),不過都是借助于AQS的,這里默認(rèn)實現(xiàn)是非公平鎖,所以最終會調(diào)用nonfairTryAcquireShared方法。

鎖的釋放

public void release() {
  sync.releaseShared(1);
}

// AQS
public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

鎖的釋放成功后,會調(diào)用doReleaseShared(),這個方法后面會分析。

獲取鎖失敗

當(dāng)獲取鎖失敗后,新的線程就會被加入隊列

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
  if (Thread.interrupted())
      throw new InterruptedException();
  // 實際調(diào)用的是NonFairSync中的nonfairTryAcquireShared
  if (tryAcquireShared(arg) < 0)
      doAcquireSharedInterruptibly(arg);
}

當(dāng)鎖的數(shù)量小于0的時候,需要入隊。

共享鎖調(diào)用的方法doAcquireSharedInterruptibly()和獨占鎖調(diào)用的方法acquireQueued()只有一些細(xì)微的區(qū)別。

區(qū)別

首先獨占鎖構(gòu)造的節(jié)點是模式是EXCLUSIVE,而共享鎖構(gòu)造模式是SHARED,它們使用的是AQS中的nextWaiter變量來區(qū)分的。

其次在在準(zhǔn)備入隊的時候,如果嘗試獲取共享鎖成功,那么會調(diào)用setHeadAndPropagate()方法,重新設(shè)置頭節(jié)點并決定是否需要喚醒后繼節(jié)點

private void setHeadAndPropagate(Node node, int propagate) {
  // 舊的頭節(jié)點
  Node h = head;
  // 將當(dāng)前獲取到鎖的節(jié)點設(shè)置為頭節(jié)點
  setHead(node);

  // 如果仍然有多的鎖(propagate的值是nonfairTryAcquireShared()返回值)
  // 或者舊的頭結(jié)點為空,或者頭結(jié)點的 ws 小于0
  // 又或者新的頭結(jié)點為空,或者新頭結(jié)點的 ws 小于0,則喚醒后繼節(jié)點
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
      Node s = node.next;
      if (s == null || s.isShared())
          doReleaseShared();
  }
}

private void setHead(Node node) {
  head = node;
  node.thread = null;
  node.prev = null;
}

private void doReleaseShared() {    
  for (;;) {
      Node h = head;
      // 保證同步隊列中至少有兩個節(jié)點
      if (h != null && h != tail) {
          int ws = h.waitStatus;

          // 需要喚醒后繼節(jié)點
          if (ws == Node.SIGNAL) {
              if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                  continue;            // loop to recheck cases
              unparkSuccessor(h);
          }
          // 將節(jié)點狀態(tài)更新為PROPAGATE
          else if (ws == 0 &&
                   !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
              continue;                // loop on failed CAS
      }
      if (h == head)                   // loop if head changed
          break;
  }
}

其實看到這里就有一些邏輯我看不懂了,比如setHeadAndPropagate()方法中的這一段邏輯

if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {

寫成這樣不好嗎?

if(propagate > 0  && h.waitStatus < 0)

為什么要那么復(fù)雜呢?而且看上去這樣也可以嘛,我本來想要假裝看懂的,可是我發(fā)現(xiàn)騙自己真的不容易呀。

這里應(yīng)該是有什么特殊的原因,不然Doug Lea老爺子不會這么寫。。。。。。

PROPAGATE狀態(tài)有什么用?

我就去網(wǎng)上搜索了下,結(jié)果在Java的Bug列表中發(fā)現(xiàn)是因為有一個bug才這樣修改的

https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020

JDK Bug

看到這個bug在2011年就在JDK6中被修復(fù)了,我想說那個時候我還不知道java是啥呢。。。。

這個修改可以在Doug Lead老爺子的主頁中找到,通過JSR 166找到可對比的CSV,對比1.73和1.74兩個版本

http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java?r1=1.73&r2=1.74

我們先看看setHeadAndPropagate中的修改對比

對比

以前的版本中判斷條件是這樣的

if (propagate > 0 && node.waitStatus != 0)

這樣的判斷很符合我的認(rèn)知嘛。但是會造成怎樣的問題呢?按照bug的描述,我們來紙上談兵下沒有PROPAGATE狀態(tài)的時候會出什么問題。

首先 Semaphore初始化state值為0,然后4個線程分別運行4個任務(wù)。線程t1,t2同時獲取鎖,另外兩個線程t3,t3同時釋放鎖

public class TestSemaphore {

  // 這里將信號量設(shè)置成了0
  private static Semaphore sem = new Semaphore(0);

  private static class Thread1 extends Thread {
    @Override
    public void run() {
      // 獲取鎖
      sem.acquireUninterruptibly();
    }
  }

  private static class Thread2 extends Thread {
    @Override
    public void run() {
      // 釋放鎖
      sem.release();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 10000000; i++) {
      Thread t1 = new Thread1();
      Thread t2 = new Thread1();
      Thread t3 = new Thread2();
      Thread t4 = new Thread2();
      t1.start();
      t2.start();
      t3.start();
      t4.start();
      t1.join();
      t2.join();
      t3.join();
      t4.join();
      System.out.println(i);
    }
  }
}

根據(jù)上面的代碼,我們將信號量設(shè)置為0,所以t1,t2獲取鎖會失敗。

假設(shè)某次循環(huán)中隊列中的情況如下

head --> t1 --> t2(tail)

鎖的釋放由t3先釋放,t4后釋放

時刻1: 線程t3調(diào)用releaseShared(),然后喚醒隊列中節(jié)點(線程t1),此時head的狀態(tài)從-1變成0

時刻2: 線程t1由于線程t3釋放了鎖,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0

再次獲取鎖

時刻3: 線程t4調(diào)用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),不滿足條件,因此不喚醒后繼節(jié)點

diff

時刻4: 線程t1獲取鎖成功,調(diào)用setHeadAndPropagate(),因為不滿足propagate > 0(時刻2中propagate == 0),從而不會喚醒后繼節(jié)點

如果沒有PROPAGATE狀態(tài),上面的情況就會導(dǎo)致線程t2不會被喚醒。

那在引入了propagate之后這個變量又會是怎樣的情況呢?

時刻1: 線程t3調(diào)用doReleaseShared,然后喚醒隊列中結(jié)點(線程t1),此時head的狀態(tài)從-1變成0

時刻2: 線程t1由于t3釋放了信號量,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0

時刻3: 線程t4調(diào)用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),將節(jié)點狀態(tài)設(shè)置為PROPAGATE(值為-3)

 else if (ws == 0 &&
        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
   continue;     // loop on failed CAS

時刻4: 線程t1獲取鎖成功,調(diào)用setHeadAndPropagate(),雖然不滿足propagate > 0(時刻2中propagate == 0),但是waitStatus<0,所以會去喚醒后繼節(jié)點

至此我們知道了PROPAGATE的作用,就是為了避免線程無法會喚醒的窘境。,因為共享鎖會有很多線程獲取到鎖或者釋放鎖,所以有些方法是并發(fā)執(zhí)行的,就會產(chǎn)生很多中間狀態(tài),而PROPAGATE就是為了讓這些中間狀態(tài)不影響程序的正常運行。

doReleaseShared-小方法大智慧

無論是釋放鎖還是申請到鎖都會調(diào)用doReleaseShared()方法,這個方法看似簡單,其實里面的邏輯還是很精妙的。

private void doReleaseShared() {    
  for (;;) {
    Node h = head;
    // 保證同步隊列中至少有兩個節(jié)點
    if (h != null && h != tail) {
      int ws = h.waitStatus;

      // 需要喚醒后繼節(jié)點
      if (ws == Node.SIGNAL) {
          // 可能有其他線程調(diào)用doReleaseShared(),unpark操作只需要其中一個調(diào)用就行了
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
              continue;     // loop to recheck cases
          unparkSuccessor(h);
      }
      // 將節(jié)點狀態(tài)設(shè)置為PROPAGATE(畫重點了)
      else if (ws == 0 &&
            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;    // loop on failed CAS
    }
    if (h == head)        // loop if head changed
      break;
  }
}

這其中有一個判斷條件

ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

這個if條件成立也巧妙

  1. 首先隊列中現(xiàn)在至少有兩個節(jié)點,簡化分析,我們認(rèn)為它只有兩個節(jié)點,head --> node
  2. 執(zhí)行到else if,說明跳過了前面的if條件,說明頭結(jié)點是剛成為頭結(jié)點的,它的waitStatus為0,尾節(jié)點是在這之后加入的,發(fā)生這種情況是shouldParkAfterFailedAcquire()中還沒來得及將前一個節(jié)點的ws值修改為SIGNAL
  3. CAS失敗說明此時頭結(jié)點的ws不為0了,也就表明shouldParkAfterFailedAcquire()已經(jīng)將前驅(qū)節(jié)點的waitStatus值修改為了SIGNAL了
更新前一個節(jié)點狀態(tài)

而整個循環(huán)的退出條件是在h==head的時候,這個是為什么呢?

由于我們的head節(jié)點是一個虛擬節(jié)點(也可以叫做哨兵節(jié)點),假設(shè)我們的同步隊列中節(jié)點順序如下:

head --> A --> B --> C

現(xiàn)在假設(shè)A拿到了共享鎖,那么它將成為新的dummy node(虛擬節(jié)點),

head(A) --> B --> C

此時A線程會調(diào)用doReleaseShared方法喚醒后繼節(jié)點B,它很快就獲取到了鎖,并成為了新的頭節(jié)點

head(B) --> C

此時B線程也會調(diào)用該方法,并喚醒其后繼節(jié)點C,但是在B線程調(diào)用的時候,線程A可能還沒有運行結(jié)束,也正在執(zhí)行這個方法,
當(dāng)它執(zhí)行到h==head的時候發(fā)現(xiàn)head改變了,所以for循環(huán)就不會退出,又會繼續(xù)執(zhí)行for循環(huán),喚醒后繼節(jié)點。

至此我們共享鎖分析完畢,其實只要弄明白了AQS的邏輯,依賴于AQS實現(xiàn)的Semaphore就很簡單了。

在看共享鎖源碼過程中尤其需要注意的是方法是會被多個線程并發(fā)執(zhí)行的,所以其中很多判斷是多線程競爭情況下才會出現(xiàn)的。同時需要注意的是共享鎖并不能保證線程安全,仍然需要程序員自己保證對共享資源的操作是安全的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容