多線程協(xié)作的基本機(jī)制 wait/notify
多線程之間除了競(jìng)爭(zhēng)訪問(wèn)同一個(gè)資源外,也經(jīng)常需要相互協(xié)作,怎么協(xié)作呢?本節(jié)就來(lái)介紹Java中多線程協(xié)作的基本機(jī)制 wait/notify。
wait 實(shí)際上做了什么呢?它在等待什么?之前我們說(shuō)過(guò),每個(gè)對(duì)象都有一把鎖和等待隊(duì)列,一個(gè)線程在進(jìn)入 synchronized 代碼塊時(shí),會(huì)嘗試獲取鎖,如果獲取不到則會(huì)把當(dāng)前線程加入等待隊(duì)列中,其實(shí),除了用于鎖的等待隊(duì)列,每個(gè)對(duì)象還有另一個(gè)等待隊(duì)列,表示條件隊(duì)列,該隊(duì)列用于線程間的協(xié)作。
notify 做的事情就是從條件隊(duì)列中選一個(gè)線程,將其從隊(duì)列中移除并喚醒,notify 和 notifyAll 的區(qū)別是,它會(huì)移除條件隊(duì)列中所有的線程并全部喚醒。
wait/notify 方法只能在 synchronized 代碼塊內(nèi)被調(diào)用,如果調(diào)用 wait/notify 方法時(shí),當(dāng)前線程沒(méi)有持有對(duì)象鎖,會(huì)拋出異常 java.lang.IllegalMonitor-StateException。
同時(shí)開(kāi)始
每個(gè)線程在開(kāi)始前進(jìn)行 wait,然后主線程通過(guò) notifyAll 喚醒所有。
同時(shí)結(jié)束
我們之前通過(guò)主線程等待子線程使用的是 join,但是 join 有時(shí)比較麻煩,需要主線程逐一等待每個(gè)子線程。
主線程先等待,只有等到所有子線程結(jié)束。然后一個(gè)條件,必須先 wait,再 notify。
異步結(jié)果
一種常見(jiàn)的模式是異步調(diào)用,異步調(diào)用返回一個(gè)一般稱為 Future 的對(duì)象,通過(guò)它可以獲得最終的結(jié)果。
package qy.basic.ch21;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
* 線程池自定義任務(wù) 簡(jiǎn)單 demo
*/
public class Ch21_10_Executor {
interface MyFuture<V> {
// 阻塞直到線程運(yùn)行結(jié)束
V get() throws InterruptedException, ExecutionException;
}
static class MyExecutor {
// 它封裝了創(chuàng)建子線程,同步獲取結(jié)果的過(guò)程,它會(huì)創(chuàng)建一個(gè)執(zhí)行子線程
public <V> MyFuture<V> submit(final Callable<V> callable) {
Object lock = new Object();
ExecutorThread<V> thread = new ExecutorThread<>(callable, lock);
thread.start();
MyFuture<V> future = new MyFuture<V>() {
@Override
public V get() throws InterruptedException, ExecutionException {
synchronized (lock) {
while(!thread.isDone) {
lock.wait();
}
if (thread.getException() != null) {
throw new ExecutionException(thread.getException());
}
V v = thread.getResult();
return v;
}
}
};
return future;
}
}
static class ExecutorThread<V> extends Thread {
private V result;
private Exception exception;
boolean isDone = false;
private Callable<V> callable;
private Object lock;
public ExecutorThread(Callable<V> callable, Object lock) {
this.callable = callable;
this.lock = lock;
}
@Override
public void run() {
try {
result = callable.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
isDone = true;
lock.notifyAll();
}
}
}
public V getResult() {
return result;
}
public Exception getException() {
return exception;
}
public boolean isDone() {
return isDone;
}
}
public static void main(String[] args) throws Exception {
MyExecutor executor = new MyExecutor();
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "hello MyExecutor!";
}
};
MyFuture<String> future = executor.submit(callable);
// 獲取異步調(diào)取結(jié)果
String result = future.get();
System.out.println("result = " + result);
}
}
集合點(diǎn)
各個(gè)線程先是分頭行動(dòng),各自到達(dá)一個(gè)集合點(diǎn),在集合點(diǎn)需要集齊所有線程,交換數(shù)據(jù),然后再進(jìn)行下一步動(dòng)作。
線程中斷
stop 方法看上去就可以停止線程,但這個(gè)方法被標(biāo)記為了過(guò)時(shí),簡(jiǎn)單地說(shuō),我們不應(yīng)該使用它,可以忽略它。
在 Java 中,停止一個(gè)線程的主要機(jī)制是中斷,中斷并不是強(qiáng)迫終止一個(gè)線程,它是一種協(xié)作機(jī)制,是給線程傳遞一個(gè)取消信號(hào),但是由線程來(lái)決定如何以及何時(shí)退出。
void interrupt()方法 :中斷線程,例如,當(dāng)線程A運(yùn)行時(shí),線程B可以調(diào)用線程A的interrupt()方法來(lái)設(shè)置線程A的中斷標(biāo)志為 true 并立即返回。設(shè)置標(biāo)志僅僅是設(shè)置標(biāo)志,線程A實(shí)際并沒(méi)有被中斷,它會(huì)繼續(xù)往下執(zhí)行。如果線程處于了阻塞狀態(tài)(如線程調(diào)用了thread.sleep、thread.join、thread.wait、1.5中的 condition.await、以及可中斷的通道上的 I/O 操作方法后可進(jìn)入阻塞狀態(tài)),這時(shí)候若線程B調(diào)用線程A的interrupt()方法,線程A在檢查中斷標(biāo)示時(shí)如果發(fā)現(xiàn)中斷標(biāo)示為true,則會(huì)在這些阻塞方法(sleep、join、wait、1.5中的condition.await 及可中斷的通道上的 I/O 操作方法)調(diào)用處拋出 InterruptedException 異常。并且在拋出異常后立即將線程的中斷標(biāo)示位清除,即重新設(shè)置為 false。拋出異常是為了線程從阻塞狀態(tài)醒過(guò)來(lái),并在結(jié)束線程前讓程序員有足夠的時(shí)間來(lái)處理中斷請(qǐng)求
boolean isInterrupted()方法:檢測(cè)當(dāng)前線程是否被中斷,如果是返回 true,否則返回 false。并不清除中斷標(biāo)志位。
public boolean isInterrupted() {
return isInterrupted(false);
}
- boolean interrupted()方法:檢測(cè)當(dāng)前線程是否被中斷,如果是返回 true,否則返回 false。與 isInterrupted 不同的是,該方法如果發(fā)現(xiàn)當(dāng)前線程被中斷,則會(huì)清除中斷標(biāo)志,并且該方法是 static 靜態(tài)方法,可以通過(guò) Thread 類直接調(diào)用。另外從下面的代碼可以知道,在 interrupted()內(nèi)部是獲取當(dāng)前調(diào)用線程的中斷標(biāo)志而不是調(diào)用 interrupted()方法的實(shí)例對(duì)象的中斷標(biāo)志。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
線程對(duì)中斷的反應(yīng)
interrupt()對(duì)線程的影響與線程的狀態(tài)和在進(jìn)行的IO操作有關(guān)。我們主要考慮線程的狀態(tài),IO操作的影響和具體IO以及操作系統(tǒng)有關(guān),我們就不討論了。線程狀態(tài)有:
? RUNNABLE:線程在運(yùn)行或具備運(yùn)行條件只是在等待操作系統(tǒng)調(diào)度。
? WAITING/TIMED_WAITING:線程在等待某個(gè)條件或超時(shí)。
? BLOCKED:線程在等待鎖,試圖進(jìn)入同步塊。
? NEW/TERMINATED:線程還未啟動(dòng)或已結(jié)束。
RUNNABLE:如果線程在運(yùn)行中,且沒(méi)有執(zhí)行IO操作,interrupt()只是會(huì)設(shè)置線程的中斷標(biāo)志位,沒(méi)有任何其他作用。
WAITING/TIMED_WAITING:線程調(diào)用join/wait/sleep方法會(huì)進(jìn)入 WAITING 或 TIMED_WAITING狀態(tài),在這些狀態(tài)時(shí),對(duì)線程對(duì)象調(diào)用interrupt()會(huì)使得該線程拋出InterruptedException。需要注意的是,拋出異常后,中斷標(biāo)志位會(huì)被清空,而不是被設(shè)置。
捕獲到 InterruptedException,通常表示希望結(jié)束該線程,線程大致有兩種處理方式:
1)向上傳遞該異常,這使得該方法也變成了一個(gè)可中斷的方法,需要調(diào)用者進(jìn)行處理;
2)有些情況,不能向上傳遞異常,比如 Thread 的 run 方法,它的聲明是固定的,不能拋出任何受檢異常,這時(shí),應(yīng)該捕獲異常,進(jìn)行合適的清理操作,清理后,一般應(yīng)該調(diào)用 Thread 的 interrupt 方法設(shè)置中斷標(biāo)志位,使得其他代碼有辦法知道它發(fā)生了中斷。
BLOCKED:如果線程在等待鎖,對(duì)線程對(duì)象調(diào)用interrupt()只是會(huì)設(shè)置線程的中斷標(biāo)志位,線程依然會(huì)處于BLOCKED狀態(tài),也就是說(shuō),interrupt()并不能使一個(gè)在等待鎖的線程真正“中斷”。
NEW/TERMINATED:如果線程尚未啟動(dòng)(NEW),或者已經(jīng)結(jié)束(TERMINATED),則調(diào)用interrupt()對(duì)它沒(méi)有任何效果,中斷標(biāo)志位也不會(huì)被設(shè)置。
參考
- Java 編程的邏輯-微信讀書(shū)
https://weread.qq.com/web/reader/b51320f05e159eb51b29226kc81322c012c81e728d9d180