Java線程源碼解析之start

概述

Java開發(fā)中,會經(jīng)常使用到多線程,有必要深入了解其實現(xiàn)原理;

創(chuàng)建Thread

java.lang.Thread主要的成員變量如下:

    private char        name[];//線程名稱
    private int         priority;//優(yōu)先級
    private volatile int threadStatus = 0;//線程狀態(tài)
    private boolean     daemon = false;//是否后臺線程
    private Runnable target;//線程執(zhí)行的邏輯
   //每個線程都有一個ThreadLocalMap的成員變量,類似hashmap
   //有興趣深入了解的可以閱讀文章《ThreadLocal源碼閱讀》
    ThreadLocal.ThreadLocalMap threadLocals = null;
    private long        eetop;//實際上是個指針,指向JavaThread的地址

創(chuàng)建Thread對象時,實際上調(diào)用的是init方法,方法邏輯比較簡單,這里就不詳細(xì)介紹了。

start

我們都知道啟動線程要調(diào)用start方法,那么start方法里面都做了些什么呢?

public synchronized void start() {
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        group.add(this);
        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
            }
        }
    }

    private native void start0();

可以看到主要的邏輯都是通過native方法star0實現(xiàn)的, 查看Thread.c文件可以知道,它實際上調(diào)用的是jvm.cpp文件的JVM_StartThread方法:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  bool throw_illegal_thread_state = false;
  {
   // Threads_lock代表在活動線程表上的鎖,MutexLocker會調(diào)用lock方法上鎖
    MutexLocker mu(Threads_lock);
    //實際上是判斷java.lang.Thread的eetop,正常情況下,在后續(xù)步驟中會賦值,但在此處為0,不指向任何對象;
    //其實在start方法中已經(jīng)根據(jù)threadStatus進(jìn)行了判斷,但是由于創(chuàng)建線程對象和更新threadStatus并不是原子操作,因而再次check
    if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;//狀態(tài)錯誤,返回
    } else {
     //在java.lang.Thread的init方法中,可設(shè)置stack的大小,此處獲取設(shè)置的大??;
     //不過通常調(diào)用構(gòu)造函數(shù)的時候都不會傳入stack大小,size=0
      jlong size =
        java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      size_t sz = size > 0 ? (size_t) size : 0;
     //在下面的[創(chuàng)建JavaThread]介紹
      native_thread = new JavaThread(&thread_entry, sz);
      if (native_thread->osthread() != NULL) {
        native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  //Java線程實際上是通過系統(tǒng)線程實現(xiàn)的,如果創(chuàng)建系統(tǒng)線程失敗,報錯;
  //有很多原因會導(dǎo)致該錯誤:比如內(nèi)存不足、max user processes設(shè)置過小 
  if (native_thread->osthread() == NULL) {
    delete native_thread;
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        "unable to create new native thread");
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              "unable to create new native thread");
  }
  //在下面的[創(chuàng)建JavaThread]介紹
  Thread::start(native_thread);

JVM_END

創(chuàng)建JavaThread

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
  Thread()
#ifndef SERIALGC
  , _satb_mark_queue(&_satb_mark_queue_set),
  _dirty_card_queue(&_dirty_card_queue_set)
#endif // !SERIALGC
{
  if (TraceThreadEvents) {
    tty->print_cr("creating thread %p", this);
  }
  initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
 
  os::ThreadType thr_type = os::java_thread;
  //根據(jù)entry_point判斷是CompilerThread還是JavaThread
  //由于此處傳入的為&thread_entry,因此為os::java_thread
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                             os::java_thread;
 // os線程有可能創(chuàng)建失敗,在上文已經(jīng)看到對該場景的處理
  os::create_thread(this, thr_type, stack_sz);
  _safepoint_visible = false;
}

可以看到JavaThread繼承自Thread,而Thread重載了operator new:

public:
  void* operator new(size_t size) { return allocate(size, true); }
  void* operator new(size_t size, std::nothrow_t& nothrow_constant) { return allocate(size, false); }
  void  operator delete(void* p);

 protected:
   static void* allocate(size_t size, bool throw_excpt, MEMFLAGS flags = mtThread);

關(guān)于operator new描述如下:

operator new可以做為常規(guī)函數(shù)被調(diào)用;在C ++中,new是一個具有特定行為的操作符:它首先調(diào)用operator new函數(shù),用其類型說明符的大小作為第一個參數(shù),如果調(diào)用成功,則自動初始化或構(gòu)造對象。

