一、簡(jiǎn)介
? ? ? ?JUC中的ReentrantLock給我們提供了方便的加鎖解鎖操作,但是我們有時(shí)候會(huì)需要有條件的對(duì)線程進(jìn)行掛起和喚醒,此時(shí)另一個(gè)工具就排上了用場(chǎng)。下面是Condition變量的常用用法。
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Thread(() -> {
try {
lock.lock();
System.out.println("我要等一個(gè)新信號(hào)");
condition.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("拿到一個(gè)信號(hào)??!");
lock.unlock();
}, "waitThread");
Thread thread2 = new Thread(() -> {
lock.lock();
System.out.println("我拿到鎖了");
try {
Thread.sleep(3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal();
System.out.println("我發(fā)了一個(gè)信號(hào)!!");
lock.unlock();
}, "signalThread");
thread1.start();
thread2.start();
}
運(yùn)行完之后的結(jié)果:
我要等一個(gè)新信號(hào)
我拿到鎖了
我發(fā)了一個(gè)信號(hào)??!
拿到一個(gè)信號(hào)!!
? ? ? ?線程1獲取到鎖之后調(diào)用Condition的await()方法,該方法會(huì)釋放鎖,并將當(dāng)前線程掛起。隨后線程2會(huì)拿到鎖,并執(zhí)行signal()方法,該方法會(huì)喚起線程1,并釋放鎖,然后線程1拿到鎖,執(zhí)行后續(xù)的流程。
? ? ? ?所以說(shuō)Condition是一個(gè)多線程間協(xié)調(diào)通信的工具,使得某個(gè),或者某些線程一起等待某個(gè)條件(Condition),只有當(dāng)該條件具備( signal 或者 signalAll方法被調(diào)用)時(shí) ,這些等待線程才會(huì)被喚醒,從而重新爭(zhēng)奪鎖。
? ? ? ?那么,這些邏輯具體是怎么實(shí)現(xiàn)的呢?
? ? ? ?首先,必須要明確,在線程調(diào)用await()或者signal()/signalAll()方法時(shí),必須首先獲取鎖,否則會(huì)出現(xiàn)java.lang.IllegalMonitorStateException異常。其次,Condition(其實(shí)是ConditionObject,Condition接口的實(shí)現(xiàn))維護(hù)了一個(gè)所有等待Condition條件變量的線程的隊(duì)列,每個(gè)線程構(gòu)成一個(gè)Node結(jié)點(diǎn),也就是AQS中的Node結(jié)點(diǎn):
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
二、await()
? ? ? ?以Condition condition = lock.newCondition();為例,newCondition()會(huì)調(diào)用Sync的newCondition()方法:
public Condition newCondition() {
return sync.newCondition();
}
Sync的newCondition()方法:
final ConditionObject newCondition() {
return new ConditionObject();
}
//ConditionObject是AQS中的內(nèi)部類,實(shí)例屬性只有上面的firstWaiter和lastWaiter
public ConditionObject() { }
? ? ? ?對(duì)于await()方法,可以拋出中斷異常,
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//將當(dāng)前線程封裝成Node結(jié)點(diǎn),添加到Condition的等待隊(duì)列中
Node node = addConditionWaiter();
int savedState = fullyRelease(node); //釋放所占的鎖,因?yàn)樵谡{(diào)用await()方法時(shí),是占有鎖的
int interruptMode = 0;
//釋放了鎖之后,判斷當(dāng)前線程的node結(jié)點(diǎn)是不是在syncQueue中,
//什么是syncQueue呢?就是獲取鎖不成功而被掛起的線程所在的那個(gè)隊(duì)列
//如果不在syncQueue中,說(shuō)明當(dāng)前線程還不具備獲取鎖的資格,就將當(dāng)前線程掛起,直到被添加到阻塞隊(duì)列中,
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //線程被掛起,等待被signal喚醒,此時(shí)可以直接跳到下面的signal方法解析
//掛起的過(guò)程中,如果被中斷了,線程被喚醒,跳出while循環(huán)
//如果沒(méi)被中斷,則此時(shí)node已經(jīng)在阻塞隊(duì)列中了,也會(huì)跳出循環(huán)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此時(shí)線程已經(jīng)被喚醒,node結(jié)點(diǎn)已經(jīng)被添加到阻塞隊(duì)列中準(zhǔn)備獲取鎖,被誰(shuí)添加到阻塞隊(duì)列了呢,是signal
//acquireQueued嘗試獲取鎖,被中斷返回true,否則返回false
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
? ? ? ?其他版本的await()方法和這個(gè)類似,讀者可以自行嘗試分析。
三.signal()
//ConditionObject
public final void signal() {
//如果在沒(méi)有獲取鎖的情況下調(diào)用signal,會(huì)拋出IllegalMonitorStateException異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; //拿到隊(duì)列中的第一個(gè)node,此隊(duì)列是Condition隊(duì)列
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; //將第一個(gè)隊(duì)列移出隊(duì)列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//該方法是將結(jié)點(diǎn)移到阻塞隊(duì)列中,使得當(dāng)前node節(jié)點(diǎn)的線程可以有資格獲取鎖
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//在加入阻塞隊(duì)列之前,將node的waitStatus設(shè)置為0,如果失敗,說(shuō)明該節(jié)點(diǎn)已經(jīng)被取消,
//返回false,此時(shí)上面的doSignal方法會(huì)繼續(xù)遍歷Condition隊(duì)列,
//找到第一個(gè)還在等待Condition變量的結(jié)點(diǎn)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//將結(jié)點(diǎn)加入到阻塞隊(duì)列,返回的p結(jié)點(diǎn)是node的前置節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
//如果前一個(gè)節(jié)點(diǎn)已經(jīng)被取消等待(ws>0),或者修改waitStatus失敗,則直接喚醒。
//正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個(gè)判斷
//是不會(huì)為true的,所以,不會(huì)在這個(gè)時(shí)候喚醒該線程。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true; //成功轉(zhuǎn)移到阻塞隊(duì)列,返回true
}
? ? ? ?那么什么時(shí)候才會(huì)喚醒呢?當(dāng)前線程調(diào)用完 signal()之后,執(zhí)行lock.unlock()方法之后,釋放鎖的時(shí)候,會(huì)喚醒其他線程,這個(gè)就是unlock()的邏輯了。
? ? ? ?signalAll()方法的邏輯與signal()類似,讀者可以自己分析一遍。
四.總結(jié)
? ? ? ?現(xiàn)在我們?cè)賮?lái)理一遍:
? ? ? ?一個(gè)線程Alock.lock()獲取鎖成功之后,調(diào)用condition.await()方法,該方法會(huì)首先將線程封裝成node加入Condition隊(duì)列,然后釋放鎖,將線程移出阻塞隊(duì)列,然后掛起;另一個(gè)線程B因?yàn)橹熬€程釋放鎖,獲取鎖成功,調(diào)用signal()方法,將Condition隊(duì)列的第一個(gè)node轉(zhuǎn)移到阻塞隊(duì)列,這個(gè)時(shí)候還沒(méi)完,執(zhí)行完signal之后會(huì)釋放鎖,此時(shí)會(huì)喚醒后面的線程,此時(shí)線程A有機(jī)會(huì)競(jìng)爭(zhēng)到鎖。