21.2 Java 線程的協(xié)作

多線程協(xié)作的基本機(jī)制 wait/notify

多線程之間除了競(jìng)爭(zhēng)訪問(wèn)同一個(gè)資源外,也經(jīng)常需要相互協(xié)作,怎么協(xié)作呢?本節(jié)就來(lái)介紹Java中多線程協(xié)作的基本機(jī)制 wait/notify。

wait 實(shí)際上做了什么呢?它在等待什么?之前我們說(shuō)過(guò),每個(gè)對(duì)象都有一把鎖和等待隊(duì)列,一個(gè)線程在進(jìn)入 synchronized 代碼塊時(shí),會(huì)嘗試獲取鎖,如果獲取不到則會(huì)把當(dāng)前線程加入等待隊(duì)列中,其實(shí),除了用于鎖的等待隊(duì)列,每個(gè)對(duì)象還有另一個(gè)等待隊(duì)列,表示條件隊(duì)列,該隊(duì)列用于線程間的協(xié)作。

notify 做的事情就是從條件隊(duì)列中選一個(gè)線程,將其從隊(duì)列中移除并喚醒,notify 和 notifyAll 的區(qū)別是,它會(huì)移除條件隊(duì)列中所有的線程并全部喚醒。

wait/notify 方法只能在 synchronized 代碼塊內(nèi)被調(diào)用,如果調(diào)用 wait/notify 方法時(shí),當(dāng)前線程沒(méi)有持有對(duì)象鎖,會(huì)拋出異常 java.lang.IllegalMonitor-StateException。

同時(shí)開(kāi)始

每個(gè)線程在開(kāi)始前進(jìn)行 wait,然后主線程通過(guò) notifyAll 喚醒所有。

同時(shí)結(jié)束

我們之前通過(guò)主線程等待子線程使用的是 join,但是 join 有時(shí)比較麻煩,需要主線程逐一等待每個(gè)子線程。

主線程先等待,只有等到所有子線程結(jié)束。然后一個(gè)條件,必須先 wait,再 notify。

異步結(jié)果

一種常見(jiàn)的模式是異步調(diào)用,異步調(diào)用返回一個(gè)一般稱為 Future 的對(duì)象,通過(guò)它可以獲得最終的結(jié)果。

package qy.basic.ch21;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
 * 線程池自定義任務(wù) 簡(jiǎn)單 demo
 */
public class Ch21_10_Executor {
    
    interface MyFuture<V> {
        // 阻塞直到線程運(yùn)行結(jié)束
        V get() throws InterruptedException, ExecutionException;
    }
    
    static class MyExecutor {
        // 它封裝了創(chuàng)建子線程,同步獲取結(jié)果的過(guò)程,它會(huì)創(chuàng)建一個(gè)執(zhí)行子線程
        public <V> MyFuture<V> submit(final Callable<V> callable) {
            Object lock = new Object();
            ExecutorThread<V> thread = new ExecutorThread<>(callable, lock);
            thread.start();
            
            MyFuture<V> future = new MyFuture<V>() {

                @Override
                public V get() throws InterruptedException, ExecutionException {
                    synchronized (lock) {
                        while(!thread.isDone) {
                            lock.wait();
                        }
                        if (thread.getException() != null) {
                            throw new ExecutionException(thread.getException());
                        }
                        V v = thread.getResult();
                        return v;
                    }
                }
            };
            return future;
        }
    }
    
    static class ExecutorThread<V> extends Thread {
        private V result;
        private Exception exception;
        boolean isDone = false;
        private Callable<V> callable;
        private Object lock;
        
        public ExecutorThread(Callable<V> callable, Object lock) {
            this.callable = callable;
            this.lock = lock;
        }
        
        @Override
        public void run() {
            try {
                result = callable.call();
            } catch (Exception e) {
                exception = e;
            } finally {
                synchronized (lock) {
                    isDone = true;
                    lock.notifyAll();
                }
            }
        }

        public V getResult() {
            return result;
        }

        public Exception getException() {
            return exception;
        }

        public boolean isDone() {
            return isDone;
        }
    }
    
    public static void main(String[] args) throws Exception {
        MyExecutor executor = new MyExecutor();
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "hello MyExecutor!";
            }
        };
        MyFuture<String> future = executor.submit(callable);
        // 獲取異步調(diào)取結(jié)果
        String result = future.get();
        System.out.println("result = " + result);
    }

}

集合點(diǎn)

各個(gè)線程先是分頭行動(dòng),各自到達(dá)一個(gè)集合點(diǎn),在集合點(diǎn)需要集齊所有線程,交換數(shù)據(jù),然后再進(jìn)行下一步動(dòng)作。

線程中斷

stop 方法看上去就可以停止線程,但這個(gè)方法被標(biāo)記為了過(guò)時(shí),簡(jiǎn)單地說(shuō),我們不應(yīng)該使用它,可以忽略它。

