本文為synchronized系列第二篇。主要內(nèi)容為分析偏向鎖的實現(xiàn)。
偏向鎖的誕生背景和基本原理在上文中已經(jīng)講過了,強烈建議在有看過上篇文章的基礎(chǔ)下閱讀本文。
本系列文章將對HotSpot的synchronized鎖實現(xiàn)進行全面分析,內(nèi)容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及源碼分析,希望給在研究synchronized路上的同學一些幫助。主要包括以下幾篇文章:
死磕Synchronized底層實現(xiàn)--輕量級鎖
死磕Synchronized底層實現(xiàn)--重量級鎖
更多文章見個人博客:https://github.com/farmerjohngit/myblog
本文將分為幾塊內(nèi)容:
1.偏向鎖的入口
2.偏向鎖的獲取流程
3.偏向鎖的撤銷流程
4.偏向鎖的釋放流程
5.偏向鎖的批量重偏向和批量撤銷
本文分析的JVM版本是JVM8,具體版本號以及代碼可以在這里看到。
偏向鎖入口
目前網(wǎng)上的很多文章,關(guān)于偏向鎖源碼入口都找錯地方了,導(dǎo)致我之前對于偏向鎖的很多邏輯一直想不通,走了很多彎路。
synchronized分為synchronized代碼塊和synchronized方法,其底層獲取鎖的邏輯都是一樣的,本文講解的是synchronized代碼塊的實現(xiàn)。上篇文章也說過,synchronized代碼塊是由monitorenter和monitorexit兩個指令實現(xiàn)的。
關(guān)于HotSpot虛擬機中獲取鎖的入口,網(wǎng)上很多文章要么給出的方法入口為interpreterRuntime.cpp#monitorenter,要么給出的入口為bytecodeInterpreter.cpp#1816。包括占小狼的這篇文章關(guān)于鎖入口的位置說法也是有問題的(當然文章還是很好的,在我剛開始研究synchronized的時候,小狼哥的這篇文章給了我很多幫助)。
要找鎖的入口,肯定是要在源碼中找到對monitorenter指令解析的地方。在HotSpot的中有兩處地方對monitorenter指令進行解析:一個是在bytecodeInterpreter.cpp#1816 ,另一個是在templateTable_x86_64.cpp#3667。
前者是JVM中的字節(jié)碼解釋器(bytecodeInterpreter),用C++實現(xiàn)了每條JVM指令(如monitorenter、invokevirtual等),其優(yōu)點是實現(xiàn)相對簡單且容易理解,缺點是執(zhí)行慢。后者是模板解釋器(templateInterpreter),其對每個指令都寫了一段對應(yīng)的匯編代碼,啟動時將每個指令與對應(yīng)匯編代碼入口綁定,可以說是效率做到了極致。模板解釋器的實現(xiàn)可以看這篇文章,在研究的過程中也請教過文章作者‘汪先生’一些問題,這里感謝一下。
在HotSpot中,只用到了模板解釋器,字節(jié)碼解釋器根本就沒用到,R大的讀書筆記中說的很清楚了,大家可以看看,這里不再贅述。
所以montorenter的解析入口在模板解釋器中,其代碼位于templateTable_x86_64.cpp#3667。通過調(diào)用路徑:templateTable_x86_64#monitorenter->interp_masm_x86_64#lock_object進入到偏向鎖入口macroAssembler_x86#biased_locking_enter,在這里大家可以看到會生成對應(yīng)的匯編代碼。需要注意的是,不是說每次解析monitorenter指令都會調(diào)用biased_locking_enter,而是只會在JVM啟動的時候調(diào)用該方法生成匯編代碼,之后對指令的解析是通過直接執(zhí)行匯編代碼。
其實bytecodeInterpreter的邏輯和templateInterpreter的邏輯是大同小異的,因為templateInterpreter中都是匯編代碼,比較晦澀,所以看bytecodeInterpreter的實現(xiàn)會便于理解一點。但這里有個坑,在jdk8u之前,bytecodeInterpreter并沒有實現(xiàn)偏向鎖的邏輯。我之前看的JDK8-87ee5ee27509這個版本就沒有實現(xiàn)偏向鎖的邏輯,導(dǎo)致我看了很久都沒看懂。在這個commit中對bytecodeInterpreter加入了偏向鎖的支持,我大致了看了下和templateInterpreter對比除了棧結(jié)構(gòu)不同外,其他邏輯大致相同,所以下文就按bytecodeInterpreter中的代碼對偏向鎖邏輯進行講解。templateInterpreter的匯編代碼講解可以看這篇文章,其實匯編源碼中都有英文注釋,了解了匯編幾個基本指令的作用再結(jié)合注釋理解起來也不是很難。
偏向鎖獲取流程
下面開始偏向鎖獲取流程分析,代碼在bytecodeInterpreter.cpp#1816。注意本文代碼都有所刪減。
CASE(_monitorenter): {
// lockee 就是鎖對象
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// code 1:找到一個空閑的Lock Record
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
//entry不為null,代表還有空閑的Lock Record
if (entry != NULL) {
// code 2:將Lock Record的obj指針指向鎖對象
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
// markoop即對象頭的mark word
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
// code 3:如果鎖對象的mark word的狀態(tài)是偏向模式
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
// code 4:這里有幾步操作,下文分析
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
// code 5:如果偏向的線程是自己且epoch等于class的epoch
if (anticipated_bias_locking_value == 0) {
// already biased towards this thread, nothing to do
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
// code 6:如果偏向模式關(guān)閉,則嘗試撤銷偏向鎖
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
// 利用CAS操作將mark word替換為class中的mark word
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
// code 7:如果epoch不等于class中的epoch,則嘗試重偏向
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// 構(gòu)造一個偏向當前線程的mark word
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
// CAS替換對象頭的mark word
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
// 重偏向失敗,代表存在多線程競爭,則調(diào)用monitorenter方法進行鎖升級
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
// 走到這里說明當前要么偏向別的線程,要么是匿名偏向(即沒有偏向任何線程)
// code 8:下面構(gòu)建一個匿名偏向的mark word,嘗試用CAS指令替換掉鎖對象的mark word
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
// CAS修改成功
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
// 如果修改失敗說明存在多線程競爭,所以進入monitorenter方法
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
// 如果偏向線程不是當前線程或沒有開啟偏向模式等原因都會導(dǎo)致success==false
if (!success) {
// 輕量級鎖的邏輯
//code 9: 構(gòu)造一個無鎖狀態(tài)的Displaced Mark Word,并將Lock Record的lock指向它
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
//如果指定了-XX:+UseHeavyMonitors,則call_vm=true,代表禁用偏向鎖和輕量級鎖
bool call_vm = UseHeavyMonitors;
// 利用CAS將對象頭的mark word替換為指向Lock Record的指針
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 判斷是不是鎖重入
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { //code 10: 如果是鎖重入,則直接將Displaced Mark Word設(shè)置為null
entry->lock()->set_displaced_header(NULL);
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
// lock record不夠,重新執(zhí)行
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
再回顧下對象頭中mark word的格式:JVM中的每個類也有一個類似mark word的prototype_header,用來標記該class的epoch和偏向開關(guān)等信息。上面的代碼中lockee->klass()->prototype_header()即獲取class的prototype_header。
code 1,從當前線程的棧中找到一個空閑的Lock Record(即代碼中的BasicObjectLock,下文都用Lock Record代指),判斷Lock Record是否空閑的依據(jù)是其obj字段 是否為null。注意這里是按內(nèi)存地址從低往高找到最后一個可用的Lock Record,換而言之,就是找到內(nèi)存地址最高的可用Lock Record。
code 2,獲取到Lock Record后,首先要做的就是為其obj字段賦值。
code 3,判斷鎖對象的mark word是否是偏向模式,即低3位是否為101。
code 4,這里有幾步位運算的操作anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ? ~((uintptr_t) markOopDesc::age_mask_in_place); 這個位運算可以分為3個部分。
第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 將當前線程id和類的prototype_header相或,這樣得到的值為(當前線程id + prototype_header中的(epoch + 分代年齡 + 偏向鎖標志 + 鎖標志位)),注意prototype_header的分代年齡那4個字節(jié)為0
第二部分 ^ (uintptr_t)mark 將上面計算得到的結(jié)果與鎖對象的markOop進行異或,相等的位全部被置為0,只剩下不相等的位。
第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place為...0001111000,取反后,變成了...1110000111,除了分代年齡那4位,其他位全為1;將取反后的結(jié)果再與上面的結(jié)果相與,將上面異或得到的結(jié)果中分代年齡給忽略掉。
code 5,anticipated_bias_locking_value==0代表偏向的線程是當前線程且mark word的epoch等于class的epoch,這種情況下什么都不用做。
code 6,(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0代表class的prototype_header或?qū)ο蟮?code>mark word中偏向模式是關(guān)閉的,又因為能走到這已經(jīng)通過了mark->has_bias_pattern()判斷,即對象的mark word中偏向模式是開啟的,那也就是說class的prototype_header不是偏向模式。
然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤銷偏向鎖,我們知道CAS會有幾個參數(shù),1是預(yù)期的原值,2是預(yù)期修改后的值 ,3是要修改的對象,與之對應(yīng),cmpxchg_ptr方法第一個參數(shù)是預(yù)期修改后的值,第2個參數(shù)是修改的對象,第3個參數(shù)是預(yù)期原值,方法返回實際原值,如果等于預(yù)期原值則說明修改成功。
code 7,如果epoch已過期,則需要重偏向,利用CAS指令將鎖對象的mark word替換為一個偏向當前線程且epoch為類的epoch的新的mark word。
code 8,CAS將偏向線程改為當前線程,如果當前是匿名偏向則能修改成功,否則進入鎖升級的邏輯。
code 9,這一步已經(jīng)是輕量級鎖的邏輯了。從上圖的mark word的格式可以看到,輕量級鎖中mark word存的是指向Lock Record的指針。這里構(gòu)造一個無鎖狀態(tài)的mark word,然后存儲到Lock Record(Lock Record的格式可以看第一篇文章)。設(shè)置mark word是無鎖狀態(tài)的原因是:輕量級鎖解鎖時是將對象頭的mark word設(shè)置為Lock Record中的Displaced Mark Word,所以創(chuàng)建時設(shè)置為無鎖狀態(tài),解鎖時直接用CAS替換就好了。
code 10, 如果是鎖重入,則將Lock Record的Displaced Mark Word設(shè)置為null,起到一個鎖重入計數(shù)的作用。
以上是偏向鎖加鎖的流程(包括部分輕量級鎖的加鎖流程),如果當前鎖已偏向其他線程||epoch值過期||偏向模式關(guān)閉||獲取偏向鎖的過程中存在并發(fā)沖突,都會進入到InterpreterRuntime::monitorenter方法, 在該方法中會對偏向鎖撤銷和升級。
偏向鎖的撤銷
這里說的撤銷是指在獲取偏向鎖的過程因為不滿足條件導(dǎo)致要將鎖對象改為非偏向鎖狀態(tài);釋放是指退出同步塊時的過程,釋放鎖的邏輯會在下一小節(jié)闡述。請讀者注意本文中撤銷與釋放的區(qū)別。
如果獲取偏向鎖失敗會進入到InterpreterRuntime::monitorenter方法
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
...
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
...
IRT_END
可以看到如果開啟了JVM偏向鎖,那會進入到ObjectSynchronizer::fast_enter方法中。
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, THREAD) ;
}
如果是正常的Java線程,會走上面的邏輯進入到BiasedLocking::revoke_and_rebias方法,如果是VM線程則會走到下面的BiasedLocking::revoke_at_safepoint。我們主要看BiasedLocking::revoke_and_rebias方法。這個方法的主要作用像它的方法名:撤銷或者重偏向,第一個參數(shù)封裝了鎖對象和當前線程,第二個參數(shù)代表是否允許重偏向,這里是true。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
markOop mark = obj->mark();
if (mark->is_biased_anonymously() && !attempt_rebias) {
//如果是匿名偏向且attempt_rebias==false會走到這里,如鎖對象的hashcode方法被調(diào)用會出現(xiàn)這種情況,需要撤銷偏向鎖。
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {
// 鎖對象開啟了偏向模式會走到這里
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
//code 1: 如果對應(yīng)class關(guān)閉了偏向模式
if (!prototype_header->has_bias_pattern()) {
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
//code2: 如果epoch過期
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
} else {
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
//code 3:批量重偏向與批量撤銷的邏輯
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == HR_NOT_BIASED) {
return NOT_BIASED;
} else if (heuristics == HR_SINGLE_REVOKE) {
//code 4:撤銷單個線程
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
// 走到這里說明需要撤銷的是偏向當前線程的鎖,當調(diào)用Object#hashcode方法時會走到這一步
// 因為只要遍歷當前線程的棧就好了,所以不需要等到safepoint再撤銷。
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} else {
// 下面代碼最終會在VM線程中的safepoint調(diào)用revoke_bias方法
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
return revoke.status_code();
}
}
assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
//code5:批量撤銷、批量重偏向的邏輯
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}
會走到該方法的邏輯有很多,我們只分析最常見的情況:假設(shè)鎖已經(jīng)偏向線程A,這時B線程嘗試獲得鎖。
上面的code 1,code 2B線程都不會走到,最終會走到code 4處,如果要撤銷的鎖偏向的是當前線程則直接調(diào)用revoke_bias撤銷偏向鎖,否則會將該操作push到VM Thread中等到safepoint的時候再執(zhí)行。
關(guān)于VM Thread這里介紹下:在JVM中有個專門的VM Thread,該線程會源源不斷的從VMOperationQueue中取出請求,比如GC請求。對于需要safepoint的操作(VM_Operationevaluate_at_safepoint返回true)必須要等到所有的Java線程進入到safepoint才開始執(zhí)行。 關(guān)于safepoint可以參考下這篇文章。
接下來我們著重分析下revoke_bias方法。第一個參數(shù)為鎖對象,第2、3個參數(shù)為都為false
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
markOop mark = obj->mark();
// 如果沒有開啟偏向模式,則直接返回NOT_BIASED
if (!mark->has_bias_pattern()) {
...
return BiasedLocking::NOT_BIASED;
}
uint age = mark->age();
// 構(gòu)建兩個mark word,一個是匿名偏向模式(101),一個是無鎖模式(001)
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
...
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {
// 匿名偏向。當調(diào)用鎖對象的hashcode()方法可能會導(dǎo)致走到這個邏輯
// 如果不允許重偏向,則將對象的mark word設(shè)置為無鎖模式
if (!allow_rebias) {
obj->set_mark(unbiased_prototype);
}
...
return BiasedLocking::BIAS_REVOKED;
}
// code 1:判斷偏向線程是否還存活
bool thread_is_alive = false;
// 如果當前線程就是偏向線程
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
// 遍歷當前jvm的所有線程,如果能找到,則說明偏向的線程還存活
for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
if (cur_thread == biased_thread) {
thread_is_alive = true;
break;
}
}
}
// 如果偏向的線程已經(jīng)不存活了
if (!thread_is_alive) {
// 允許重偏向則將對象mark word設(shè)置為匿名偏向狀態(tài),否則設(shè)置為無鎖狀態(tài)
if (allow_rebias) {
obj->set_mark(biased_prototype);
} else {
obj->set_mark(unbiased_prototype);
}
...
return BiasedLocking::BIAS_REVOKED;
}
// 線程還存活則遍歷線程棧中所有的Lock Record
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
// 如果能找到對應(yīng)的Lock Record說明偏向的線程還在執(zhí)行同步代碼塊中的代碼
if (mon_info->owner() == obj) {
...
// 需要升級為輕量級鎖,直接修改偏向線程棧中的Lock Record。為了處理鎖重入的case,在這里將Lock Record的Displaced Mark Word設(shè)置為null,第一個Lock Record會在下面的代碼中再處理
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);
} else {
...
}
}
if (highest_lock != NULL) {
// 修改第一個Lock Record為無鎖狀態(tài),然后將obj的mark word設(shè)置為執(zhí)行該Lock Record的指針
highest_lock->set_displaced_header(unbiased_prototype);
obj->release_set_mark(markOopDesc::encode(highest_lock));
...
} else {
// 走到這里說明偏向線程已經(jīng)不在同步塊中了
...
if (allow_rebias) {
//設(shè)置為匿名偏向狀態(tài)
obj->set_mark(biased_prototype);
} else {
// 將mark word設(shè)置為無鎖狀態(tài)
obj->set_mark(unbiased_prototype);
}
}
return BiasedLocking::BIAS_REVOKED;
}
需要注意下,當調(diào)用鎖對象的Object#hash或System.identityHashCode()方法會導(dǎo)致該對象的偏向鎖或輕量級鎖升級。這是因為在Java中一個對象的hashcode是在調(diào)用這兩個方法時才生成的,如果是無鎖狀態(tài)則存放在mark word中,如果是重量級鎖則存放在對應(yīng)的monitor中,而偏向鎖是沒有地方能存放該信息的,所以必須升級。具體可以看這篇文章的hashcode()方法對偏向鎖的影響小節(jié)(注意:該文中對于偏向鎖的加鎖描述有些錯誤),另外我也向該文章作者請教過一些問題,他很熱心的回答了我,在此感謝一下!
言歸正傳,revoke_bias方法邏輯:
- 查看偏向的線程是否存活,如果已經(jīng)不存活了,則直接撤銷偏向鎖。JVM維護了一個集合存放所有存活的線程,通過遍歷該集合判斷某個線程是否存活。
- 偏向的線程是否還在同步塊中,如果不在了,則撤銷偏向鎖。我們回顧一下偏向鎖的加鎖流程:每次進入同步塊(即執(zhí)行
monitorenter)的時候都會以從高往低的順序在棧中找到第一個可用的Lock Record,將其obj字段指向鎖對象。每次解鎖(即執(zhí)行monitorexit)的時候都會將最低的一個相關(guān)Lock Record移除掉。所以可以通過遍歷線程棧中的Lock Record來判斷線程是否還在同步塊中。 - 將偏向線程所有相關(guān)
Lock Record的Displaced Mark Word設(shè)置為null,然后將最高位的Lock Record的Displaced Mark Word設(shè)置為無鎖狀態(tài),最高位的Lock Record也就是第一次獲得鎖時的Lock Record(這里的第一次是指重入獲取鎖時的第一次),然后將對象頭指向最高位的Lock Record,這里不需要用CAS指令,因為是在safepoint。 執(zhí)行完后,就升級成了輕量級鎖。原偏向線程的所有Lock Record都已經(jīng)變成輕量級鎖的狀態(tài)。這里如果看不明白,請回顧上篇文章的輕量級鎖加鎖過程。
偏向鎖的釋放
偏向鎖的釋放入口在bytecodeInterpreter.cpp#1923
CASE(_monitorexit): {
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
// derefing's lockee ought to provoke implicit null check
// find our monitor slot
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// 從低往高遍歷棧的Lock Record
while (most_recent != limit ) {
// 如果Lock Record關(guān)聯(lián)的是該鎖對象
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
markOop header = lock->displaced_header();
// 釋放Lock Record
most_recent->set_obj(NULL);
// 如果是偏向模式,僅僅釋放Lock Record就好了。否則要走輕量級鎖or重量級鎖的釋放流程
if (!lockee->mark()->has_bias_pattern()) {
bool call_vm = UseHeavyMonitors;
// header!=NULL說明不是重入,則需要將Displaced Mark Word CAS到對象頭的Mark Word
if (header != NULL || call_vm) {
if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// CAS失敗或者是重量級鎖則會走到這里,先將obj還原,然后調(diào)用monitorexit方法
most_recent->set_obj(lockee);
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
}
//執(zhí)行下一條命令
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
//處理下一條Lock Record
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
上面的代碼結(jié)合注釋理解起來應(yīng)該不難,偏向鎖的釋放很簡單,只要將對應(yīng)Lock Record釋放就好了,而輕量級鎖則需要將Displaced Mark Word替換到對象頭的mark word中。如果CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit方法中。該方法會在輕量級與重量級鎖的文章中講解。
批量重偏向和批量撤銷
批量重偏向和批量撤銷的背景可以看上篇文章,相關(guān)實現(xiàn)在BiasedLocking::revoke_and_rebias中:
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
...
//code 1:重偏向的邏輯
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
// 非重偏向的邏輯
...
assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
//code 2:批量撤銷、批量重偏向的邏輯
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}
在每次撤銷偏向鎖的時候都通過update_heuristics方法記錄下來,以類為單位,當某個類的對象撤銷偏向次數(shù)達到一定閾值的時候JVM就認為該類不適合偏向模式或者需要重新偏向另一個對象,update_heuristics就會返回HR_BULK_REVOKE或HR_BULK_REBIAS。進行批量撤銷或批量重偏向。
先看update_heuristics方法。
static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
markOop mark = o->mark();
//如果不是偏向模式直接返回
if (!mark->has_bias_pattern()) {
return HR_NOT_BIASED;
}
// 鎖對象的類
Klass* k = o->klass();
// 當前時間
jlong cur_time = os::javaTimeMillis();
// 該類上一次批量撤銷的時間
jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();
// 該類偏向鎖撤銷的次數(shù)
int revocation_count = k->biased_lock_revocation_count();
// BiasedLockingBulkRebiasThreshold是重偏向閾值(默認20),BiasedLockingBulkRevokeThreshold是批量撤銷閾值(默認40),BiasedLockingDecayTime是開啟一次新的批量重偏向距離上次批量重偏向的后的延遲時間,默認25000。也就是開啟批量重偏向后,經(jīng)過了一段較長的時間(>=BiasedLockingDecayTime),撤銷計數(shù)器才超過閾值,那我們會重置計數(shù)器。
if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
(revocation_count < BiasedLockingBulkRevokeThreshold) &&
(last_bulk_revocation_time != 0) &&
(cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {
// This is the first revocation we've seen in a while of an
// object of this type since the last time we performed a bulk
// rebiasing operation. The application is allocating objects in
// bulk which are biased toward a thread and then handing them
// off to another thread. We can cope with this allocation
// pattern via the bulk rebiasing mechanism so we reset the
// klass's revocation count rather than allow it to increase
// monotonically. If we see the need to perform another bulk
// rebias operation later, we will, and if subsequently we see
// many more revocation operations in a short period of time we
// will completely disable biasing for this type.
k->set_biased_lock_revocation_count(0);
revocation_count = 0;
}
// 自增撤銷計數(shù)器
if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
revocation_count = k->atomic_incr_biased_lock_revocation_count();
}
// 如果達到批量撤銷閾值則返回HR_BULK_REVOKE
if (revocation_count == BiasedLockingBulkRevokeThreshold) {
return HR_BULK_REVOKE;
}
// 如果達到批量重偏向閾值則返回HR_BULK_REBIAS
if (revocation_count == BiasedLockingBulkRebiasThreshold) {
return HR_BULK_REBIAS;
}
// 沒有達到閾值則撤銷單個對象的鎖
return HR_SINGLE_REVOKE;
}
當達到閾值的時候就會通過VM 線程在safepoint調(diào)用bulk_revoke_or_rebias_at_safepoint, 參數(shù)bulk_rebias如果是true代表是批量重偏向否則為批量撤銷。attempt_rebias_of_object代表對操作的鎖對象o是否運行重偏向,這里是true。
static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
bool bulk_rebias,
bool attempt_rebias_of_object,
JavaThread* requesting_thread) {
...
jlong cur_time = os::javaTimeMillis();
o->klass()->set_last_biased_lock_bulk_revocation_time(cur_time);
Klass* k_o = o->klass();
Klass* klass = k_o;
if (bulk_rebias) {
// 批量重偏向的邏輯
if (klass->prototype_header()->has_bias_pattern()) {
// 自增前類中的的epoch
int prev_epoch = klass->prototype_header()->bias_epoch();
// code 1:類中的epoch自增
klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
int cur_epoch = klass->prototype_header()->bias_epoch();
// code 2:遍歷所有線程的棧,更新類型為該klass的所有鎖實例的epoch
for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
oop owner = mon_info->owner();
markOop mark = owner->mark();
if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
// We might have encountered this object already in the case of recursive locking
assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
owner->set_mark(mark->set_bias_epoch(cur_epoch));
}
}
}
}
// 接下來對當前鎖對象進行重偏向
revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
} else {
...
// code 3:批量撤銷的邏輯,將類中的偏向標記關(guān)閉,markOopDesc::prototype()返回的是一個關(guān)閉偏向模式的prototype
klass->set_prototype_header(markOopDesc::prototype());
// code 4:遍歷所有線程的棧,撤銷該類所有鎖的偏向
for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
oop owner = mon_info->owner();
markOop mark = owner->mark();
if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
revoke_bias(owner, false, true, requesting_thread);
}
}
}
// 撤銷當前鎖對象的偏向模式
revoke_bias(o, false, true, requesting_thread);
}
...
BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;
if (attempt_rebias_of_object &&
o->mark()->has_bias_pattern() &&
klass->prototype_header()->has_bias_pattern()) {
// 構(gòu)造一個偏向請求線程的mark word
markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
klass->prototype_header()->bias_epoch());
// 更新當前鎖對象的mark word
o->set_mark(new_mark);
status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
...
}
...
return status_code;
}
該方法分為兩個邏輯:批量重偏向和批量撤銷。
先看批量重偏向,分為兩步:
code 1 將類中的撤銷計數(shù)器自增1,之后當該類已存在的實例獲得鎖時,就會嘗試重偏向,相關(guān)邏輯在偏向鎖獲取流程小節(jié)中。
code 2 處理當前正在被使用的鎖對象,通過遍歷所有存活線程的棧,找到所有正在使用的偏向鎖對象,然后更新它們的epoch值。也就是說不會重偏向正在使用的鎖,否則會破壞鎖的線程安全性。
批量撤銷邏輯如下:
code 3將類的偏向標記關(guān)閉,之后當該類已存在的實例獲得鎖時,就會升級為輕量級鎖;該類新分配的對象的mark word則是無鎖模式。
code 4處理當前正在被使用的鎖對象,通過遍歷所有存活線程的棧,找到所有正在使用的偏向鎖對象,然后撤銷偏向鎖。