Java并發(fā)編程 - FutureTask

Callable

我們知道線程運(yùn)行任務(wù)其中的一個(gè)方法就是創(chuàng)建一個(gè)實(shí)現(xiàn)Runnable接口的類,然后通過Thread的構(gòu)造方法設(shè)置進(jìn)去,線程啟動(dòng)后就可以執(zhí)行Runnalbe的邏輯。

可以回顧一樣Runnable接口的定義:

java.lang.Runnable

public interface Runnable { 
    public abstract void run();
}

可以看到Runnable接口只定義了一個(gè)方法,而且這個(gè)方法沒有返回值。

現(xiàn)在如果我們執(zhí)行一個(gè)任務(wù)需要它返回給我們運(yùn)行結(jié)果,該怎么做?

Java為我們提供了另外一個(gè)接口Callable,貌似有這樣的功能,相比于Runnable,它允許有返回值,并且可以拋出檢查型異常信息。

java.util.concurrent.Callable

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

可以看到它還是一個(gè)泛型接口,方法的返回值可以泛型化。

于是,我們類似這樣定義我們的任務(wù),提供返回值。

public class MyTask implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        return 1 * 1;
    }
    
}

當(dāng)你這樣定義后,你會(huì)發(fā)現(xiàn)了一個(gè)問題,任務(wù)是需要線程來運(yùn)行了,但是看了一下Thread類,發(fā)現(xiàn)沒有類似構(gòu)造方法或者是普通方法來將Callable類型的對(duì)象設(shè)置進(jìn)去,Thread只能運(yùn)行Runnable類型的任務(wù)對(duì)象。我想JDK API的制作者將這個(gè)接口的名字定義為"Callable"是有特殊含義: 使用實(shí)現(xiàn)它接口的類創(chuàng)造出來的對(duì)象只是一個(gè)可被調(diào)用的對(duì)象,而不是一個(gè)可運(yùn)行的對(duì)象。

線程運(yùn)行提供返回值,現(xiàn)在這個(gè)需求我們卡住了,先暫時(shí)放在一邊,我們繼續(xù)。

Runnable和Callable都可以理解為任務(wù)具體業(yè)務(wù)邏輯封裝接口。Java是面向?qū)ο蟮?,一切都是?duì)象,一段具體的業(yè)務(wù)邏輯代碼當(dāng)然也就需要有某種東西封裝起來,實(shí)現(xiàn)了Runnable或Callable接口的類創(chuàng)建的對(duì)象就是業(yè)務(wù)邏輯代碼的載體。

Future

有時(shí)候線程執(zhí)行任務(wù)是耗時(shí)的,這種情況下我們希望不要浪費(fèi)時(shí)間一直等待它返回結(jié)果,而是希望利用等待的時(shí)間處理其他任務(wù),只要知道有一個(gè)線程在跑,并且它在某個(gè)時(shí)刻會(huì)返回結(jié)果就行了。也就是說希望我們的執(zhí)行是異步的。

異步:當(dāng)一個(gè)異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。實(shí)際處理這個(gè)調(diào)用的部件在完成后,通過狀態(tài)、通知和回調(diào)來通知調(diào)用者。

Java提供了Future接口來實(shí)現(xiàn)對(duì)異步調(diào)用的支持。下面是接口的定義:

java.util.concurrent.Future

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

A Future represents the result of an asynchronous computation.

API對(duì)這個(gè)接口的描述是:Future代表異步計(jì)算的結(jié)果。這個(gè)接口提供了get方法,用于獲取異步執(zhí)行后的結(jié)果。

現(xiàn)在我們通過實(shí)現(xiàn)這個(gè)接口來定義我們的任務(wù)類,跟上面類似你會(huì)發(fā)現(xiàn),先不說異步獲取結(jié)果,首先這個(gè)定義出來的任務(wù)類創(chuàng)建的對(duì)象必須可以被線程調(diào)用,但是僅僅執(zhí)行這個(gè)接口的任務(wù)類不滿足。

也就是說我們需要一個(gè)既能被線程運(yùn)行又能異步獲取結(jié)果的類存在。

Java定義了RunnableFuture接口來支持這種需求。RunnableFuture接口的定義如下:

java.util.concurrent.RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFuture接口繼承了Runnable接口和Future接口,這樣實(shí)現(xiàn)了這個(gè)接口的任務(wù)類能被線程調(diào)用,然后也能異步獲取結(jié)果。

不過這里要注意:Future只是表示可異步獲取結(jié)果的接口類,它只是一個(gè)規(guī)定,實(shí)現(xiàn)它接口的類要實(shí)現(xiàn)它的這種語義。當(dāng)然我們可以自己實(shí)現(xiàn)如何異步調(diào)用,又如何返回異步調(diào)用的結(jié)果。不過JDK為我們提供了FutureTask接口,我們直接使用它就可以了。

FutureTask

public class FutureTask<V> implements RunnableFuture<V>

上面是FutureTask的定義,可以看到實(shí)現(xiàn)了RunnableFuture接口,這樣通過它創(chuàng)建的對(duì)象就是可被線程運(yùn)行的和可異步獲取結(jié)果的。

