Callable實現(xiàn)子線程獲取函數(shù)返回值

Callable接口

Java中的子線程通常是通過Thread或者Runnable的方式實現(xiàn)的,但是這種方式只能通過回調(diào),或者共享變量等方式來傳遞數(shù)據(jù),而Callable則是可以獲取返回結(jié)果的一種子線程實現(xiàn)方式。

Callable是一個接口,源碼如下:

public interface Callable<V> {
    V call() throws Exception;
}

非常簡單,只有一個方法,和一個泛型V,所以我們創(chuàng)建Callable對象的時候,也只需要指定返回類型并實現(xiàn)call方法就可以了。

Future接口

看完了Callable接口,會發(fā)現(xiàn)它非常簡單,沒有辦法在子線程中直接通過它來獲取到返回結(jié)果的,這時候就需要Future發(fā)揮作用了。源碼如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • boolean cancel(boolean mayInterruptIfRunning) 在任務(wù)開始前取消,傳入值表示任務(wù)開始后是否允許打斷,返回值表示是否取消成功(任務(wù)已經(jīng)開始不允許打斷,已經(jīng)運行結(jié)束,已經(jīng)取消等等狀態(tài)會返回失敗)
  • boolean isCancelled() 是否已經(jīng)被取消
  • boolean isDone() 是否完成任務(wù)
  • V get() 嘗試獲取返回結(jié)果,阻塞方法
  • V get(long timeout, TimeUnit unit) 同上,可以指定超時時間

可以看到,Future實際上可以理解為Callable的管理類。

在線程池中執(zhí)行任務(wù)時,除了execute方法之外,還有一個submit方法:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

它返回的就是一個Future對象,可以通過它來回去Callable任務(wù)的執(zhí)行結(jié)果:


Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Sub Thread is calculating...");
                Thread.sleep(10000);
                return 10;
            }
        };
        
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
    try {
            System.out.println("Main Thread start waiting result... ");

            int res = future.get();
            System.out.println("Main Thread get result: " + res);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

//運行結(jié)果:
Main Thread start waiting result... 
Sub Thread is calculating...
Main Thread get result: 10

FutureTask

如果不用Java提供的線程池,直接用Thread怎樣在子線程中運行Callable呢? 這時候就要用到FutureTask類了。

FutureTask實現(xiàn)了FutureRunnable接口,這就意味著,它既可以放在Thread中去運行,又能夠?qū)θ蝿?wù)進行管理,下面是源碼:


    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

可以看到,構(gòu)造函數(shù)中傳入了待運行的任務(wù)Callable對象或者Runnable對象和指定的返回結(jié)果,V result用來指定運行完成后的返回值,如果不想用指定值,可以用Future<?> f = new FutureTask<Void>(runnable, null)來返回null。采用Callable構(gòu)造方法創(chuàng)建的FutureTask對象,執(zhí)行完畢返回的是實際運算結(jié)果,而Runnable 構(gòu)造函數(shù)返回值是傳入的result。

task的狀態(tài)

FutureTask中持有的任務(wù)對象,有以下幾種狀態(tài):

    private static final int NEW          = 0; //新建或運行中
    private static final int COMPLETING   = 1;//任務(wù)運行結(jié)束,正在處理一些后續(xù)操作
    private static final int NORMAL       = 2;//任務(wù)已經(jīng)完成,COMPLETING的下一個狀態(tài)
    private static final int EXCEPTIONAL  = 3;//任務(wù)拋出異常,COMPLETING的下一個狀態(tài)
    private static final int CANCELLED    = 4;//任務(wù)被取消
    private static final int INTERRUPTING = 5;//收到打斷指令,還沒有執(zhí)行interrupt
    private static final int INTERRUPTED  = 6;//收到打斷指令,也執(zhí)行了interrupt

可能的狀態(tài)變化主要有以下幾種:

     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED

task的執(zhí)行過程

FutureTask實現(xiàn)了Runnable接口,是可以直接放在Thread中執(zhí)行的,實際上運行的就是它的run方法:

public void run() {
    //r如果當(dāng)前狀態(tài)不是NEW,說明任務(wù)已經(jīng)執(zhí)行完成了,直接返回
    //如果當(dāng)前狀態(tài)是NEW,嘗試用CAS方式將當(dāng)前線程賦值給RUNNER,賦值前RUNNER的值應(yīng)該是null,否則賦值失敗
    //賦值失敗表示已經(jīng)有線程執(zhí)行了run方法,直接返回
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //運行中拋出異常
                    setException(ex);
                }
                //ran為true,說明正常運行結(jié)束,得到了返回結(jié)果
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

