死磕Synchronized底層實現(xiàn)--偏向鎖

本文為synchronized系列第二篇。主要內(nèi)容為分析偏向鎖的實現(xiàn)。

偏向鎖的誕生背景和基本原理在上文中已經(jīng)講過了,強烈建議在有看過上篇文章的基礎(chǔ)下閱讀本文。

本系列文章將對HotSpot的synchronized鎖實現(xiàn)進行全面分析,內(nèi)容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及源碼分析,希望給在研究synchronized路上的同學一些幫助。主要包括以下幾篇文章:

死磕Synchronized底層實現(xiàn)--概論

死磕Synchronized底層實現(xiàn)--偏向鎖

死磕Synchronized底層實現(xiàn)--輕量級鎖

死磕Synchronized底層實現(xiàn)--重量級鎖

更多文章見個人博客:https://github.com/farmerjohngit/myblog

本文將分為幾塊內(nèi)容:

1.偏向鎖的入口

2.偏向鎖的獲取流程

3.偏向鎖的撤銷流程

4.偏向鎖的釋放流程

5.偏向鎖的批量重偏向和批量撤銷

本文分析的JVM版本是JVM8,具體版本號以及代碼可以在這里看到。

偏向鎖入口

目前網(wǎng)上的很多文章,關(guān)于偏向鎖源碼入口都找錯地方了,導(dǎo)致我之前對于偏向鎖的很多邏輯一直想不通,走了很多彎路。

synchronized分為synchronized代碼塊和synchronized方法,其底層獲取鎖的邏輯都是一樣的,本文講解的是synchronized代碼塊的實現(xiàn)。上篇文章也說過,synchronized代碼塊是由monitorentermonitorexit兩個指令實現(xiàn)的。

關(guān)于HotSpot虛擬機中獲取鎖的入口,網(wǎng)上很多文章要么給出的方法入口為interpreterRuntime.cpp#monitorenter,要么給出的入口為bytecodeInterpreter.cpp#1816。包括占小狼的這篇文章關(guān)于鎖入口的位置說法也是有問題的(當然文章還是很好的,在我剛開始研究synchronized的時候,小狼哥的這篇文章給了我很多幫助)。

要找鎖的入口,肯定是要在源碼中找到對monitorenter指令解析的地方。在HotSpot的中有兩處地方對monitorenter指令進行解析:一個是在bytecodeInterpreter.cpp#1816 ,另一個是在templateTable_x86_64.cpp#3667

前者是JVM中的字節(jié)碼解釋器(bytecodeInterpreter),用C++實現(xiàn)了每條JVM指令(如monitorenterinvokevirtual等),其優(yōu)點是實現(xiàn)相對簡單且容易理解,缺點是執(zhí)行慢。后者是模板解釋器(templateInterpreter),其對每個指令都寫了一段對應(yīng)的匯編代碼,啟動時將每個指令與對應(yīng)匯編代碼入口綁定,可以說是效率做到了極致。模板解釋器的實現(xiàn)可以看這篇文章,在研究的過程中也請教過文章作者‘汪先生’一些問題,這里感謝一下。

在HotSpot中,只用到了模板解釋器,字節(jié)碼解釋器根本就沒用到,R大的讀書筆記中說的很清楚了,大家可以看看,這里不再贅述。

所以montorenter的解析入口在模板解釋器中,其代碼位于templateTable_x86_64.cpp#3667。通過調(diào)用路徑:templateTable_x86_64#monitorenter->interp_masm_x86_64#lock_object進入到偏向鎖入口macroAssembler_x86#biased_locking_enter,在這里大家可以看到會生成對應(yīng)的匯編代碼。需要注意的是,不是說每次解析monitorenter指令都會調(diào)用biased_locking_enter,而是只會在JVM啟動的時候調(diào)用該方法生成匯編代碼,之后對指令的解析是通過直接執(zhí)行匯編代碼。

