Java 并發(fā): 從任務(wù)中產(chǎn)生返回值

通常我們會(huì)通過 Thread 類創(chuàng)建執(zhí)行線程并執(zhí)行提交的任務(wù)(Runnable)

        new Thread(new Runnable() {
            @Override
            public void run() {

            }
        }).start();

但是這種方式有個(gè)缺陷,它沒有返回值,有時(shí)候,我們想知道查詢?nèi)蝿?wù)是否完成,或者我們想在任務(wù)執(zhí)行完畢后立即通知我們,很顯單純的 Thread 類做不到。 然而, Executor, Callable, FutureTask 可以幫我們實(shí)現(xiàn),不過,最終執(zhí)行任務(wù)的方式還是 Thread + Runnable,這些類只不過包裝了一些邏輯而已。

Callable

public interface Callable<V> {
    V call() throws Exception;
}

通常我們說,如果需要返回任務(wù)執(zhí)行的結(jié)果,任務(wù)需要實(shí)現(xiàn) Callable 接口,而不是 Runnable 接口,這個(gè)說法并不是很精確,因?yàn)榫€程中執(zhí)行的任務(wù)永遠(yuǎn)是 Runnable,而 Callable 只是在 Runnablerun() 方法中調(diào)用而已,這個(gè)可以在后面的源碼分析中可知。

Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future 代表一個(gè)異步任務(wù)的結(jié)果,可以通過 get() 來查詢結(jié)果,但是會(huì)阻塞當(dāng)前線程,直到結(jié)果返回。也可以通過 cancel() 來取消一個(gè)任務(wù)。 也可以通過 isCancelled()isDone() 查詢?nèi)蝿?wù)的狀態(tài)。

其中 cancel() 方法比較特殊,它會(huì)嘗試取消當(dāng)前任務(wù)的執(zhí)行。如果當(dāng)前任務(wù)已經(jīng)完成,或者已經(jīng)被取消,或者由于某些原因不能被取消,cancel() 方法就會(huì)返回 false。 而如果 cancel() 方法返回 true ,并且當(dāng)前任務(wù)還沒有啟動(dòng),這個(gè)任務(wù)將永遠(yuǎn)不會(huì)運(yùn)行。 而如果這個(gè)任務(wù)已經(jīng)啟動(dòng)了,cancel(boolean mayInterruptIfRunning) 的參數(shù) mayInterruptIfRunning 決定了執(zhí)行當(dāng)前任務(wù)的線程時(shí)候被中斷,從而來停止任務(wù)。

FutureTask

public class FutureTask<V> implements RunnableFuture<V> {}

Future 實(shí)現(xiàn)了 RunnableFuture 接口,而 RunnableFuture 正如它的名字一樣,繼承了 RunnableFuture 兩個(gè)接口。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

例子

public class Test {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        Future<String> future = service.submit(new Task());
        try {
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

}

class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(10);
        return "Task completed!";
    }
}

執(zhí)行結(jié)果

Task completed!

執(zhí)行結(jié)果很簡單,但是 get() 方法是會(huì)阻塞主線程 10 秒的,有時(shí)候,例如在 Android 中,阻塞主線程這么長時(shí)間,這是不可接受的。那么有沒有一種方法,能夠在任務(wù)執(zhí)行完畢后,自動(dòng)通知我們呢? 當(dāng)然有,往下看。

原理

ExecutorService 接口的 submit(Callable) 方法是在 AbstractExecutorService 中實(shí)現(xiàn)的。

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }    

submit() 方法首先把 Callable 對象轉(zhuǎn)換為了 RunnableFuture 接口實(shí)現(xiàn)類對象,在這里用的是 FutureTask, 最終也是返回這個(gè) FutureTask 對象,只是最后被轉(zhuǎn)為了更為通用的 Future 接口,因此例子中的 submit() 返回結(jié)果也可以強(qiáng)制轉(zhuǎn)換為 FutureTask ,只是我們一般不這樣使用它,這個(gè)事情后面再說。從這里可以看出,最終執(zhí)行的任務(wù)還是 Runnable。

