走馬觀花-FutureTask和Callable

創(chuàng)建線程的方式

  • 直接創(chuàng)建Thread
        new Thread() {
                @Override
                public void run() {
                    super.run();
                    setContent("來(lái)自Thread");
                }
            }.start();
  • 通過(guò)Thread執(zhí)行Runnable
        new Thread(new Runnable() {
                @Override
                public void run() {
                    setContent("來(lái)自Runnable");
                }
            }).start();
  • 通過(guò)FutureTask和Callable
        //1.創(chuàng)建一個(gè)類實(shí)現(xiàn)Callable接口并得到實(shí)現(xiàn)類對(duì)象
        TestCallable callable = new TestCallable();
        //2.將Callable對(duì)象構(gòu)建成FutureTask
        FutureTask task = new FutureTask<>(callable);
        //通過(guò)FutureTask實(shí)例創(chuàng)建Thread對(duì)象
        Thread thread = new Thread(task);

        thread.start();

這些方法都可以開啟一個(gè)新線程。那么他們之間有什么不同呢?
Thread繼承自Runnable,Runnable的源碼:

    public interface Runnable {
        /**
         * 當(dāng)一個(gè)對(duì)象實(shí)現(xiàn)了Runnable接口去實(shí)現(xiàn)一個(gè)線程是,開啟該線程就會(huì)在此線程中回調(diào)run()方法
         */
        public abstract void run();
    }

Runnable類的介紹:

??Runnable接口應(yīng)該被任何一個(gè)想要執(zhí)行一個(gè)新線程的類對(duì)象實(shí)現(xiàn)。且這個(gè)類必須定義一個(gè)不帶參數(shù)的方法run()
??這個(gè)接口被設(shè)計(jì)來(lái)為那些存活著的時(shí)候想要去執(zhí)行某些代碼的對(duì)象提供一個(gè)協(xié)議。例如:Thread實(shí)現(xiàn)了Runnable接口,Thread對(duì)象存活意味著Thread啟動(dòng)了且還未停止。
??此外,Runnable提供了一種方法,讓類在不繼承Thread的情況下處于活動(dòng)狀態(tài)。一個(gè)實(shí)現(xiàn)了Runnable的類可以通過(guò)實(shí)例化一個(gè)Thread實(shí)例并將它自己作為目標(biāo)進(jìn)行傳遞,而無(wú)需創(chuàng)建Thread的子類。 在大多數(shù)情況下,如果您只打算覆蓋run()方法并且沒(méi)有其他Thread方法,則應(yīng)該使用Runnable接口。 這很重要,因?yàn)轭惒粦?yīng)該被子類化除非程序員打算修改或增強(qiáng)班級(jí)的基本行為.總而言之,就是借助Runnable來(lái)開啟一個(gè)線程。

Callable:

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * 返回結(jié)果,或者拋出異常
         */
        V call() throws Exception;
    }

Callable類的介紹:

??一個(gè)可以返回結(jié)果或者拋出異常的任務(wù),需要實(shí)現(xiàn)一個(gè)不帶參數(shù)的方法call()
??Callable接口通Runnable接口很相似,兩者都是被設(shè)計(jì)來(lái)開啟一個(gè)新線程的。不同的是,Runnable接口不能返回 一個(gè)結(jié)果或拋出異常。
??Executors類包含從其他常見(jiàn)形式轉(zhuǎn)換為Callable類的實(shí)用程序方法。

FutureTask類的介紹:

??可取消的異步計(jì)算。 該類提供Future的基本實(shí)現(xiàn),其中包含啟動(dòng)和取消任務(wù)的方法,查詢?nèi)蝿?wù)是否完成以及檢索任務(wù)結(jié)果的方法。
??只有在任務(wù)完成后才能檢索結(jié)果; 如果任務(wù)還沒(méi)有完成,get方法將會(huì)被阻塞。 一旦任務(wù)完成,任務(wù)就不能重新啟動(dòng)或取消(除非使用runAndReset調(diào)用任務(wù))
??FutureTask可以用來(lái)包裝一個(gè)Callable對(duì)象。因?yàn)镕utureTask實(shí)現(xiàn)了Runnable接口,且FutureTask能被Executor提交
??除了作為獨(dú)立類使用外,此類還提供受保護(hù)的功能,這在創(chuàng)建自定義任務(wù)類時(shí)可能非常有用。