可以簡單地從字面上理解這個(gè)類:未來任務(wù)類。也就是說創(chuàng)建這個(gè)任務(wù)之后直接交給線程執(zhí)行,什么時(shí)候運(yùn)行不用關(guān)心了,相信你運(yùn)行后會(huì)帶回來某些東西。

##FutureTask類概覽

FutureTask.png

上面不是說過可返回結(jié)果的Callable對(duì)象無法被線程運(yùn)行嘛,從上面可以看出FutureTask可接收Callable類型的對(duì)象,并且FutureTask還是可運(yùn)行的,那么在FutureTask的run方法運(yùn)行Callable的call方法,就到達(dá)了線程運(yùn)行Callable代碼邏輯的效果。

##可能的狀態(tài)轉(zhuǎn)移

在我們之前使用Runnable的時(shí)候,實(shí)現(xiàn)了Runnable接口的類創(chuàng)建的對(duì)象就是一個(gè)任務(wù),創(chuàng)建這個(gè)任務(wù)后交給線程,之后這個(gè)任務(wù)對(duì)我們來說就是不可控了,但是我們的"FutureTask"必須是可控的,因?yàn)闃I(yè)務(wù)邏輯執(zhí)行完成后,需要得到計(jì)算的結(jié)果。

正如上面FutureTask類概覽所示,state用來表示FutureTask的內(nèi)部狀態(tài),所擁有的狀態(tài)如上圖所示,可能的狀態(tài)轉(zhuǎn)移為:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

##FutureTask執(zhí)行原理

1. 結(jié)果的載體

上面我們已經(jīng)說了Callable對(duì)象封裝了業(yè)務(wù)邏輯代碼,結(jié)果是業(yè)務(wù)邏輯代碼執(zhí)行后產(chǎn)生的,雖然說Callable對(duì)象提供的call方法可以返回執(zhí)行后的結(jié)果,但是我們無法直接從這對(duì)象中獲取,因?yàn)檫@個(gè)對(duì)象不能被線程直接調(diào)用,能被線程直接調(diào)用的是實(shí)現(xiàn)了Runnable接口的類的對(duì)象,執(zhí)行的代碼在run方法體中,也就是Callable封裝的業(yè)務(wù)邏輯代碼必須以某種方式被放到實(shí)現(xiàn)了Runnable接口的類的對(duì)象的run方法中執(zhí)行。而FutureTask是一個(gè)實(shí)現(xiàn)了Runnable接口的類,它可以用來執(zhí)行Callable封裝的邏輯??梢钥吹紽utureTask內(nèi)部有:

private Callable<V> callable;

這個(gè)屬性,它用來接收封裝了業(yè)務(wù)邏輯代碼的Callable對(duì)象。也就是線程通過調(diào)用FutureTask就間接地調(diào)用了Callable對(duì)象封裝的邏輯代碼,這個(gè)調(diào)用發(fā)生在run方法中。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Runnable接口的run方法是被線程調(diào)用的,而且上面我們說過這個(gè)方法是沒有返回值的,就算我們持有了FutureTask對(duì)象的引用,并且Callable對(duì)象的邏輯已經(jīng)在run方法內(nèi)執(zhí)行,我們也沒法獲取到Callable對(duì)象執(zhí)行返回的值,不過既然我們有FutureTask對(duì)象的持有權(quán),而且Callable對(duì)象方法的執(zhí)行在FutureTask對(duì)象的run方法內(nèi)部,那么我們就可以在FutureTask內(nèi)部定義一個(gè)屬性來接收Callable對(duì)象方法的返回值。FutureTask的outcome屬性正是啟到這個(gè)作用:

 private Object outcome; // non-volatile, protected by state reads/writes

這個(gè)值的設(shè)置就是在run方法的內(nèi)部:

 if (ran)
     set(result);

2. 結(jié)果的獲取

FutureTask實(shí)現(xiàn)類Future接口,F(xiàn)uture定義的get方法提供了獲取異步任務(wù)的執(zhí)行結(jié)果的作用,這個(gè)執(zhí)行結(jié)果就是我們上面說的outcome的值。

Callable對(duì)象封裝的業(yè)務(wù)邏輯代碼執(zhí)行操作可能是非常耗時(shí)的,也就是說cal
l方法一時(shí)半會(huì)執(zhí)行不完,執(zhí)行不完就無法將執(zhí)行的結(jié)果設(shè)置到outcome中,如果FutureTask的方法只是簡單的像下面這樣返回outcome:

public V get() throws InterruptedException, ExecutionException {
    return (V) outcome;
}

那么返回的這個(gè)outcome就毫無意義,因?yàn)闃I(yè)務(wù)邏輯代碼還沒執(zhí)行完,根本就還沒設(shè)置結(jié)果。

如何解決呢?解決的辦法就是如果業(yè)務(wù)邏輯代碼沒有執(zhí)行完,那么當(dāng)線程調(diào)用get方法獲取結(jié)果的時(shí)候就讓他掛起,讓它知道處理還沒執(zhí)行完,你這時(shí)候要獲取這個(gè)值的話得到的也是無用的數(shù)據(jù),所以你要等等。