其實bytecodeInterpreter的邏輯和templateInterpreter的邏輯是大同小異的,因為templateInterpreter中都是匯編代碼,比較晦澀,所以看bytecodeInterpreter的實現(xiàn)會便于理解一點。但這里有個坑,在jdk8u之前,bytecodeInterpreter并沒有實現(xiàn)偏向鎖的邏輯。我之前看的JDK8-87ee5ee27509這個版本就沒有實現(xiàn)偏向鎖的邏輯,導(dǎo)致我看了很久都沒看懂。在這個commit中對bytecodeInterpreter加入了偏向鎖的支持,我大致了看了下和templateInterpreter對比除了棧結(jié)構(gòu)不同外,其他邏輯大致相同,所以下文就按bytecodeInterpreter中的代碼對偏向鎖邏輯進行講解。templateInterpreter的匯編代碼講解可以看這篇文章,其實匯編源碼中都有英文注釋,了解了匯編幾個基本指令的作用再結(jié)合注釋理解起來也不是很難。

偏向鎖獲取流程

下面開始偏向鎖獲取流程分析,代碼在bytecodeInterpreter.cpp#1816。注意本文代碼都有所刪減

CASE(_monitorenter): {
  // lockee 就是鎖對象
  oop lockee = STACK_OBJECT(-1);
  // derefing's lockee ought to provoke implicit null check
  CHECK_NULL(lockee);
  // code 1:找到一個空閑的Lock Record
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  BasicObjectLock* entry = NULL;
  while (most_recent != limit ) {
    if (most_recent->obj() == NULL) entry = most_recent;
    else if (most_recent->obj() == lockee) break;
    most_recent++;
  }
  //entry不為null,代表還有空閑的Lock Record
  if (entry != NULL) {
    // code 2:將Lock Record的obj指針指向鎖對象
    entry->set_obj(lockee);
    int success = false;
    uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
    // markoop即對象頭的mark word
    markOop mark = lockee->mark();
    intptr_t hash = (intptr_t) markOopDesc::no_hash;
    // code 3:如果鎖對象的mark word的狀態(tài)是偏向模式
    if (mark->has_bias_pattern()) {
      uintptr_t thread_ident;
      uintptr_t anticipated_bias_locking_value;
      thread_ident = (uintptr_t)istate->thread();
     // code 4:這里有幾步操作,下文分析
      anticipated_bias_locking_value =
        (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
        ~((uintptr_t) markOopDesc::age_mask_in_place);
     // code 5:如果偏向的線程是自己且epoch等于class的epoch
      if  (anticipated_bias_locking_value == 0) {
        // already biased towards this thread, nothing to do
        if (PrintBiasedLockingStatistics) {
          (* BiasedLocking::biased_lock_entry_count_addr())++;
        }
        success = true;
      }
       // code 6:如果偏向模式關(guān)閉,則嘗試撤銷偏向鎖
      else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
        markOop header = lockee->klass()->prototype_header();
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        // 利用CAS操作將mark word替換為class中的mark word
        if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
          if (PrintBiasedLockingStatistics)
            (*BiasedLocking::revoked_lock_entry_count_addr())++;
        }
      }
         // code 7:如果epoch不等于class中的epoch,則嘗試重偏向
      else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
        // 構(gòu)造一個偏向當前線程的mark word
        markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
        if (hash != markOopDesc::no_hash) {
          new_header = new_header->copy_set_hash(hash);
        }
        // CAS替換對象頭的mark word  
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::rebiased_lock_entry_count_addr())++;
        }
        else {
          // 重偏向失敗,代表存在多線程競爭,則調(diào)用monitorenter方法進行鎖升級
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
      else {
         // 走到這里說明當前要么偏向別的線程,要么是匿名偏向(即沒有偏向任何線程)
        // code 8:下面構(gòu)建一個匿名偏向的mark word,嘗試用CAS指令替換掉鎖對象的mark word
        markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
        // debugging hint
        DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
           // CAS修改成功
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
        }
        else {
          // 如果修改失敗說明存在多線程競爭,所以進入monitorenter方法
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
    }

    // 如果偏向線程不是當前線程或沒有開啟偏向模式等原因都會導(dǎo)致success==false
    if (!success) {
      // 輕量級鎖的邏輯
      //code 9: 構(gòu)造一個無鎖狀態(tài)的Displaced Mark Word,并將Lock Record的lock指向它
      markOop displaced = lockee->mark()->set_unlocked();
      entry->lock()->set_displaced_header(displaced);
      //如果指定了-XX:+UseHeavyMonitors,則call_vm=true,代表禁用偏向鎖和輕量級鎖
      bool call_vm = UseHeavyMonitors;
      // 利用CAS將對象頭的mark word替換為指向Lock Record的指針
      if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
        // 判斷是不是鎖重入
        if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {        //code 10: 如果是鎖重入,則直接將Displaced Mark Word設(shè)置為null
          entry->lock()->set_displaced_header(NULL);
        } else {
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
      }
    }
    UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  } else {
    // lock record不夠,重新執(zhí)行
    istate->set_msg(more_monitors);
    UPDATE_PC_AND_RETURN(0); // Re-execute
  }
}

