/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
譯文:可運行的Future。 成功執(zhí)行run方法會導(dǎo)致Future的完成,并允許訪問其結(jié)果。
從源碼可知,RunnableFuture繼承了Future和Runnable,所以其就具有了可以運行在線程中,能夠取消并且能夠異步獲取到運行結(jié)果。接下來學(xué)習(xí)一下它的一個具體實現(xiàn)類FutureTask
概論
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
*
* <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
* {@link Runnable} object. Because {@code FutureTask} implements
* {@code Runnable}, a {@code FutureTask} can be submitted to an
* {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
* {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V>
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
譯文:可取消的異步計算。 此類提供Future的基本實現(xiàn),其中包含啟動和取消計算,查詢計算是否完成以及返回計算結(jié)果的方法。 只有在計算完成后才能返回結(jié)果; 如果計算尚未完成,則get方法將阻塞。 一旦計算完成,除非使用runAndReset調(diào)用計算,否則無法重新啟動或取消計算。
FutureTask主要成員
public class FutureTask<V> implements RunnableFuture<V> {
/*
* FutureTask中定義了一個state變量,用于記錄任務(wù)執(zhí)行的相關(guān)狀態(tài) ,狀態(tài)的變化過程如下
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
//主流程狀態(tài)
private static final int NEW = 0; //當(dāng)FutureTask實例剛剛創(chuàng)建到callbale的call方法執(zhí)行完成前,處于此狀態(tài)
private static final int COMPLETING = 1; //callable的call方法執(zhí)行完成或出現(xiàn)異常時,首先進行此狀態(tài)
private static final int NORMAL = 2;//callable的call方法正常結(jié)束時,進入此狀態(tài),將outcom設(shè)置為正常結(jié)果
private static final int EXCEPTIONAL = 3;//callable的call方法異常結(jié)束時,進入此狀態(tài),將outcome設(shè)置為拋出的異常
//取消任務(wù)執(zhí)行時可能處于的狀態(tài)
private static final int CANCELLED= 4;// FutureTask任務(wù)尚未執(zhí)行,即還在任務(wù)隊列的時候,調(diào)用了cancel方法,進入此狀態(tài)
private static final int INTERRUPTING = 5;// FutureTask的run方法已經(jīng)在執(zhí)行,收到中斷信號,進入此狀態(tài)
private static final int INTERRUPTED = 6;// 任務(wù)成功中斷后,進入此狀態(tài)
private Callable<V> callable;//需要執(zhí)行的任務(wù),提示:如果提交的是Runnable對象,會先轉(zhuǎn)換為Callable對象,這是構(gòu)造方法參數(shù)
private Object outcome; //任務(wù)運行的結(jié)果
private volatile Thread runner;//執(zhí)行此任務(wù)的線程
//等待該FutureTask的線程鏈表,對于同一個FutureTask,如果多個線程調(diào)用了get方法,對應(yīng)的線程都會加入到waiters鏈表中,同時當(dāng)FutureTask執(zhí)行完成后,也會告知所有waiters中的線程
private volatile WaitNode waiters;
......
}
FutureTask的成員變量并不復(fù)雜,主要記錄以下幾部分信息:
1、狀態(tài)
2、任務(wù)(callable)
3、結(jié)果(outcome)
4、等待線程(waiters)
構(gòu)造方法
Future 是一個接口,而 FutureTask 是一個實實在在的工具類,這個工具類有兩個構(gòu)造函數(shù),它們的參數(shù)和前面介紹的 submit() 方法類似。
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
FutureTask的方法
run()方法
FutureTask執(zhí)行任務(wù)的方法當(dāng)然還是run方法:
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執(zhí)行任務(wù)
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//設(shè)置結(jié)果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
//判斷該任務(wù)是否正在響應(yīng)中斷,如果中斷沒有完成,則等待中斷操作完成
handlePossibleCancellationInterrupt(s);
}
}
run方法的大概邏輯如下:
1、如果狀態(tài)不為new或者運行線程runner失敗,說明當(dāng)前任務(wù)已經(jīng)被其他線程啟動或者已經(jīng)被執(zhí)行過,直接返回false
2、調(diào)用call方法執(zhí)行核心任務(wù)邏輯。如果調(diào)用成功則執(zhí)行set(result)方法,將state狀態(tài)設(shè)置成NORMAL。如果調(diào)用失敗拋出異常則執(zhí)行setException(ex)方法,將state狀態(tài)設(shè)置成EXCEPTIONAL,喚醒所有在get()方法上等待的線程
3、如果當(dāng)前狀態(tài)為INTERRUPTING(步驟2已CAS失敗),則一直調(diào)用Thread.yield()直至狀態(tài)不為INTERRUPTING
set()方法
除非已經(jīng)設(shè)置或取消了該Future,否則將此Future的結(jié)果設(shè)置為給定值
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
finishCompletion()
刪除并發(fā)送所有等待線程的信號,調(diào)用done(),并使callable無效。
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
////通過CAS把棧頂?shù)脑刂脼閚ull,相當(dāng)于彈出棧頂元素
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//喚醒等待的線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion的邏輯也比較簡單:
1、遍歷waiters鏈表,取出每一個節(jié)點:每個節(jié)點都代表一個正在等待該FutureTask結(jié)果(即調(diào)用過get方法)的線程
2、通過 LockSupport.unpark(t)喚醒每一個節(jié)點,通知每個線程,該任務(wù)執(zhí)行完成
get()方法
get方法很簡單,主要就是調(diào)用awaitDone方法:
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
awaitDone()
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion or at timeout
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) { //如果state狀態(tài)大于COMPLETING 則說明任務(wù)執(zhí)行完成,或取消
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)//如果state=COMPLETING,則使用yield,因為此狀態(tài)的時間特別短,通過yield比掛起響應(yīng)更快。
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) { //如果該線程執(zhí)行interrupt()方法,則從隊列中移除該節(jié)點,并拋出異常
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) { //構(gòu)建節(jié)點
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)//把當(dāng)前節(jié)點入棧
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
//如果需要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間
//如果到指定時間還沒執(zhí)行完,則從隊列中移除該節(jié)點,并返回當(dāng)前狀態(tài)
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);//阻塞當(dāng)前線程nanos秒
}
else
LockSupport.park(this);//阻塞當(dāng)前線程
}
}
整個方法的大致邏輯主要分為以下幾步:
1>如果當(dāng)前狀態(tài)值大于COMPLETING,說明已經(jīng)執(zhí)行完成或者取消,直接返回
2>如果state=COMPLETING,則使用yield,因為此狀態(tài)的時間特別短,通過yield比掛起響應(yīng)更快
3>如果當(dāng)前線程是首次進入循環(huán),為當(dāng)前線程創(chuàng)建wait節(jié)點加入到waiters鏈表中
4>根據(jù)是否定時將當(dāng)前線程掛起(LockSupport.parkNanos LockSupport.park)來阻塞當(dāng)前線程,直到超時或者線程被finishCompletion方法喚醒
5>當(dāng)線程掛起超時或者被喚醒后,重新循環(huán)執(zhí)行上述邏輯
cancel()
public boolean cancel(boolean mayInterruptIfRunning) {
//根據(jù)mayInterruptIfRunning是否為true,CAS設(shè)置狀態(tài)為INTERRUPTING或CANCELLED,設(shè)置成功,繼續(xù)第二步,否則返回false
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//如果mayInterruptIfRunning為true,調(diào)用runner.interupt(),設(shè)置狀態(tài)為INTERRUPTED
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
//喚醒所有在get()方法等待的線程
finishCompletion();
}
return true;
}
public class FutureTaskDemo {
public static void main(String[] args) {
FutureTask<String> futureTask=new FutureTask<>(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Hello world");
Thread thread = new Thread(futureTask,"Thread Future");
thread.start();
try {
String result=futureTask.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
運行時線程堆棧信息
D:\android\progect\IdeaDemo>jps
10624 Launcher
1296 FutureTaskDemo
5360 Jps
12872
D:\android\progect\IdeaDemo>jstack 1296
2021-05-13 00:03:51
Full thread dump Java HotSpot(TM) Client VM (25.202-b08 mixed mode, sharing):
"Thread Future" #9 prio=5 os_prio=0 tid=0x15a1d800 nid=0x2040 waiting on condition [0x15cef000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.idea.future.FutureTaskDemo$1.run(FutureTaskDemo.java:13)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
"main" #1 prio=5 os_prio=0 tid=0x02a0dc00 nid=0x18a8 waiting on condition [0x00d4f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x04cfd7e8> (a java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.idea.future.FutureTaskDemo.main(FutureTaskDemo.java:22)
由于我在run方法里面設(shè)置了相當(dāng)于耗時的操作,從堆棧信息可以得知,"Thread Future"的狀態(tài)是TIMED_WAITING,而主線程"main"則處于WAITING的狀態(tài)
實現(xiàn)了 Runnable 接口,所以可以將 FutureTask 對象作為任務(wù)提交給 ThreadPoolExecutor 去執(zhí)行,也可以直接被 Thread 執(zhí)行;
又因為實現(xiàn)了 Future 接口,所以也能用來獲得任務(wù)的執(zhí)行結(jié)果。
提交給 ThreadPoolExecutor 去執(zhí)行
下面的示例代碼是將 FutureTask 對象提交給 ThreadPoolExecutor 去執(zhí)行。
// 創(chuàng)建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 創(chuàng)建線程池
ExecutorService es =
Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 獲取計算結(jié)果
Integer result = futureTask.get();
直接被 Thread 執(zhí)行
FutureTask 對象直接被 Thread 執(zhí)行的示例代碼如下所示。相信你已經(jīng)發(fā)現(xiàn)了,利用 FutureTask 對象可以很容易獲取子線程的執(zhí)行結(jié)果。
// 創(chuàng)建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 創(chuàng)建并啟動線程
Thread T1 = new Thread(futureTask);
T1.start();
// 獲取計算結(jié)果
Integer result = futureTask.get();
總結(jié)
利用 Java 并發(fā)包提供的 Future 可以很容易獲得異步任務(wù)的執(zhí)行結(jié)果,無論異步任務(wù)是通過線程池 ThreadPoolExecutor 執(zhí)行的,還是通過手工創(chuàng)建子線程來執(zhí)行的。
利用多線程可以快速將一些串行的任務(wù)并行化,從而提高性能;如果任務(wù)之間有依賴關(guān)系,比如當(dāng)前任務(wù)依賴前一個任務(wù)的執(zhí)行結(jié)果,這種問題基本上都可以用 Future 來解決。