現(xiàn)在就有兩個(gè)問題了:如何判斷業(yè)務(wù)邏輯是否執(zhí)行完?如何掛起?什么喚醒?

@@如何判斷業(yè)務(wù)邏輯是否執(zhí)行完

邏輯邏輯代碼執(zhí)行完就是FutureTask的run方法正確無誤地調(diào)用了Callable對(duì)象的call方法,然后成功的設(shè)置了outcome值,也就是說如果到了這個(gè)時(shí)間點(diǎn)那么就可以說任務(wù)執(zhí)行完了。也就是說到這個(gè)點(diǎn)之后設(shè)置一個(gè)狀態(tài)變量來表示任務(wù)完成,F(xiàn)utureTask的state就是這樣的一個(gè)狀態(tài)變量,判斷這個(gè)狀態(tài)具有某值之后那么就表明任務(wù)已經(jīng)完成了。

所以當(dāng)線程調(diào)用get方法的時(shí)候首先就應(yīng)該判斷這個(gè)值是否就是表明任務(wù)完成的特定值,如果不是那么當(dāng)前線程就要?jiǎng)e掛起。

@@如何掛起

上面我們已經(jīng)說過了當(dāng)調(diào)用get方法,線程看到狀態(tài)變量的值不是表示任務(wù)完成的值時(shí),它要暫停執(zhí)行,想想我們學(xué)過的知識(shí)使一個(gè)線程掛起有兩種(不考慮sleep)方式:Object的wait方法和LockSupport的park方法。Object的wait方法需要同步機(jī)制的支持,而我們這里沒有共享資源訪問同步的問題(state是共享資源,但是會(huì)采用無鎖的機(jī)制),所以不適合。FutureTask內(nèi)部采用的是LockSupport的park方法來掛起線程。

LockSupport.park(this);

這里還有一個(gè)問題就是會(huì)有多個(gè)線程請(qǐng)求get方法,那么它們都需要掛起,而后面要重新喚醒它們,那么就需要有個(gè)地方暫存它們,F(xiàn)utureTask使用
Treiber Stack棧這種數(shù)據(jù)結(jié)構(gòu)來暫存掛起的線程。

Treiber Stack是一個(gè)可擴(kuò)展的無鎖棧,利用細(xì)粒度的并發(fā)原語CAS來實(shí)現(xiàn)的。

FutureTask內(nèi)部定義了代表當(dāng)前線程的WaitNode節(jié)點(diǎn):

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

通過CAS操作來實(shí)現(xiàn)節(jié)點(diǎn)的入棧:

UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q)

@@什么時(shí)候喚醒

當(dāng)業(yè)務(wù)邏輯代碼執(zhí)行完畢,并且成功設(shè)置了outcome值后就要喚醒之前棧中被掛起的線程。

FutureTask代碼中,當(dāng)run方法內(nèi)部Callable調(diào)用call執(zhí)行業(yè)務(wù)邏輯操作無誤后,就會(huì)執(zhí)行設(shè)置outcome的操作,outcome操作設(shè)置成功后,會(huì)做喚醒操作。

run

try {
    result = c.call();
    ran = true;
} catch (Throwable ex) {
    result = null;
    ran = false;
    setException(ex);
}
if (ran)
    set(result);

set

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

finishCompletion

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

3. 關(guān)于取消

Future接口定義了取消任務(wù)的方法,可提供取消任務(wù)的功能:

boolean cancel(boolean mayInterruptIfRunning)

這里,我們首先要弄明白一點(diǎn),取消任務(wù)是取消什么?

FutureTask說到底還是像Runnable一樣被線程調(diào)用,線程調(diào)用就是執(zhí)行run方法,所以取消任務(wù)就是中斷執(zhí)行run方法的那個(gè)線程,哪個(gè)線程在執(zhí)行FutureTask的run方法,F(xiàn)utureTask是有記錄的:

/** The thread running the callable; CASed during run() */
private volatile Thread runner;

這個(gè)屬性就是用來記錄執(zhí)行FutureTask的線程對(duì)象的。這個(gè)屬性在run方法中設(shè)置:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
}

cancel邏輯就是中斷這個(gè)線程:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

總結(jié)

FutureTask說白了就是一個(gè)實(shí)現(xiàn)了Runnable接口的類,線程可以調(diào)用它的run方法。但是它跟普通實(shí)現(xiàn)Runnable接口類不一樣,它又實(shí)現(xiàn)了Future接口,內(nèi)部通過包裝Callable這個(gè)能執(zhí)行業(yè)務(wù)邏輯代碼又能返回結(jié)果的對(duì)象使得它具有了返回處理結(jié)果的能力。同時(shí)通過其內(nèi)部定義的任務(wù)處理狀態(tài)的屬性的支持,使得它能感知任務(wù)處理狀態(tài),通過這個(gè)狀態(tài)來掛起和喚醒想獲取它處理結(jié)果的線程。

FutureTask是一種主動(dòng)感知狀態(tài)的異步調(diào)用模式。

其他的異步模式有:通知和回調(diào)。FutureTask是主動(dòng)去詢問狀態(tài)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容