**移步[java多線程系列文章]
Java多線程(二十二)---LockSupport工具
Java 停止線程
一、概述
- Callable和Runnbale一樣代表著任務(wù),區(qū)別在于Callable有返回值并且可以拋出異常。
- 其使用如下:
public class CallableDemo {
static class SumTask implements Callable<Long> {
@Override
public Long call() throws Exception {
long sum = 0;
for (int i = 0; i < 9000; i++) {
sum += i;
}
return sum;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("Start:" + System.nanoTime());
FutureTask<Long> futureTask = new FutureTask<Long>(new SumTask());
Executor executor=Executors.newSingleThreadExecutor();
executor.execute(futureTask);
System.out.println(futureTask.get());
System.out.println("End:" + System.nanoTime());
}
}
從上面的代碼可以看到,使用到了一個(gè)FutureTask的變量并且還可以得到Callable執(zhí)行的結(jié)果,那么這個(gè)FutureTask是什么呢?
二、 分析
2.0 Callable接口
public interface Callable<V> {
// 子類復(fù)寫這個(gè)方法,一般都是耗時(shí)操作,并返回結(jié)果值
V call() throws Exception;
}
相當(dāng)于Runnable的run方法,一般都是耗時(shí)操作,但是不一樣的是,這個(gè)方法會(huì)返回結(jié)果值。
2.1 Future接口
- Future是一個(gè)接口,代表了一個(gè)異步計(jì)算的結(jié)果。
- 接口中的方法用來檢查計(jì)算是否完成、等待完成和得到計(jì)算的結(jié)果。
- 當(dāng)計(jì)算完成后,只能通過get()方法得到結(jié)果,get方法會(huì)阻塞直到結(jié)果準(zhǔn)備好了。
- 如果想取消,那么調(diào)用cancel()方法。
- 其他方法用于確定任務(wù)是正常完成還是取消了。
- 一旦計(jì)算完成了,那么這個(gè)計(jì)算就不能被取消。
public interface Future<V> {
/**
* 取消當(dāng)前的Future。會(huì)喚醒所有等待結(jié)果值的線程,拋出CancellationException異常
* @param mayInterruptIfRunning 是否中斷 計(jì)算結(jié)果值的那個(gè)線程
* @return 返回true表示取消成功
*/
boolean cancel(boolean mayInterruptIfRunning);
// 當(dāng)前的Future是否被取消,返回true表示已取消。
boolean isCancelled();
// 當(dāng)前Future是否已結(jié)束。包括運(yùn)行完成、拋出異常以及取消,都表示當(dāng)前Future已結(jié)束
boolean isDone();
// 獲取Future的結(jié)果值。如果當(dāng)前Future還沒有結(jié)束,那么當(dāng)前線程就等待,
// 直到Future運(yùn)行結(jié)束,那么會(huì)喚醒等待結(jié)果值的線程的。
V get() throws InterruptedException, ExecutionException;
// 獲取Future的結(jié)果值。與get()相比較多了允許設(shè)置超時(shí)時(shí)間。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
2.1 RunnableFuture接口
// 繼承自Runnable和Future接口。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
RunnableFuture接口的作用就是它的子類可以當(dāng)做Runnable接口使用,那么創(chuàng)建一個(gè)新線程的時(shí)候,就可以使用它的實(shí)例作為參數(shù)了。
2.2 FutureTask類
- FutureTask類實(shí)現(xiàn)了RunnableFuture接口,而RunnnableFuture接口繼承了Runnable和Future接口,所以說FutureTask是一個(gè)提供異步計(jì)算的結(jié)果的任務(wù)。
- FutureTask可以用來包裝Callable或者Runnbale對(duì)象。因?yàn)镕utureTask實(shí)現(xiàn)了Runnable接口,所以FutureTask也可以被提交給Executor(如上面例子那樣)。
2.3 FutureTask的狀態(tài)
FutureTask中有一個(gè)表示任務(wù)狀態(tài)的int值,初始為NEW。定義如下:
// 表示FutureTask當(dāng)前的狀態(tài)
private volatile int state;
// NEW 新建狀態(tài),表示這個(gè)FutureTask還沒有開始運(yùn)行
private static final int NEW = 0;
// COMPLETING 完成狀態(tài), 表示FutureTask任務(wù)已經(jīng)計(jì)算完畢了,
// 但是還有一些后續(xù)操作,例如喚醒等待線程操作,還沒有完成。
private static final int COMPLETING = 1;
// FutureTask任務(wù)完結(jié),正常完成,沒有發(fā)生異常
private static final int NORMAL = 2;
// FutureTask任務(wù)完結(jié),因?yàn)榘l(fā)生異常。
private static final int EXCEPTIONAL = 3;
// FutureTask任務(wù)完結(jié),因?yàn)槿∠蝿?wù)
private static final int CANCELLED = 4;
// FutureTask任務(wù)完結(jié),也是取消任務(wù),不過發(fā)起了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求。
private static final int INTERRUPTING = 5;
// FutureTask任務(wù)完結(jié),也是取消任務(wù),已經(jīng)完成了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求。
private static final int INTERRUPTED = 6;
可能的狀態(tài)轉(zhuǎn)換包括:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
2.4 構(gòu)造方法
FutureTask一共有兩個(gè)構(gòu)造方法,如下:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
第一個(gè)構(gòu)造方法好理解;第二個(gè)方法是將Runnbale和結(jié)果組合成一個(gè)Callable,這個(gè)可以通過Excutors.callable()方法得出
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
- 從上面可以看到RunnableAdapter實(shí)現(xiàn)了Callable并且在call方法中調(diào)用了Runnable的run方法,然后將結(jié)果返回,這其實(shí)就是一個(gè)適配器模式
- 所以說兩個(gè)構(gòu)造方法最終都是得到了一個(gè)Callable以及設(shè)置了初始狀態(tài)為NEW。
2.5 run方法
- 當(dāng)將FutureTask提交給Executor后,Executor執(zhí)行FutureTask時(shí)會(huì)執(zhí)行其run方法
// 開始運(yùn)行FutureTask任務(wù)
public void run() {
// 如果狀態(tài)state不是NEW,或者設(shè)置runner值失敗
// 表示有別的線程在此之前調(diào)用run方法,并成功設(shè)置了runner值
// 保證了只有一個(gè)線程可以運(yùn)行try 代碼塊中的代碼。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//嘗試調(diào)用Callable.call
try {
// 使用一個(gè)變量c記錄callable,防止多線程情況下,
// callable直接被設(shè)置為null出現(xiàn)問題
Callable<V> c = callable;
// 只有c不為null且狀態(tài)state為NEW的情況,
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調(diào)用callable的call方法,并返回結(jié)果
result = c.call();
// 運(yùn)行成功
ran = true;
} catch (Throwable ex) {
//出現(xiàn)異常了,調(diào)用setException方法
result = null;
ran = false;
// 設(shè)置異常
setException(ex);
}
// 如果運(yùn)行成功,則設(shè)置結(jié)果
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
// 當(dāng)狀態(tài)大于或等于INTERRUPTING,調(diào)用handlePossibleCancellationInterrupt方法,
// 等待別的線程將狀態(tài)設(shè)置成INTERRUPTED
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 從上面可以看到,任務(wù)可以被執(zhí)行的前提是當(dāng)前狀態(tài)為NEW以及CAS當(dāng)前執(zhí)行線程成功,也就是runner值,代表執(zhí)行Callable的線程
- 從這個(gè)看到run方法就是調(diào)用Callable的call方法,然后如果出現(xiàn)異常了就調(diào)用setException方法,如果成功執(zhí)行了,那么調(diào)用set方法
2.6 set方法
- 當(dāng)Callable成功執(zhí)行后,會(huì)調(diào)用set方法將結(jié)果傳出。
protected void set(V v) {
//完成NEW->COMPLETING->NORMAL狀態(tài)轉(zhuǎn)換
// 調(diào)用CAS函數(shù),將狀態(tài)state從NEW改成COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 延遲設(shè)置,將狀態(tài)改成NORMAL,
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 調(diào)用finishCompletion喚醒所有等待結(jié)果的線程
finishCompletion();
}
}
- 先使用CAS函數(shù)將狀態(tài)state從NEW變成COMPLETING,防止多線程沖突。
- 使用putOrderedInt方法設(shè)置狀態(tài)state是NORMAL或EXCEPTIONAL。
- 調(diào)用finishCompletion方法喚醒所有等待結(jié)果的線程。
2.7 setException方法
當(dāng)想得到FutureTask的結(jié)算結(jié)果時(shí),調(diào)用get方法,get方法可以允許多個(gè)線程調(diào)用,下面的例子展示了多個(gè)線程調(diào)用get的情況。
//完成NEW->COMPLETING->EXCEPTIONAL狀態(tài)轉(zhuǎn)換
protected void setException(Throwable t) {
// 調(diào)用CAS函數(shù),將狀態(tài)state從NEW改成COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 延遲設(shè)置,將狀態(tài)改成EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 調(diào)用finishCompletion喚醒所有等待結(jié)果的線程
finishCompletion();
}
}
- 先使用CAS函數(shù)將狀態(tài)state從NEW變成COMPLETING,防止多線程沖突。
- 使用putOrderedInt方法設(shè)置狀態(tài)state是NORMAL或EXCEPTIONAL。
- 調(diào)用finishCompletion方法喚醒所有等待結(jié)果的線程。
- 注: putOrderedInt方法的意義。因?yàn)閟tate變量是被volatile關(guān)鍵字修飾,那么它會(huì)給state變量加一個(gè)內(nèi)存屏障,來保證state變量的可見性和有序性,這樣會(huì)消耗一些性能。
- 而putOrderedInt方法的意義,就是通過它來設(shè)置volatile修飾的變量,會(huì)取消這個(gè)內(nèi)存屏障,也就是像普通變量一樣了,不保證可見性了。
2.8 get方法
當(dāng)想得到FutureTask的結(jié)算結(jié)果時(shí),調(diào)用get方法,get方法可以允許多個(gè)線程調(diào)用
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("Start:" + System.nanoTime());
FutureTask<Long> futureTask = new FutureTask<Long>(new SumTask());
Executor executor=Executors.newSingleThreadExecutor();
executor.execute(futureTask);
for(int i=0;i<5;i++){
executor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("get result "+futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
});
}
System.out.println(futureTask.get());
System.out.println("End:" + System.nanoTime());
}
該例子展示了一共有5個(gè)線程想得到FutureTask的結(jié)果,一旦調(diào)用get,那么該線程就會(huì)阻塞。
FutureTask的get方法實(shí)現(xiàn)如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
/**
* 狀態(tài)小于等于COMPLETING,表示FutureTask任務(wù)還沒有完結(jié),
* 所以調(diào)用awaitDone方法,讓當(dāng)前線程等待
*/
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回結(jié)果值或者拋出異常
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
/**
* 狀態(tài)小于等于COMPLETING,表示FutureTask任務(wù)還沒有完結(jié),
* 所以調(diào)用awaitDone方法,讓當(dāng)前線程等待。
* 與get()不同的是,如果到了規(guī)定時(shí)間,任務(wù)狀態(tài)仍然是小于等于COMPLETING,
* 那么就拋出TimeoutException超時(shí)異常
*/
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 返回結(jié)果值或者拋出異常
return report(s);
}
- 從上面的代碼可以看到,如果當(dāng)前任務(wù)的狀態(tài)不大于COMPLETING,那么會(huì)調(diào)用awaitDone方法,這個(gè)方法會(huì)將調(diào)用的線程掛起;否則直接調(diào)用report方法返回結(jié)果。
- 在前面set和setException方法中可以得出結(jié)論:當(dāng)狀態(tài)從NEW變?yōu)镃OMPLETING后,才會(huì)將outcome賦值,也就是狀態(tài)是NEW或者COMPLETING時(shí),outcome都還未賦值,也就意味著計(jì)算仍在進(jìn)行,那么此時(shí)想要get到結(jié)果,就必須等待。
2.9 awaitDone 將當(dāng)前線程插入到等待隊(duì)列中
- awaitDone的兩個(gè)參數(shù)分別表示是否定時(shí),以及定時(shí)的時(shí)間多少。
- get的另一個(gè)重載方法就提供了超時(shí)限制。awaitDone方法如下:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 計(jì)算截止日期
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
// 節(jié)點(diǎn)是否已添加
boolean queued = false;
for (;;) {
// 如果當(dāng)前線程中斷標(biāo)志位是true,
// 那么從列表中移除節(jié)點(diǎn)q,并拋出InterruptedException異常
if (Thread.interrupted()) {
// 調(diào)用removeWaiter方法從鏈表中移除節(jié)點(diǎn)q
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果狀態(tài)大于COMPLETING,說明已經(jīng)計(jì)算已經(jīng)完成了
if (s > COMPLETING) {
if (q != null)
// 將節(jié)點(diǎn)q線程設(shè)置為null,因?yàn)榫€程沒有阻塞等待
q.thread = null;
return s;
}
//狀態(tài)是COMPLETING,在set和setException方法中可以看到處于該狀態(tài)馬上就會(huì)進(jìn)入下一個(gè)狀態(tài)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//新建一個(gè)等待節(jié)點(diǎn)
// 代碼來到這里,表示狀態(tài)是NEW,那么就需要將當(dāng)前線程阻塞等待。
// 就是將它插入等待線程鏈表中,
else if (q == null)
q = new WaitNode();
//還沒有入隊(duì),嘗試入隊(duì)
else if (!queued)
// 使用CAS函數(shù)將新節(jié)點(diǎn)添加到鏈表中,如果添加失敗,那么queued為false,
// 下次循環(huán)時(shí),會(huì)繼續(xù)添加,知道成功。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// timed為true表示需要設(shè)置超時(shí)
else if (timed) {
// 得到剩余時(shí)間
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//掛起指定時(shí)間
LockSupport.parkNanos(this, nanos);
}
//無限掛起
else
LockSupport.park(this);
}
}
上面的代碼中有一個(gè)WaitNode類,該類表示等待節(jié)點(diǎn),保存等待的線程以及下一個(gè)節(jié)點(diǎn),是一個(gè)單鏈表結(jié)構(gòu),其定義如下:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
awaitDone方法中進(jìn)入死循環(huán)后,主要有幾步:
- 如果線程被中斷了,移除節(jié)點(diǎn),拋出異常
- 如果狀態(tài)大于COMPLETING,那么直接返回
- 如果狀態(tài)是COMPLETING,在set和setException可以看到,處于COMPLETING是一個(gè)暫時(shí)狀態(tài),很快就會(huì)進(jìn)入下一個(gè)狀態(tài),所以這兒就調(diào)用了Thread.yield()方法讓步一下
- 如果狀態(tài)是NEW且節(jié)點(diǎn)為null,那么創(chuàng)建一個(gè)節(jié)點(diǎn)
- 如果還沒有將當(dāng)前線程加入隊(duì)列,那么將當(dāng)前線程加入到等待隊(duì)列中。由于WaitNode是一個(gè)單鏈表結(jié)構(gòu),F(xiàn)utureTask中保存了waiters的變量,就可以沿著該變量得到所有等待的線程
- 如果限制了時(shí)間,那么計(jì)算出生出超出時(shí)間,掛起指定時(shí)間。當(dāng)解除掛起時(shí),如果計(jì)算還未完成,那么將會(huì)由于沒有時(shí)間了,調(diào)用removeWaiter方法移除節(jié)點(diǎn)。
- 如果沒有限制時(shí)間,那么將線程無限掛起
上面幾種情況下,都涉及了移除節(jié)點(diǎn),removeWaiter方法就是刪除單鏈表中一個(gè)節(jié)點(diǎn)的實(shí)現(xiàn)。
2.10 report
當(dāng)線程被解除掛起,或計(jì)算已經(jīng)完成后,將會(huì)get方法中將會(huì)調(diào)用report返回結(jié)果,其實(shí)現(xiàn)如下:
private V report(int s) throws ExecutionException {
Object x = outcome;
// 表示正常完結(jié)狀態(tài),所以返回結(jié)果值
if (s == NORMAL)
return (V)x;
// 大于或等于CANCELLED,都表示手動(dòng)取消FutureTask任務(wù),
// 所以拋出CancellationException異常
if (s >= CANCELLED)
throw new CancellationException();
// 否則就是運(yùn)行過程中,發(fā)生了異常,這里就拋出這個(gè)異常
throw new ExecutionException((Throwable)x);
}
從上面可以看到report會(huì)根據(jù)任務(wù)的狀態(tài)不同返回不同的結(jié)果。
- 如果計(jì)算正常結(jié)束,即狀態(tài)是NORMAL,那么返回正確的計(jì)算結(jié)果
- 如果計(jì)算被取消了,即狀態(tài)大于等于CANCELLED,那么拋出CancellationException
- 如果計(jì)算以異常結(jié)束,即狀態(tài)是EXCEPTIONAL,那么拋出ExecutionException
2.11 finishCompletion方法
- 在set方法和setException方法中,當(dāng)將結(jié)果賦值后,都調(diào)用了finishCompletion方法來移除和通知等待線程。
- 由于get方法中可以掛起了一群等待節(jié)點(diǎn),那么當(dāng)結(jié)果被計(jì)算出來了,自然應(yīng)該通知那些等待線程。
finishCompletion的實(shí)現(xiàn)如下:
private void finishCompletion() {
//如果有等待線程,從頭開始解除掛起
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 循環(huán)等待結(jié)果的線程鏈表
for (;;) {
//得到等待節(jié)點(diǎn)的線程,解除掛起
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 鉤子方法。子類可以復(fù)寫這個(gè)方法。
done();
callable = null; // to reduce footprint
}
- finishCompletion的實(shí)現(xiàn)比較簡單,就是遍歷等待線程的單鏈表,釋放那些等待線程。
- 當(dāng)線程被釋放后,那么在awaitDone的死循環(huán)中就會(huì)進(jìn)入下一個(gè)循環(huán),由于狀態(tài)已經(jīng)變成了NORMAL或者EXCEPTIONAL,將會(huì)直接跳出循環(huán)。
- 釋放了所有線程后,將會(huì)調(diào)用done()方法,F(xiàn)utureTask的done()方法默認(rèn)沒有任何實(shí)現(xiàn),子類可以在該方法中調(diào)用完成回調(diào)以及記錄操作等等。
2.12 cancel方法
- cancel方法用于取消Callable的計(jì)算。
- 參數(shù)mayInterruptIfRunning指明是否應(yīng)該中斷正在運(yùn)行的任務(wù),返回值表示取消是否成功了。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
//如果需要中斷
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
//最終狀態(tài)INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//釋放等待線程
finishCompletion();
}
return true;
}
- 從上面可以看到如果是需要中斷正在執(zhí)行的任務(wù),那么狀態(tài)轉(zhuǎn)換將會(huì)是NEW->INTERRPUTING->INTERRUPTED;
- 如果不需要中斷正在執(zhí)行的任務(wù),那么狀態(tài)轉(zhuǎn)換將會(huì)是NEW->CANCELD。
- 不管是否中斷,最終都會(huì)調(diào)用finishCompletion()完成對(duì)等待線程的釋放。
- 當(dāng)這些線程釋放后,再進(jìn)入到awaitDone中的循環(huán)時(shí),返回的狀態(tài)將會(huì)是大于等于CANCELD,在report方法中將會(huì)得到CancellationException異常。
2.13 isDone方法
- Future接口中isDone方法表明任務(wù)是否已經(jīng)完成了,如果完成了,那么返回true,否則false。
下面是FutureTask的實(shí)現(xiàn):
public boolean isDone() {
return state != NEW;
}
可以看到只要狀態(tài)從初始狀態(tài)NEW完成了一次轉(zhuǎn)換,那么就說明任務(wù)已經(jīng)被完成了。
總結(jié)
- Callable是一種可以返回結(jié)果的任務(wù),這是它與Runnable的區(qū)別,但是通過適配器模式可以使Runnable與Callable類似。
- Future代表了一個(gè)異步的計(jì)算,可以從中得到計(jì)算結(jié)果、查看計(jì)算狀態(tài),其實(shí)現(xiàn)FutureTask可以被提交給Executor執(zhí)行,多個(gè)線程可以從中得到計(jì)算結(jié)果。
- Callable和Future是配合使用的,當(dāng)從Future中g(shù)et結(jié)果時(shí),如果結(jié)果還沒被計(jì)算出來,那么線程將會(huì)被掛起,F(xiàn)utureTak內(nèi)部使用一個(gè)單鏈表維持等待的線程;
- 當(dāng)計(jì)算結(jié)果出來后,將會(huì)對(duì)等待線程解除掛起,等待線程就都可以得到計(jì)算結(jié)果了。