再回顧下對象頭中mark word的格式:
image

JVM中的每個類也有一個類似mark word的prototype_header,用來標記該class的epoch和偏向開關(guān)等信息。上面的代碼中lockee->klass()->prototype_header()即獲取class的prototype_header。

code 1,從當前線程的棧中找到一個空閑的Lock Record即代碼中的BasicObjectLock,下文都用Lock Record代指),判斷Lock Record是否空閑的依據(jù)是其obj字段 是否為null。注意這里是按內(nèi)存地址從低往高找到最后一個可用的Lock Record,換而言之,就是找到內(nèi)存地址最高的可用Lock Record。

code 2,獲取到Lock Record后,首先要做的就是為其obj字段賦值。

code 3,判斷鎖對象的mark word是否是偏向模式,即低3位是否為101。

code 4,這里有幾步位運算的操作anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ? ~((uintptr_t) markOopDesc::age_mask_in_place); 這個位運算可以分為3個部分。

第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 將當前線程id和類的prototype_header相或,這樣得到的值為(當前線程id + prototype_header中的(epoch + 分代年齡 + 偏向鎖標志 + 鎖標志位)),注意prototype_header的分代年齡那4個字節(jié)為0

第二部分 ^ (uintptr_t)mark 將上面計算得到的結(jié)果與鎖對象的markOop進行異或,相等的位全部被置為0,只剩下不相等的位。

第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place為...0001111000,取反后,變成了...1110000111,除了分代年齡那4位,其他位全為1;將取反后的結(jié)果再與上面的結(jié)果相與,將上面異或得到的結(jié)果中分代年齡給忽略掉。

code 5anticipated_bias_locking_value==0代表偏向的線程是當前線程且mark word的epoch等于class的epoch,這種情況下什么都不用做。

code 6(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0代表class的prototype_header或?qū)ο蟮?code>mark word中偏向模式是關(guān)閉的,又因為能走到這已經(jīng)通過了mark->has_bias_pattern()判斷,即對象的mark word中偏向模式是開啟的,那也就是說class的prototype_header不是偏向模式。

然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤銷偏向鎖,我們知道CAS會有幾個參數(shù),1是預(yù)期的原值,2是預(yù)期修改后的值 ,3是要修改的對象,與之對應(yīng),cmpxchg_ptr方法第一個參數(shù)是預(yù)期修改后的值,第2個參數(shù)是修改的對象,第3個參數(shù)是預(yù)期原值,方法返回實際原值,如果等于預(yù)期原值則說明修改成功。

