(1)多線程的使用和基礎(chǔ)

1.線程的應(yīng)用

如何應(yīng)用多線程

在 Java 中,有多種方式來(lái)實(shí)現(xiàn)多線程。繼承 Thread 類、實(shí)現(xiàn) Runnable 接口、使用 ExecutorService、Callable、Future 實(shí)現(xiàn)帶返回結(jié)果的多線程。

package com.sunkang.use;

import java.util.concurrent.*;

/**
 * @Project: ThreadExample
 * @description: 線程的創(chuàng)建方式
 * @author: sunkang
 * @create: 2020-06-21 21:38
 * @ModificationHistory who      when       What
 **/
public class ThreadDemo extends  Thread {


    //通過(guò)繼承Thread,復(fù)寫run方法
    @Override
    public void run() {
        System.out.println("1.通過(guò)繼承Thread來(lái)創(chuàng)建線程 ");
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //方式一:通過(guò)繼承Thread來(lái)創(chuàng)建線程
        ThreadDemo threadDemo = new ThreadDemo();
        threadDemo.start();

        //方式二:通過(guò)實(shí)現(xiàn)Runnable接口創(chuàng)建線程
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("2.通過(guò)實(shí)現(xiàn)Runnable接口創(chuàng)建線程 ");
            }
        }).start();


        //方式三:通過(guò)實(shí)現(xiàn)Callable接口來(lái)創(chuàng)建帶有返回值的線程
        ExecutorService excutorService = Executors.newFixedThreadPool(1);

        Future<String>  future= excutorService.submit(new Callable<String>() {
            @Override
            public String  call() throws Exception {
                return "3.通過(guò)實(shí)現(xiàn)Callable接口來(lái)創(chuàng)建帶有返回值的線程";
            }
        });
        System.out.println(future.get());
        excutorService.shutdown();


        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "3.通過(guò)實(shí)現(xiàn)Callable接口來(lái)創(chuàng)建帶有返回值的線程";
            }
        });
        new Thread(futureTask).start();
        System.out.println( futureTask.get());
    }



}

2.多線程的實(shí)際應(yīng)用

其實(shí)大家在工作中應(yīng)該很少有場(chǎng)景能夠應(yīng)用多線程了,因?yàn)榛跇I(yè)務(wù)開發(fā)來(lái)說(shuō),很多使用異步的場(chǎng)景我們都通過(guò)分布式消息隊(duì)列來(lái)做了。但并不是說(shuō)多線程就不會(huì)被用到,你們?nèi)绻锌匆恍┛蚣艿脑创a,會(huì)發(fā)現(xiàn)線程的使用無(wú)處不在
之前我應(yīng)用得比較多的場(chǎng)景是在做文件跑批,每天會(huì)有一些比如收益文件、對(duì)賬文件,我們會(huì)有一個(gè)定時(shí)任務(wù)去拿到數(shù)據(jù)然后通過(guò)線程去處理

一般引用多線程的場(chǎng)景基本是為了支持異步化操作,可以使用多線程,比如處理不同的任務(wù),比如銀行系統(tǒng)的財(cái)務(wù)在日終的時(shí)候要做一些日終的處理任務(wù),這時(shí)候是可以應(yīng)用到多線程

如果有看過(guò)zookeeper 源碼的時(shí)候看到一個(gè)比較有意思的異步責(zé)任鏈模式,這里邊我模仿一種責(zé)任鏈異步化處理的方式,

圖中列舉了三種處理器,
PreCheckProcessor :可認(rèn)為是前置檢查處理器,檢查不通過(guò),后置處理器不處理
LogProcessor:日志記錄處理器
SavaProcessor:存儲(chǔ)數(shù)據(jù)處理器

這里改善了鏈?zhǔn)教幚?,增加了?duì)列的方式對(duì)請(qǐng)求進(jìn)行了異步的處理,實(shí)現(xiàn)了更高的并發(fā)性

image.png

代碼如下:

請(qǐng)求上下文
/**
 * @Project: ThreadExample
 * @description: 請(qǐng)求上下文,可以存儲(chǔ)很多請(qǐng)求相關(guān)的上下文
 * @author: sunkang
 * @create: 2020-06-21 22:04
 * @ModificationHistory who      when       What
 **/
public class RequestContext {

    private String requestId;

    private Map<String,String> data;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Map<String, String> getData() {
        return data;
    }