allocate定義如下:

void* Thread::allocate(size_t size, bool throw_excpt, MEMFLAGS flags) {
  if (UseBiasedLocking) {
  //alignment=2<<10,對于偏向鎖,地址要對齊,即低10位為0
  //根據(jù)偏向鎖的實現(xiàn),要求線程指向地址的低10位為0,那么該如何實現(xiàn)呢?
  //可以看到此處在申請內(nèi)存的時候,對原申請大小做了調(diào)整;
  //假設(shè)申請到到地址為0xA11CA(低10位為0111001010),則0xA2000是滿足條件的地址,這兩地址間相差0x0E36(小于1<<10),
  //也就是說原申請內(nèi)存大?。玜lignment,則可以指針后移,找到符合條件的地址;
  //那為什么要減去sizeof(intptr_t)?因為指針后移最多為alignment-1,即可找到滿足條件的地址;
  //而第一位存儲的是_real_malloc_address,占用內(nèi)存空間為sizeof(intptr_t)
  //sizeof(intptr_t)是為了跨平臺定義的類型,在64位平臺下為8bytes,32位平臺為4bytes;
    const int alignment = markOopDesc::biased_lock_alignment;
    size_t aligned_size = size + (alignment - sizeof(intptr_t));
   //throw_excpt傳入為true
    void* real_malloc_addr = throw_excpt? AllocateHeap(aligned_size, flags, CURRENT_PC)
                                          : os::malloc(aligned_size, flags, CURRENT_PC);
    void* aligned_addr     = (void*) align_size_up((intptr_t) real_malloc_addr, alignment);
    assert(((uintptr_t) aligned_addr + (uintptr_t) size) <=
           ((uintptr_t) real_malloc_addr + (uintptr_t) aligned_size),
           "JavaThread alignment code overflowed allocated storage");
    if (TraceBiasedLocking) {
      if (aligned_addr != real_malloc_addr)
        tty->print_cr("Aligned thread " INTPTR_FORMAT " to " INTPTR_FORMAT,
                      real_malloc_addr, aligned_addr);
    }
    ((Thread*) aligned_addr)->_real_malloc_address = real_malloc_addr;
    return aligned_addr;
  } else {
    return throw_excpt? AllocateHeap(size, flags, CURRENT_PC)
                       : os::malloc(size, flags, CURRENT_PC);
  }
}
//alignment-1,再取反,即變成0xfffffffffffff800(低10位為0),該方法的效果相當(dāng)于將低10位變?yōu)?
#define align_size_up_(size, alignment) (((size) + ((alignment) - 1)) & ~((alignment) - 1))

inline intptr_t align_size_up(intptr_t size, intptr_t alignment) {
  return align_size_up_(size, alignment);
}

initialize主要是做各種初始化,這邊就不詳細(xì)介紹了;
JavaThred中有幾個成員變量比較重要:

 //用于synchronized同步塊和Object.wait()
 ParkEvent * _ParkEvent ;  
 //用于Thread.sleep()
  ParkEvent * _SleepEvent ;
 //用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport調(diào)用,
 //因此它支持了java.util.concurrent的各種鎖、條件變量等線程同步操作,是concurrent的實現(xiàn)基礎(chǔ)
  Parker*    _parker;

os::create_thread方法邏輯如下:

bool os::create_thread(Thread* thread, ThreadType thr_type, size_t stack_size) {
  assert(thread->osthread() == NULL, "caller responsible");

  // 創(chuàng)建OSThread
  OSThread* osthread = new OSThread(NULL, NULL);
  if (osthread == NULL) {
    return false;
  }
  //設(shè)置線程類型
  osthread->set_thread_type(thr_type);

  // 初始化狀態(tài)為ALLOCATED
  osthread->set_state(ALLOCATED);
  thread->set_osthread(osthread);
  //linux下可以通過pthread_attr_t來設(shè)置線程屬性
  pthread_attr_t attr;
  pthread_attr_init(&attr);//linux系統(tǒng)調(diào)用,更多內(nèi)容請參考內(nèi)核文檔
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

  // 線程棧大小
  if (os::Linux::supports_variable_stack_size()) {//是否支持設(shè)置棧大小
    //如果用戶創(chuàng)建線程時未指定棧大小,對于JavaThread會看是否設(shè)置了-Xss或ThreadStackSize;
    //如果未設(shè)置,則采用系統(tǒng)默認(rèn)值。對于64位操作系統(tǒng),默認(rèn)為1M;
    //操作系統(tǒng)棧大?。╱limit -s):這個配置只影響進(jìn)程的初始線程;后續(xù)用pthread_create創(chuàng)建的線程都可以指定棧大小。
    //HotSpot VM為了能精確控制Java線程的棧大小,特意不使用進(jìn)程的初始線程(primordial thread)作為Java線程
    if (stack_size == 0) {
      stack_size = os::Linux::default_stack_size(thr_type);
      switch (thr_type) {
      case os::java_thread:
      //讀取Xss和ThreadStackSize
        assert (JavaThread::stack_size_at_create() > 0, "this should be set");
        stack_size = JavaThread::stack_size_at_create();
        break;
      case os::compiler_thread:
        if (CompilerThreadStackSize > 0) {
          stack_size = (size_t)(CompilerThreadStackSize * K);
          break;
        } // else fall through:
          // use VMThreadStackSize if CompilerThreadStackSize is not defined
      case os::vm_thread:
      case os::pgc_thread:
      case os::cgc_thread:
      case os::watcher_thread:
        if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K);
        break;
      }
    }
   //棧最小為48k
    stack_size = MAX2(stack_size, os::Linux::min_stack_allowed);
    pthread_attr_setstacksize(&attr, stack_size);
  } else {
    // let pthread_create() pick the default value.
  }

  pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));

  ThreadState state;
  {
   //如果linux線程而且不支持設(shè)置棧大小,則先獲取創(chuàng)建線程鎖,獲取鎖之后再創(chuàng)建線程
    bool lock = os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack();
    if (lock) {
      os::Linux::createThread_lock()->lock_without_safepoint_check();
    }

    pthread_t tid;
    //調(diào)用linux的pthread_create創(chuàng)建線程,傳入4個參數(shù)
    //第一個參數(shù):指向線程標(biāo)示符pthread_t的指針;
    //第二個參數(shù):設(shè)置線程的屬性
    //第三個參數(shù):線程運(yùn)行函數(shù)的起始地址
    //第四個參數(shù):運(yùn)行函數(shù)的參數(shù)
    int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
    pthread_attr_destroy(&attr);
    if (ret != 0) {//創(chuàng)建失敗,做清理工作
      if (PrintMiscellaneous && (Verbose || WizardMode)) {
        perror("pthread_create()");
      }
      // Need to clean up stuff we've allocated so far
      thread->set_osthread(NULL);
      delete osthread;
      if (lock) os::Linux::createThread_lock()->unlock();
      return false;
    }

    // 將pthread id保存到osthread
    osthread->set_pthread_id(tid);

    // 等待pthread_create創(chuàng)建的子線程完成初始化或放棄
    {
      Monitor* sync_with_child = osthread->startThread_lock();
      MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag);
      while ((state = osthread->get_state()) == ALLOCATED) {
        sync_with_child->wait(Mutex::_no_safepoint_check_flag);
      }
    }

    if (lock) {
      os::Linux::createThread_lock()->unlock();
    }
  }

  // Aborted due to thread limit being reached
  if (state == ZOMBIE) {
      thread->set_osthread(NULL);
      delete osthread;
      return false;
  }

  // The thread is returned suspended (in state INITIALIZED),
  // and is started higher up in the call chain
  assert(state == INITIALIZED, "race condition");
  return true;
}

創(chuàng)建線程時傳入了java_start,做為線程運(yùn)行函數(shù)的初始地址:

static void *java_start(Thread *thread) {
  static int counter = 0;
  int pid = os::current_process_id();
 //alloca是用來分配存儲空間的,它和malloc的區(qū)別是它是在當(dāng)前函數(shù)的棧上分配存儲空間,而不是在堆中。
 //其優(yōu)點是:當(dāng)函數(shù)返回時,自動釋放它所使用的棧。
  alloca(((pid ^ counter++) & 7) * 128);

  ThreadLocalStorage::set_thread(thread);

  OSThread* osthread = thread->osthread();
  Monitor* sync = osthread->startThread_lock();

  // non floating stack LinuxThreads needs extra check, see above
  if (!_thread_safety_check(thread)) {
    // notify parent thread
    MutexLockerEx ml(sync, Mutex::_no_safepoint_check_flag);
    osthread->set_state(ZOMBIE);
    sync->notify_all();
    return NULL;
  }

  // thread_id is kernel thread id (similar to Solaris LWP id)
  osthread->set_thread_id(os::Linux::gettid());

 //優(yōu)先嘗試在請求線程當(dāng)前所處的CPU的Local內(nèi)存上分配空間。
 //如果local內(nèi)存不足,優(yōu)先淘汰local內(nèi)存中無用的Page
  if (UseNUMA) {//默認(rèn)為false
    int lgrp_id = os::numa_get_group_id();
    if (lgrp_id != -1) {
      thread->set_lgrp_id(lgrp_id);
    }
  }
  // 調(diào)用pthread_sigmask初始化signal mask:VM線程處理BREAK_SIGNAL信號
  os::Linux::hotspot_sigmask(thread);

  // initialize floating point control register
  os::Linux::init_thread_fpu_state();

  // handshaking with parent thread
  {
    MutexLockerEx ml(sync, Mutex::_no_safepoint_check_flag);

    // 設(shè)置狀態(tài)會INITIALIZED,并通過notify_all喚醒父線程
    osthread->set_state(INITIALIZED);
    sync->notify_all();

    // 一直等待父線程調(diào)用 os::start_thread()
    while (osthread->get_state() == INITIALIZED) {
      sync->wait(Mutex::_no_safepoint_check_flag);
    }
  }

  // call one more level start routine
  thread->run();

  return 0;
}

當(dāng)子線程完成初始化,將狀態(tài)設(shè)置為NITIALIZED并喚醒父線程之后,父線程會執(zhí)行Thread::start方法:

void Thread::start(Thread* thread) {
  trace("start", thread);
  if (!DisableStartThread) {
    if (thread->is_Java_thread()) {//設(shè)置線程狀態(tài)為RUNNABLE
      java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
                                          java_lang_Thread::RUNNABLE);
    }
    os::start_thread(thread);
  }
}
void os::start_thread(Thread* thread) {
  MutexLockerEx ml(thread->SR_lock(), Mutex::_no_safepoint_check_flag);
  OSThread* osthread = thread->osthread();
//設(shè)置線程狀態(tài)為RUNNABLE, 子線程可以開始執(zhí)行thread->run()
  osthread->set_state(RUNNABLE);
  pd_start_thread(thread);
}

void os::pd_start_thread(Thread* thread) {
  OSThread * osthread = thread->osthread();
  assert(osthread->get_state() != INITIALIZED, "just checking");
  Monitor* sync_with_child = osthread->startThread_lock();
  MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag);
  sync_with_child->notify();//父線程會調(diào)用thread->run();
}

父線程的thread->run主要邏輯為調(diào)用thread_main_inner,源碼如下:

 void JavaThread::thread_main_inner() {//刪除部分非關(guān)鍵代碼
  if (!this->has_pending_exception() &&
      !java_lang_Thread::is_stillborn(this->threadObj())) {
    {
      ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    this->entry_point()(this, this);
  }
  this->exit(false);
  delete this;
}

那這兒的entry_point是什么呢?實際上在創(chuàng)建JavaThread時,會傳入entrypoint:

static void thread_entry(JavaThread* thread, TRAPS) {
  HandleMark hm(THREAD);
  Handle obj(THREAD, thread->threadObj());
  JavaValue result(T_VOID);
  JavaCalls::call_virtual(&result,
                          obj,
                          KlassHandle(THREAD, SystemDictionary::Thread_klass()),
                          vmSymbols::run_method_name(),
                          vmSymbols::void_method_signature(),
                          THREAD);
}

可以看到實際上就是調(diào)用java.lang.Thread的run方法;

另外當(dāng)run方法執(zhí)行結(jié)束,會調(diào)用JavaThread::exit方法清理資源

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法,內(nèi)部類的語法,繼承相關(guān)的語法,異常的語法,線程的語...
    子非魚_t_閱讀 34,734評論 18 399
  • 本文主要講了java中多線程的使用方法、線程同步、線程數(shù)據(jù)傳遞、線程狀態(tài)及相應(yīng)的一些線程函數(shù)用法、概述等。 首先講...
    李欣陽閱讀 2,599評論 1 15
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,578評論 19 139
  • Java多線程學(xué)習(xí) [-] 一擴(kuò)展javalangThread類 二實現(xiàn)javalangRunnable接口 三T...
    影馳閱讀 3,111評論 1 18
  • 1、拒絕購買旅游紀(jì)念品; 2、至少保留一層書架或抽屜是空的; 3、盡量減少臺面上的物品,不是每天都用的東西收起來,...
    企鵝的北極星閱讀 760評論 0 0

友情鏈接更多精彩內(nèi)容