code 7,如果epoch已過期,則需要重偏向,利用CAS指令將鎖對象的mark word替換為一個偏向當前線程且epoch為類的epoch的新的mark word。

code 8,CAS將偏向線程改為當前線程,如果當前是匿名偏向則能修改成功,否則進入鎖升級的邏輯。

code 9,這一步已經(jīng)是輕量級鎖的邏輯了。從上圖的mark word的格式可以看到,輕量級鎖中mark word存的是指向Lock Record的指針。這里構(gòu)造一個無鎖狀態(tài)的mark word,然后存儲到Lock RecordLock Record的格式可以看第一篇文章)。設(shè)置mark word是無鎖狀態(tài)的原因是:輕量級鎖解鎖時是將對象頭的mark word設(shè)置為Lock Record中的Displaced Mark Word,所以創(chuàng)建時設(shè)置為無鎖狀態(tài),解鎖時直接用CAS替換就好了。

code 10, 如果是鎖重入,則將Lock RecordDisplaced Mark Word設(shè)置為null,起到一個鎖重入計數(shù)的作用。

以上是偏向鎖加鎖的流程(包括部分輕量級鎖的加鎖流程),如果當前鎖已偏向其他線程||epoch值過期||偏向模式關(guān)閉||獲取偏向鎖的過程中存在并發(fā)沖突,都會進入到InterpreterRuntime::monitorenter方法, 在該方法中會對偏向鎖撤銷和升級。

偏向鎖的撤銷

這里說的撤銷是指在獲取偏向鎖的過程因為不滿足條件導(dǎo)致要將鎖對象改為非偏向鎖狀態(tài);釋放是指退出同步塊時的過程,釋放鎖的邏輯會在下一小節(jié)闡述。請讀者注意本文中撤銷與釋放的區(qū)別。

如果獲取偏向鎖失敗會進入到InterpreterRuntime::monitorenter方法

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  ...
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  ...
IRT_END

可以看到如果開啟了JVM偏向鎖,那會進入到ObjectSynchronizer::fast_enter方法中。

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}

如果是正常的Java線程,會走上面的邏輯進入到BiasedLocking::revoke_and_rebias方法,如果是VM線程則會走到下面的BiasedLocking::revoke_at_safepoint。我們主要看BiasedLocking::revoke_and_rebias方法。這個方法的主要作用像它的方法名:撤銷或者重偏向,第一個參數(shù)封裝了鎖對象和當前線程,第二個參數(shù)代表是否允許重偏向,這里是true。

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
    
  markOop mark = obj->mark();
  if (mark->is_biased_anonymously() && !attempt_rebias) {
     //如果是匿名偏向且attempt_rebias==false會走到這里,如鎖對象的hashcode方法被調(diào)用會出現(xiàn)這種情況,需要撤銷偏向鎖。
    markOop biased_value       = mark;
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {
      return BIAS_REVOKED;
    }
  } else if (mark->has_bias_pattern()) {
    // 鎖對象開啟了偏向模式會走到這里
    Klass* k = obj->klass();
    markOop prototype_header = k->prototype_header();
    //code 1: 如果對應(yīng)class關(guān)閉了偏向模式
    if (!prototype_header->has_bias_pattern()) {
      markOop biased_value       = mark;
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED;
    //code2: 如果epoch過期
    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      if (attempt_rebias) {
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED_AND_REBIASED;
        }
      } else {
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED;
        }
      }
    }
  }
  //code 3:批量重偏向與批量撤銷的邏輯
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {
    //code 4:撤銷單個線程
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    if (mark->biased_locker() == THREAD &&
        prototype_header->bias_epoch() == mark->bias_epoch()) {
      // 走到這里說明需要撤銷的是偏向當前線程的鎖,當調(diào)用Object#hashcode方法時會走到這一步
      // 因為只要遍歷當前線程的棧就好了,所以不需要等到safepoint再撤銷。
      ResourceMark rm;
      if (TraceBiasedLocking) {
        tty->print_cr("Revoking bias by walking my own stack:");
      }
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      return cond;
    } else {
      // 下面代碼最終會在VM線程中的safepoint調(diào)用revoke_bias方法
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      return revoke.status_code();
    }
  }
    
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");
   //code5:批量撤銷、批量重偏向的邏輯
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

