ReentrantLock的Condition源碼解析

前言

這一篇文章,想和大家分享一下Condition的源碼學(xué)習(xí)過程,Condition的應(yīng)用,其實(shí)是很簡單的,相信大家在項(xiàng)目中或者demo中或多或少都用過。最不濟(jì),在應(yīng)付面試的時候,相信也有不少小伙伴背過不少的面試題。話不多說,水平有限,文章中有錯誤的地方也請不吝指正,共同進(jìn)步。

應(yīng)用場景

ReentrantLock的Condition的設(shè)計(jì)場景,我在上一篇博客也分享過,建議先移步上一篇看一下ReentrantLock, Condition只是其中的一個應(yīng)用。

example:

先上一段代碼,看一下如何使用

@Slf4j
public class ReentrantLockTest {
    private static ReentrantLock lock = new ReentrantLock();
    private static boolean hasSmoke = false;
    private static boolean hasSnacks = false;

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5, 5, 1,
                TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
        executor.setCorePoolSize(20);
        Condition smoke = lock.newCondition();
        Condition snacks = lock.newCondition();
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-one,沒有煙,干活沒動力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-one,有煙了,嘬一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-two,沒有煙,干活沒動力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-two,有煙了,嘬一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-one,沒有零食吃,干活沒力氣");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-one, 有零食了,吃一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-two,沒有零食吃,干活沒力氣");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-two, 有零食了,吃一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                hasSmoke = true;
                log.info("煙來嘍");
                smoke.signalAll();
                hasSnacks = true;
                log.info("零食來嘍");
                snacks.signalAll();
            }finally {
                lock.unlock();
            }
        });
        executor.shutdown();
    }
}
10:48:05.563 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,沒有煙,干活沒動力
10:48:05.565 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,沒有煙,干活沒動力
10:48:05.565 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,沒有零食吃,干活沒力氣
10:48:05.565 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,沒有零食吃,干活沒力氣
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 煙來嘍
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食來嘍
10:48:07.569 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-one,有煙了,嘬一口.
10:48:07.569 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-two,有煙了,嘬一口.
10:48:07.569 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-one, 有零食了,吃一口.
10:48:07.569 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-two, 有零食了,吃一口.

這個場景中,男生需要抽煙,女生需要吃零食,不然就會拒絕干活。當(dāng)煙來了或者零食來了的時候,大家就停止摸魚,開始干活。如果只給了零食,那么男生就會無限等待,同理,只給了煙, 那么女生就會一直摸魚下去,具體的例子大家可以自己跑一跑。
例子中我們可以看到,有4個員工都在摸魚,但是我們可以根據(jù)需求把這4個員工分為兩部分,一部分是需要抽煙的,一部分是需要吃零食的,如果我們有零食,我們就可以喚醒需要吃零食的人,煙同理。這樣就做到了部分喚醒。

分析

我們大致分析一下原理,我們首先使用lock.newCondition()方法new了兩個Condition,每個Condition對象都維護(hù)了一個隊(duì)列。然后boy-one和boy-two調(diào)用smoke.await(),會進(jìn)到smoke對象維護(hù)的隊(duì)列里,同理,girl-one和girl-two會進(jìn)入到snacks對象的隊(duì)列里。當(dāng)我們調(diào)用smoke.signalAll()的時候,smoke對象就會把boy-one和boy-two扔到aqs的隊(duì)列里,當(dāng)我們調(diào)用snacks.signalAll()的時候,snacks對象就會把girl-one和girl-two扔到aqs的隊(duì)列里。還記不記得之前aqs博文里的時候,node節(jié)點(diǎn)有個狀態(tài)。int CONDITION = -2; 沒錯,就是給Condition用的。
剩下的線程喚醒操作就交給了aqs。
上面的分析大致看懂了以后,我們帶著這個分析去看一下源碼:

源碼

newCondition()

// 通過調(diào)用鏈,大家已經(jīng)很清楚了吧,就是返回了一個ConditionObject對象,
// 這個對象里維護(hù)了一個node節(jié)點(diǎn)的鏈表,這個node節(jié)點(diǎn)不用猜,就是aqs封裝的node節(jié)點(diǎn)。
public Condition newCondition() {
    return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
           * Creates a new {@code ConditionObject} instance.
           */
    public ConditionObject() { }
}

await()

public final void await() throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    // 這個方法貼在下面了,官方注釋解釋的簡潔明了,就是添加節(jié)點(diǎn)到隊(duì)列尾部
    Node node = addConditionWaiter();
    // 釋放資源當(dāng)前線程拿到的鎖資源,下面詳細(xì)解釋
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 調(diào)用isOnSyncQueue判斷是否在aqs隊(duì)列里,這個下面會詳細(xì)解釋。
    while (!isOnSyncQueue(node)) {
      // 不在aqs隊(duì)列,就調(diào)用park方法阻塞,交出鎖。
      LockSupport.park(this);
      // 被喚醒后,首先去檢查中斷,然后退出循環(huán)
      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
    }
    // 檢查完中斷去獲取鎖,然后做一些校驗(yàn)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
      unlinkCancelledWaiters();
    if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
      // 
      unlinkCancelledWaiters();
      t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
      firstWaiter = node;
    else
      t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
