并發(fā)編程之ScheduledThreadPoolExecutor(四)

一、ScheduledThreadPoolExecutor的爺爺類AbstractExecutorService

本篇把上一篇漏掉的一些如submit等方法的解析補回來,盡量給大家構(gòu)建一個完整的體系。想要了解submit首先得了解FutureTask!

1、FutureTask類
1.1、FutureTask實現(xiàn)的接口,頂層是Runnable和Future
public class FutureTask<V> implements RunnableFuture<V> {···}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
1.2、FutureTask重要的成員變量
    private volatile int state;  //任務(wù)的狀態(tài)
    /*可能的任務(wù)狀態(tài)切換
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private static final int NEW          = 0;  //新建狀態(tài)
    private static final int COMPLETING   = 1;  //正在執(zhí)行狀態(tài)
    private static final int NORMAL       = 2;  //正??扇〗Y(jié)果狀態(tài)
    private static final int EXCEPTIONAL  = 3;  //例外狀態(tài)
    private static final int CANCELLED    = 4;  //取消狀態(tài)
    private static final int INTERRUPTING = 5;  //正在中斷
    private static final int INTERRUPTED  = 6;  //中斷完成
    /** 內(nèi)部組合了可返回結(jié)果的狀態(tài)類 */
    private Callable<V> callable;
    /** 任務(wù)執(zhí)行結(jié)果 */
    private Object outcome; 
    /** 執(zhí)行callable的線程*/
    private volatile Thread runner;
    /**等待任務(wù)執(zhí)行完成的線程隊列*/
    private volatile WaitNode waiters;
1.3、FutureTask重要的成員方法
//用于獲取任務(wù)結(jié)果,多線程競爭無所謂,只是讀取
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
/**取消任務(wù)方法,改變?nèi)蝿?wù)狀態(tài),中斷執(zhí)行任務(wù)的線程,喚醒等待隊列中的線程并把等待隊列線程節(jié)點設(shè)置為空*/
public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
/** 獲取結(jié)果,如果當前任務(wù)還沒完成,調(diào)用awaitDone等待*/
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
/**定時等待任務(wù)完成,內(nèi)部排隊掛起*/
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            //如果任務(wù)完成則把線程從阻塞隊列中移除
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) 
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
/**任務(wù)執(zhí)行入口*/
public void run() {
        //這里的cas操作設(shè)置了runner線程為當前線程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))。
            return;
        try {
            //構(gòu)造方法里設(shè)置的Runnable或者Callable
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //真正執(zhí)行任務(wù)的call方法,返回執(zhí)行結(jié)果,阻塞調(diào)用
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

在認識了FutureTask本質(zhì)是Runnable和Callable任務(wù)之后感覺FutureTask真是沒啥東西,所謂阻塞獲取無非就是死循環(huán)取Callable.call()方法的結(jié)果,等待一定時間獲取也是調(diào)用了LockSupport掛起線程一定時間而已。


2、最重要的方法submit:
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}
2.1、submit方法解析:

submit前兩種都調(diào)用了newTaskFor方法,該方法返回一個FutureTask<T>,其內(nèi)部實現(xiàn)實際上調(diào)用了Executors的callable方法,Executor給Runnable和Callable提供了一個適配器,讓Runnable有結(jié)果可以返回,這里暫且知道FutureTask內(nèi)部是由Callable任務(wù)類型實現(xiàn)的就可以了。而真正執(zhí)行任務(wù)的方法還是execute(ftask),該方法的調(diào)用鏈是這樣的

execute(ftask)-->ThreadPoolExecutor.execute(ftask)-->ThreadPoolExecutor.addWorker()
-->t.start()-->(FutureTask)ftask.run()-->Callable.call()-->Executors.RunnableAdapter.call()-->(Runnable)task.run

這個調(diào)用鏈很長,最終的最終還是調(diào)用了task的run方法,中間過程為了保存線程執(zhí)行結(jié)果使用了FutureTask的一些方法。
submit的第三種方法我們是很常用的,直接提交一個Callable任務(wù)給線程池執(zhí)行,并返回一個可以獲取線程執(zhí)行結(jié)果的FutureTask。
以上就是對上一篇的補充,接著分析定時線程池


3、ScheduledThreadPoolExecutor類:

3.1、該類能夠執(zhí)行定時任務(wù)主要依賴于一個內(nèi)部類:ScheduledFutureTask
//該類繼承了FutureTask并實現(xiàn)了RunnableScheduledFuture(這個接口內(nèi)部又繼承了Delayed接口),接口不了解不要緊,只要知道必須實現(xiàn)兩個接口方法isPeriodic和getDelayed即可
 private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
        //任務(wù)編號
        private final long sequenceNumber;
        //任務(wù)可以開始執(zhí)行的時間
        private long time;
        //任務(wù)重復(fù)執(zhí)行的時間間隔
        private final long period;
        /** reExecutePeriodic方法調(diào)用后重新入隊的任務(wù) */
        RunnableScheduledFuture<V> outerTask = this;
        // 支持快速取消,Delayed隊列的索引值 
        int heapIndex;
        //這個比較方法是任務(wù)入隊的規(guī)則,即隊列按照這個比較方法對任務(wù)排序,實際上就是按照任務(wù)設(shè)定的開始時間創(chuàng)建的任務(wù)隊列
        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
        //任務(wù)的run方法,
        public void run() {
            //是否可重復(fù)執(zhí)行
            boolean periodic = isPeriodic();
            //如果任務(wù)當前狀態(tài)不可執(zhí)行,那么取消該任務(wù)
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //如果任務(wù)可重復(fù)執(zhí)行并且狀態(tài)也是可執(zhí)行的,那么直接執(zhí)行該任務(wù)并設(shè)置下一次要執(zhí)行的時間
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                //重新周期執(zhí)行該任務(wù),內(nèi)部把任務(wù)放到待執(zhí)行的隊列中(如果可執(zhí)行)或者取消任務(wù)
                reExecutePeriodic(outerTask);
            }
        }
        //取消任務(wù)方法,調(diào)用了ThreadPoolExecutor的取消方法
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            //如果任務(wù)取消成功那么從任務(wù)隊列中移除任務(wù)
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