FutureTask運(yùn)行狀態(tài):

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

??任務(wù)的運(yùn)行狀態(tài),初始化為NEW。 運(yùn)行狀態(tài)僅在方法set,setException和cancel中轉(zhuǎn)換為終端狀態(tài)。
??在完成期間,狀態(tài)可能會(huì)呈現(xiàn)COMPLETING(結(jié)果正在設(shè)置)或INTERRUPTING(中斷賽跑者以滿足取消(true))的瞬態(tài)值。
??從這些中間狀態(tài)到最終狀態(tài)的轉(zhuǎn)換使用更便宜的有序/惰性寫入,因?yàn)橹凳俏ㄒ坏牟⑶也荒苓M(jìn)一步修改。

可能的狀態(tài)轉(zhuǎn)換:

         NEW -> COMPLETING -> NORMAL
         NEW -> COMPLETING -> EXCEPTIONAL
         NEW -> CANCELLED
         NEW -> INTERRUPTING -> INTERRUPTED

FutureTask構(gòu)造器:

        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }


        //以適配模式將Runnable對(duì)象適配成一個(gè)Callable對(duì)象
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }

啟動(dòng)線程時(shí),會(huì)調(diào)用FutureTask的run()方法:

    public void run() {
        //如果當(dāng)前FutureTask的裝填不是New,直接return
        //直接將runner變成當(dāng)前線程
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //調(diào)用Callable的call方法(異步線程,可以做耗時(shí)操作)
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

compareAndSwapInt()方法作用及其參數(shù)意思

    /**
     *public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5)方法
     *  var1:需要改變的對(duì)象
     *  var2:偏移量
     *  var4:期待的值
     *  var5:改變后的值
     *  作用:調(diào)用該方法時(shí),若var1所在地址偏移了var2之后所在的地址位置取到的值(value)和var4相等,則將
     *        value的值替換成var5
     *  返回:true :value的值被更改
     *
     *public native void putOrderedInt(Object obj, long offset, int value);
     *  作用:設(shè)置obj對(duì)象中offset偏移地址對(duì)應(yīng)的整型field的值為指定值。這是一個(gè)有序或者
     *  有延遲的putIntVolatile方法,并且不保證值的改變被其他線程立即看到
     */

set()方法:

      protected void set(V v) {
             //若當(dāng)前對(duì)象的state偏移了STATE之后的值等于NEW,則將state置為COMPLETETING
             if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                 outcome = v;
                 //將state置為NORMAL
                 U.putOrderedInt(this, STATE, NORMAL); // final state
                 finishCompletion();
             }
         }

finishCompletion()方法:

    private void finishCompletion() {
        // assert state > COMPLETING;
        ...

        //任務(wù)執(zhí)行完成之后調(diào)用,protected方法,子類實(shí)現(xiàn)
        done();

        //重置Callable
        callable = null;
    }

cancel(boolean)方法:

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

cancel()方法可以看到,將正在運(yùn)行的線程通過(guò)調(diào)用interrupt()方法打斷,然后重置state的值為INTERRUPTED.最后調(diào)用finishCompletion()方法重置一些變量與狀態(tài)。

isDone()和isCancelled():

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

get()方法獲取結(jié)果:
??get()是一個(gè)阻塞方法,為什么阻塞?其實(shí)就是調(diào)用該方法之后,開啟一個(gè)無(wú)限循環(huán),直到state的值變?yōu)镹ormal或大于等于CANCELLED才將值通過(guò)report()方法返回。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

運(yùn)行中,進(jìn)入awaitDone(false, 0L)方法開啟循環(huán)等待:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // 0L表示未停止
        WaitNode q = null;      //等待節(jié)點(diǎn)
        boolean queued = false; //是否入隊(duì)

        //開啟死循環(huán),知道結(jié)果出現(xiàn)
        for (;;) {
            int s = state;
            //執(zhí)行完成,返回結(jié)果
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //正在執(zhí)行
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                //線程處于可執(zhí)行狀態(tài)
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            //new 一個(gè)WaitNode對(duì)象
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

其實(shí)可以看到,返回結(jié)果要么正常,要么拋出異常。最后調(diào)用report(int)方法將結(jié)果返回:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            //返回正常結(jié)果
            return (V)x;
        if (s >= CANCELLED)
            //拋出異常
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容