// 這個方法會釋放當(dāng)前節(jié)點(diǎn)占用的所有資源,注意是所有。
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
      // 直接拿到現(xiàn)在的資源數(shù)
      int savedState = getState();
      // 然后釋放全部
      if (release(savedState)) {
        failed = false;
        return savedState;
      } else {
        throw new IllegalMonitorStateException();
      }
    } finally {
      // 注意,這里會把node節(jié)點(diǎn)的狀態(tài)置為CANCELLED
      if (failed)
        node.waitStatus = Node.CANCELLED;
    }
}

final boolean isOnSyncQueue(Node node) {
    // 如果node節(jié)點(diǎn)是在CONDITION狀態(tài),肯定不在aqs隊(duì)列。不懂的看下面的transferForSignal()方法
    // 如果node節(jié)點(diǎn)的prev節(jié)點(diǎn)為null,肯定不在aqs隊(duì)列。這個問題我放到最后單獨(dú)講
    if (node.waitStatus == Node.CONDITION || node.prev == null)
      return false;
    if (node.next != null) // If has successor, it must be on queue
      return true;
    /*
           * node.prev can be non-null, but not yet on queue because
           * the CAS to place it on queue can fail. So we have to
           * traverse from tail to make sure it actually made it.  It
           * will always be near the tail in calls to this method, and
           * unless the CAS failed (which is unlikely), it will be
           * there, so we hardly ever traverse much.
           */
    // 這是個兜底邏輯。就是遍歷aqs節(jié)點(diǎn),看node節(jié)點(diǎn)是否存在。
    return findNodeFromTail(node);
}

signalAll()

public final void signalAll() {
    // 調(diào)用signalAll()會首先校驗(yàn)調(diào)用signalAll()的線程是不是當(dāng)前鎖持有線程。不是會拋異常
    if (!isHeldExclusively())
      throw new IllegalMonitorStateException();
    // 從頭開始處理隊(duì)列
    Node first = firstWaiter;
    if (first != null)
      doSignalAll(first);
}
/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
    // 官方注釋總是簡單明了。。
    lastWaiter = firstWaiter = null;
    do {
      Node next = first.nextWaiter;
      first.nextWaiter = null;
      transferForSignal(first);
      first = next;
    } while (first != null);
}
final boolean transferForSignal(Node node) {
  /*
   * If cannot change waitStatus, the node has been cancelled.
   */
  // 把節(jié)點(diǎn)從CONDITION設(shè)置為0
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

  /*
   * Splice onto queue and try to set waitStatus of predecessor to
   * indicate that thread is (probably) waiting. If cancelled or
   * attempt to set waitStatus fails, wake up to resync (in which
   * case the waitStatus can be transiently and harmlessly wrong).
   */
    // 熟悉的方法,不再講
    Node p = enq(node);
    int ws = p.waitStatus;
    // 把節(jié)點(diǎn)從0設(shè)置為SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      LockSupport.unpark(node.thread);
    return true;
}

上述這三個方法,就是把Condition對象的隊(duì)列remove,然后add到aqs的隊(duì)列里。需要注意一點(diǎn),add到aqs隊(duì)列,并不是被喚醒?。。℃i一次只能被一個線程持有,所以還是逐個被喚醒的,只有調(diào)用signalAll()的線程在釋放鎖之后,才去喚醒next節(jié)點(diǎn)。至此,Condition的源碼已經(jīng)結(jié)束了。

流程分析:

我們用一個表格簡單分析一下流程,這里我們不考慮指令優(yōu)化,前面的代碼總是優(yōu)于后面的代碼先執(zhí)行。本來想畫個動態(tài)圖的,奈何不會AE, 湊合看吧。有好用的免費(fèi)的畫圖軟件,也請@我。

