線程的啟動(dòng)的實(shí)現(xiàn)原理
線程停止的實(shí)現(xiàn)原理分析
為什么中斷線程會(huì)拋出InterruptedException
線程的啟動(dòng)原理
前面我們簡(jiǎn)單分析過(guò)了線程的使用,通過(guò)調(diào)用線程的start方法來(lái)啟動(dòng)線程,線程啟動(dòng)后會(huì)調(diào)用run方法執(zhí)行業(yè)務(wù)邏輯,run方法執(zhí)行完畢后,線程的生命周期也就終止了。
很多同學(xué)最早學(xué)習(xí)線程的時(shí)候會(huì)比較疑惑,啟動(dòng)一個(gè)線程為什么是調(diào)用start方法,而不是run方法,這做一個(gè)簡(jiǎn)單的分析,先簡(jiǎn)單看一下start方法的定義
public class Thread implements Runnable {
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0(); //注意這里
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();//注意這里
我們看到調(diào)用start方法實(shí)際上是調(diào)用一個(gè)native方法start0()來(lái)啟動(dòng)一個(gè)線程,首先start0()這個(gè)方法是在Thread的靜態(tài)塊中來(lái)注冊(cè)的,代碼如下
public class Thread implements Runnable {
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}
這個(gè)registerNatives的作用是注冊(cè)一些本地方法提供給Thread類(lèi)來(lái)使用,比如start0()、isAlive()、currentThread()、sleep();這些都是大家很熟悉的方法。
registerNatives的本地方法的定義在文件 Thread.c,
Thread.c定義了各個(gè)操作系統(tǒng)平臺(tái)要用的關(guān)于線程的公共數(shù)據(jù)和操作,以下是Thread.c的全部?jī)?nèi)容
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
#undef THD
#undef OBJ
#undef STE
#undef STR
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
從這段代碼可以看出,start0(),實(shí)際會(huì)執(zhí)行 JVM_StartThread方法,這個(gè)方法是干嘛的呢? 從名字上來(lái)看,似乎是在JVM層面去啟動(dòng)一個(gè)線程,如果真的是這樣,那么在JVM層面,一定會(huì)調(diào)用Java中定義的run方法。那接下來(lái)繼續(xù)去找找答案。我們找到 jvm.cpp這個(gè)文件;這個(gè)文件需要下載hotspot的源碼才能找到.
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
native_thread = new JavaThread(&thread_entry, sz);
JVM_ENTRY是用來(lái)定義 JVM_StartThread函數(shù)的,在這個(gè)函數(shù)里面創(chuàng)建了一個(gè)真正和平臺(tái)有關(guān)的本地線程. 本著打破砂鍋查到底的原則,繼續(xù)看看 newJavaThread做了什么事情,繼續(xù)尋找JavaThread的定義
在hotspot的源碼中 thread.cpp文件中1558行的位置可以找到如下代碼
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
#if INCLUDE_ALL_GCS
, _satb_mark_queue(&_satb_mark_queue_set),
_dirty_card_queue(&_dirty_card_queue_set)
#endif // INCLUDE_ALL_GCS
{
if (TraceThreadEvents) {
tty->print_cr("creating thread %p", this);
}
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
os::create_thread(this, thr_type, stack_sz);
_safepoint_visible = false;
// The _osthread may be NULL here because we ran out of memory (too many threads active).
// We need to throw and OutOfMemoryError - however we cannot do this here because the caller
// may hold a lock and all locks must be unlocked before throwing the exception (throwing
// the exception consists of creating the exception object & initializing it, initialization
// will leave the VM via a JavaCall and then all locks must be unlocked).
//
// The thread is still suspended when we reach here. Thread must be explicit started
// by creator! Furthermore, the thread must also explicitly be added to the Threads list
// by calling Threads:add. The reason why this is not done here, is because the thread
// object must be fully initialized (take a look at JVM_Start)
}
這個(gè)方法有兩個(gè)參數(shù),第一個(gè)是函數(shù)名稱(chēng),線程創(chuàng)建成功之后會(huì)根據(jù)這個(gè)函數(shù)名稱(chēng)調(diào)用對(duì)應(yīng)的函數(shù);第二個(gè)是當(dāng)前進(jìn)程內(nèi)已經(jīng)有的線程數(shù)量。最后我們重點(diǎn)關(guān)注與一下 os::create_thread,實(shí)際就是調(diào)用平臺(tái)創(chuàng)建線程的方法來(lái)創(chuàng)建線程。
接下來(lái)就是線程的啟動(dòng),會(huì)調(diào)用Thread.cpp文件中的Thread::start(Thread* thread)方法,代碼如下
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}
start方法中有一個(gè)函數(shù)調(diào)用: os::start_thread(thread);,調(diào)用平臺(tái)啟動(dòng)線程的方法,最終會(huì)調(diào)用Thread.cpp文件中的JavaThread::run()方法
// The first routine called by a new Java thread
void JavaThread::run() {
// initialize thread-local alloc buffer related fields
this->initialize_tlab();
// used to test validitity of stack trace backs
this->record_base_of_stack_pointer();
// Record real stack base and size.
this->record_stack_base_and_size();
// Initialize thread local storage; set before calling MutexLocker
this->initialize_thread_local_storage();
this->create_stack_guard_pages();
this->cache_global_variables();
// Thread is now sufficient initialized to be handled by the safepoint code as being
// in the VM. Change thread state from _thread_new to _thread_in_vm
ThreadStateTransition::transition_and_fence(this, _thread_new, _thread_in_vm);
assert(JavaThread::current() == this, "sanity check");
assert(!Thread::current()->owns_locks(), "sanity check");
DTRACE_THREAD_PROBE(start, this);
// This operation might block. We call that after all safepoint checks for a new thread has
// been completed.
this->set_active_handles(JNIHandleBlock::allocate_block());
if (JvmtiExport::should_post_thread_life()) {
JvmtiExport::post_thread_start(this);
}
EventThreadStart event;
if (event.should_commit()) {
event.set_javalangthread(java_lang_Thread::thread_id(this->threadObj()));
event.commit();
}
// We call another function to do the rest so we are sure that the stack addresses used
// from there will be lower than the stack base just computed
thread_main_inner();
// Note, thread is no longer valid at this point!
}
這個(gè)方法中主要是做一系列的初始化操作,最后有一個(gè)方法 thread_main_inner, 接下來(lái)看看這個(gè)方法的邏輯是什么樣的
void JavaThread::thread_main_inner() {
assert(JavaThread::current() == this, "sanity check");
assert(this->threadObj() != NULL, "just checking");
// Execute thread entry point unless this thread has a pending exception
// or has been stopped before starting.
// Note: Due to JVM_StopThread we can have pending exceptions already!
if (!this->has_pending_exception() &&
!java_lang_Thread::is_stillborn(this->threadObj())) {
{
ResourceMark rm(this);
this->set_native_thread_name(this->get_thread_name());
}
HandleMark hm(this);
this->entry_point()(this, this);
}
DTRACE_THREAD_PROBE(stop, this);
this->exit(false);
delete this;
}
和主流程無(wú)關(guān)的代碼咱們先不去看,直接找到最核心的代碼塊 this->entry_point()(this,this);, 這個(gè)entrypoint應(yīng)該比較熟悉了,因?yàn)槲覀冊(cè)谇懊嫣岬搅?,?:JavaThread這個(gè)方法中傳遞的第一個(gè)參數(shù),代表函數(shù)名稱(chēng),線程啟動(dòng)的時(shí)候會(huì)調(diào)用這個(gè)函數(shù)。
如果大家還沒(méi)有暈車(chē)的話,應(yīng)該記得我們?cè)趈vm.cpp文件中看到的代碼,在創(chuàng)建 native_thread=newJavaThread(&thread_entry,sz); 的時(shí)候傳遞了一個(gè)threadentry函數(shù),所以我們?cè)趈vm.cpp中找到這個(gè)函數(shù)的定義如下
static void thread_entry(JavaThread* thread, TRAPS) {
{
HandleMark hm(THREAD);
Handle obj(THREAD, thread->threadObj());
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
obj,
KlassHandle(THREAD, SystemDictionary::Thread_klass()),
vmSymbols::run_method_name(), //注意這里
vmSymbols::void_method_signature(),
THREAD);
}
可以看到 vmSymbols::run_method_name()這個(gè)調(diào)用,其實(shí)就是通過(guò)回調(diào)方法調(diào)用Java線程中定義的run方法, run_method_name是一個(gè)宏定義,在vmSymbols.hpp文件中可以找到如下代碼
#define VM_SYMBOLS_DO(template, do_alias)
...
template(run_method_name, "run")
...
所以結(jié)論就是,Java里面創(chuàng)建線程之后必須要調(diào)用start方法才能真正的創(chuàng)建一個(gè)線程,該方法會(huì)調(diào)用虛擬機(jī)啟動(dòng)一個(gè)本地線程,本地線程的創(chuàng)建會(huì)調(diào)用當(dāng)前系統(tǒng)創(chuàng)建線程的方法進(jìn)行創(chuàng)建,并且線程被執(zhí)行的時(shí)候會(huì)回調(diào) run方法進(jìn)行業(yè)務(wù)邏輯的處理
線程的終止方法及原理
線程的終止有主動(dòng)和被動(dòng)之分,被動(dòng)表示線程出現(xiàn)異常退出或者run方法執(zhí)行完畢,線程會(huì)自動(dòng)終止。主動(dòng)的方式是 Thread.stop()來(lái)實(shí)現(xiàn)線程的終止,但是stop()方法是一個(gè)過(guò)期的方法,官方是不建議使用,理由很簡(jiǎn)單,stop()方法在中介一個(gè)線程時(shí)不會(huì)保證線程的資源正常釋放,也就是不會(huì)給線程完成資源釋放工作的機(jī)會(huì),相當(dāng)于我們?cè)趌inux上通過(guò)kill -9強(qiáng)制結(jié)束一個(gè)進(jìn)程。
那么如何安全的終止一個(gè)線程呢?
我們先看一下下面的代碼,代碼演示了一個(gè)正確終止線程的方法,至于它的實(shí)現(xiàn)原理,稍后我們?cè)俜治?/p>
public class InterruptedDemo implements Runnable{
@Override
public void run() {
long i=0l;
while(!Thread.currentThread().isInterrupted()){//notice here
i++;
}
System.out.println("result:"+i);
}
public static void main(String[] args) throws InterruptedException {
InterruptedDemo interruptedDemo=new InterruptedDemo();
Thread thread=new Thread(interruptedDemo);
thread.start();
Thread.sleep(1000);//睡眠一秒
thread.interrupt();//notice here
}
}
代碼中有兩處需要注意,在main線程中,調(diào)用了線程的interrupt()方法、在run方法中,while循環(huán)中通過(guò) Thread.currentThread().isInterrupted()來(lái)判斷線程中斷的標(biāo)識(shí)。所以我們?cè)谶@里猜想一下,應(yīng)該是在線程中維護(hù)了一個(gè)中斷標(biāo)識(shí),通過(guò) thread.interrupt()方法去改變了中斷標(biāo)識(shí)的值使得run方法中while循環(huán)的判斷不成立而跳出循環(huán),因此run方法執(zhí)行完畢以后線程就終止了。
線程中斷的原理分析
我們來(lái)看一下 thread.interrupt()方法做了什么事情
public class Thread implements Runnable {
...
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
...
這個(gè)方法里面,調(diào)用了interrupt0(),這個(gè)方法在前面分析start方法的時(shí)候見(jiàn)過(guò),是一個(gè)native方法,這里就不再重復(fù)貼代碼了,同樣,我們找到j(luò)vm.cpp文件,找到JVM_Interrupt的定義
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
這個(gè)方法比較簡(jiǎn)單,直接調(diào)用了 Thread::interrupt(thr)這個(gè)方法,這個(gè)方法的定義在Thread.cpp文件中,代碼如下
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
Thread::interrupt方法調(diào)用了os::interrupt方法,這個(gè)是調(diào)用平臺(tái)的interrupt方法,這個(gè)方法的實(shí)現(xiàn)是在 os_*.cpp文件中,其中星號(hào)代表的是不同平臺(tái),因?yàn)閖vm是跨平臺(tái)的,所以對(duì)于不同的操作平臺(tái),線程的調(diào)度方式都是不一樣的。我們以os_linux.cpp文件為例
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//獲取本地線程對(duì)象
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {//判斷本地線程對(duì)象是否為中斷
osthread->set_interrupted(true);//設(shè)置中斷狀態(tài)為true
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
//這里是內(nèi)存屏障,這塊在后續(xù)的文章中會(huì)剖析;內(nèi)存屏障的目的是使得interrupted狀態(tài)對(duì)其他線程立即可見(jiàn)
OrderAccess::fence();
//_SleepEvent相當(dāng)于Thread.sleep,表示如果線程調(diào)用了sleep方法,則通過(guò)unpark喚醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//_ParkEvent用于synchronized同步塊和Object.wait(),這里相當(dāng)于也是通過(guò)unpark進(jìn)行喚醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
通過(guò)上面的代碼分析可以知道,thread.interrupt()方法實(shí)際就是設(shè)置一個(gè)interrupted狀態(tài)標(biāo)識(shí)為true、并且通過(guò)ParkEvent的unpark方法來(lái)喚醒線程。
對(duì)于synchronized阻塞的線程,被喚醒以后會(huì)繼續(xù)嘗試獲取鎖,如果失敗仍然可能被park
在調(diào)用ParkEvent的park方法之前,會(huì)先判斷線程的中斷狀態(tài),如果為true,會(huì)清除當(dāng)前線程的中斷標(biāo)識(shí)
Object.wait、Thread.sleep、Thread.join會(huì)拋出InterruptedException
這里給大家普及一個(gè)知識(shí)點(diǎn),為什么Object.wait、Thread.sleep和Thread.join都會(huì)拋出InterruptedException?首先,這個(gè)異常的意思是表示一個(gè)阻塞被其他線程中斷了。然后,由于線程調(diào)用了interrupt()中斷方法,那么Object.wait、Thread.sleep等被阻塞的線程被喚醒以后會(huì)通過(guò)is_interrupted方法判斷中斷標(biāo)識(shí)的狀態(tài)變化,如果發(fā)現(xiàn)中斷標(biāo)識(shí)為true,則先清除中斷標(biāo)識(shí),然后拋出InterruptedException
需要注意的是,InterruptedException異常的拋出并不意味著線程必須終止,而是提醒當(dāng)前線程有中斷的操作發(fā)生,至于接下來(lái)怎么處理取決于線程本身,比如
直接捕獲異常不做任何處理
將異常往外拋出
停止當(dāng)前線程,并打印異常信息
關(guān)注我的技術(shù)公眾號(hào)【架構(gòu)技術(shù)匯】一周出產(chǎn)1-2篇技術(shù)文章。
Q裙681179158分享并發(fā)編程,分布式,微服務(wù)架構(gòu),性能優(yōu)化,源碼,設(shè)計(jì)模式,高并發(fā),高可用,Spring,Netty,tomcat,JVM等技術(shù)視頻。
為了讓大家能夠更好的理解上面這段話,我們以Thread.sleep為例直接從jdk的源碼中找到中斷標(biāo)識(shí)的清除以及異常拋出的方法代碼
找到 is_interrupted()方法,linux平臺(tái)中的實(shí)現(xiàn)在os_linux.cpp文件中,代碼如下
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted(); //獲取線程的中斷標(biāo)識(shí)
if (interrupted && clear_interrupted) {//如果中斷標(biāo)識(shí)為true
osthread->set_interrupted(false);//設(shè)置中斷標(biāo)識(shí)為false
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}
找到Thread.sleep這個(gè)操作在jdk中的源碼體現(xiàn),怎么找?相信如果前面大家有認(rèn)真看的話,應(yīng)該能很快找到,代碼在jvm.cpp文件中
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//判斷并清除線程中斷狀態(tài),如果中斷狀態(tài)為true,拋出中斷異常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);
注意上面加了中文注釋的地方的代碼,先判斷is_interrupted的狀態(tài),然后拋出一個(gè)InterruptedException異常。到此為止,我們就已經(jīng)分析清楚了中斷的整個(gè)流程。
Java線程的中斷標(biāo)識(shí)判斷
了解了thread.interrupt方法的作用以后,再回過(guò)頭來(lái)看Java中 Thread.currentThread().isInterrupted()這段代碼,就很好理解了。由于前者先設(shè)置了一個(gè)中斷標(biāo)識(shí)為true,所以 isInterrupted()這個(gè)方法的返回值為true,故而不滿(mǎn)足while循環(huán)的判斷條件導(dǎo)致退出循環(huán)。
這里有必要再提一句,就是這個(gè)線程中斷標(biāo)識(shí)有兩種方式復(fù)位,第一種是前面提到過(guò)的InterruptedException;另一種是通過(guò)Thread.interrupted()對(duì)當(dāng)前線程的中斷標(biāo)識(shí)進(jìn)行復(fù)位。