1.線程的應(yīng)用
如何應(yīng)用多線程
在 Java 中,有多種方式來(lái)實(shí)現(xiàn)多線程。繼承 Thread 類、實(shí)現(xiàn) Runnable 接口、使用 ExecutorService、Callable、Future 實(shí)現(xiàn)帶返回結(jié)果的多線程。
package com.sunkang.use;
import java.util.concurrent.*;
/**
* @Project: ThreadExample
* @description: 線程的創(chuàng)建方式
* @author: sunkang
* @create: 2020-06-21 21:38
* @ModificationHistory who when What
**/
public class ThreadDemo extends Thread {
//通過(guò)繼承Thread,復(fù)寫run方法
@Override
public void run() {
System.out.println("1.通過(guò)繼承Thread來(lái)創(chuàng)建線程 ");
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//方式一:通過(guò)繼承Thread來(lái)創(chuàng)建線程
ThreadDemo threadDemo = new ThreadDemo();
threadDemo.start();
//方式二:通過(guò)實(shí)現(xiàn)Runnable接口創(chuàng)建線程
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("2.通過(guò)實(shí)現(xiàn)Runnable接口創(chuàng)建線程 ");
}
}).start();
//方式三:通過(guò)實(shí)現(xiàn)Callable接口來(lái)創(chuàng)建帶有返回值的線程
ExecutorService excutorService = Executors.newFixedThreadPool(1);
Future<String> future= excutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "3.通過(guò)實(shí)現(xiàn)Callable接口來(lái)創(chuàng)建帶有返回值的線程";
}
});
System.out.println(future.get());
excutorService.shutdown();
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "3.通過(guò)實(shí)現(xiàn)Callable接口來(lái)創(chuàng)建帶有返回值的線程";
}
});
new Thread(futureTask).start();
System.out.println( futureTask.get());
}
}
2.多線程的實(shí)際應(yīng)用
其實(shí)大家在工作中應(yīng)該很少有場(chǎng)景能夠應(yīng)用多線程了,因?yàn)榛跇I(yè)務(wù)開發(fā)來(lái)說(shuō),很多使用異步的場(chǎng)景我們都通過(guò)分布式消息隊(duì)列來(lái)做了。但并不是說(shuō)多線程就不會(huì)被用到,你們?nèi)绻锌匆恍┛蚣艿脑创a,會(huì)發(fā)現(xiàn)線程的使用無(wú)處不在
之前我應(yīng)用得比較多的場(chǎng)景是在做文件跑批,每天會(huì)有一些比如收益文件、對(duì)賬文件,我們會(huì)有一個(gè)定時(shí)任務(wù)去拿到數(shù)據(jù)然后通過(guò)線程去處理
一般引用多線程的場(chǎng)景基本是為了支持異步化操作,可以使用多線程,比如處理不同的任務(wù),比如銀行系統(tǒng)的財(cái)務(wù)在日終的時(shí)候要做一些日終的處理任務(wù),這時(shí)候是可以應(yīng)用到多線程
如果有看過(guò)zookeeper 源碼的時(shí)候看到一個(gè)比較有意思的異步責(zé)任鏈模式,這里邊我模仿一種責(zé)任鏈異步化處理的方式,
圖中列舉了三種處理器,
PreCheckProcessor :可認(rèn)為是前置檢查處理器,檢查不通過(guò),后置處理器不處理
LogProcessor:日志記錄處理器
SavaProcessor:存儲(chǔ)數(shù)據(jù)處理器
這里改善了鏈?zhǔn)教幚?,增加了?duì)列的方式對(duì)請(qǐng)求進(jìn)行了異步的處理,實(shí)現(xiàn)了更高的并發(fā)性