boy-one boy-two girl-one girl-two 飼養(yǎng)員 隊(duì)列情況
lock()拿到鎖
調(diào)用await()
把自己添加到smoke隊(duì)列 smoke:boy-one
釋放鎖,休眠
lock()拿到鎖
調(diào)用await()
把自己添加到smoke隊(duì)列 smoke:boy-one-->boy-two
釋放鎖,休眠
lock()拿到鎖
調(diào)用await()
把自己添加到snacks隊(duì)列 smoke:boy-one-->boy-two snacks:girl-one
釋放鎖,休眠
lock()拿到鎖
調(diào)用await()
把自己添加到snacks隊(duì)列 smoke:boy-one-->boy-two snacks: girl-one-->girl-two
釋放鎖,休眠
lock()拿到鎖
調(diào)用smoke.signalAll() smoke:snacks:girl-one-->girl-two aqs:boy-one-->boy-two
調(diào)用snacks.signalAll() smoke:snacks: aqs:boy-one-->boy-two-->girl-one-->girl-two
釋放鎖
aqs喚醒boy-one
被喚醒,拿到鎖
執(zhí)行邏輯
釋放鎖,喚醒下一個
被喚醒,拿到鎖
執(zhí)行邏輯
釋放鎖,喚醒下一個
被喚醒,拿到鎖
執(zhí)行邏輯
釋放鎖,喚醒下一個
被喚醒,拿到鎖
執(zhí)行邏輯
釋放鎖,喚醒下一個
發(fā)現(xiàn)沒有下一個,結(jié)束。

我們可以看到,整個流程都是圍繞一把鎖展開的,那么如果我們不加鎖,也就是我們把lock代碼都去掉,流程又會怎么樣呢?

@Slf4j
public class ReentrantLockTest {
    private static ReentrantLock lock = new ReentrantLock();
    private static boolean hasSmoke = false;
    private static boolean hasSnacks = false;

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5, 5, 1,
                TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
        executor.setCorePoolSize(20);
        Condition smoke = lock.newCondition();
        Condition snacks = lock.newCondition();
        executor.submit(()->{
            
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-one,沒有煙,干活沒動力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-one,有煙了,嘬一口.");
            
        });
        executor.submit(()->{
            
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-two,沒有煙,干活沒動力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-two,有煙了,嘬一口.");
            
        });
        executor.submit(()->{
            
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-one,沒有零食吃,干活沒力氣");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-one, 有零食了,吃一口.");
            
        });
        executor.submit(()->{
            
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-two,沒有零食吃,干活沒力氣");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-two, 有零食了,吃一口.");
           
        });
        executor.submit(()->{
            lock.lock();
            try {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                hasSmoke = true;
                log.info("煙來嘍");
                smoke.signalAll();
                hasSnacks = true;
                log.info("零食來嘍");
                snacks.signalAll();
            }finally {
                lock.unlock();
            }
        });
        executor.shutdown();
    }
}
11:52:51.473 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,沒有零食吃,干活沒力氣
11:52:51.473 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,沒有煙,干活沒動力
11:52:51.473 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,沒有零食吃,干活沒力氣
11:52:51.473 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,沒有煙,干活沒動力
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 煙來嘍
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食來嘍

在調(diào)用signalAll()方法的時候,會強(qiáng)制校驗(yàn)當(dāng)前線程是否持有鎖,否則會拋出異常,所以飼養(yǎng)員的鎖必須加,boy和girl的鎖我們?nèi)サ袅恕?/p>

public final void signalAll() {
  if (!isHeldExclusively())
  throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)
  doSignalAll(first);
}

我們看到運(yùn)行結(jié)果里,boy和girl都不干活了,這是咋回事呢?我們分析一下。

當(dāng)boy和girl調(diào)用smoke或者snacks的await()方法,會把自己添加到相應(yīng)的隊(duì)列。然后調(diào)用fullyRelease(node)釋放鎖,釋放鎖的方法里會校驗(yàn)當(dāng)前持有線程,不是的話會拋異常

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 這里拋異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

然后在fullyRelease(Node node)方法里,finally的代碼塊會執(zhí)行,node節(jié)點(diǎn)ws狀態(tài)置為CANCELLED。

飼養(yǎng)員在調(diào)用signalAll()方法,把隊(duì)列挪到aqs隊(duì)列的過程中在doSignalAll(Node first)里的transferForSignal()方法中

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

會首先用cas修改node節(jié)點(diǎn)狀態(tài),由于節(jié)點(diǎn)是CANCELLED,所以cas會失敗,失敗就直接返回。節(jié)點(diǎn)從Condition隊(duì)列中移除,加入aqs隊(duì)列失敗。因此,這個節(jié)點(diǎn)就灰飛煙滅了。

遺留問題

  1. 如果node節(jié)點(diǎn)的prev節(jié)點(diǎn)為null,肯定不在aqs隊(duì)列
    為什么這么說呢,Condition維護(hù)的隊(duì)列,是單鏈表,用nextWaiter指向下個節(jié)點(diǎn)
    Aqs維護(hù)的隊(duì)列,是雙向鏈表,用prev和next指向前后節(jié)點(diǎn),所以Condition維護(hù)的隊(duì)列的prev必為null,而Aqs維護(hù)的隊(duì)列的真實(shí)node節(jié)點(diǎn)的prev必不為null,因?yàn)槌跏蓟?duì)列的時候會在頭部放一個dummy node
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容