會走到該方法的邏輯有很多,我們只分析最常見的情況:假設(shè)鎖已經(jīng)偏向線程A,這時B線程嘗試獲得鎖。

上面的code 1,code 2B線程都不會走到,最終會走到code 4處,如果要撤銷的鎖偏向的是當前線程則直接調(diào)用revoke_bias撤銷偏向鎖,否則會將該操作push到VM Thread中等到safepoint的時候再執(zhí)行。

關(guān)于VM Thread這里介紹下:在JVM中有個專門的VM Thread,該線程會源源不斷的從VMOperationQueue中取出請求,比如GC請求。對于需要safepoint的操作(VM_Operationevaluate_at_safepoint返回true)必須要等到所有的Java線程進入到safepoint才開始執(zhí)行。 關(guān)于safepoint可以參考下這篇文章。

接下來我們著重分析下revoke_bias方法。第一個參數(shù)為鎖對象,第2、3個參數(shù)為都為false

static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
  markOop mark = obj->mark();
  // 如果沒有開啟偏向模式,則直接返回NOT_BIASED
  if (!mark->has_bias_pattern()) {
    ...
    return BiasedLocking::NOT_BIASED;
  }

  uint age = mark->age();
  // 構(gòu)建兩個mark word,一個是匿名偏向模式(101),一個是無鎖模式(001)
  markOop   biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
  markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);

  ...

  JavaThread* biased_thread = mark->biased_locker();
  if (biased_thread == NULL) {
     // 匿名偏向。當調(diào)用鎖對象的hashcode()方法可能會導(dǎo)致走到這個邏輯
     // 如果不允許重偏向,則將對象的mark word設(shè)置為無鎖模式
    if (!allow_rebias) {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // code 1:判斷偏向線程是否還存活
  bool thread_is_alive = false;
  // 如果當前線程就是偏向線程 
  if (requesting_thread == biased_thread) {
    thread_is_alive = true;
  } else {
     // 遍歷當前jvm的所有線程,如果能找到,則說明偏向的線程還存活
    for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
      if (cur_thread == biased_thread) {
        thread_is_alive = true;
        break;
      }
    }
  }
  // 如果偏向的線程已經(jīng)不存活了
  if (!thread_is_alive) {
    // 允許重偏向則將對象mark word設(shè)置為匿名偏向狀態(tài),否則設(shè)置為無鎖狀態(tài)
    if (allow_rebias) {
      obj->set_mark(biased_prototype);
    } else {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // 線程還存活則遍歷線程棧中所有的Lock Record
  GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
  BasicLock* highest_lock = NULL;
  for (int i = 0; i < cached_monitor_info->length(); i++) {
    MonitorInfo* mon_info = cached_monitor_info->at(i);
    // 如果能找到對應(yīng)的Lock Record說明偏向的線程還在執(zhí)行同步代碼塊中的代碼
    if (mon_info->owner() == obj) {
      ...
      // 需要升級為輕量級鎖,直接修改偏向線程棧中的Lock Record。為了處理鎖重入的case,在這里將Lock Record的Displaced Mark Word設(shè)置為null,第一個Lock Record會在下面的代碼中再處理
      markOop mark = markOopDesc::encode((BasicLock*) NULL);
      highest_lock = mon_info->lock();
      highest_lock->set_displaced_header(mark);
    } else {
      ...
    }
  }
  if (highest_lock != NULL) {
    // 修改第一個Lock Record為無鎖狀態(tài),然后將obj的mark word設(shè)置為執(zhí)行該Lock Record的指針
    highest_lock->set_displaced_header(unbiased_prototype);
    obj->release_set_mark(markOopDesc::encode(highest_lock));
    ...
  } else {
    // 走到這里說明偏向線程已經(jīng)不在同步塊中了
    ...
    if (allow_rebias) {
       //設(shè)置為匿名偏向狀態(tài)
      obj->set_mark(biased_prototype);
    } else {
      // 將mark word設(shè)置為無鎖狀態(tài)
      obj->set_mark(unbiased_prototype);
    }
  }

  return BiasedLocking::BIAS_REVOKED;
}

需要注意下,當調(diào)用鎖對象的Object#hashSystem.identityHashCode()方法會導(dǎo)致該對象的偏向鎖或輕量級鎖升級。這是因為在Java中一個對象的hashcode是在調(diào)用這兩個方法時才生成的,如果是無鎖狀態(tài)則存放在mark word中,如果是重量級鎖則存放在對應(yīng)的monitor中,而偏向鎖是沒有地方能存放該信息的,所以必須升級。具體可以看這篇文章hashcode()方法對偏向鎖的影響小節(jié)(注意:該文中對于偏向鎖的加鎖描述有些錯誤),另外我也向該文章作者請教過一些問題,他很熱心的回答了我,在此感謝一下!

言歸正傳,revoke_bias方法邏輯:

  1. 查看偏向的線程是否存活,如果已經(jīng)不存活了,則直接撤銷偏向鎖。JVM維護了一個集合存放所有存活的線程,通過遍歷該集合判斷某個線程是否存活。
  2. 偏向的線程是否還在同步塊中,如果不在了,則撤銷偏向鎖。我們回顧一下偏向鎖的加鎖流程:每次進入同步塊(即執(zhí)行monitorenter)的時候都會以從高往低的順序在棧中找到第一個可用的Lock Record,將其obj字段指向鎖對象。每次解鎖(即執(zhí)行monitorexit)的時候都會將最低的一個相關(guān)Lock Record移除掉。所以可以通過遍歷線程棧中的Lock Record來判斷線程是否還在同步塊中。
  3. 將偏向線程所有相關(guān)Lock RecordDisplaced Mark Word設(shè)置為null,然后將最高位的Lock RecordDisplaced Mark Word 設(shè)置為無鎖狀態(tài),最高位的Lock Record也就是第一次獲得鎖時的Lock Record(這里的第一次是指重入獲取鎖時的第一次),然后將對象頭指向最高位的Lock Record,這里不需要用CAS指令,因為是在safepoint。 執(zhí)行完后,就升級成了輕量級鎖。原偏向線程的所有Lock Record都已經(jīng)變成輕量級鎖的狀態(tài)。這里如果看不明白,請回顧上篇文章的輕量級鎖加鎖過程。

偏向鎖的釋放

偏向鎖的釋放入口在bytecodeInterpreter.cpp#1923

CASE(_monitorexit): {
  oop lockee = STACK_OBJECT(-1);
  CHECK_NULL(lockee);
  // derefing's lockee ought to provoke implicit null check
  // find our monitor slot
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  // 從低往高遍歷棧的Lock Record
  while (most_recent != limit ) {
    // 如果Lock Record關(guān)聯(lián)的是該鎖對象
    if ((most_recent)->obj() == lockee) {
      BasicLock* lock = most_recent->lock();
      markOop header = lock->displaced_header();
      // 釋放Lock Record
      most_recent->set_obj(NULL);
      // 如果是偏向模式,僅僅釋放Lock Record就好了。否則要走輕量級鎖or重量級鎖的釋放流程
      if (!lockee->mark()->has_bias_pattern()) {
        bool call_vm = UseHeavyMonitors;
        // header!=NULL說明不是重入,則需要將Displaced Mark Word CAS到對象頭的Mark Word
        if (header != NULL || call_vm) {
          if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
            // CAS失敗或者是重量級鎖則會走到這里,先將obj還原,然后調(diào)用monitorexit方法
            most_recent->set_obj(lockee);
            CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
          }
        }
      }
      //執(zhí)行下一條命令
      UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
    }
    //處理下一條Lock Record
    most_recent++;
  }
  // Need to throw illegal monitor state exception
  CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
  ShouldNotReachHere();
}

上面的代碼結(jié)合注釋理解起來應(yīng)該不難,偏向鎖的釋放很簡單,只要將對應(yīng)Lock Record釋放就好了,而輕量級鎖則需要將Displaced Mark Word替換到對象頭的mark word中。如果CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit方法中。該方法會在輕量級與重量級鎖的文章中講解。

批量重偏向和批量撤銷

批量重偏向和批量撤銷的背景可以看上篇文章,相關(guān)實現(xiàn)在BiasedLocking::revoke_and_rebias中:

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  ...
  //code 1:重偏向的邏輯
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  // 非重偏向的邏輯
  ...
      
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");  
   //code 2:批量撤銷、批量重偏向的邏輯
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

在每次撤銷偏向鎖的時候都通過update_heuristics方法記錄下來,以類為單位,當某個類的對象撤銷偏向次數(shù)達到一定閾值的時候JVM就認為該類不適合偏向模式或者需要重新偏向另一個對象,update_heuristics就會返回HR_BULK_REVOKEHR_BULK_REBIAS。進行批量撤銷或批量重偏向。

先看update_heuristics方法。

static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
  markOop mark = o->mark();
  //如果不是偏向模式直接返回
  if (!mark->has_bias_pattern()) {
    return HR_NOT_BIASED;
  }
 
  // 鎖對象的類
  Klass* k = o->klass();
  // 當前時間
  jlong cur_time = os::javaTimeMillis();
  // 該類上一次批量撤銷的時間
  jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();
  // 該類偏向鎖撤銷的次數(shù)
  int revocation_count = k->biased_lock_revocation_count();
  // BiasedLockingBulkRebiasThreshold是重偏向閾值(默認20),BiasedLockingBulkRevokeThreshold是批量撤銷閾值(默認40),BiasedLockingDecayTime是開啟一次新的批量重偏向距離上次批量重偏向的后的延遲時間,默認25000。也就是開啟批量重偏向后,經(jīng)過了一段較長的時間(>=BiasedLockingDecayTime),撤銷計數(shù)器才超過閾值,那我們會重置計數(shù)器。
  if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
      (revocation_count <  BiasedLockingBulkRevokeThreshold) &&
      (last_bulk_revocation_time != 0) &&
      (cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {
    // This is the first revocation we've seen in a while of an
    // object of this type since the last time we performed a bulk
    // rebiasing operation. The application is allocating objects in
    // bulk which are biased toward a thread and then handing them
    // off to another thread. We can cope with this allocation
    // pattern via the bulk rebiasing mechanism so we reset the
    // klass's revocation count rather than allow it to increase
    // monotonically. If we see the need to perform another bulk
    // rebias operation later, we will, and if subsequently we see
    // many more revocation operations in a short period of time we
    // will completely disable biasing for this type.
    k->set_biased_lock_revocation_count(0);
    revocation_count = 0;
  }

  // 自增撤銷計數(shù)器
  if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
    revocation_count = k->atomic_incr_biased_lock_revocation_count();
  }
  // 如果達到批量撤銷閾值則返回HR_BULK_REVOKE
  if (revocation_count == BiasedLockingBulkRevokeThreshold) {
    return HR_BULK_REVOKE;
  }
  // 如果達到批量重偏向閾值則返回HR_BULK_REBIAS
  if (revocation_count == BiasedLockingBulkRebiasThreshold) {
    return HR_BULK_REBIAS;
  }
  // 沒有達到閾值則撤銷單個對象的鎖
  return HR_SINGLE_REVOKE;
}