在 Java 中,停止一個(gè)線程的主要機(jī)制是中斷,中斷并不是強(qiáng)迫終止一個(gè)線程,它是一種協(xié)作機(jī)制,是給線程傳遞一個(gè)取消信號(hào),但是由線程來(lái)決定如何以及何時(shí)退出。

  • void interrupt()方法 :中斷線程,例如,當(dāng)線程A運(yùn)行時(shí),線程B可以調(diào)用線程A的interrupt()方法來(lái)設(shè)置線程A的中斷標(biāo)志為 true 并立即返回。設(shè)置標(biāo)志僅僅是設(shè)置標(biāo)志,線程A實(shí)際并沒(méi)有被中斷,它會(huì)繼續(xù)往下執(zhí)行。如果線程處于了阻塞狀態(tài)(如線程調(diào)用了thread.sleep、thread.join、thread.wait、1.5中的 condition.await、以及可中斷的通道上的 I/O 操作方法后可進(jìn)入阻塞狀態(tài)),這時(shí)候若線程B調(diào)用線程A的interrupt()方法,線程A在檢查中斷標(biāo)示時(shí)如果發(fā)現(xiàn)中斷標(biāo)示為true,則會(huì)在這些阻塞方法(sleep、join、wait、1.5中的condition.await 及可中斷的通道上的 I/O 操作方法)調(diào)用處拋出 InterruptedException 異常。并且在拋出異常后立即將線程的中斷標(biāo)示位清除,即重新設(shè)置為 false。拋出異常是為了線程從阻塞狀態(tài)醒過(guò)來(lái),并在結(jié)束線程前讓程序員有足夠的時(shí)間來(lái)處理中斷請(qǐng)求

  • boolean isInterrupted()方法:檢測(cè)當(dāng)前線程是否被中斷,如果是返回 true,否則返回 false。并不清除中斷標(biāo)志位。

public boolean isInterrupted() {
    return isInterrupted(false);
}
  • boolean interrupted()方法:檢測(cè)當(dāng)前線程是否被中斷,如果是返回 true,否則返回 false。與 isInterrupted 不同的是,該方法如果發(fā)現(xiàn)當(dāng)前線程被中斷,則會(huì)清除中斷標(biāo)志,并且該方法是 static 靜態(tài)方法,可以通過(guò) Thread 類直接調(diào)用。另外從下面的代碼可以知道,在 interrupted()內(nèi)部是獲取當(dāng)前調(diào)用線程的中斷標(biāo)志而不是調(diào)用 interrupted()方法的實(shí)例對(duì)象的中斷標(biāo)志。
public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

線程對(duì)中斷的反應(yīng)

interrupt()對(duì)線程的影響與線程的狀態(tài)和在進(jìn)行的IO操作有關(guān)。我們主要考慮線程的狀態(tài),IO操作的影響和具體IO以及操作系統(tǒng)有關(guān),我們就不討論了。線程狀態(tài)有:

? RUNNABLE:線程在運(yùn)行或具備運(yùn)行條件只是在等待操作系統(tǒng)調(diào)度。
? WAITING/TIMED_WAITING:線程在等待某個(gè)條件或超時(shí)。
? BLOCKED:線程在等待鎖,試圖進(jìn)入同步塊。
? NEW/TERMINATED:線程還未啟動(dòng)或已結(jié)束。

RUNNABLE:如果線程在運(yùn)行中,且沒(méi)有執(zhí)行IO操作,interrupt()只是會(huì)設(shè)置線程的中斷標(biāo)志位,沒(méi)有任何其他作用。

WAITING/TIMED_WAITING:線程調(diào)用join/wait/sleep方法會(huì)進(jìn)入 WAITING 或 TIMED_WAITING狀態(tài),在這些狀態(tài)時(shí),對(duì)線程對(duì)象調(diào)用interrupt()會(huì)使得該線程拋出InterruptedException。需要注意的是,拋出異常后,中斷標(biāo)志位會(huì)被清空,而不是被設(shè)置。

捕獲到 InterruptedException,通常表示希望結(jié)束該線程,線程大致有兩種處理方式:
1)向上傳遞該異常,這使得該方法也變成了一個(gè)可中斷的方法,需要調(diào)用者進(jìn)行處理;
2)有些情況,不能向上傳遞異常,比如 Thread 的 run 方法,它的聲明是固定的,不能拋出任何受檢異常,這時(shí),應(yīng)該捕獲異常,進(jìn)行合適的清理操作,清理后,一般應(yīng)該調(diào)用 Thread 的 interrupt 方法設(shè)置中斷標(biāo)志位,使得其他代碼有辦法知道它發(fā)生了中斷。

BLOCKED:如果線程在等待鎖,對(duì)線程對(duì)象調(diào)用interrupt()只是會(huì)設(shè)置線程的中斷標(biāo)志位,線程依然會(huì)處于BLOCKED狀態(tài),也就是說(shuō),interrupt()并不能使一個(gè)在等待鎖的線程真正“中斷”。

NEW/TERMINATED:如果線程尚未啟動(dòng)(NEW),或者已經(jīng)結(jié)束(TERMINATED),則調(diào)用interrupt()對(duì)它沒(méi)有任何效果,中斷標(biāo)志位也不會(huì)被設(shè)置。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容