Java如何實(shí)現(xiàn)線(xiàn)程堵塞

概述

Java中的鎖,object的wait、sleep都能夠堵塞線(xiàn)程,它們到底是如何實(shí)現(xiàn)的呢?

源碼分析

Java中的可重入鎖都是通過(guò)LockSupport調(diào)用Unsafe的park、unpark方法實(shí)現(xiàn)的:

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  oop obj = thread->current_park_blocker();
  if (event.should_commit()) {
    event.set_klass(obj ? obj->klass() : (klassOop)NULL);
    event.set_timeout(time);
    event.set_address(obj ? (TYPE_ADDRESS) (uintptr_t) obj : 0);
    event.commit();
  }
UNSAFE_END

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  Parker* p = NULL;
  if (jthread != NULL) {
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if (java_thread != NULL) {
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
        // This cast is OK even though the jlong might have been read
        // non-atomically on 32bit systems, since there, one word will
        // always be zero anyway and the value set is always the same
        p = (Parker*)addr_from_java(lp);
      } else {
        // Grab lock if apparently null or using older version of library
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if (thr != NULL) {
            p = thr->parker();
            if (p != NULL) { // Bind to Java thread for next time.
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if (p != NULL) {
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */
    p->unpark();
  }
UNSAFE_END

可以看到底層都是通過(guò)調(diào)用thread->parker()實(shí)現(xiàn)的,而根據(jù)之前Thread的源碼分析,Thread.sleep也是采用類(lèi)似方法實(shí)現(xiàn)的;

Pakrer對(duì)象

在JavaThread中定義了成員變量:

private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }

void JavaThread::initialize() {
  // Initialize fields
  _parker = Parker::Allocate(this) ;
}

這里的_parker是Parker實(shí)例,繼承自os::PlatformParker ,其park方法實(shí)現(xiàn)如下:

void os::PlatformEvent::park() { 
//_Event是個(gè)int變量,如果CAS更新成功,即成功將_Event減1,則退出循環(huán)
  int v ;
  for (;;) {
      v = _Event ;
      if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  }
  guarantee (v >= 0, "invariant") ;
 //v=_Event,v=0表示無(wú)許可,則需要堵塞等待獲得許可;
  if (v == 0) {
     int status = pthread_mutex_lock(_mutex);//獲取鎖
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
    //等待許可,調(diào)用pthread_cond_wait進(jìn)行等待
     while (_Event < 0) {
       //pthread_cond_wait會(huì)加入等待隊(duì)列,同時(shí)釋放_(tái)mutex鎖,
      //等待signal方法喚醒,喚醒之后需要重新獲取_mutex鎖,方法才能返回
        status = pthread_cond_wait(_cond, _mutex);
        if (status == ETIME) { status = EINTR; }
        assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;
     //獲取許可之后,設(shè)置可用許可數(shù)為0;由此可見(jiàn)許可數(shù)最大為1
    _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);//釋放鎖
     assert_status(status == 0, status, "mutex_unlock");
    OrderAccess::fence();//內(nèi)存屏障語(yǔ)句
  }
  guarantee (_Event >= 0, "invariant") ;
}
void os::PlatformEvent::unpark() {
 //使用原子方法xchg將1放入寄存器,與_Event所指的內(nèi)容交換,即_Event=1,然后返回_Event原先的值,
 //如果返回值大于等于0,表示有許可有用,方法直接返回;
  if (Atomic::xchg(1, &_Event) >= 0) return;
 //如果原來(lái)的_Event小于0,說(shuō)明有park方法進(jìn)入了pthread_cond_wait堵塞
  int status = pthread_mutex_lock(_mutex);//獲取鎖
  assert_status(status == 0, status, "mutex_lock");
  int AnyWaiters = _nParked;
  assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
   //NPTL存在瑕疵,當(dāng)pthread_cond_timedwait() 方法時(shí)間參數(shù)為過(guò)去時(shí)間,
   //會(huì)導(dǎo)致_cond變量被破壞或者線(xiàn)程被hang?。?  if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
    AnyWaiters = 0;
    //調(diào)用signal喚醒pthread_cond_wait調(diào)用線(xiàn)程,不判斷方法執(zhí)行結(jié)果
    pthread_cond_signal(_cond);
  }
  //然后釋放_(tái)mutex鎖
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock");
  if (AnyWaiters != 0) {
   // //調(diào)用signal喚醒pthread_cond_wait調(diào)用線(xiàn)程,并判斷方法執(zhí)行結(jié)果
    status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }
}

這個(gè)地方要說(shuō)明下,如果WorkAroundNPTLTimedWaitHang=true,會(huì)先調(diào)用signal,然后釋放鎖;如果WorkAroundNPTLTimedWaitHang=false,會(huì)先釋放鎖,然后調(diào)用signal;

  • 先signal,后釋放鎖 :signal之后,等待線(xiàn)程可以馬上運(yùn)行,但由于無(wú)法獲取鎖,會(huì)馬上進(jìn)入waiting狀態(tài);
  • 先釋放鎖,后signal:釋放鎖之后,如果有等待線(xiàn)程,可能pthread_cond_signal還沒(méi)運(yùn)行就發(fā)生了線(xiàn)程切換;在極少的情況下,由于pthread_cond_wait有spurious wakeup(偽喚醒)問(wèn)題,可能導(dǎo)致park方法提前返回,這個(gè)時(shí)候使用者需要判斷cond的狀態(tài),再次調(diào)用park;

關(guān)于WorkAroundNPTLTimedWaitHang還是有些疑問(wèn),后續(xù)繼續(xù)探討;

Atomic方法

在上面的源碼中,使用到了Atomic的xchg和cmpxchg方法,這兩個(gè)方法是采用匯編實(shí)現(xiàn)的:

匯編方法的語(yǔ)法如下:

asm ( assembler template  
    : output operands                   (optional)  
    : input operands                    (optional)  
    : clobbered registers list          (optional)  
    );  

output operands和inpupt operands指定參數(shù),它們從左到右依次排列,用','分割,編號(hào)從0開(kāi)始;

inline jint     Atomic::xchg    (jint     exchange_value, volatile jint*     dest) {
  __asm__ volatile (  "xchgl (%2),%0"
                    : "=r" (exchange_value)
                    : "0" (exchange_value), "r" (dest)
                    : "memory");
  return exchange_value;
}

將exchange_value(%0)的值放入通用寄存器,與dest(%2)所指的內(nèi)容進(jìn)行交換,返回dest指針原指向內(nèi)容的值;

  • %0為exchange_value,%1為exchange_value,%2為dest;
  • "r"表示將dest的值讀到一個(gè)通用寄存器;
  • "0"表示和%0使用同樣的通用寄存器,此處表示將exchange_value值讀入通用寄存器;
  • "=r"表示將結(jié)果寫(xiě)入到exchange_value,而且要使用通用寄存器,由于通用寄存器的內(nèi)容已經(jīng)被設(shè)置為dest所指向的內(nèi)容,因此exchange_value等于原dest所指向的內(nèi)容;
  • asm指示編譯器在此插入?yún)R編語(yǔ)句;
  • volatile告訴編譯器,嚴(yán)禁將此處的匯編語(yǔ)句與其它的語(yǔ)句重組合優(yōu)化,即原原本本按原來(lái)的樣子處理這里的匯編;
  • memory強(qiáng)制gcc編譯器假設(shè)RAM所有內(nèi)存單元均被匯編指令修改,這樣cpu中的registers和cache中已緩存的內(nèi)存單元中的數(shù)據(jù)將作廢。cpu將不得不在需要的時(shí)候重新讀取內(nèi)存中的數(shù)據(jù)。
int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
static inline bool is_MP() {
    assert(_processor_count > 0, "invalid processor count");
    return _processor_count > 1;
  }
  • mp表示是否屬于多核cpu環(huán)境,如果是則LOCK_IF_MP會(huì)插入lock指令;
  • %0為exchange_value,%1為exchange_value,%2為compare_value,%3為dest,%4為mp;
  • "a" (compare_value)表示將compare_value讀入eax寄存器,與%3(dest)進(jìn)行比較,如果相等則將%1(exchange_value)寫(xiě)入dest;
  • "=a" (exchange_value)表示將eax寄存器的內(nèi)容寫(xiě)入到exchange_value;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容