3.1小結(jié):

定時線程池能夠定時執(zhí)行任務(wù)全靠內(nèi)部類ScheduledFutureTask,該類維護了一個DelayedWorkQueue(按照自己指定順序加入元素的隊列),任務(wù)在隊列中的順序?qū)嶋H上是任務(wù)開始執(zhí)行的時間的順序。當任務(wù)執(zhí)行的時候會調(diào)用ThreadPoolExecutor的執(zhí)行方法runAndReset(線程執(zhí)行完交出執(zhí)行權(quán)),執(zhí)行完會設(shè)置下一次將要執(zhí)行的時間并把任務(wù)重新放到任務(wù)隊列等待其他線程執(zhí)行。判斷當前任務(wù)是否可執(zhí)行要看當前線程池是否處于SHUTDOWN并有殘留任務(wù)或者處于RUNNING狀態(tài),如果線程池處于其他狀態(tài),那么當前任務(wù)是要取消的。

3.2、定時線程池最重要的方法schedule
//該方法對普通任務(wù)進行了裝飾,即裝飾城ScheduledFutureTask,真正執(zhí)行任務(wù)的是delayedExecute
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}
//本方法真正執(zhí)行定時任務(wù)
private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果線程池關(guān)閉,拒絕接受任務(wù)
        if (isShutdown())
            reject(task);
        else {
            //任務(wù)入隊
            super.getQueue().add(task);
            //如果線程池關(guān)閉或者不能繼續(xù)執(zhí)行任務(wù),從隊列里移除任務(wù)并取消任務(wù)
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            //如果可以執(zhí)行任務(wù)那么確保線程池各方面狀態(tài)正常
            else
                ensurePrestart();
        }
}
//該方法在線程池中啟動了一定數(shù)量的工作線程,addWorker方法我們前面講過,該方法會自己取任務(wù)執(zhí)行
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

3.2小結(jié):

分析至此我們可以明白定時任務(wù)的執(zhí)行原理了,
首先:任務(wù)創(chuàng)建成功之后根據(jù)任務(wù)開始執(zhí)行的時間先后順序把任務(wù)加到任務(wù)隊列里(比如隊列里原來有個任務(wù)10點執(zhí)行,你這個任務(wù)11點執(zhí)行,那你就在它后面,如果再來一個任務(wù)10點半執(zhí)行,那么10點半的任務(wù)在你倆中間)
然后:啟動線程池,線程池的工人們來這個時序隊列里取任務(wù)執(zhí)行,如果在線程池關(guān)閉的時候恰好來了一個任務(wù),那么丟棄該任務(wù)。線程池的線程執(zhí)行完任務(wù)后會交出任務(wù)的使用權(quán)(runner=null),并把任務(wù)下一次執(zhí)行時間設(shè)置成你創(chuàng)建任務(wù)時設(shè)置的時間間隔+原來的執(zhí)行時間,并再一次把任務(wù)加到任務(wù)隊列(如果線程池沒關(guān)閉的話)。

總結(jié):從AQS框架到ScheduledThreadPoolExecutor一層層的類和接口構(gòu)成了層次清晰的知識樹,解析到這里不得不佩服作者!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 譯序 本指南根據(jù) Jakob Jenkov 最新博客翻譯,請隨時關(guān)注博客更新:http://tutorials.j...
    高廣超閱讀 5,482評論 1 68
  • 一.線程安全性 線程安全是建立在對于對象狀態(tài)訪問操作進行管理,特別是對共享的與可變的狀態(tài)的訪問 解釋下上面的話: ...
    黃大大吃不胖閱讀 972評論 0 3
  • 早上和媽媽一起回憶了小時候,才發(fā)現(xiàn)原來父母為我們真的做了很多很多。因為二姐的出生,我們一家的命運都有所改變,至少媽...
    劉芷寧running閱讀 273評論 0 0
  • 猴年說猴 開場:猴年到,猴年到,吉星福星高高照!小妹小妹開心笑,來到舞臺樂淘淘。宋小妹給大家拜年了! 祝大家猴年行...
    土家霜妹閱讀 442評論 0 0
  • 中醫(yī)學(xué)認為脾胃是人體的“后天之本”、“氣血生化之源”,是消化、吸收、轉(zhuǎn)化人體所需氣血精微的重要臟腑。 脾胃...
    趙瀾青閱讀 2,622評論 0 1

友情鏈接更多精彩內(nèi)容