一篇講明白FeatureTask

FeatureTask 我之前是真沒見過,也沒用過。不過不是吃飽撐了沒事研究新的類,而是在 Android AsyncTask 的實現(xiàn)中使用到了 FeatureTask, AsyncTask 的實現(xiàn)基本上就是在 FeatureTask基礎(chǔ)上套了個殼。所以想理解AsyncTask 必須先理解FeatureTask。那為什么不和AsyncTask一起講,是因為FeatureTask 是 jdk concurrent 包中為了解決某一類問題(后面會講)而設(shè)計的。另外拆開來講,有助于更好的理解每塊的功能。 好的,廢話少說,系好安全帶,準備發(fā)車了。


文章目錄:

一、前言
二、正文

一、前言

正式開始之前,為了更好的理解FeatureTask,有必要先了解一下這兩個類:LockSupportsun.misc.Unsafe。原因很簡單,因為FeatureTask中使用了它們。

1.1、 LockSupport
public class LockSupport {
    // 當前線程放棄線程調(diào)度,直到獲得許可。
    // 如果獲得了許可,就會立刻返回。否則線程當前線程放棄線程調(diào)度,進入休眠狀態(tài)。
    // 如下幾種情況會被喚醒,從而繼續(xù)執(zhí)行:
    // 1.其它線程執(zhí)行unpark喚醒當前線程
    // 2.其它線程執(zhí)行 Thread#interrupt 打斷當前線程。
    public static void park() 
    // 使線程獲取許可,從而繼續(xù)執(zhí)行。如果之前線程時blocking,那么它將編程非blocking的。
    public static void unpark(Thread thread) 
}

功能類似 wait/notify,但是有一些區(qū)別:
1、park/unpark 不要求獲取對象的鎖。
2、park 不會釋放線程持有的鎖。
3、假如park 時線程處于blocking狀態(tài),Thread#interrupt之后不會拋出Exception

小例子:

class TestThread(name: String) : Thread(name) {
    override fun run() {
        println("$name: running")
        LockSupport.park()
        if (Thread.interrupted()) {
            println("$name: 被中斷了")
        }
        println("$name: 繼續(xù)執(zhí)行了")

    }
}
fun main() {
    val t1 = TestThread("T1")
    val t2 = TestThread("T2")
    t1.start()
    t2.start()

    TimeUnit.SECONDS.sleep(5)
    //會使t1獲得許可,從而繼續(xù)執(zhí)行。
    LockSupport.unpark(t1)      
    // t2被中斷,t2處于blocking狀態(tài)會被喚醒。
    t2.interrupt()

    t1.join()
    t2.join()
}

結(jié)果:

T1: running
T2: running
T1: 繼續(xù)執(zhí)行了
T2: 被中斷了
T2: 繼續(xù)執(zhí)行了

如果先執(zhí)行 unpark,再執(zhí)行park,park執(zhí)行時會認為獲得了許可,立即返回。

1.2、 sun.misc.Unsafe

sun.misc.Unsafe 這個類名字 Unsafe - 不安全,它提供了一些可以直接繞過 jvm安全檢查的一些機制(例如直接分配內(nèi)存、回收內(nèi)存),如果對它的實現(xiàn)不是特別清楚的話,用起來應(yīng)該是危險的,因此僅開放給了JDK使用,當然非要用也不是不能,利用反射可以使用。下面找?guī)讉€典型的方法:

public final class Unsafe {
// CAS 
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

// memory
public native long allocateMemory(long var1);
public native void freeMemory(long var1);

public native long objectFieldOffset(Field var1);
public native long staticFieldOffset(Field var1);
...
}

提供了CPU硬件指令級別的原子操作:compare - and - swap 的支持,我們熟悉的 Atomic 原子類的操作就是基于此實現(xiàn)的。
還有一些內(nèi)存相關(guān)的操作:分配內(nèi)存、釋放內(nèi)存
我們重點關(guān)注一下:objectFieldOffset用于獲取非靜態(tài)屬性Field在對象實例中的偏移量,然后利用偏移量可以通過 CAS 更新對象實例中的屬性,即可實現(xiàn)多線程同步機制,比使用 synchronized 加鎖效率要高。使用 CAS 有一種場景:只讓第一次修改對象實例中的屬性生效,在多線程情況下可以使用synchronized來保證,也可以使用 CAS,我們來看一個例子:

class Bean {
    // state屬性
    private volatile int state;  
    private static final int NEW          = 111;
    private static final int FINISHED     = 112;
    private static final int CANCELED     = 113;
    public Bean(){
        this.state = NEW;
    }
    public void doSomething(){
        U.compareAndSwapInt(this, STATE, NEW, FINISHED);
    }
    public void doSomething2(){
        U.compareAndSwapInt(this, STATE, NEW, CANCELED);
    }
    public int getState(){
        return state;
    }
    private static sun.misc.Unsafe U;
    private static final long STATE;       // state 的偏移地址
    static {
        try {
            // 利用反射獲取 Unsafe實例
            Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            U = (sun.misc.Unsafe) f.get(null);
            
            // 獲取 state 屬性的偏移量
            STATE = U.objectFieldOffset(Bean.class.getDeclaredField("state"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }
}

public static void main(String[] args) {
        Bean b = new Bean();
        b.doSomething();
        b.doSomething2();
        System.out.println("b#state: " + b.getState());
}

結(jié)果:

b#state: 112

上述例子中 doSomething 、 doSomething2 只有一個方法修改 state 屬性生效,即便在多線程情況下,也能保證同步。

更多關(guān)于 sun.misc.Unsafe 的用法自行搜索

二、正文

2.1 FeatureTask 是什么,解決什么問題

我們知道線程的設(shè)計是基于 Thread-Runnable 模式的。 即沒有辦法直接從線程處理的異步任務(wù)中返回一個結(jié)果。因為Runnable的設(shè)計是這樣的:

public interface Runnable {
    public abstract void run();
}

run方法沒有參數(shù)、沒有返回值。

FeatureTask 就是一個基于現(xiàn)有的 Thread-Runnable 模式,實現(xiàn)了一個可以獲取異步任務(wù)返回值的機制。

FeatureTask類圖

FeatureTask 是一個實現(xiàn)了 RunnableFeature 接口的 具體實現(xiàn)類。
RunnableFeature 接口 = Runnable 接口 + Feature接口 「沒有多出一毛錢」

也就是說 FeatureTask 首先是一個 Runnable(無參、無返回值)
再看一下 Feature 的定義就知道了 FeatureTask 是怎么實現(xiàn)的 獲取異步任務(wù)返回值的。

// V 代表了返回值類型
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    // 獲取異步任務(wù)處理結(jié)果返回值,會一直阻塞調(diào)用者。直到異步任務(wù)處理結(jié)束,或者異步出現(xiàn)異常,又或者調(diào)用get()的線程被打斷。
    V get() throws InterruptedException, ExecutionException;
    // 增加了超時機制
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Feature 就代表了 異步任務(wù)處理 返回結(jié)果的。 定義了任務(wù)是否已經(jīng)完成、取消任務(wù) 和 獲取任務(wù)返回值的行為。 即 Feature就是為了解決異步任務(wù)返回值而定義的

也即是說 FeatureTask即是一個普通的Runnable,可以被線程執(zhí)行。又可以從它身上獲取到異步任務(wù)處理的結(jié)果。

到此是不是有一瞬間的靈感,大概知道怎么使用 FeatureTask了。類似如下這種:

// 注意這是示意代碼,不能真的運行。

val task = FeatureTask()    // 定義一個Runnable
val thread = Thread(task)  
thread.start()  // 開啟一個線程
val result = task.get()  // 等待異步任務(wù)處理完成后給出返回結(jié)果,此步驟阻塞。

如上示意代碼的整體邏輯是沒問題的。 接下來就是我該如何把我的異步任務(wù)交給 FeatureTask 去執(zhí)行呢(因為我們知道開啟的線程肯定會去執(zhí)行 FeatureTask#run ),難道在 FeatureTask 構(gòu)造的時候再傳一個 Runnable進去,此Runnable用來封裝我們的異步任務(wù)代碼片段?可是這個Runnable還是沒有返回值呀,FeatureTask 也拿不到異步任務(wù)返回值,又怎么返回給我們呢。這里引入另一個類:Callable

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

CallableRunnable 比起來的區(qū)別是:1、有返回值類型 2、方法可以拋出異常。

沒錯。Callable 就是用來替代 Runnable 讓我們封裝 有返回值的 異步任務(wù)代碼片段的。我們先看一個 FeatureTask使用的代碼示例:

data class ResultData(var flag: String = "")

class MyCall : Callable<ResultData>{
    override fun call(): ResultData {
        println("this is myCall running, threadName: ${Thread.currentThread().name}")

        // do something wasting time

        return ResultData("callable")
    }
}

fun main() {
    val executors = Executors.newCachedThreadPool()
    val task = FutureTask<ResultData>(MyCall())
    executors.execute(task)
    // 獲取異步任務(wù)返回值, maybe blocking
    val result = task.get()
    println("the call result is : $result")
}

結(jié)果:

this is myCall running, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)

至此,對FeatureTask的使用應(yīng)該有了一個基本的認識。

2.2 FeatureTask 實現(xiàn)原理

原理這塊基本上圍繞著這個問題進行:
1、如何實現(xiàn)獲取異步任務(wù)的結(jié)果。
2、如果異步任務(wù)執(zhí)行過程中出現(xiàn)異常,會怎么處理。
3、如果在異步任務(wù)執(zhí)行的過程中被取消了,會怎么處理。
4、get 超時機制如何實現(xiàn)

如果想要實現(xiàn)類似 FeatueTask的功能,上邊幾個問題是繞不開的。上邊幾個問題弄清楚了,原理也就清楚了。

2.2.1 如何實現(xiàn)獲取異步任務(wù)的結(jié)果

首先FeatureTask 內(nèi)部有五種狀態(tài):

  • NEW 初始化狀態(tài)
  • COMPLETING 處理完異步任務(wù)等待結(jié)束的狀態(tài)(短暫)
  • NORMAL 正常處理完異步任務(wù)后的狀態(tài)
  • EXCEPTIONAL 異步任務(wù)異常后的狀態(tài)
  • CANCELLED 用戶取消異步任務(wù)執(zhí)行后的狀態(tài)
  • INTERRUPTING 異步任務(wù)被打斷中的狀態(tài)(短暫)
  • INTERRUPTED 異步任務(wù)被打斷后的狀態(tài)

內(nèi)部狀態(tài)的轉(zhuǎn)化有一下幾種:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
// 代碼僅包括核心的邏輯,完整邏輯看源碼
public class FutureTask<V>{
    ...
    private Callable<V> callable;  // 最終運行的異步任務(wù)
    private Object outcome;  // 執(zhí)行完成后用于返回上層的異步任務(wù)處理結(jié)果
    public FutureTask(Callable<V> callable)   //構(gòu)造函數(shù)

    //非完整邏輯,狀態(tài)判斷等邏輯都刪除了
    public void run() {
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 異步任務(wù)執(zhí)行出現(xiàn)了未捕獲的異常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);    // 異步任務(wù)正常執(zhí)行完成
            }
        } finally {
           ...
        }
    }

