Java的wait()、notify()學(xué)習(xí)三部曲由三篇文章組成,內(nèi)容分別是:?
一、通過閱讀openjdk8的源碼,分析和理解wait,notify在JVM中的具體執(zhí)行過程;?
二、修改JVM源碼,編譯構(gòu)建成新的JVM,把我們感興趣的參數(shù)打印出來,結(jié)合具體代碼檢查和我們的理解是否一致;?
三、修改JVM源碼,編譯構(gòu)建成新的JVM,按照我們的理解去修改關(guān)鍵參數(shù),看能否達(dá)到預(yù)期效果;
現(xiàn)在,咱們一起開始既漫長又深入的wait、notify學(xué)習(xí)之旅吧!
Java多線程開發(fā)中,我們常用到wait()和notify()方法來實(shí)現(xiàn)線程間的協(xié)作,簡單的說步驟如下:?
1. A線程取得鎖,執(zhí)行wait(),釋放鎖;?
2. B線程取得鎖,完成業(yè)務(wù)后執(zhí)行notify(),再釋放鎖;?
3. B線程釋放鎖之后,A線程取得鎖,繼續(xù)執(zhí)行wait()之后的代碼;
通常,對于synchronize(lock){…}這樣的代碼塊,編譯后會(huì)生成monitorenter和monitorexit指令,線程執(zhí)行到monitorenter指令時(shí)會(huì)嘗試取得lock對應(yīng)的monitor的所有權(quán)(CAS設(shè)置對象頭),取得后即獲取到鎖,執(zhí)行monitorexit指令時(shí)會(huì)釋放monitor的所有權(quán)即釋放鎖;
為了深入學(xué)習(xí)wait()和notify(),先用完整的demo程序來模擬場景吧,以下是源碼:
publicclassNotifyDemo{privatestaticvoidsleep(longsleepVal){try{? ? ? ? ? ? Thread.sleep(sleepVal);? ? ? ? }catch(Exception e){? ? ? ? ? ? e.printStackTrace();? ? ? ? }? ? }privatestaticvoidlog(String desc){? ? ? ? System.out.println(Thread.currentThread().getName() +" : "+ desc);? ? }? ? Object lock =newObject();publicvoidstartThreadA(){newThread(() -> {synchronized(lock){? ? ? ? ? ? ? ? log("get lock");? ? ? ? ? ? ? ? startThreadB();? ? ? ? ? ? ? ? log("start wait");try{? ? ? ? ? ? ? ? ? ? lock.wait();? ? ? ? ? ? ? ? }catch(InterruptedException e){? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? log("get lock after wait");? ? ? ? ? ? ? ? log("release lock");? ? ? ? ? ? }? ? ? ? },"thread-A").start();? ? }publicvoidstartThreadB(){newThread(()->{synchronized(lock){? ? ? ? ? ? ? ? log("get lock");? ? ? ? ? ? ? ? startThreadC();? ? ? ? ? ? ? ? sleep(100);? ? ? ? ? ? ? ? log("start notify");? ? ? ? ? ? ? ? lock.notify();? ? ? ? ? ? ? ? log("release lock");? ? ? ? ? ? }? ? ? ? },"thread-B").start();? ? }publicvoidstartThreadC(){newThread(() -> {synchronized(lock){? ? ? ? ? ? ? ? log("get lock");? ? ? ? ? ? ? ? log("release lock");? ? ? ? ? ? }? ? ? ? },"thread-C").start();? ? }publicstaticvoidmain(String[] args){newNotifyDemo().startThreadA();? ? }}
以上就是本次實(shí)戰(zhàn)用到的demo,代碼功能簡述如下:
啟動(dòng)線程A,取得鎖之后先啟動(dòng)線程B再執(zhí)行wait()方法,釋放鎖并等待;
線程B啟動(dòng)之后會(huì)等待鎖,A線程執(zhí)行wait()之后,線程B取得鎖,然后啟動(dòng)線程C,再執(zhí)行notify喚醒線程A,最后退出synchronize代碼塊,釋放鎖;
線程C啟動(dòng)之后就一直在等待鎖,這時(shí)候線程B還沒有退出synchronize代碼塊,鎖還在線程B手里;
線程A在線程B執(zhí)行notify()之后就一直在等待鎖,這時(shí)候線程B還沒有退出synchronize代碼塊,鎖還在線程B手里;
線程B退出synchronize代碼塊,釋放鎖之后,線程A和線程C競爭鎖;
把上面的代碼在Openjdk8下面執(zhí)行,反復(fù)執(zhí)行多次,都得到以下結(jié)果:
thread-A : get lock
thread-A : start wait
thread-B : get lock
thread-C : c thread is start
thread-B : start notify
thread-B : release lock
thread-A : after wait, acquire lock again
thread-A : release lock
thread-C : get lock
thread-C : release lock
1
2
3
4
5
6
7
8
9
10
針對以上結(jié)果,問題來了:?
第一個(gè)問題:?
將以上代碼反復(fù)執(zhí)行多次,結(jié)果都是B釋放鎖之后A會(huì)先得到鎖,這又是為什么呢?C為何不能先拿到鎖呢?
第二個(gè)問題:?
線程C自開始就執(zhí)行了monitorenter指令,它能得到鎖是容易理解的,但是線程A呢?在wait()之后并沒有沒有monitorenter指令,那么它又是如何取得鎖的呢?
wait()、notify()這些方法都是native方法,所以只有從JVM源碼尋找答案了,本次閱讀的是openjdk8的源碼;
按照demo代碼執(zhí)行順序,我整理了如下問題,帶著這些問題去看JVM源碼可以聚焦主線,不要被一些支線的次要的代碼卡住(例如一些異常處理,監(jiān)控和上報(bào)等):?
1. 線程A在wait()的時(shí)候做了什么??
2. 線程C啟動(dòng)后,由于此時(shí)線程B持有鎖,那么線程C此時(shí)在干啥??
3. 線程B在notify()的時(shí)候做了什么??
4. 線程B釋放鎖的時(shí)候做了什么?
在源碼中有段注釋堪稱是整篇文章最重要的說明,請大家始終記住這段信息,處處都用得上:
ObjectWaiter對象存在于WaitSet、EntryList、cxq等集合中,或者正在這些集合中移動(dòng)
原文如下:
請務(wù)必記住這三個(gè)集合:WaitSet、EntryList、cxq
好了,接下來看源碼分析問題吧:
打開hotspot/src/share/vm/runtime/objectMonitor.cpp,看ObjectMonitor::wait方法:
如上圖所示,有兩處代碼值得我們注意:?
1. 綠框中將當(dāng)前線程包裝成ObjectWaiter對象,并且狀態(tài)為TS_WAIT,這里對應(yīng)的是jstack看到的線程狀態(tài)WAITING;?
2. 紅框中調(diào)用了AddWaiter方法,跟進(jìn)去看下:
這個(gè)ObjectWaiter對象被放入了_WaitSet中,_WaitSet是個(gè)環(huán)形雙向鏈表(circular doubly linked list)
回到ObjectMonitor::wait方法接著往下看,會(huì)發(fā)現(xiàn)關(guān)鍵代碼如下圖,當(dāng)前線程通過park()方法開始掛起(suspend):
至此,我們把wait()方法要做的事情就理清了:?
1. 包裝成ObjectWaiter對象,狀態(tài)為TS_WAIT;?
2. ObjectWaiter對象被放入_WaitSet中;?
3. 當(dāng)前線程掛起;
此時(shí)的線程C無法進(jìn)入synchronized{}代碼塊,用jstack看應(yīng)該是BLOCKED狀態(tài),如下圖:
我們看看monitorenter指令對應(yīng)的源碼吧,位置:openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread*thread, BasicObjectLock*elem))#ifdefASSERTthread->last_frame().interpreter_frame_verify_monitor(elem);#endifif(PrintBiasedLockingStatistics) {? ? Atomic::inc(BiasedLocking::slow_path_entry_count_addr());? }Handleh_obj(thread, elem->obj());? assert(Universe::heap()->is_in_reserved_or_null(h_obj()),"must be NULL or an object");if(UseBiasedLocking) {// Retry fast entry if bias is revoked to avoid unnecessary inflationObjectSynchronizer::fast_enter(h_obj, elem->lock(),true, CHECK);? }else{? ? ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);? }? assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),"must be NULL or an object");#ifdefASSERTthread->last_frame().interpreter_frame_verify_monitor(elem);#endifIRT_END
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
上面的代碼有個(gè)if (UseBiasedLocking)判斷,是判斷是否使用偏向鎖的,本例中的鎖顯然已經(jīng)不屬于當(dāng)前線程C了,所以我們還是直接看slow_enter(h_obj, elem->lock(), CHECK)方法吧;
打開openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:
voidObjectSynchronizer::slow_enter(Handleobj, BasicLock*lock, TRAPS) {? markOop mark=obj->mark();? assert(!mark->has_bias_pattern(),"should not see bias pattern here");//是否處于無鎖狀態(tài)if(mark->is_neutral()) {// Anticipate successful CAS -- the ST of the displaced mark must// be visible <= the ST performed by the CAS.lock->set_displaced_header(mark);//無鎖狀態(tài)就去競爭鎖if(mark==(markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {? ? ? TEVENT (slow_enter: release stacklock) ;return;? ? }// Fall through to inflate() ...}elseif(mark->has_locker()&&THREAD->is_lock_owned((address)mark->locker())) {//如果處于有鎖狀態(tài),就檢查是不是當(dāng)前線程持有鎖,如果是當(dāng)前線程持有的,就return,然后就能執(zhí)行同步代碼塊中的代碼了assert(lock!=mark->locker(),"must not re-lock the same lock");? ? assert(lock!=(BasicLock*)obj->mark(),"don't relock with same BasicLock");? ? lock->set_displaced_header(NULL);return;? }#if0// The following optimization isn't particularly useful.if(mark->has_monitor()&&mark->monitor()->is_entered(THREAD)) {? ? lock->set_displaced_header (NULL) ;return;? }#endif// The object header will never be displaced to this lock,// so it does not matter what the value is, except that it// must be non-zero to avoid looking like a re-entrant lock,// and must not look locked either.lock->set_displaced_header(markOopDesc::unused_mark());//鎖膨脹ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
線程C在上面代碼中的執(zhí)行順序如下:?
1. 判斷是否是無鎖狀態(tài),如果是就通過Atomic::cmpxchg_ptr去競爭鎖;?
2. 不是無鎖狀態(tài),就檢查當(dāng)前鎖是否是線程C持有;?
3. 不是線程C持有,調(diào)用inflate方法開始鎖膨脹;
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
來看看鎖膨脹的源碼:
如上圖,鎖膨脹的代碼太長,我們這里只看關(guān)鍵代碼吧:?
紅框中,如果當(dāng)前狀態(tài)已經(jīng)是重量級鎖,就通過mark->monitor()方法取得ObjectMonitor指針再返回;?
綠框中,如果還不是重量級鎖,就檢查是否處于膨脹中狀態(tài)(其他線程正在膨脹中),如果是膨脹中,就調(diào)用ReadStableMark方法進(jìn)行等待,ReadStableMark方法執(zhí)行完畢后再通過continue繼續(xù)檢查,ReadStableMark方法中還會(huì)調(diào)用os::NakedYield()釋放CPU資源;
如果紅框和綠框的條件都沒有命中,目前已經(jīng)是輕量級鎖了(不是重量級鎖并且不處于鎖膨脹狀態(tài)),可以開始膨脹了,如下圖:
簡單來說,鎖膨脹就是通過CAS將監(jiān)視器對象OjectMonitor的狀態(tài)設(shè)置為INFLATING,如果CAS失敗,就在此循環(huán),再走前一副圖中的的紅框和綠框中的判斷,如果CAS設(shè)置成功,會(huì)繼續(xù)設(shè)置ObjectMonitor中的header、owner等字段,然后inflate方法返回監(jiān)視器對象OjectMonitor;
看看之前slow_enter方法中,調(diào)用inflate方法的代碼如下:
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
1
所以inflate方法返回監(jiān)視器對象OjectMonitor之后,會(huì)立刻執(zhí)行OjectMonitor的enter方法,這個(gè)方法中開始競爭鎖了,方法在openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp文件中:
如上圖,紅框中表示OjectMonitor的enter方法一進(jìn)來就通過CAS將OjectMonitor的_owner設(shè)置為當(dāng)前線程,綠框中表示設(shè)置成功的邏輯,第一個(gè)if表示重入鎖的邏輯,第二個(gè)if表示第一次設(shè)置_owner成功,都意味著競爭鎖成功,而我們的線程C顯然是競爭失敗的,會(huì)進(jìn)入下圖中的無線循環(huán),反復(fù)調(diào)用EnterI方法:
進(jìn)入EnterI方法看看:
如上圖,首先構(gòu)造一個(gè)ObjectWaiter對象node,后面的for(;;)代碼塊中來是一段非常巧妙的代碼,同一時(shí)刻可能有多個(gè)線程都競爭鎖失敗走進(jìn)這個(gè)EnterI方法,所以在這個(gè)for循環(huán)中,用CAS將_cxq地址放入node的_next,也就是把node放到_cxq隊(duì)列的首位,如果CAS失敗,就表示其他線程把node放入到_cxq的首位了,所以通過for循環(huán)再放一次,只要成功,此node就一定在最新的_cxq隊(duì)列的首位。
接下來的代碼又是一個(gè)無限循環(huán),如下圖:
從上圖可以看出,進(jìn)入循環(huán)后先調(diào)用TryLock方法競爭一次鎖,如果成功了就退出循環(huán),否則就調(diào)用Self->_ParkEvent->park方法使線程掛起,這里有自旋鎖的邏輯,也就是park方法帶了時(shí)間參數(shù),就會(huì)在掛起一段時(shí)間后自動(dòng)喚醒,如果不是自旋的條件,就一直掛起等待被其他條件喚醒,線程被喚醒后又會(huì)執(zhí)行TryLock方法競爭一次鎖,競爭不到繼續(xù)這個(gè)for循環(huán);
到這里我們已經(jīng)把線程C在BLOCK的時(shí)候的邏輯理清楚了,小結(jié)如下:
偏向鎖邏輯,未命中;
如果是無鎖狀態(tài),就通過CAS去競爭鎖,此處由于鎖已經(jīng)被線程B持有,所以不是無鎖狀態(tài);
不是無鎖狀態(tài),而且鎖不是線程C持有,執(zhí)行鎖膨脹,構(gòu)造OjectMonitor對象;
競爭鎖,競爭失敗就將線程加入_cxq隊(duì)列的首位;
開始無限循環(huán),競爭鎖成功就退出循環(huán),競爭失敗線程掛起,等待被喚醒后繼續(xù)競爭;
接下來該線程B執(zhí)行notify了,代碼是objectMonitor.cpp的ObjectMonitor::notify方法:
如上圖所示,首先是Policy的賦值,其次是調(diào)用DequeueWaiter()方法將_WaitSet隊(duì)列的第一個(gè)值取出并返回,還記得_WaitSet么?所有wait的線程都被包裝成ObjectWaiter對象然后放進(jìn)來了;?
接下來對ObjectWaiter對象的處理方式,根據(jù)Policy的不同而不同:?
Policy == 0:放入_EntryList隊(duì)列的排頭位置;?
Policy == 1:放入_EntryList隊(duì)列的末尾位置;?
Policy == 2:_EntryList隊(duì)列為空就放入_EntryList,否則放入_cxq隊(duì)列的排頭位置;
如上圖所示,請注意把ObjectWaiter的地址寫到_cxq變量的時(shí)候要用CAS操作,因?yàn)榇藭r(shí)可能有其他線程正在競爭鎖,競爭失敗的時(shí)候會(huì)將自己包裝成ObjectWaiter對象加入到_cxq中;
這里的代碼有一處疑問,期待著讀著您的指教:如果_EntryList為空,就把ObjectWaiter放入ObjectWaiter中,為什么要這樣做呢?
Policy == 3:放入_cxq隊(duì)列中,末尾位置;更新_cxq變量的值的時(shí)候,同樣要通過CAS注意并發(fā)問題;
這里有一段很巧妙的代碼,現(xiàn)將_cxq保存在Tail中,正常情況下將ObjectWaiter賦值給Tail->_next就可以了,但是此時(shí)有可能其他線程正在_cxq的尾部追加數(shù)據(jù)了,所以此時(shí)Tail對象對應(yīng)的記錄就不是最后一條了,那么它的_next就非空了,一旦發(fā)生這種情況,就執(zhí)行Tail = Tail->_next,這樣就獲得了最新的_cxq的尾部數(shù)據(jù),如下圖所示:
Policy等于其他值,立即喚醒ObjectWaiter對應(yīng)的線程;
小結(jié)一下,線程B執(zhí)行notify時(shí)候做的事情:
執(zhí)行過wait的線程都在隊(duì)列_WaitSet中,此處從_WaitSet中取出第一個(gè);
根據(jù)Policy的不同,將這個(gè)線程放入_EntryList或者_(dá)cxq隊(duì)列中的起始或末尾位置;
接下來到了揭開問題的關(guān)鍵了,我們來看objectMonitor.cpp的ObjectMonitor::exit方法;
如上圖,方法一進(jìn)來先做一些合法性判斷,接下來如紅框所示,是偏向鎖邏輯,偏向次數(shù)減一后直接返回,顯然線程B在此處不會(huì)返回,而是繼續(xù)往下執(zhí)行;
根據(jù)QMode的不同,有不同的處理方式:?
1. QMode = 2,并且_cxq非空:取_cxq隊(duì)列排頭位置的ObjectWaiter對象,調(diào)用ExitEpilog方法,該方法會(huì)喚醒ObjectWaiter對象的線程,此處會(huì)立即返回,后面的代碼不會(huì)執(zhí)行了;?
2. QMode = 3,并且_cxq非空:把_cxq隊(duì)列首元素放入_EntryList的尾部;?
3. QMode = 4,并且_cxq非空:把_cxq隊(duì)列首元素放入_EntryList的頭部;?
4. QMode = 0,不做什么,繼續(xù)往下看;
只有QMode=2的時(shí)候會(huì)提前返回,等于0、3、4的時(shí)候都會(huì)繼續(xù)往下執(zhí)行:
如果_EntryList的首元素非空,就取出來調(diào)用ExitEpilog方法,該方法會(huì)喚醒ObjectWaiter對象的線程,然后立即返回;?
如果_EntryList的首元素為空,就取_cxq的首元素,放入_EntryList,然后再從_EntryList中取出來執(zhí)行ExitEpilog方法,然后立即返回;
以上操作,均是執(zhí)行過ExitEpilog方法然后立即返回,如果取出的元素為空,就執(zhí)行循環(huán)繼續(xù)?。?/p>
小結(jié)一下,線程B釋放了鎖之后,執(zhí)行的操作如下:?
1. 偏向鎖邏輯,此處未命中;?
2. 根據(jù)QMode的不同,將ObjectWaiter從_cxq或者_(dá)EntryList中取出后喚醒;?
3. 喚醒的元素會(huì)繼續(xù)執(zhí)行掛起前的代碼,按照我們之前的分析,線程喚醒后,就會(huì)通過CAS去競爭鎖,此時(shí)由于線程B已經(jīng)釋放了鎖,那么此時(shí)應(yīng)該能競爭成功;
到了現(xiàn)在已經(jīng)將之前的幾個(gè)問題搞清了,匯總起來看看:?
1. 線程A在wait() 后被加入了_WaitSet隊(duì)列中;?
2. 線程C被線程B啟動(dòng)后競爭鎖失敗,被加入到_cxq隊(duì)列的首位;?
3. 線程B在notify()時(shí),從_WaitSet中取出第一個(gè),根據(jù)Policy的不同,將這個(gè)線程放入_EntryList或者_(dá)cxq隊(duì)列中的起始或末尾位置;?
4. 根據(jù)QMode的不同,將ObjectWaiter從_cxq或者_(dá)EntryList中取出后喚醒;;
所以,最初的問題已經(jīng)清楚了,wait()的線程被喚醒后,會(huì)進(jìn)入一個(gè)隊(duì)列,然后JVM會(huì)根據(jù)Policy和QMode的不同對隊(duì)列中的ObjectWaiter做不同的處理,被選中的ObjectWaiter會(huì)被喚醒,去競爭鎖;
至此,源碼分析已結(jié)束,但是因?yàn)槲覀儾恢繮olicy和QMode參數(shù)到底是多少,所以還不能對之前的問題有個(gè)明確的結(jié)果,這些還是留在下一章來解答吧,下一章里我們?nèi)バ薷腏VM源碼,把參數(shù)都打印出來;