    public void setData(Map<String, String> data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "RequestContext{" +
                "requestId='" + requestId + '\'' +
                ", data=" + data +
                '}';
    }
}
處理器接口
/**
 * @Project: ThreadExample
 * @description:  處理器接口
 * @author: sunkang
 * @create: 2020-06-21 22:04
 * @ModificationHistory who      when       What
 **/
public abstract class IRequestProcessor extends Thread{

    void process(RequestContext context) {

    }
}
前置檢查處理器
/**
 * @Project: ThreadExample
 * @description: 前置檢查處理器
 * @author: sunkang
 * @create: 2020-06-21 22:07
 * @ModificationHistory who      when       What
 **/
public class PreCheckProcessor extends IRequestProcessor {

    private volatile boolean isFinish = false;

    private IRequestProcessor nextProcessor;

   private  LinkedBlockingQueue<RequestContext> requests = new LinkedBlockingQueue<>();

    public PreCheckProcessor(IRequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    public void shutdown(){
        isFinish =true;
    }

    @Override
    public void process(RequestContext context) {
        //TODO 做些前置檢查

        //放入阻塞隊(duì)列中
        requests.add(context);
    }

    @Override
    public void run() {
        while(!isFinish){ //不建議這么寫
            try {
                RequestContext request=requests.take();//阻塞式獲取數(shù)據(jù)
                //真正的處理邏輯
                System.out.println("prevProcessor:"+request);
                //交給下一個(gè)責(zé)任鏈
                if(nextProcessor != null){
                    nextProcessor.process(request);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
打印日志信息處理器
/**
 * @Project: ThreadExample
 * @description: 主要是打印日志信息
 * @author: sunkang
 * @create: 2020-06-21 22:16
 * @ModificationHistory who      when       What
 **/
public class LogProcessor extends IRequestProcessor {


    private volatile boolean isFinish = false;

    private IRequestProcessor nextProcessor;

    private LinkedBlockingQueue<RequestContext> requests = new LinkedBlockingQueue<>();

    public LogProcessor(IRequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    public void shutdown(){
        isFinish =true;
    }

    @Override
    public void process(RequestContext context) {
        //TODO 按照實(shí)際業(yè)務(wù)來(lái)寫

        //放入阻塞隊(duì)列中
        requests.add(context);
    }

    @Override
    public void run() {
        while(!isFinish){ //不建議這么寫
            try {
                RequestContext request=requests.take();//阻塞式獲取數(shù)據(jù)
                //真正的處理邏輯
                System.out.println("LogProcessor:"+request);
                //交給下一個(gè)責(zé)任鏈
                //交給下一個(gè)責(zé)任鏈
                if(nextProcessor != null){
                    nextProcessor.process(request);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
存儲(chǔ)處理器

這里的偽代碼一直,就不在展示

啟動(dòng)入口
/**
 * @Project: ThreadExample
 * @description:程序入口
 * @author: sunkang
 * @create: 2020-06-21 22:19
 * @ModificationHistory who      when       What
 **/
public class App {
    
    public static void main(String[] args) {
        //初始化責(zé)任鏈,并啟動(dòng)線程
        IRequestProcessor logProcessor = new LogProcessor(null);
        logProcessor.start();

        IRequestProcessor preCheckProcessor = new PreCheckProcessor(logProcessor);
        preCheckProcessor.start();

        //構(gòu)造請(qǐng)求
        RequestContext requestContext = new RequestContext();
        requestContext.setRequestId("request");

        //鏈?zhǔn)教幚碚?qǐng)求
        preCheckProcessor.process(requestContext);
    }
}

3.java并發(fā)編程的基礎(chǔ)

基本應(yīng)用搞清楚以后,我們?cè)賮?lái)基于Java線程的基礎(chǔ)切入,來(lái)逐步去深入挖掘線程的整體模型。

線程的生命周期

Java 線程既然能夠創(chuàng)建,那么也勢(shì)必會(huì)被銷毀,所以線程是存在生命周期的,那么我們接下來(lái)從線程的生命周期開始去了解線程。

線程一共有 6 種狀態(tài)(NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED)

NEW:初始狀態(tài),線程被構(gòu)建,但是還沒(méi)有調(diào)用 start 方法
RUNNABLED:運(yùn)行狀態(tài),JAVA 線程把操作系統(tǒng)中的就緒(READY)和運(yùn)行(RUNNING)兩種狀態(tài)統(tǒng)一稱為“運(yùn)行中”
BLOCKED:阻塞狀態(tài),表示線程進(jìn)入等待狀態(tài),也就是線程因?yàn)槟撤N原因放棄了 CPU 使用權(quán),阻塞也分為幾種情況

等待阻塞:運(yùn)行的線程執(zhí)行 wait 方法,jvm 會(huì)把當(dāng)前線程放入到等待隊(duì)列
同步阻塞:運(yùn)行的線程在獲取對(duì)象的同步鎖時(shí),若該同步鎖被其他線程鎖占用了,那么 jvm 會(huì)把當(dāng)前的線程放入到鎖池中
其他阻塞:運(yùn)行的線程執(zhí)行 Thread.sleep 或者 t.join 方法,或者發(fā)出了 I/O 請(qǐng)求時(shí),JVM 會(huì)把當(dāng)前線程設(shè)置為阻塞狀態(tài),當(dāng) sleep 結(jié)束、join 線程終止、io 處理完畢則線程恢復(fù)

TIME_WAITING:超時(shí)等待狀態(tài),超時(shí)以后自動(dòng)返回
TERMINATED:終止?fàn)顟B(tài),表示當(dāng)前線程執(zhí)行完畢

image.png

通過(guò)代碼的方式來(lái)演示線程的狀態(tài)

代碼示例

package com.sunkang.use;

import java.util.concurrent.TimeUnit;

/**
 * @Project: ThreadExample
 * @description: 線程的狀態(tài)演示
 * @author: sunkang
 * @create: 2020-06-21 22:54
 * @ModificationHistory who      when       What
 **/
public class ThreadStatusDemo {

    public static void main(String[] args) {
        
        //Time_Waiting的狀態(tài)演示
        new Thread(()->{
            while(true){
                try {
                    TimeUnit.SECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"Time_Waiting_Thread").start();

        //WAITINaG的狀態(tài)演示
        new Thread(()->{
            while(true){
                synchronized (ThreadStatusDemo.class) {
                    try {
                        ThreadStatusDemo.class.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        },"Waiting_Thread").start();

        //RUNNING的狀態(tài)演示
        new Thread(()->{
            for(;;);
        },"Running_Thread").start();

        //BLOCKED的狀態(tài)演示
        new Thread(new BlockedDemo(),"Block01_Thread").start();
        new Thread(new BlockedDemo(),"Block02_Thread").start();
    }
    static class BlockedDemo implements   Runnable{
        @Override
        public void run() {
            synchronized (BlockedDemo.class){
                while(true){
                    try {
                        TimeUnit.SECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

啟動(dòng)一個(gè)線程前,最好為這個(gè)線程設(shè)置線程名稱,因?yàn)檫@樣在使用 jstack 分析程序或者進(jìn)行問(wèn)題排查時(shí),就會(huì)給開發(fā)人員提供一些提示顯示線程的狀態(tài)

運(yùn)行該示例,打開終端或者命令提示符,鍵入“jps”, (JDK1.5 提供的一個(gè)顯示當(dāng)前所有 java 進(jìn)程 pid 的命
令)
根據(jù)上一步驟獲得的 pid,繼續(xù)輸入 jstack pid(jstack是 java 虛擬機(jī)自帶的一種堆棧跟蹤工具。jstack 用于打印出給定的 java 進(jìn)程 ID 或 core file 或遠(yuǎn)程調(diào)試服務(wù)的 Java 堆棧信息)
下面的jstack的日志輸出的關(guān)鍵信息

"Block02_Thread" #16 prio=5 os_prio=0 tid=0x0000000018dcb000 nid=0x3fb8 waiting on condition [0x0000000019b3e000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at com.sunkang.use.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:58)
        - locked <0x00000000d62b91a0> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo$BlockedDemo)
        at java.lang.Thread.run(Thread.java:745)

"Block01_Thread" #15 prio=5 os_prio=0 tid=0x0000000018dca000 nid=0x3b6c waiting for monitor entry [0x0000000019a3f000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at com.sunkang.use.ThreadStatusDemo$BlockedDemo.run(ThreadStatusDemo.java:58)
        - waiting to lock <0x00000000d62b91a0> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo$BlockedDemo)
        at java.lang.Thread.run(Thread.java:745)

"Running_Thread" #14 prio=5 os_prio=0 tid=0x0000000018dc9800 nid=0x3470 runnable [0x000000001993f000]
   java.lang.Thread.State: RUNNABLE
        at com.sunkang.use.ThreadStatusDemo.lambda$main$3(ThreadStatusDemo.java:45)
        at com.sunkang.use.ThreadStatusDemo$$Lambda$4/1831932724.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

"Waiting_Thread" #13 prio=5 os_prio=0 tid=0x0000000018dc8800 nid=0x3f98 in Object.wait() [0x000000001983f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000000d5f8db10> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo)
        at java.lang.Object.wait(Object.java:502)
        at com.sunkang.use.ThreadStatusDemo.lambda$main$2(ThreadStatusDemo.java:35)
        - locked <0x00000000d5f8db10> (a java.lang.Class for com.sunkang.use.ThreadStatusDemo)
        at com.sunkang.use.ThreadStatusDemo$$Lambda$3/1078694789.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

"Time_Waiting_Thread" #12 prio=5 os_prio=0 tid=0x0000000018dc1000 nid=0x31d0 waiting on condition [0x000000001973e000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at com.sunkang.use.ThreadStatusDemo.lambda$main$1(ThreadStatusDemo.java:23)
        at com.sunkang.use.ThreadStatusDemo$$Lambda$2/1096979270.run(Unknown Source)

通過(guò)上面的分析,我們了解到了線程的生命周期,現(xiàn)在在整個(gè)生命周期中并不是固定的處于某個(gè)狀態(tài),而是隨著代碼的執(zhí)行在不同的狀態(tài)之間進(jìn)行切換

線程的啟動(dòng)

前面我們通過(guò)一些案例演示了線程的啟動(dòng),也就是調(diào)用start()方法去啟動(dòng)一個(gè)線程,當(dāng) run 方法中的代碼執(zhí)行完畢以后,線程的生命周期也將終止。調(diào)用 start 方法的語(yǔ)義是
當(dāng)前線程告訴 JVM,啟動(dòng)調(diào)用 start 方法的線程。

線程的啟動(dòng)原理

啟動(dòng)一個(gè)線程為什么是調(diào)用 start 方法,而不是 run 方法,這做一個(gè)簡(jiǎn)單
的分析,先簡(jiǎn)單看一下 start 方法的定義

 public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

我們看到調(diào)用 start 方法實(shí)際上是調(diào)用一個(gè) native 方法start0()來(lái)啟動(dòng)一個(gè)線程,首先 start0()這個(gè)方法是在Thread 的靜態(tài)塊中來(lái)注冊(cè)的,代碼如下

    private static native void registerNatives();
    static {
        registerNatives();
    }

registerNatives 的 本 地 方 法 的 定 義 在 文 件Thread.c,Thread.c 定義了各個(gè)操作系統(tǒng)平臺(tái)要用的關(guān)于線程的公共數(shù)據(jù)和操作,以下是 Thread.c 的全部?jī)?nèi)容
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c

<pre class="sourcelines stripes4 wrap" style="margin: 0px; font-size: 12px; position: relative; counter-reset: lineno 0; color: rgb(0, 0, 0); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-style: initial; text-decoration-color: initial;">static JNINativeMethod methods[] = {[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l42)
 {"start0",           "()V",        (void *)&JVM_StartThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l43)
 {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l44)
 {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l45)
 {"suspend0",         "()V",        (void *)&JVM_SuspendThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l46)
 {"resume0",          "()V",        (void *)&JVM_ResumeThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l47)
 {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l48)
 {"yield",            "()V",        (void *)&JVM_Yield},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l49)
 {"sleep",            "(J)V",       (void *)&JVM_Sleep},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l50)
 {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l51)
 {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l52)
 {"interrupt0",       "()V",        (void *)&JVM_Interrupt},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l53)
 {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l54)
 {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l55)
 {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l56)
 {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},[](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c#l57)
};</pre>

從 這 段 代 碼 可 以 看 出 , start0() , 實(shí) 際 會(huì) 執(zhí) 行JVM_StartThread 方法,這個(gè)方法是干嘛的呢? 從名字上來(lái)看,似乎是在 JVM 層面去啟動(dòng)一個(gè)線程,如果真的是這樣,那么在 JVM 層面,一定會(huì)調(diào)用 Java 中定義的 run 方法。那接下來(lái)繼續(xù)去找找答案。我們找到 jvm.cpp 這個(gè)文件;這個(gè)文件需要下載 hotspot 的源碼才能找到.

大概調(diào)用流程如下:
先去jvm.cpp 執(zhí)行 native_thread = new Thread(&thread_entry,sz);
然后thread.cpp 調(diào)用os::create_thread,實(shí)際就是調(diào)用平臺(tái)創(chuàng)建線程的方法來(lái)創(chuàng)建線程

接下來(lái)就是線程的啟動(dòng),會(huì)調(diào)用 Thread.cpp 文件中的Thread::start(Thread* thread)方法,start 方法中有一個(gè)函數(shù)調(diào)用: os::start_thread(thread);,調(diào)用平臺(tái)啟動(dòng)線程的方法,最終會(huì)調(diào)用 Thread.cpp 文件中的 JavaThread::run()方法

線程的終止

如何終止一個(gè)線程呢? 這是面試過(guò)程中針對(duì) 3 年左右的人喜歡問(wèn)到的一個(gè)題目。

線程的終止,并不是簡(jiǎn)單的調(diào)用 stop 命令去。雖然 api 仍然可以調(diào)用,但是和其他的線程控制方法如 suspend、resume 一樣都是過(guò)期了的不建議使用,就拿 stop 來(lái)說(shuō),stop 方法在結(jié)束一個(gè)線程時(shí)并不會(huì)保證線程的資源正常釋放,因此會(huì)導(dǎo)致程序可能出現(xiàn)一些不確定的狀態(tài)。

要優(yōu)雅的去中斷一個(gè)線程,在線程中提供了一個(gè) interrupt方法

interrupt 方法

當(dāng)其他線程通過(guò)調(diào)用當(dāng)前線程的 interrupt 方法,表示向當(dāng)前線程打個(gè)招呼,告訴他可以中斷線程的執(zhí)行了,至于什么時(shí)候中斷,取決于當(dāng)前線程自己。
線程通過(guò)檢查資深是否被中斷來(lái)進(jìn)行相應(yīng),可以通過(guò)isInterrupted()來(lái)判斷是否被中斷

這種通過(guò)標(biāo)識(shí)位或者中斷操作的方式能夠使線程在終止時(shí)有機(jī)會(huì)去清理資源,而不是武斷地將線程停止,因此這種終止線程的做法顯得更加安全和優(yōu)雅。

線程復(fù)位

第一種就是Thread.interrupted()可以設(shè)置對(duì)線程的中斷標(biāo)識(shí)的線程復(fù)位操作
還有一 種 被 動(dòng) 復(fù) 位 的 場(chǎng) 景 , 就 是 對(duì) 拋 出InterruptedException 異 常 的 方 法 , 在InterruptedException 拋出之前,JVM 會(huì)先把線程的中斷標(biāo)識(shí)位清除,然后才會(huì)拋出 InterruptedException,這個(gè)時(shí)候如果調(diào)用 isInterrupted 方法,將會(huì)返回 false

為什么要復(fù)位

Thread.interrupted()是屬于當(dāng)前線程的,是當(dāng)前線程對(duì)外界中斷信號(hào)的一個(gè)響應(yīng),表示自己已經(jīng)得到了中斷信號(hào),但不會(huì)立刻中斷自己,具體什么時(shí)候中斷由自己決定,讓外界知道在自身中斷前,他的中斷狀態(tài)仍然是 false,這就是復(fù)位的原因。

其實(shí)本質(zhì)是會(huì)調(diào)用當(dāng)前線程的isInterrupted,只不過(guò)傳了ture的參數(shù),表示需要清除中斷標(biāo)志

  public static boolean interrupted() {
        return currentThread().isInterrupted(true);
     }

下面是綜合的一個(gè)例子,輸出方式跟步驟的執(zhí)行一樣

/**
 * @Project: ThreadExample
 * @description: 線程中斷標(biāo)識(shí)的演示
 * @author: sunkang
 * @create: 2020-06-21 23:39
 * @ModificationHistory who      when       What
 **/
public class ThreadInterrupt {

    private static int i;

    public static void main(String[] args) throws InterruptedException {
        Thread thread=new Thread(()->{
            while(!Thread.currentThread().isInterrupted()){//默認(rèn)是false  _interrupted state?

                System.out.println("步驟01:"+Thread.currentThread().isInterrupted());//此時(shí)中斷表示為false
                try {
                    TimeUnit.SECONDS.sleep(10); //中斷一個(gè)處于阻塞狀態(tài)的線程。join/wait/queue.take..
                    System.out.println("不會(huì)輸出");
                } catch (InterruptedException e) { //讓其自己判斷是否怎么處理,是拋異常還是繼續(xù)處理
                    System.out.println("步驟02:"+Thread.currentThread().isInterrupted());//拋出異常會(huì)清除中斷標(biāo)識(shí),此時(shí)中斷標(biāo)識(shí)表示狀態(tài)為false
                    //再次中斷
                    Thread.currentThread().interrupt();
                    System.out.println("步驟03:"+Thread.currentThread().isInterrupted());//再次中斷狀態(tài)為true
                    Thread.interrupted();//復(fù)位操作
                    System.out.println("步驟04:"+Thread.currentThread().isInterrupted());//復(fù)位操作,此時(shí)狀態(tài)為false
                    //再次中斷
                    Thread.currentThread().interrupt();
                    System.out.println("步驟05:"+Thread.currentThread().isInterrupted());//再次中斷,此時(shí)狀態(tài)為true,退出循環(huán)
                }
            }

            //對(duì)當(dāng)前線程進(jìn)行標(biāo)志復(fù)位
        },"thread-01");
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt(); //把isInterrupted設(shè)置成true
    }
}
線程的終止原理

thread.interrupt()方法做了什么事情

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }

這個(gè)方法里會(huì)調(diào)用interrupt0(),是個(gè)native方法,可以在jvm.cpp文件找到j(luò)vm_Interrupt的定義

JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_Interrupt");

  // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  // We need to re-resolve the java_thread, since a GC might have happened during the
  // acquire of the lock
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);//調(diào)用Thread的interrupt的方法
  }
JVM_END

這個(gè)方法會(huì)調(diào)用Thread::interrupt(thr),這個(gè)方法定義在Thread.cpp的文件中,代碼如下

void Thread::interrupt(Thread* thread) {
  trace("interrupt", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  os::interrupt(thread);
}

Thread::interrupt()會(huì)調(diào)用os:interrupt(),這個(gè)會(huì)調(diào)用平臺(tái)的interrupt方法, jvm是跨平臺(tái)的,所以對(duì)于不同的平臺(tái)的操作系統(tǒng),現(xiàn)場(chǎng)的調(diào)度方式不一樣,以os_linux.cpp的文件為例,可以看出Thread::interrupt()做了兩個(gè)事情,一個(gè)是設(shè)置中斷標(biāo)識(shí)為ture,通過(guò)內(nèi)存屏障操作使得中斷標(biāo)識(shí)對(duì)于其他的線程可見,第二個(gè)使得調(diào)用該方法的線程的被喚醒

void os::interrupt(Thread* thread) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();//獲得本地線程

  if (!osthread->interrupted()) {//判斷本地線程是否中斷
    osthread->set_interrupted(true);//設(shè)置中斷標(biāo)記為true
    // More than one thread can get here with the same value of osthread,
    // resulting in multiple notifications.  We do, however, want the store
    // to interrupted() to be visible to other threads before we execute unpark().
    OrderAccess::fence();//這里設(shè)置了內(nèi)存屏障,使得interrupted狀態(tài)對(duì)于其他線程立即可見
    ParkEvent * const slp = thread->_SleepEvent ;//_SleepEvent相當(dāng)于Thread.sleep的方法,線程調(diào)用sleep方法,則通過(guò)uppark喚醒
    if (slp != NULL) slp->unpark() ;
  }

  // For JSR166. Unpark even if interrupt status already was set
  if (thread->is_Java_thread())
    ((JavaThread*)thread)->parker()->unpark();

  ParkEvent * ev = thread->_ParkEvent ;//_ParkEvent用于synchronized同步塊和object.wait()方法,相當(dāng)于也是通過(guò)unpark進(jìn)行喚醒
  if (ev != NULL) ev->unpark() ;

}

set_interrupted(true)實(shí)際調(diào)用的是osThread.hpp中的set_interrupted()方法,在osThread()的方法中,定義了一個(gè)成員屬性volatile jint _interrupted來(lái)實(shí)現(xiàn)的

  volatile jint _interrupted;     // Thread.isInterrupted state
  void set_interrupted(bool z)                      { _interrupted = z ? 1 : 0; }

thread.interrupt()方法實(shí)際就是設(shè)置一個(gè) interrupted 狀態(tài)標(biāo)識(shí)為 true、并且通過(guò)ParkEvent 的 unpark 方法來(lái)喚醒線程。

  1. 對(duì)于 synchronized 阻塞的線程,被喚醒以后會(huì)繼續(xù)嘗試獲取鎖,如果失敗仍然可能被 park
  2. 在調(diào)用 ParkEvent 的 park 方法之前,會(huì)先判斷線程的中斷狀態(tài),如果為 true,會(huì)清除當(dāng)前線程的中斷標(biāo)識(shí)
  3. Object.wait 、 Thread.sleep 、 Thread.join 會(huì) 拋 出InterruptedException

這里給大家普及一個(gè)知識(shí)點(diǎn),為什么 Object.wait、Thread.sleep 和 Thread.join 都 會(huì) 拋 出InterruptedException?

你會(huì)發(fā)現(xiàn)這幾個(gè)方法有一個(gè)共同點(diǎn),都是屬于阻塞的方法而阻塞方法的釋放會(huì)取決于一些外部的事件,但是阻塞方法可能因?yàn)榈炔坏酵獠康挠|發(fā)事件而導(dǎo)致無(wú)法終止,
所以它允許一個(gè)線程請(qǐng)求自己來(lái)停止它正在做的事情。當(dāng)一個(gè)方法拋出InterruptedException 時(shí),它是在告訴調(diào)用者如果執(zhí)行該方法的線程被中斷,它會(huì)嘗試停止正在做的事情并且通過(guò)拋出 InterruptedException 表示提前返回。

所以,這個(gè)異常的意思是表示一個(gè)阻塞被其他線程中斷了。然 后 , 由 于 線 程 調(diào) 用 了 interrupt() 中 斷 方 法 , 那 么Object.wait、Thread.sleep 等被阻塞的線程被喚醒以后會(huì)
通過(guò) is_interrupted 方法判斷中斷標(biāo)識(shí)的狀態(tài)變化,如果發(fā)現(xiàn)中斷標(biāo)識(shí)為 true,則先清除中斷標(biāo)識(shí),然后拋出InterruptedException

需要注意的是,InterruptedException 異常的拋出并不意味著線程必須終止,而是提醒當(dāng)前線程有中斷的操作發(fā)生,至于接下來(lái)怎么處理取決于線程本身,比如

  1. 直接捕獲異常不做任何處理
  2. 將異常往外拋出
  3. 停止當(dāng)前線程,并打印異常信息

為 了 讓 大 家 能 夠 更 好 的 理 解 上 面 這 段 話 , 我 們 以Thread.sleep 為例直接從 jdk 的源碼中找到中斷標(biāo)識(shí)的清除以及異常拋出的方法代碼找 到 is_interrupted() 方法, linux 平 臺(tái) 中 的 實(shí) 現(xiàn) 在os_linux.cpp 文件中,代碼如下

bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

  bool interrupted = osthread->interrupted();//獲取線程的中斷標(biāo)識(shí)

  if (interrupted && clear_interrupted) {//如果中斷標(biāo)識(shí)為true,并且需要清除標(biāo)記為true
    osthread->set_interrupted(false);//設(shè)置中斷標(biāo)識(shí)為false
    // consider thread->_SleepEvent->reset() ... optional optimization
  }

  return interrupted;
}

Thread.sleep 這個(gè)操作在jdk的源碼怎么實(shí)現(xiàn),代碼在jvm.cpp文件中,大概可以理解到先會(huì)調(diào)用Thread.is_interrupted的方法,這個(gè)會(huì)先判斷線程的中斷標(biāo)記,如果設(shè)置的清除線程的標(biāo)記是true,則會(huì)設(shè)置線程標(biāo)記為false, 也就是先線程復(fù)位,然后在拋出InterruptedException異常

JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
  JVMWrapper("JVM_Sleep");

  if (millis < 0) {
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }

  if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {//先判斷is_interrupted的狀態(tài),這個(gè)是調(diào)用is_interrupted的方法,并這里是需要清除線程中斷標(biāo)記的
    THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); //拋異常
  }

  // Save current thread state and restore it at the end of this block.
  // And set new thread state to SLEEPING.
  JavaThreadSleepState jtss(thread);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容