    //報告異步任務(wù)執(zhí)行成功
    protected void set(V v) {
        // 嘗試修改內(nèi)部狀態(tài)
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;  //存入異步任務(wù)的結(jié)果
            U.putOrderedInt(this, STATE, NORMAL); // 最終的狀態(tài)
            finishCompletion();
        }
    }
    // 報告異步任務(wù)執(zhí)行出現(xiàn)異常
     protected void setException(Throwable t) {
        // 嘗試修改內(nèi)部狀態(tài)
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;    // Exception 存入結(jié)果
            U.putOrderedInt(this, STATE, EXCEPTIONAL); // 最終狀態(tài)
            finishCompletion();
        }
    }
    
    // 用戶主動取消異步任務(wù)(不一定會成功,因為CAS保證了一旦狀態(tài)從NEW改變了,就再也無法接受其它的情況了)
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 如果起始狀態(tài)不是NEW,則直接取消失敗,直接返回。
        if (!(state == NEW &&
              U.compazreAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;


        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

    private void finishCompletion() {
        ...
        done();
        callable = null;        // to reduce footprint
    }
     
    // 異步任務(wù)結(jié)束后的回調(diào),無論是正常執(zhí)行結(jié)束、還是異步任務(wù)出現(xiàn)了Exception、還是用戶cancel掉了異步任務(wù),最終都會執(zhí)行done。這幾種情況是互斥的,通過CAS保證了,只能有一種實現(xiàn)。
    protected void done() { }
    ...
}

上邊的代碼已經(jīng)削減了很多,只留下了處理這個話題最核心的代碼。

Callable異步任務(wù)的執(zhí)行,最終依賴的是Runnable#run方法。即在 FeatureTask#run中調(diào)用Callable#call的調(diào)用,在子線程中完成了一次異步任務(wù)的執(zhí)行。

異步任務(wù)的執(zhí)行有幾種情況:
1、正常處理完成 通過set方法報告結(jié)果
2、有未捕獲的異常 通過setException方法報告結(jié)果
3、用戶取消異步任務(wù) 通過cancel方法報告結(jié)果

上述幾種情況的處理是基于CAS來實現(xiàn)原子操作,也即是說只有一種情況會最終執(zhí)行。無論走哪一種情況,最終都會報告結(jié)果:finishCompletion,也就是說最終都會執(zhí)行 done 方法, done 這相當于一個回調(diào),即最后一步通知用戶的回調(diào)。所以基于此,我們可以在done方法里去取異步任務(wù)處理的結(jié)果,好處是因為異步任務(wù)已經(jīng)結(jié)束,最終結(jié)果的獲取就不會blocking了。

改進一下上邊的 FeatureTask的使用:

    val executors = Executors.newCachedThreadPool()
    val task = object : FutureTask<ResultData>(MyCall()){
        override fun done() {
            println("this is call done, threadName: ${Thread.currentThread().name}")
            val result = get()
            println("the call result is : $result")
        }
    }

    executors.execute(task)

執(zhí)行結(jié)果:

this is myCall running, threadName: pool-1-thread-1
this is call done, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)