代碼如下:
請(qǐng)求上下文
/**
* @Project: ThreadExample
* @description: 請(qǐng)求上下文,可以存儲(chǔ)很多請(qǐng)求相關(guān)的上下文
* @author: sunkang
* @create: 2020-06-21 22:04
* @ModificationHistory who when What
**/
public class RequestContext {
private String requestId;
private Map<String,String> data;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Map<String, String> getData() {
return data;
}
public void setData(Map<String, String> data) {
this.data = data;
}
@Override
public String toString() {
return "RequestContext{" +
"requestId='" + requestId + '\'' +
", data=" + data +
'}';
}
}
處理器接口
/**
* @Project: ThreadExample
* @description: 處理器接口
* @author: sunkang
* @create: 2020-06-21 22:04
* @ModificationHistory who when What
**/
public abstract class IRequestProcessor extends Thread{
void process(RequestContext context) {
}
}
前置檢查處理器
/**
* @Project: ThreadExample
* @description: 前置檢查處理器
* @author: sunkang
* @create: 2020-06-21 22:07
* @ModificationHistory who when What
**/
public class PreCheckProcessor extends IRequestProcessor {
private volatile boolean isFinish = false;
private IRequestProcessor nextProcessor;
private LinkedBlockingQueue<RequestContext> requests = new LinkedBlockingQueue<>();
public PreCheckProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
public void shutdown(){
isFinish =true;
}
@Override
public void process(RequestContext context) {
//TODO 做些前置檢查
//放入阻塞隊(duì)列中
requests.add(context);
}
@Override
public void run() {
while(!isFinish){ //不建議這么寫
try {
RequestContext request=requests.take();//阻塞式獲取數(shù)據(jù)
//真正的處理邏輯
System.out.println("prevProcessor:"+request);
//交給下一個(gè)責(zé)任鏈
if(nextProcessor != null){
nextProcessor.process(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
打印日志信息處理器
/**
* @Project: ThreadExample
* @description: 主要是打印日志信息
* @author: sunkang
* @create: 2020-06-21 22:16
* @ModificationHistory who when What
**/
public class LogProcessor extends IRequestProcessor {
private volatile boolean isFinish = false;
private IRequestProcessor nextProcessor;
private LinkedBlockingQueue<RequestContext> requests = new LinkedBlockingQueue<>();
public LogProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
public void shutdown(){
isFinish =true;
}
@Override
public void process(RequestContext context) {
//TODO 按照實(shí)際業(yè)務(wù)來(lái)寫
//放入阻塞隊(duì)列中
requests.add(context);
}
@Override
public void run() {
while(!isFinish){ //不建議這么寫
try {
RequestContext request=requests.take();//阻塞式獲取數(shù)據(jù)
//真正的處理邏輯
System.out.println("LogProcessor:"+request);
//交給下一個(gè)責(zé)任鏈
//交給下一個(gè)責(zé)任鏈
if(nextProcessor != null){
nextProcessor.process(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
存儲(chǔ)處理器
這里的偽代碼一直,就不在展示
啟動(dòng)入口
/**
* @Project: ThreadExample
* @description:程序入口
* @author: sunkang
* @create: 2020-06-21 22:19
* @ModificationHistory who when What
**/
public class App {
public static void main(String[] args) {
//初始化責(zé)任鏈,并啟動(dòng)線程
IRequestProcessor logProcessor = new LogProcessor(null);
logProcessor.start();
IRequestProcessor preCheckProcessor = new PreCheckProcessor(logProcessor);
preCheckProcessor.start();
//構(gòu)造請(qǐng)求
RequestContext requestContext = new RequestContext();
requestContext.setRequestId("request");
//鏈?zhǔn)教幚碚?qǐng)求
preCheckProcessor.process(requestContext);
}
}
3.java并發(fā)編程的基礎(chǔ)
基本應(yīng)用搞清楚以后,我們?cè)賮?lái)基于Java線程的基礎(chǔ)切入,來(lái)逐步去深入挖掘線程的整體模型。
線程的生命周期
Java 線程既然能夠創(chuàng)建,那么也勢(shì)必會(huì)被銷毀,所以線程是存在生命周期的,那么我們接下來(lái)從線程的生命周期開始去了解線程。
線程一共有 6 種狀態(tài)(NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED)
NEW:初始狀態(tài),線程被構(gòu)建,但是還沒(méi)有調(diào)用 start 方法
RUNNABLED:運(yùn)行狀態(tài),JAVA 線程把操作系統(tǒng)中的就緒(READY)和運(yùn)行(RUNNING)兩種狀態(tài)統(tǒng)一稱為“運(yùn)行中”
BLOCKED:阻塞狀態(tài),表示線程進(jìn)入等待狀態(tài),也就是線程因?yàn)槟撤N原因放棄了 CPU 使用權(quán),阻塞也分為幾種情況
等待阻塞:運(yùn)行的線程執(zhí)行 wait 方法,jvm 會(huì)把當(dāng)前線程放入到等待隊(duì)列
同步阻塞:運(yùn)行的線程在獲取對(duì)象的同步鎖時(shí),若該同步鎖被其他線程鎖占用了,那么 jvm 會(huì)把當(dāng)前的線程放入到鎖池中
其他阻塞:運(yùn)行的線程執(zhí)行 Thread.sleep 或者 t.join 方法,或者發(fā)出了 I/O 請(qǐng)求時(shí),JVM 會(huì)把當(dāng)前線程設(shè)置為阻塞狀態(tài),當(dāng) sleep 結(jié)束、join 線程終止、io 處理完畢則線程恢復(fù)
TIME_WAITING:超時(shí)等待狀態(tài),超時(shí)以后自動(dòng)返回
TERMINATED:終止?fàn)顟B(tài),表示當(dāng)前線程執(zhí)行完畢

通過(guò)代碼的方式來(lái)演示線程的狀態(tài)
代碼示例
package com.sunkang.use;
import java.util.concurrent.TimeUnit;
/**
* @Project: ThreadExample
* @description: 線程的狀態(tài)演示
* @author: sunkang
* @create: 2020-06-21 22:54
* @ModificationHistory who when What
**/
public class ThreadStatusDemo {
public static void main(String[] args) {
//Time_Waiting的狀態(tài)演示
new Thread(()->{
while(true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Time_Waiting_Thread").start();
//WAITINaG的狀態(tài)演示
new Thread(()->{
while(true){
synchronized (ThreadStatusDemo.class) {
try {
ThreadStatusDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"Waiting_Thread").start();
//RUNNING的狀態(tài)演示
new Thread(()->{
for(;;);
},"Running_Thread").start();
//BLOCKED的狀態(tài)演示
new Thread(new BlockedDemo(),"Block01_Thread").start();
new Thread(new BlockedDemo(),"Block02_Thread").start();
}
static class BlockedDemo implements Runnable{
@Override
public void run() {
synchronized (BlockedDemo.class){
while(true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
啟動(dòng)一個(gè)線程前,最好為這個(gè)線程設(shè)置線程名稱,因?yàn)檫@樣在使用 jstack 分析程序或者進(jìn)行問(wèn)題排查時(shí),就會(huì)給開發(fā)人員提供一些提示顯示線程的狀態(tài)
運(yùn)行該示例,打開終端或者命令提示符,鍵入“jps”, (JDK1.5 提供的一個(gè)顯示當(dāng)前所有 java 進(jìn)程 pid 的命
令)
根據(jù)上一步驟獲得的 pid,繼續(xù)輸入 jstack pid(jstack是 java 虛擬機(jī)自帶的一種堆棧跟蹤工具。jstack 用于打印出給定的 java 進(jìn)程 ID 或 core file 或遠(yuǎn)程調(diào)試服務(wù)的 Java 堆棧信息)
下面的jstack的日志輸出的關(guān)鍵信息
"Block02_Thread" #16 prio=5 os_prio=0 tid=0x0000000018dcb000 nid=0x3fb8 waiting on condition [0x0000000019b3e000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.sunkang.use.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:58)
- locked <0x00000000d62b91a0> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo$BlockedDemo)
at java.lang.Thread.run(Thread.java:745)
"Block01_Thread" #15 prio=5 os_prio=0 tid=0x0000000018dca000 nid=0x3b6c waiting for monitor entry [0x0000000019a3f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.sunkang.use.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:58)
- waiting to lock <0x00000000d62b91a0> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo$BlockedDemo)
at java.lang.Thread.run(Thread.java:745)
"Running_Thread" #14 prio=5 os_prio=0 tid=0x0000000018dc9800 nid=0x3470 runnable [0x000000001993f000]
java.lang.Thread.State: RUNNABLE
at com.sunkang.use.ThreadStatusDemo.lambda$main$3(ThreadStatusDemo.java:45)
at com.sunkang.use.ThreadStatusDemo$$Lambda$4/1831932724.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"Waiting_Thread" #13 prio=5 os_prio=0 tid=0x0000000018dc8800 nid=0x3f98 in Object.wait() [0x000000001983f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000d5f8db10> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo)
at java.lang.Object.wait(Object.java:502)
at com.sunkang.use.ThreadStatusDemo.lambda$main$2(ThreadStatusDemo.java:35)
- locked <0x00000000d5f8db10> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo)
at com.sunkang.use.ThreadStatusDemo$$Lambda$3/1078694789.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"Time_Waiting_Thread" #12 prio=5 os_prio=0 tid=0x0000000018dc1000 nid=0x31d0 waiting on condition [0x000000001973e000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.sunkang.use.ThreadStatusDemo.lambda$main$1(ThreadStatusDemo.java:23)
at com.sunkang.use.ThreadStatusDemo$$Lambda$2/1096979270.run(Unknown Source)
通過(guò)上面的分析,我們了解到了線程的生命周期,現(xiàn)在在整個(gè)生命周期中并不是固定的處于某個(gè)狀態(tài),而是隨著代碼的執(zhí)行在不同的狀態(tài)之間進(jìn)行切換
線程的啟動(dòng)
前面我們通過(guò)一些案例演示了線程的啟動(dòng),也就是調(diào)用start()方法去啟動(dòng)一個(gè)線程,當(dāng) run 方法中的代碼執(zhí)行完畢以后,線程的生命周期也將終止。調(diào)用 start 方法的語(yǔ)義是
當(dāng)前線程告訴 JVM,啟動(dòng)調(diào)用 start 方法的線程。
線程的啟動(dòng)原理
啟動(dòng)一個(gè)線程為什么是調(diào)用 start 方法,而不是 run 方法,這做一個(gè)簡(jiǎn)單
的分析,先簡(jiǎn)單看一下 start 方法的定義
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
我們看到調(diào)用 start 方法實(shí)際上是調(diào)用一個(gè) native 方法start0()來(lái)啟動(dòng)一個(gè)線程,首先 start0()這個(gè)方法是在Thread 的靜態(tài)塊中來(lái)注冊(cè)的,代碼如下
private static native void registerNatives();
static {
registerNatives();
}
registerNatives 的 本 地 方 法 的 定 義 在 文 件Thread.c,Thread.c 定義了各個(gè)操作系統(tǒng)平臺(tái)要用的關(guān)于線程的公共數(shù)據(jù)和操作,以下是 Thread.c 的全部?jī)?nèi)容
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c
<pre class="sourcelines stripes4 wrap" style="margin: 0px; font-size: 12px; position: relative; counter-reset: lineno 0; color: rgb(0, 0, 0); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-style: initial; text-decoration-color: initial;">static JNINativeMethod methods[] = {[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l42)
{"start0", "()V", (void *)&JVM_StartThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l43)
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l44)
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l45)
{"suspend0", "()V", (void *)&JVM_SuspendThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l46)
{"resume0", "()V", (void *)&JVM_ResumeThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l47)
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l48)
{"yield", "()V", (void *)&JVM_Yield},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l49)
{"sleep", "(J)V", (void *)&JVM_Sleep},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l50)
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l51)
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l52)
{"interrupt0", "()V", (void *)&JVM_Interrupt},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l53)
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l54)
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l55)
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l56)
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l57)
};</pre>
從 這 段 代 碼 可 以 看 出 , start0() , 實(shí) 際 會(huì) 執(zhí) 行JVM_StartThread 方法,這個(gè)方法是干嘛的呢? 從名字上來(lái)看,似乎是在 JVM 層面去啟動(dòng)一個(gè)線程,如果真的是這樣,那么在 JVM 層面,一定會(huì)調(diào)用 Java 中定義的 run 方法。那接下來(lái)繼續(xù)去找找答案。我們找到 jvm.cpp 這個(gè)文件;這個(gè)文件需要下載 hotspot 的源碼才能找到.
大概調(diào)用流程如下:
先去jvm.cpp 執(zhí)行 native_thread = new Thread(&thread_entry,sz);
然后thread.cpp 調(diào)用os::create_thread,實(shí)際就是調(diào)用平臺(tái)創(chuàng)建線程的方法來(lái)創(chuàng)建線程
接下來(lái)就是線程的啟動(dòng),會(huì)調(diào)用 Thread.cpp 文件中的Thread::start(Thread* thread)方法,start 方法中有一個(gè)函數(shù)調(diào)用: os::start_thread(thread);,調(diào)用平臺(tái)啟動(dòng)線程的方法,最終會(huì)調(diào)用 Thread.cpp 文件中的 JavaThread::run()方法
線程的終止
如何終止一個(gè)線程呢? 這是面試過(guò)程中針對(duì) 3 年左右的人喜歡問(wèn)到的一個(gè)題目。
線程的終止,并不是簡(jiǎn)單的調(diào)用 stop 命令去。雖然 api 仍然可以調(diào)用,但是和其他的線程控制方法如 suspend、resume 一樣都是過(guò)期了的不建議使用,就拿 stop 來(lái)說(shuō),stop 方法在結(jié)束一個(gè)線程時(shí)并不會(huì)保證線程的資源正常釋放,因此會(huì)導(dǎo)致程序可能出現(xiàn)一些不確定的狀態(tài)。
要優(yōu)雅的去中斷一個(gè)線程,在線程中提供了一個(gè) interrupt方法
interrupt 方法
當(dāng)其他線程通過(guò)調(diào)用當(dāng)前線程的 interrupt 方法,表示向當(dāng)前線程打個(gè)招呼,告訴他可以中斷線程的執(zhí)行了,至于什么時(shí)候中斷,取決于當(dāng)前線程自己。
線程通過(guò)檢查資深是否被中斷來(lái)進(jìn)行相應(yīng),可以通過(guò)isInterrupted()來(lái)判斷是否被中斷
這種通過(guò)標(biāo)識(shí)位或者中斷操作的方式能夠使線程在終止時(shí)有機(jī)會(huì)去清理資源,而不是武斷地將線程停止,因此這種終止線程的做法顯得更加安全和優(yōu)雅。
線程復(fù)位
第一種就是Thread.interrupted()可以設(shè)置對(duì)線程的中斷標(biāo)識(shí)的線程復(fù)位操作
還有一 種 被 動(dòng) 復(fù) 位 的 場(chǎng) 景 , 就 是 對(duì) 拋 出InterruptedException 異 常 的 方 法 , 在InterruptedException 拋出之前,JVM 會(huì)先把線程的中斷標(biāo)識(shí)位清除,然后才會(huì)拋出 InterruptedException,這個(gè)時(shí)候如果調(diào)用 isInterrupted 方法,將會(huì)返回 false
為什么要復(fù)位
Thread.interrupted()是屬于當(dāng)前線程的,是當(dāng)前線程對(duì)外界中斷信號(hào)的一個(gè)響應(yīng),表示自己已經(jīng)得到了中斷信號(hào),但不會(huì)立刻中斷自己,具體什么時(shí)候中斷由自己決定,讓外界知道在自身中斷前,他的中斷狀態(tài)仍然是 false,這就是復(fù)位的原因。
其實(shí)本質(zhì)是會(huì)調(diào)用當(dāng)前線程的isInterrupted,只不過(guò)傳了ture的參數(shù),表示需要清除中斷標(biāo)志
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
下面是綜合的一個(gè)例子,輸出方式跟步驟的執(zhí)行一樣
/**
* @Project: ThreadExample
* @description: 線程中斷標(biāo)識(shí)的演示
* @author: sunkang
* @create: 2020-06-21 23:39
* @ModificationHistory who when What
**/
public class ThreadInterrupt {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread=new Thread(()->{
while(!Thread.currentThread().isInterrupted()){//默認(rèn)是false _interrupted state?
System.out.println("步驟01:"+Thread.currentThread().isInterrupted());//此時(shí)中斷表示為false
try {
TimeUnit.SECONDS.sleep(10); //中斷一個(gè)處于阻塞狀態(tài)的線程。join/wait/queue.take..
System.out.println("不會(huì)輸出");
} catch (InterruptedException e) { //讓其自己判斷是否怎么處理,是拋異常還是繼續(xù)處理
System.out.println("步驟02:"+Thread.currentThread().isInterrupted());//拋出異常會(huì)清除中斷標(biāo)識(shí),此時(shí)中斷標(biāo)識(shí)表示狀態(tài)為false
//再次中斷
Thread.currentThread().interrupt();
System.out.println("步驟03:"+Thread.currentThread().isInterrupted());//再次中斷狀態(tài)為true
Thread.interrupted();//復(fù)位操作
System.out.println("步驟04:"+Thread.currentThread().isInterrupted());//復(fù)位操作,此時(shí)狀態(tài)為false
//再次中斷
Thread.currentThread().interrupt();
System.out.println("步驟05:"+Thread.currentThread().isInterrupted());//再次中斷,此時(shí)狀態(tài)為true,退出循環(huán)
}
}
//對(duì)當(dāng)前線程進(jìn)行標(biāo)志復(fù)位
},"thread-01");
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); //把isInterrupted設(shè)置成true
}
}
線程的終止原理
thread.interrupt()方法做了什么事情
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
這個(gè)方法里會(huì)調(diào)用interrupt0(),是個(gè)native方法,可以在jvm.cpp文件找到j(luò)vm_Interrupt的定義
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);//調(diào)用Thread的interrupt的方法
}
JVM_END
這個(gè)方法會(huì)調(diào)用Thread::interrupt(thr),這個(gè)方法定義在Thread.cpp的文件中,代碼如下
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
Thread::interrupt()會(huì)調(diào)用os:interrupt(),這個(gè)會(huì)調(diào)用平臺(tái)的interrupt方法, jvm是跨平臺(tái)的,所以對(duì)于不同的平臺(tái)的操作系統(tǒng),現(xiàn)場(chǎng)的調(diào)度方式不一樣,以os_linux.cpp的文件為例,可以看出Thread::interrupt()做了兩個(gè)事情,一個(gè)是設(shè)置中斷標(biāo)識(shí)為ture,通過(guò)內(nèi)存屏障操作使得中斷標(biāo)識(shí)對(duì)于其他的線程可見,第二個(gè)使得調(diào)用該方法的線程的被喚醒
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();//獲得本地線程
if (!osthread->interrupted()) {//判斷本地線程是否中斷
osthread->set_interrupted(true);//設(shè)置中斷標(biāo)記為true
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();//這里設(shè)置了內(nèi)存屏障,使得interrupted狀態(tài)對(duì)于其他線程立即可見
ParkEvent * const slp = thread->_SleepEvent ;//_SleepEvent相當(dāng)于Thread.sleep的方法,線程調(diào)用sleep方法,則通過(guò)uppark喚醒
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;//_ParkEvent用于synchronized同步塊和object.wait()方法,相當(dāng)于也是通過(guò)unpark進(jìn)行喚醒
if (ev != NULL) ev->unpark() ;
}
set_interrupted(true)實(shí)際調(diào)用的是osThread.hpp中的set_interrupted()方法,在osThread()的方法中,定義了一個(gè)成員屬性volatile jint _interrupted來(lái)實(shí)現(xiàn)的
volatile jint _interrupted; // Thread.isInterrupted state
void set_interrupted(bool z) { _interrupted = z ? 1 : 0; }
thread.interrupt()方法實(shí)際就是設(shè)置一個(gè) interrupted 狀態(tài)標(biāo)識(shí)為 true、并且通過(guò)ParkEvent 的 unpark 方法來(lái)喚醒線程。
- 對(duì)于 synchronized 阻塞的線程,被喚醒以后會(huì)繼續(xù)嘗試獲取鎖,如果失敗仍然可能被 park
- 在調(diào)用 ParkEvent 的 park 方法之前,會(huì)先判斷線程的中斷狀態(tài),如果為 true,會(huì)清除當(dāng)前線程的中斷標(biāo)識(shí)
- Object.wait 、 Thread.sleep 、 Thread.join 會(huì) 拋 出InterruptedException
這里給大家普及一個(gè)知識(shí)點(diǎn),為什么 Object.wait、Thread.sleep 和 Thread.join 都 會(huì) 拋 出InterruptedException?
你會(huì)發(fā)現(xiàn)這幾個(gè)方法有一個(gè)共同點(diǎn),都是屬于阻塞的方法而阻塞方法的釋放會(huì)取決于一些外部的事件,但是阻塞方法可能因?yàn)榈炔坏酵獠康挠|發(fā)事件而導(dǎo)致無(wú)法終止,
所以它允許一個(gè)線程請(qǐng)求自己來(lái)停止它正在做的事情。當(dāng)一個(gè)方法拋出InterruptedException 時(shí),它是在告訴調(diào)用者如果執(zhí)行該方法的線程被中斷,它會(huì)嘗試停止正在做的事情并且通過(guò)拋出 InterruptedException 表示提前返回。
所以,這個(gè)異常的意思是表示一個(gè)阻塞被其他線程中斷了。然 后 , 由 于 線 程 調(diào) 用 了 interrupt() 中 斷 方 法 , 那 么Object.wait、Thread.sleep 等被阻塞的線程被喚醒以后會(huì)
通過(guò) is_interrupted 方法判斷中斷標(biāo)識(shí)的狀態(tài)變化,如果發(fā)現(xiàn)中斷標(biāo)識(shí)為 true,則先清除中斷標(biāo)識(shí),然后拋出InterruptedException
需要注意的是,InterruptedException 異常的拋出并不意味著線程必須終止,而是提醒當(dāng)前線程有中斷的操作發(fā)生,至于接下來(lái)怎么處理取決于線程本身,比如
- 直接捕獲異常不做任何處理
- 將異常往外拋出
- 停止當(dāng)前線程,并打印異常信息
為 了 讓 大 家 能 夠 更 好 的 理 解 上 面 這 段 話 , 我 們 以Thread.sleep 為例直接從 jdk 的源碼中找到中斷標(biāo)識(shí)的清除以及異常拋出的方法代碼找 到 is_interrupted() 方法, linux 平 臺(tái) 中 的 實(shí) 現(xiàn) 在os_linux.cpp 文件中,代碼如下
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted();//獲取線程的中斷標(biāo)識(shí)
if (interrupted && clear_interrupted) {//如果中斷標(biāo)識(shí)為true,并且需要清除標(biāo)記為true
osthread->set_interrupted(false);//設(shè)置中斷標(biāo)識(shí)為false
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}
Thread.sleep 這個(gè)操作在jdk的源碼怎么實(shí)現(xiàn),代碼在jvm.cpp文件中,大概可以理解到先會(huì)調(diào)用Thread.is_interrupted的方法,這個(gè)會(huì)先判斷線程的中斷標(biāo)記,如果設(shè)置的清除線程的標(biāo)記是true,則會(huì)設(shè)置線程標(biāo)記為false, 也就是先線程復(fù)位,然后在拋出InterruptedException異常
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {//先判斷is_interrupted的狀態(tài),這個(gè)是調(diào)用is_interrupted的方法,并這里是需要清除線程中斷標(biāo)記的
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); //拋異常
}
// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);