前言
這一篇文章,想和大家分享一下Condition的源碼學(xué)習(xí)過程,Condition的應(yīng)用,其實(shí)是很簡單的,相信大家在項(xiàng)目中或者demo中或多或少都用過。最不濟(jì),在應(yīng)付面試的時候,相信也有不少小伙伴背過不少的面試題。話不多說,水平有限,文章中有錯誤的地方也請不吝指正,共同進(jìn)步。
應(yīng)用場景
ReentrantLock的Condition的設(shè)計(jì)場景,我在上一篇博客也分享過,建議先移步上一篇看一下ReentrantLock, Condition只是其中的一個應(yīng)用。
example:
先上一段代碼,看一下如何使用
@Slf4j
public class ReentrantLockTest {
private static ReentrantLock lock = new ReentrantLock();
private static boolean hasSmoke = false;
private static boolean hasSnacks = false;
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
executor.setCorePoolSize(20);
Condition smoke = lock.newCondition();
Condition snacks = lock.newCondition();
executor.submit(()->{
lock.lock();
try {
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-one,沒有煙,干活沒動力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-one,有煙了,嘬一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-two,沒有煙,干活沒動力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-two,有煙了,嘬一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-one,沒有零食吃,干活沒力氣");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-one, 有零食了,吃一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-two,沒有零食吃,干活沒力氣");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-two, 有零食了,吃一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
hasSmoke = true;
log.info("煙來嘍");
smoke.signalAll();
hasSnacks = true;
log.info("零食來嘍");
snacks.signalAll();
}finally {
lock.unlock();
}
});
executor.shutdown();
}
}
10:48:05.563 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,沒有煙,干活沒動力
10:48:05.565 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,沒有煙,干活沒動力
10:48:05.565 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,沒有零食吃,干活沒力氣
10:48:05.565 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,沒有零食吃,干活沒力氣
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 煙來嘍
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食來嘍
10:48:07.569 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-one,有煙了,嘬一口.
10:48:07.569 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-two,有煙了,嘬一口.
10:48:07.569 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-one, 有零食了,吃一口.
10:48:07.569 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-two, 有零食了,吃一口.
這個場景中,男生需要抽煙,女生需要吃零食,不然就會拒絕干活。當(dāng)煙來了或者零食來了的時候,大家就停止摸魚,開始干活。如果只給了零食,那么男生就會無限等待,同理,只給了煙, 那么女生就會一直摸魚下去,具體的例子大家可以自己跑一跑。
例子中我們可以看到,有4個員工都在摸魚,但是我們可以根據(jù)需求把這4個員工分為兩部分,一部分是需要抽煙的,一部分是需要吃零食的,如果我們有零食,我們就可以喚醒需要吃零食的人,煙同理。這樣就做到了部分喚醒。
分析
我們大致分析一下原理,我們首先使用lock.newCondition()方法new了兩個Condition,每個Condition對象都維護(hù)了一個隊(duì)列。然后boy-one和boy-two調(diào)用smoke.await(),會進(jìn)到smoke對象維護(hù)的隊(duì)列里,同理,girl-one和girl-two會進(jìn)入到snacks對象的隊(duì)列里。當(dāng)我們調(diào)用smoke.signalAll()的時候,smoke對象就會把boy-one和boy-two扔到aqs的隊(duì)列里,當(dāng)我們調(diào)用snacks.signalAll()的時候,snacks對象就會把girl-one和girl-two扔到aqs的隊(duì)列里。還記不記得之前aqs博文里的時候,node節(jié)點(diǎn)有個狀態(tài)。int CONDITION = -2; 沒錯,就是給Condition用的。
剩下的線程喚醒操作就交給了aqs。
上面的分析大致看懂了以后,我們帶著這個分析去看一下源碼:
源碼
newCondition()
// 通過調(diào)用鏈,大家已經(jīng)很清楚了吧,就是返回了一個ConditionObject對象,
// 這個對象里維護(hù)了一個node節(jié)點(diǎn)的鏈表,這個node節(jié)點(diǎn)不用猜,就是aqs封裝的node節(jié)點(diǎn)。
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
}
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 這個方法貼在下面了,官方注釋解釋的簡潔明了,就是添加節(jié)點(diǎn)到隊(duì)列尾部
Node node = addConditionWaiter();
// 釋放資源當(dāng)前線程拿到的鎖資源,下面詳細(xì)解釋
int savedState = fullyRelease(node);
int interruptMode = 0;
// 調(diào)用isOnSyncQueue判斷是否在aqs隊(duì)列里,這個下面會詳細(xì)解釋。
while (!isOnSyncQueue(node)) {
// 不在aqs隊(duì)列,就調(diào)用park方法阻塞,交出鎖。
LockSupport.park(this);
// 被喚醒后,首先去檢查中斷,然后退出循環(huán)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 檢查完中斷去獲取鎖,然后做一些校驗(yàn)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 這個方法會釋放當(dāng)前節(jié)點(diǎn)占用的所有資源,注意是所有。
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 直接拿到現(xiàn)在的資源數(shù)
int savedState = getState();
// 然后釋放全部
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 注意,這里會把node節(jié)點(diǎn)的狀態(tài)置為CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
// 如果node節(jié)點(diǎn)是在CONDITION狀態(tài),肯定不在aqs隊(duì)列。不懂的看下面的transferForSignal()方法
// 如果node節(jié)點(diǎn)的prev節(jié)點(diǎn)為null,肯定不在aqs隊(duì)列。這個問題我放到最后單獨(dú)講
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 這是個兜底邏輯。就是遍歷aqs節(jié)點(diǎn),看node節(jié)點(diǎn)是否存在。
return findNodeFromTail(node);
}
signalAll()
public final void signalAll() {
// 調(diào)用signalAll()會首先校驗(yàn)調(diào)用signalAll()的線程是不是當(dāng)前鎖持有線程。不是會拋異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 從頭開始處理隊(duì)列
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
// 官方注釋總是簡單明了。。
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 把節(jié)點(diǎn)從CONDITION設(shè)置為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 熟悉的方法,不再講
Node p = enq(node);
int ws = p.waitStatus;
// 把節(jié)點(diǎn)從0設(shè)置為SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
上述這三個方法,就是把Condition對象的隊(duì)列remove,然后add到aqs的隊(duì)列里。需要注意一點(diǎn),add到aqs隊(duì)列,并不是被喚醒?。。℃i一次只能被一個線程持有,所以還是逐個被喚醒的,只有調(diào)用signalAll()的線程在釋放鎖之后,才去喚醒next節(jié)點(diǎn)。至此,Condition的源碼已經(jīng)結(jié)束了。
流程分析:
我們用一個表格簡單分析一下流程,這里我們不考慮指令優(yōu)化,前面的代碼總是優(yōu)于后面的代碼先執(zhí)行。本來想畫個動態(tài)圖的,奈何不會AE, 湊合看吧。有好用的免費(fèi)的畫圖軟件,也請@我。
| boy-one | boy-two | girl-one | girl-two | 飼養(yǎng)員 | 隊(duì)列情況 |
|---|---|---|---|---|---|
| lock()拿到鎖 | |||||
| 調(diào)用await() | |||||
| 把自己添加到smoke隊(duì)列 | smoke:boy-one | ||||
| 釋放鎖,休眠 | |||||
| lock()拿到鎖 | |||||
| 調(diào)用await() | |||||
| 把自己添加到smoke隊(duì)列 | smoke:boy-one-->boy-two | ||||
| 釋放鎖,休眠 | |||||
| lock()拿到鎖 | |||||
| 調(diào)用await() | |||||
| 把自己添加到snacks隊(duì)列 | smoke:boy-one-->boy-two snacks:girl-one | ||||
| 釋放鎖,休眠 | |||||
| lock()拿到鎖 | |||||
| 調(diào)用await() | |||||
| 把自己添加到snacks隊(duì)列 | smoke:boy-one-->boy-two snacks: girl-one-->girl-two | ||||
| 釋放鎖,休眠 | |||||
| lock()拿到鎖 | |||||
| 調(diào)用smoke.signalAll() | smoke:snacks:girl-one-->girl-two aqs:boy-one-->boy-two | ||||
| 調(diào)用snacks.signalAll() | smoke:snacks: aqs:boy-one-->boy-two-->girl-one-->girl-two | ||||
| 釋放鎖 | |||||
| aqs喚醒boy-one | |||||
| 被喚醒,拿到鎖 | |||||
| 執(zhí)行邏輯 | |||||
| 釋放鎖,喚醒下一個 | |||||
| 被喚醒,拿到鎖 | |||||
| 執(zhí)行邏輯 | |||||
| 釋放鎖,喚醒下一個 | |||||
| 被喚醒,拿到鎖 | |||||
| 執(zhí)行邏輯 | |||||
| 釋放鎖,喚醒下一個 | |||||
| 被喚醒,拿到鎖 | |||||
| 執(zhí)行邏輯 | |||||
| 釋放鎖,喚醒下一個 | |||||
| 發(fā)現(xiàn)沒有下一個,結(jié)束。 |
我們可以看到,整個流程都是圍繞一把鎖展開的,那么如果我們不加鎖,也就是我們把lock代碼都去掉,流程又會怎么樣呢?
@Slf4j
public class ReentrantLockTest {
private static ReentrantLock lock = new ReentrantLock();
private static boolean hasSmoke = false;
private static boolean hasSnacks = false;
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
executor.setCorePoolSize(20);
Condition smoke = lock.newCondition();
Condition snacks = lock.newCondition();
executor.submit(()->{
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-one,沒有煙,干活沒動力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-one,有煙了,嘬一口.");
});
executor.submit(()->{
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-two,沒有煙,干活沒動力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-two,有煙了,嘬一口.");
});
executor.submit(()->{
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-one,沒有零食吃,干活沒力氣");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-one, 有零食了,吃一口.");
});
executor.submit(()->{
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-two,沒有零食吃,干活沒力氣");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-two, 有零食了,吃一口.");
});
executor.submit(()->{
lock.lock();
try {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
hasSmoke = true;
log.info("煙來嘍");
smoke.signalAll();
hasSnacks = true;
log.info("零食來嘍");
snacks.signalAll();
}finally {
lock.unlock();
}
});
executor.shutdown();
}
}
11:52:51.473 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,沒有零食吃,干活沒力氣
11:52:51.473 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,沒有煙,干活沒動力
11:52:51.473 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,沒有零食吃,干活沒力氣
11:52:51.473 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,沒有煙,干活沒動力
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 煙來嘍
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食來嘍
在調(diào)用signalAll()方法的時候,會強(qiáng)制校驗(yàn)當(dāng)前線程是否持有鎖,否則會拋出異常,所以飼養(yǎng)員的鎖必須加,boy和girl的鎖我們?nèi)サ袅恕?/p>
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
我們看到運(yùn)行結(jié)果里,boy和girl都不干活了,這是咋回事呢?我們分析一下。
當(dāng)boy和girl調(diào)用smoke或者snacks的await()方法,會把自己添加到相應(yīng)的隊(duì)列。然后調(diào)用fullyRelease(node)釋放鎖,釋放鎖的方法里會校驗(yàn)當(dāng)前持有線程,不是的話會拋異常
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 這里拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
然后在fullyRelease(Node node)方法里,finally的代碼塊會執(zhí)行,node節(jié)點(diǎn)ws狀態(tài)置為CANCELLED。
飼養(yǎng)員在調(diào)用signalAll()方法,把隊(duì)列挪到aqs隊(duì)列的過程中在doSignalAll(Node first)里的transferForSignal()方法中
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
會首先用cas修改node節(jié)點(diǎn)狀態(tài),由于節(jié)點(diǎn)是CANCELLED,所以cas會失敗,失敗就直接返回。節(jié)點(diǎn)從Condition隊(duì)列中移除,加入aqs隊(duì)列失敗。因此,這個節(jié)點(diǎn)就灰飛煙滅了。
遺留問題
- 如果node節(jié)點(diǎn)的prev節(jié)點(diǎn)為null,肯定不在aqs隊(duì)列
為什么這么說呢,Condition維護(hù)的隊(duì)列,是單鏈表,用nextWaiter指向下個節(jié)點(diǎn)
Aqs維護(hù)的隊(duì)列,是雙向鏈表,用prev和next指向前后節(jié)點(diǎn),所以Condition維護(hù)的隊(duì)列的prev必為null,而Aqs維護(hù)的隊(duì)列的真實(shí)node節(jié)點(diǎn)的prev必不為null,因?yàn)槌跏蓟?duì)列的時候會在頭部放一個dummy node