好了,下一步看一下,FeatureTask是以何種方式把異步任務(wù)處理結(jié)果報告給使用者呢,答案就在 get() 方法中:

  public V get() throws InterruptedException, ExecutionException {
        ...
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

我們看到了調(diào)用 ge() 方法的時候,如果異步任務(wù)處理正常(NORMAL),則直接返回異步任務(wù)的結(jié)果;如果用戶取消了異步任務(wù)(CANCELLED、INTERRUPTED),則直接拋出異常 CancellationException ; 如果異步任務(wù)有未捕獲的異常(EXCEPTIONAL),則會拋出 ExecutionException 異常。最終把主動權(quán)交給上層用戶。

一個常見的處理模型:

try {
    get()
   // do something
} catch (InterruptedException e) {
    // do something                       
} catch (ExecutionException e) {                    
    // do something
} catch (CancellationException e) {          
    // do something         
}

至此整個流程就串起來了,細節(jié)的東西需要大家實際去看代碼。

最后 get() 還可能會拋出 InterruptedException,這是處理哪一種情況呢?因為我們知道 get() 會阻塞調(diào)用者所在的線程,假如被其它線程執(zhí)行 Thread#interrupt 打斷了所阻塞的線程,get()方法就會拋出 InterruptedException,然后具體被打斷后的處理邏輯交給用戶去處理。

2.2.2 get(long timeout, TimeUnit unit) 超時機制如何實現(xiàn)

get(long timeout, TimeUnit unit)提供了超時機制,在指定時間內(nèi)還沒有返回的話,就會拋出異常 TimeoutException,然后把控制權(quán)交給使用者?!?和get()相比,會多拋出一個 TimeoutException

每一個FeatureTask 對象都維護了一個因調(diào)用此 FeatureTaskget 而進入等待狀態(tài)的線程的單鏈表。元素的節(jié)點是:

    static final class WaitNode {
        volatile Thread thread;    // 當前節(jié)點的線程
        volatile WaitNode next;    // 下一個節(jié)點
        WaitNode() { thread = Thread.currentThread(); }
    }

如果get指定了超時時間,會使用 LockSupport#parkNanos將當前線程進入阻塞狀態(tài),超過時間后回去檢查異步任務(wù)運行狀態(tài),如果異步任務(wù)還沒有完成,則會拋出異常 TimeoutException 。

如果get 沒指定超時時間,會使用LockSupport.park 無限期阻塞當前線程,直到任務(wù)處理完成調(diào)用 LockSupport.unpark 恢復運行。

我們知道 finishCompletion 無論如何都會調(diào)用,不管異步任務(wù)正常處理、還是有未捕獲的異常,或者被用戶取消了都會調(diào)用。之前講解流程的時候,忽略了這個方法中關(guān)于 等待節(jié)點的處理。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
       ...
    }

會遍歷自己維護的 WaitNode 單鏈表,依次調(diào)用對應(yīng)節(jié)點的 LockSupport.unpark(t); 讓之前阻塞的線程都恢復運行。這么以來,所有阻塞的線程在這里都能得到異步任務(wù)的處理的結(jié)果。

對于超時機制的處理,這里講的只是基本的處理。更細節(jié)的東西還是要參考代碼去學習。

很有意思的一個事情是,和超時相關(guān)的話題這已經(jīng)是第二次了。 第一次是講 okio中的超時機制(這是一個典型的 生產(chǎn)者/消費者 模型的實踐)。 第二次是講 獲取FeatureTask 異步任務(wù)處理結(jié)果的超時機制。 其實無非就是利用 wait/notify 或者 park/unpark 去處理多線程之間的關(guān)系。這兩部分的超時機制設(shè)計的都很精彩,有時間建議大家去看看。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容