執(zhí)行結(jié)果其實是比較簡單的,通過RUNNER來記錄執(zhí)行任務(wù)的線程,從而保證只有一個線程可以執(zhí)行該任務(wù)。運行結(jié)束后有兩個出口:

  • setException(ex); 運行中出錯,拋出異常
  • set(result); 任務(wù)執(zhí)行完畢,獲取到返回值
    protected void setException(Throwable t) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;
            U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //喚醒等待線程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

這兩個方法實際上是一樣的,都是將狀態(tài)賦值為COMPLETING,然后保存結(jié)果(運行結(jié)果或錯誤信息),再執(zhí)行finishCompletion方法,通知WAITERS里記錄的等待線程繼續(xù)執(zhí)行,并清空WAITERS。

獲取返回結(jié)果

獲取返回結(jié)果是通過get()方法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
            //cas機制,將新建節(jié)點q的next指向原來的節(jié)點workers,然后將workers更新為新建的節(jié)點。workers(WAITERS)實際上就是持有了所有等待線程的一個鏈表
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

get()方法比較簡單,先判斷當(dāng)前狀態(tài),如果狀態(tài) < COMPLETING,說明任務(wù)沒有執(zhí)行完畢,直接調(diào)用awaitDone方法。

awaitDone方法可以接受兩個參數(shù),用來指定是否設(shè)置超時時間。它內(nèi)部是一個無限for循環(huán)。下面是awaitDone方法的執(zhí)行步驟(忽略超時設(shè)置):

  1. 進入awaitDone方法時,state一定是小于COMPLETING的,第一次會走else if (q == null)分支,創(chuàng)建一個WaitNode()對象用來保存當(dāng)前線程

  2. 第二次循環(huán)q已經(jīng)不是null了,如果任務(wù)仍然沒有結(jié)束,會執(zhí)行else if (!queued)分支,queued表示創(chuàng)建的WaitNode()是否已經(jīng)添加到鏈表里,如果沒有嘗試添加,直到添加成功為止。

  3. 等待線程添加成功以后進入下一個循環(huán),此時如果任務(wù)仍然沒有結(jié)束,會走到else分支,掛起當(dāng)前線程(阻塞)

  4. 此處阻塞的是等待結(jié)果的線程,也就是調(diào)用FutureTaskget()方法的線程,而不是執(zhí)行任務(wù)的線程。阻塞線程用的是LockSupport.park(this)方法,喚醒的方法是LockSupport.unpark(),該方法在上邊的finishCompletion()中出現(xiàn)了,也就是說,任務(wù)執(zhí)行結(jié)束(運行完,拋出異常,被取消)時,等待的線程才會被喚醒,繼續(xù)下一次循環(huán)。

  5. 任務(wù)結(jié)束以后,如果state是COMPLETING狀態(tài),說明一些清理任務(wù)還沒有執(zhí)行完,等待的線程會讓出cpu,讓其他線程優(yōu)先執(zhí)行

  6. 直到state 大于COMPLETING,說明FutureTask已經(jīng)完全結(jié)束了,此時會會執(zhí)行(s > COMPLETING)分支,把節(jié)點置空,并返回。

awaitDone返回以后,說明任務(wù)已經(jīng)執(zhí)行完成了,會進入report方法:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

可以看到,如果是正常結(jié)束,或者拋出異常結(jié)束,會返回結(jié)果,而如果是被取消,則會拋出異常。

使用

Callable<Integer> call = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("正在計算結(jié)果...");
                Thread.sleep(3000);
                return 1;
            }
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("結(jié)果為:" + result);

總結(jié)

  1. FutureTask可以視為一個管理Callable任務(wù)的工具類,執(zhí)行Callable任務(wù)的是FutureTaskrun方法,所以,可以通過 new Thread(futuretask)的方法來實現(xiàn)子線程執(zhí)行任務(wù)

  2. 獲取執(zhí)行結(jié)果是通過FutureTaskget方法,調(diào)用該方法后,如果線程會被掛起,知道任務(wù)結(jié)束為止

  3. 獲取結(jié)果的線程數(shù)量沒有限定,可以是任意個線程

  4. 獲取結(jié)果的線程被掛起以后,可以通過取消,超時等方法在任務(wù)執(zhí)行完畢以前結(jié)束掛起狀態(tài)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容