FeatureTask我之前是真沒見過,也沒用過。不過不是吃飽撐了沒事研究新的類,而是在 AndroidAsyncTask的實現(xiàn)中使用到了FeatureTask,AsyncTask的實現(xiàn)基本上就是在FeatureTask基礎(chǔ)上套了個殼。所以想理解AsyncTask必須先理解FeatureTask。那為什么不和AsyncTask一起講,是因為FeatureTask 是 jdk concurrent 包中為了解決某一類問題(后面會講)而設(shè)計的。另外拆開來講,有助于更好的理解每塊的功能。 好的,廢話少說,系好安全帶,準備發(fā)車了。
文章目錄:
一、前言
二、正文
一、前言
正式開始之前,為了更好的理解FeatureTask,有必要先了解一下這兩個類:LockSupport 和 sun.misc.Unsafe。原因很簡單,因為FeatureTask中使用了它們。
1.1、 LockSupport
public class LockSupport {
// 當前線程放棄線程調(diào)度,直到獲得許可。
// 如果獲得了許可,就會立刻返回。否則線程當前線程放棄線程調(diào)度,進入休眠狀態(tài)。
// 如下幾種情況會被喚醒,從而繼續(xù)執(zhí)行:
// 1.其它線程執(zhí)行unpark喚醒當前線程
// 2.其它線程執(zhí)行 Thread#interrupt 打斷當前線程。
public static void park()
// 使線程獲取許可,從而繼續(xù)執(zhí)行。如果之前線程時blocking,那么它將編程非blocking的。
public static void unpark(Thread thread)
}
功能類似 wait/notify,但是有一些區(qū)別:
1、park/unpark 不要求獲取對象的鎖。
2、park 不會釋放線程持有的鎖。
3、假如park 時線程處于blocking狀態(tài),Thread#interrupt之后不會拋出Exception
小例子:
class TestThread(name: String) : Thread(name) {
override fun run() {
println("$name: running")
LockSupport.park()
if (Thread.interrupted()) {
println("$name: 被中斷了")
}
println("$name: 繼續(xù)執(zhí)行了")
}
}
fun main() {
val t1 = TestThread("T1")
val t2 = TestThread("T2")
t1.start()
t2.start()
TimeUnit.SECONDS.sleep(5)
//會使t1獲得許可,從而繼續(xù)執(zhí)行。
LockSupport.unpark(t1)
// t2被中斷,t2處于blocking狀態(tài)會被喚醒。
t2.interrupt()
t1.join()
t2.join()
}
結(jié)果:
T1: running
T2: running
T1: 繼續(xù)執(zhí)行了
T2: 被中斷了
T2: 繼續(xù)執(zhí)行了
如果先執(zhí)行 unpark,再執(zhí)行park,park執(zhí)行時會認為獲得了許可,立即返回。
1.2、 sun.misc.Unsafe
sun.misc.Unsafe 這個類名字 Unsafe - 不安全,它提供了一些可以直接繞過 jvm安全檢查的一些機制(例如直接分配內(nèi)存、回收內(nèi)存),如果對它的實現(xiàn)不是特別清楚的話,用起來應(yīng)該是危險的,因此僅開放給了JDK使用,當然非要用也不是不能,利用反射可以使用。下面找?guī)讉€典型的方法:
public final class Unsafe {
// CAS
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// memory
public native long allocateMemory(long var1);
public native void freeMemory(long var1);
public native long objectFieldOffset(Field var1);
public native long staticFieldOffset(Field var1);
...
}
提供了CPU硬件指令級別的原子操作:compare - and - swap 的支持,我們熟悉的 Atomic 原子類的操作就是基于此實現(xiàn)的。
還有一些內(nèi)存相關(guān)的操作:分配內(nèi)存、釋放內(nèi)存
我們重點關(guān)注一下:objectFieldOffset :用于獲取非靜態(tài)屬性Field在對象實例中的偏移量,然后利用偏移量可以通過 CAS 更新對象實例中的屬性,即可實現(xiàn)多線程同步機制,比使用 synchronized 加鎖效率要高。使用 CAS 有一種場景:只讓第一次修改對象實例中的屬性生效,在多線程情況下可以使用synchronized來保證,也可以使用 CAS,我們來看一個例子:
class Bean {
// state屬性
private volatile int state;
private static final int NEW = 111;
private static final int FINISHED = 112;
private static final int CANCELED = 113;
public Bean(){
this.state = NEW;
}
public void doSomething(){
U.compareAndSwapInt(this, STATE, NEW, FINISHED);
}
public void doSomething2(){
U.compareAndSwapInt(this, STATE, NEW, CANCELED);
}
public int getState(){
return state;
}
private static sun.misc.Unsafe U;
private static final long STATE; // state 的偏移地址
static {
try {
// 利用反射獲取 Unsafe實例
Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
U = (sun.misc.Unsafe) f.get(null);
// 獲取 state 屬性的偏移量
STATE = U.objectFieldOffset(Bean.class.getDeclaredField("state"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
public static void main(String[] args) {
Bean b = new Bean();
b.doSomething();
b.doSomething2();
System.out.println("b#state: " + b.getState());
}
結(jié)果:
b#state: 112
上述例子中 doSomething 、 doSomething2 只有一個方法修改 state 屬性生效,即便在多線程情況下,也能保證同步。
更多關(guān)于 sun.misc.Unsafe 的用法自行搜索
二、正文
2.1 FeatureTask 是什么,解決什么問題
我們知道線程的設(shè)計是基于 Thread-Runnable 模式的。 即沒有辦法直接從線程處理的異步任務(wù)中返回一個結(jié)果。因為Runnable的設(shè)計是這樣的:
public interface Runnable {
public abstract void run();
}
run方法沒有參數(shù)、沒有返回值。
FeatureTask 就是一個基于現(xiàn)有的 Thread-Runnable 模式,實現(xiàn)了一個可以獲取異步任務(wù)返回值的機制。

FeatureTask 是一個實現(xiàn)了 RunnableFeature 接口的 具體實現(xiàn)類。
RunnableFeature 接口 = Runnable 接口 + Feature接口 「沒有多出一毛錢」
也就是說 FeatureTask 首先是一個 Runnable(無參、無返回值)
再看一下 Feature 的定義就知道了 FeatureTask 是怎么實現(xiàn)的 獲取異步任務(wù)返回值的。
// V 代表了返回值類型
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
// 獲取異步任務(wù)處理結(jié)果返回值,會一直阻塞調(diào)用者。直到異步任務(wù)處理結(jié)束,或者異步出現(xiàn)異常,又或者調(diào)用get()的線程被打斷。
V get() throws InterruptedException, ExecutionException;
// 增加了超時機制
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Feature 就代表了 異步任務(wù)處理 返回結(jié)果的。 定義了任務(wù)是否已經(jīng)完成、取消任務(wù) 和 獲取任務(wù)返回值的行為。 即 Feature就是為了解決異步任務(wù)返回值而定義的
也即是說 FeatureTask即是一個普通的Runnable,可以被線程執(zhí)行。又可以從它身上獲取到異步任務(wù)處理的結(jié)果。
到此是不是有一瞬間的靈感,大概知道怎么使用 FeatureTask了。類似如下這種:
// 注意這是示意代碼,不能真的運行。
val task = FeatureTask() // 定義一個Runnable
val thread = Thread(task)
thread.start() // 開啟一個線程
val result = task.get() // 等待異步任務(wù)處理完成后給出返回結(jié)果,此步驟阻塞。
如上示意代碼的整體邏輯是沒問題的。 接下來就是我該如何把我的異步任務(wù)交給 FeatureTask 去執(zhí)行呢(因為我們知道開啟的線程肯定會去執(zhí)行 FeatureTask#run ),難道在 FeatureTask 構(gòu)造的時候再傳一個 Runnable進去,此Runnable用來封裝我們的異步任務(wù)代碼片段?可是這個Runnable還是沒有返回值呀,FeatureTask 也拿不到異步任務(wù)返回值,又怎么返回給我們呢。這里引入另一個類:Callable
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable 和 Runnable 比起來的區(qū)別是:1、有返回值類型 2、方法可以拋出異常。
沒錯。Callable 就是用來替代 Runnable 讓我們封裝 有返回值的 異步任務(wù)代碼片段的。我們先看一個 FeatureTask使用的代碼示例:
data class ResultData(var flag: String = "")
class MyCall : Callable<ResultData>{
override fun call(): ResultData {
println("this is myCall running, threadName: ${Thread.currentThread().name}")
// do something wasting time
return ResultData("callable")
}
}
fun main() {
val executors = Executors.newCachedThreadPool()
val task = FutureTask<ResultData>(MyCall())
executors.execute(task)
// 獲取異步任務(wù)返回值, maybe blocking
val result = task.get()
println("the call result is : $result")
}
結(jié)果:
this is myCall running, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)
至此,對FeatureTask的使用應(yīng)該有了一個基本的認識。
2.2 FeatureTask 實現(xiàn)原理
原理這塊基本上圍繞著這個問題進行:
1、如何實現(xiàn)獲取異步任務(wù)的結(jié)果。
2、如果異步任務(wù)執(zhí)行過程中出現(xiàn)異常,會怎么處理。
3、如果在異步任務(wù)執(zhí)行的過程中被取消了,會怎么處理。
4、get 超時機制如何實現(xiàn)
如果想要實現(xiàn)類似 FeatueTask的功能,上邊幾個問題是繞不開的。上邊幾個問題弄清楚了,原理也就清楚了。
2.2.1 如何實現(xiàn)獲取異步任務(wù)的結(jié)果
首先FeatureTask 內(nèi)部有五種狀態(tài):
- NEW 初始化狀態(tài)
- COMPLETING 處理完異步任務(wù)等待結(jié)束的狀態(tài)(短暫)
- NORMAL 正常處理完異步任務(wù)后的狀態(tài)
- EXCEPTIONAL 異步任務(wù)異常后的狀態(tài)
- CANCELLED 用戶取消異步任務(wù)執(zhí)行后的狀態(tài)
- INTERRUPTING 異步任務(wù)被打斷中的狀態(tài)(短暫)
- INTERRUPTED 異步任務(wù)被打斷后的狀態(tài)
內(nèi)部狀態(tài)的轉(zhuǎn)化有一下幾種:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
// 代碼僅包括核心的邏輯,完整邏輯看源碼
public class FutureTask<V>{
...
private Callable<V> callable; // 最終運行的異步任務(wù)
private Object outcome; // 執(zhí)行完成后用于返回上層的異步任務(wù)處理結(jié)果
public FutureTask(Callable<V> callable) //構(gòu)造函數(shù)
//非完整邏輯,狀態(tài)判斷等邏輯都刪除了
public void run() {
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// 異步任務(wù)執(zhí)行出現(xiàn)了未捕獲的異常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); // 異步任務(wù)正常執(zhí)行完成
}
} finally {
...
}
}
//報告異步任務(wù)執(zhí)行成功
protected void set(V v) {
// 嘗試修改內(nèi)部狀態(tài)
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v; //存入異步任務(wù)的結(jié)果
U.putOrderedInt(this, STATE, NORMAL); // 最終的狀態(tài)
finishCompletion();
}
}
// 報告異步任務(wù)執(zhí)行出現(xiàn)異常
protected void setException(Throwable t) {
// 嘗試修改內(nèi)部狀態(tài)
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t; // Exception 存入結(jié)果
U.putOrderedInt(this, STATE, EXCEPTIONAL); // 最終狀態(tài)
finishCompletion();
}
}
// 用戶主動取消異步任務(wù)(不一定會成功,因為CAS保證了一旦狀態(tài)從NEW改變了,就再也無法接受其它的情況了)
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果起始狀態(tài)不是NEW,則直接取消失敗,直接返回。
if (!(state == NEW &&
U.compazreAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
private void finishCompletion() {
...
done();
callable = null; // to reduce footprint
}
// 異步任務(wù)結(jié)束后的回調(diào),無論是正常執(zhí)行結(jié)束、還是異步任務(wù)出現(xiàn)了Exception、還是用戶cancel掉了異步任務(wù),最終都會執(zhí)行done。這幾種情況是互斥的,通過CAS保證了,只能有一種實現(xiàn)。
protected void done() { }
...
}
上邊的代碼已經(jīng)削減了很多,只留下了處理這個話題最核心的代碼。
Callable異步任務(wù)的執(zhí)行,最終依賴的是Runnable#run方法。即在 FeatureTask#run中調(diào)用Callable#call的調(diào)用,在子線程中完成了一次異步任務(wù)的執(zhí)行。
異步任務(wù)的執(zhí)行有幾種情況:
1、正常處理完成 通過set方法報告結(jié)果
2、有未捕獲的異常 通過setException方法報告結(jié)果
3、用戶取消異步任務(wù) 通過cancel方法報告結(jié)果
上述幾種情況的處理是基于CAS來實現(xiàn)原子操作,也即是說只有一種情況會最終執(zhí)行。無論走哪一種情況,最終都會報告結(jié)果:finishCompletion,也就是說最終都會執(zhí)行 done 方法, done 這相當于一個回調(diào),即最后一步通知用戶的回調(diào)。所以基于此,我們可以在done方法里去取異步任務(wù)處理的結(jié)果,好處是因為異步任務(wù)已經(jīng)結(jié)束,最終結(jié)果的獲取就不會blocking了。
改進一下上邊的 FeatureTask的使用:
val executors = Executors.newCachedThreadPool()
val task = object : FutureTask<ResultData>(MyCall()){
override fun done() {
println("this is call done, threadName: ${Thread.currentThread().name}")
val result = get()
println("the call result is : $result")
}
}
executors.execute(task)
執(zhí)行結(jié)果:
this is myCall running, threadName: pool-1-thread-1
this is call done, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)
好了,下一步看一下,FeatureTask是以何種方式把異步任務(wù)處理結(jié)果報告給使用者呢,答案就在 get() 方法中:
public V get() throws InterruptedException, ExecutionException {
...
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
我們看到了調(diào)用 ge() 方法的時候,如果異步任務(wù)處理正常(NORMAL),則直接返回異步任務(wù)的結(jié)果;如果用戶取消了異步任務(wù)(CANCELLED、INTERRUPTED),則直接拋出異常 CancellationException ; 如果異步任務(wù)有未捕獲的異常(EXCEPTIONAL),則會拋出 ExecutionException 異常。最終把主動權(quán)交給上層用戶。
一個常見的處理模型:
try {
get()
// do something
} catch (InterruptedException e) {
// do something
} catch (ExecutionException e) {
// do something
} catch (CancellationException e) {
// do something
}
至此整個流程就串起來了,細節(jié)的東西需要大家實際去看代碼。
最后 get() 還可能會拋出 InterruptedException,這是處理哪一種情況呢?因為我們知道 get() 會阻塞調(diào)用者所在的線程,假如被其它線程執(zhí)行 Thread#interrupt 打斷了所阻塞的線程,get()方法就會拋出 InterruptedException,然后具體被打斷后的處理邏輯交給用戶去處理。
2.2.2 get(long timeout, TimeUnit unit) 超時機制如何實現(xiàn)
get(long timeout, TimeUnit unit)提供了超時機制,在指定時間內(nèi)還沒有返回的話,就會拋出異常 TimeoutException,然后把控制權(quán)交給使用者?!?和get()相比,會多拋出一個 TimeoutException 』
每一個FeatureTask 對象都維護了一個因調(diào)用此 FeatureTask 的 get 而進入等待狀態(tài)的線程的單鏈表。元素的節(jié)點是:
static final class WaitNode {
volatile Thread thread; // 當前節(jié)點的線程
volatile WaitNode next; // 下一個節(jié)點
WaitNode() { thread = Thread.currentThread(); }
}
如果get指定了超時時間,會使用 LockSupport#parkNanos將當前線程進入阻塞狀態(tài),超過時間后回去檢查異步任務(wù)運行狀態(tài),如果異步任務(wù)還沒有完成,則會拋出異常 TimeoutException 。
如果get 沒指定超時時間,會使用LockSupport.park 無限期阻塞當前線程,直到任務(wù)處理完成調(diào)用 LockSupport.unpark 恢復運行。
我們知道 finishCompletion 無論如何都會調(diào)用,不管異步任務(wù)正常處理、還是有未捕獲的異常,或者被用戶取消了都會調(diào)用。之前講解流程的時候,忽略了這個方法中關(guān)于 等待節(jié)點的處理。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
...
}
會遍歷自己維護的 WaitNode 單鏈表,依次調(diào)用對應(yīng)節(jié)點的 LockSupport.unpark(t); 讓之前阻塞的線程都恢復運行。這么以來,所有阻塞的線程在這里都能得到異步任務(wù)的處理的結(jié)果。
對于超時機制的處理,這里講的只是基本的處理。更細節(jié)的東西還是要參考代碼去學習。
很有意思的一個事情是,和超時相關(guān)的話題這已經(jīng)是第二次了。 第一次是講
okio中的超時機制(這是一個典型的 生產(chǎn)者/消費者 模型的實踐)。 第二次是講 獲取FeatureTask異步任務(wù)處理結(jié)果的超時機制。 其實無非就是利用wait/notify或者park/unpark去處理多線程之間的關(guān)系。這兩部分的超時機制設(shè)計的都很精彩,有時間建議大家去看看。