由于 RunnableFuture 繼承自 Runnable 接口,因此能被 Executorexecute(Runnable) 方法執(zhí)行。當(dāng)調(diào)用 execute(ftask) 提交任務(wù)后,就會(huì)執(zhí)行 FutureTaskrun() 方法,主要源碼如下

    public void run() {
        // ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // STE1: 調(diào)用 Callable 接口的 call() 方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    // STEP2: 設(shè)置結(jié)果
                    set(result);
            }
        } finally {
            // ...
        }
    }

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 這就是 get() 方法返回的結(jié)果
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }    

    private void finishCompletion() {
        // assert state > COMPLETING;
        // ...
        // STEP3: 任務(wù)執(zhí)行完畢后調(diào)用 FutureTask 的 done() 方法
        done();
        callable = null;
    }   

    protected void done() { } 

通過我對源碼的注釋,可以看到 FutureTask.run() 方法,實(shí)際做了三件事

  1. 調(diào)用 Callablerun() 方法,并獲取結(jié)果。
  2. outcome 保存返回的結(jié)果。通過 Future 接口的 get() 方法就可以獲取這個(gè)結(jié)果
  3. 任務(wù)執(zhí)行完畢后,調(diào)用 FutureTaskdone() 方法。

FutureTask 的正確使用

從剛才的源碼實(shí)現(xiàn)中可以發(fā)現(xiàn),提交的是 Callable 對象,而最終執(zhí)行的是 FutureTask 對象。那么我們完全可以自己用 Callable 創(chuàng)建一個(gè) FutureTask 對象,然后用 Executorexecute(Runnable) 方法提交任務(wù)。那么,這么做有什么好處呢?因?yàn)槲覀兛梢詮?fù)寫 FutureTask 中的 done() 方法,從而在任務(wù)執(zhí)行完畢后回調(diào)。

public class Test {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        MyFutureTask futureTask = new MyFutureTask(new Task());
        System.out.println(Thread.currentThread().getName() + ": start task");
        service.execute(futureTask);
    }

}

class MyFutureTask extends FutureTask<String> {

    public MyFutureTask(Callable<String> callable) {
        super(callable);
    }

    @Override
    protected void done() {
        try {
            System.out.println(Thread.currentThread().getName() + ": " + get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        ;
    }

}

class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Task completed!";
    }
}

執(zhí)行結(jié)果如下

main: start task
pool-1-thread-1: Task completed!

根據(jù)上面講的原理,這里的結(jié)果并不意外。 從這個(gè)例子可以看出,FutureTaskdone() 方法能夠在任務(wù)執(zhí)行完畢后第一時(shí)間回調(diào),這就不需要去調(diào)用會(huì)阻塞當(dāng)前線程的get() 方法。

另外,從結(jié)果看 done() 方法并不是在主線程中執(zhí)行,如果是像在 Android 中,是不能在工作線程中直接更新 UI 的。

其實(shí)我們的代碼可以寫得再簡單點(diǎn),只是觀賞性并不好

        FutureTask<String> futureTask = new FutureTask<String>(new Task()){
            @Override
            protected void done() {
                try {
                    System.out.println(Thread.currentThread().getName() + ": " + get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };

直接在創(chuàng)建 FutureTask 對象的時(shí)候,復(fù)寫它的 done() 方法。

ExecutorService.submit()

查看 API 發(fā)現(xiàn),ExecutorServicesubmit() 方法可以傳入一個(gè) Runnable 對象作為參數(shù)。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }    

通過上面的原理分析可知,submit(Runnable task) 方法的任務(wù)返回結(jié)果是 null,而 submit(Runnable task, T result) 返回的結(jié)果就是參數(shù) result.

Runnable 轉(zhuǎn) Callable

有時(shí)候代碼中只用了 Runnable 來執(zhí)行任務(wù),如果我們想給這個(gè)任務(wù)一個(gè)返回結(jié)果,就需要把 Runnable 轉(zhuǎn)為 Callable ,然后用 ExecutorService.submit() 來執(zhí)行。 Executors 提供了靜態(tài)方法用來轉(zhuǎn)換

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }        

如果看懂了上面的原理,這里就很簡單了 ~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容