當達到閾值的時候就會通過VM 線程在safepoint調(diào)用bulk_revoke_or_rebias_at_safepoint, 參數(shù)bulk_rebias如果是true代表是批量重偏向否則為批量撤銷。attempt_rebias_of_object代表對操作的鎖對象o是否運行重偏向,這里是true

static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
                                                                   bool bulk_rebias,
                                                                   bool attempt_rebias_of_object,
                                                                   JavaThread* requesting_thread) {
  ...
  jlong cur_time = os::javaTimeMillis();
  o->klass()->set_last_biased_lock_bulk_revocation_time(cur_time);


  Klass* k_o = o->klass();
  Klass* klass = k_o;

  if (bulk_rebias) {
    // 批量重偏向的邏輯
    if (klass->prototype_header()->has_bias_pattern()) {
      // 自增前類中的的epoch
      int prev_epoch = klass->prototype_header()->bias_epoch();
      // code 1:類中的epoch自增
      klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
      int cur_epoch = klass->prototype_header()->bias_epoch();

      // code 2:遍歷所有線程的棧,更新類型為該klass的所有鎖實例的epoch
      for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
        GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
        for (int i = 0; i < cached_monitor_info->length(); i++) {
          MonitorInfo* mon_info = cached_monitor_info->at(i);
          oop owner = mon_info->owner();
          markOop mark = owner->mark();
          if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
            // We might have encountered this object already in the case of recursive locking
            assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
            owner->set_mark(mark->set_bias_epoch(cur_epoch));
          }
        }
      }
    }

    // 接下來對當前鎖對象進行重偏向
    revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
  } else {
    ...

    // code 3:批量撤銷的邏輯,將類中的偏向標記關(guān)閉,markOopDesc::prototype()返回的是一個關(guān)閉偏向模式的prototype
    klass->set_prototype_header(markOopDesc::prototype());

    // code 4:遍歷所有線程的棧,撤銷該類所有鎖的偏向
    for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
      GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
      for (int i = 0; i < cached_monitor_info->length(); i++) {
        MonitorInfo* mon_info = cached_monitor_info->at(i);
        oop owner = mon_info->owner();
        markOop mark = owner->mark();
        if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
          revoke_bias(owner, false, true, requesting_thread);
        }
      }
    }

    // 撤銷當前鎖對象的偏向模式
    revoke_bias(o, false, true, requesting_thread);
  }

  ...
  
  BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;

  if (attempt_rebias_of_object &&
      o->mark()->has_bias_pattern() &&
      klass->prototype_header()->has_bias_pattern()) {
    // 構(gòu)造一個偏向請求線程的mark word
    markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
                                           klass->prototype_header()->bias_epoch());
    // 更新當前鎖對象的mark word
    o->set_mark(new_mark);
    status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
    ...
  }

  ...

  return status_code;
}

該方法分為兩個邏輯:批量重偏向和批量撤銷。

先看批量重偏向,分為兩步:

code 1 將類中的撤銷計數(shù)器自增1,之后當該類已存在的實例獲得鎖時,就會嘗試重偏向,相關(guān)邏輯在偏向鎖獲取流程小節(jié)中。

code 2 處理當前正在被使用的鎖對象,通過遍歷所有存活線程的棧,找到所有正在使用的偏向鎖對象,然后更新它們的epoch值。也就是說不會重偏向正在使用的鎖,否則會破壞鎖的線程安全性。

批量撤銷邏輯如下:

code 3將類的偏向標記關(guān)閉,之后當該類已存在的實例獲得鎖時,就會升級為輕量級鎖;該類新分配的對象的mark word則是無鎖模式。

code 4處理當前正在被使用的鎖對象,通過遍歷所有存活線程的棧,找到所有正在使用的偏向鎖對象,然后撤銷偏向鎖。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容