[懷舊并發(fā)09]分析Java延遲與周期任務(wù)的實現(xiàn)原理

Java并發(fā)編程源碼分析系列:

延遲或周期執(zhí)行任務(wù)可以使用Timer或者ScheduledThreadPoolExecutor,前者可以拋棄,后者是今天的主角。

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,對應(yīng)執(zhí)行任務(wù)變成ScheduledFutureTask。本文會在前三篇分析線程池原理的基礎(chǔ)上,分析ScheduledThreadPoolExecutor的實現(xiàn)原理,最后介紹下為什么不用Timer了。

ScheduledThreadPoolExecutor的創(chuàng)建

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
ScheduledExecutorService singleScheduledThreadPool = Executors.newSingleThreadScheduledExecutor();

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor的創(chuàng)建可以使用Executors,也可以自己傳參構(gòu)建。上面的構(gòu)造函數(shù)是參數(shù)最全的版本,可以設(shè)置線程目標(biāo)數(shù)量、線程工廠和飽和策略。至于等待隊列,使用內(nèi)部類DelayedWorkQueue,看后文分析。

ScheduledFutureTask

ScheduledFutureTask的構(gòu)造函數(shù)沒什么特別,保存了三個參數(shù)。

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • time:任務(wù)執(zhí)行時間;
  • period:任務(wù)周期執(zhí)行間隔;
  • sequenceNumber:自增的任務(wù)序號。

Callable默認(rèn)period=0,表示任務(wù)不是周期執(zhí)行,因為只有Runnable可以周期執(zhí)行。想想也是,Callable目的是獲得執(zhí)行結(jié)果,沒有必要重復(fù)調(diào)用。

圖1

ScheduledFutureTask繼承了我們熟悉的FutureTask,這個不用多說。圖1是它實現(xiàn)的接口,比較陌生的是Delayed,而Delayed又繼承了Comparable。

public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

這兩個接口的存在很容易理解,ScheduledFutureTask在等待隊列里調(diào)度不再按照FIFO,而是按照執(zhí)行時間,誰即將執(zhí)行,誰就排在前面。在這里也可以看到sequenceNumber的作用,當(dāng)執(zhí)行時間相同時,按照序號排序。

添加延遲任務(wù)

對ScheduledThreadPoolExecutor使用通用的execute或者submit提交任務(wù),最終調(diào)用schedule方法,默認(rèn)馬上執(zhí)行。如果需要延遲執(zhí)行,需要直接使用schedule,傳遞時間參數(shù)。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

Runnable和Callable包裝成ScheduledFutureTask實例,保存了延遲信息,然后執(zhí)行delayedExecute。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}

如果線程池已經(jīng)關(guān)閉,直接調(diào)用飽和策略,否則將任務(wù)加入等待隊列。加入之后,需要再判斷線程池的狀態(tài),和當(dāng)前任務(wù)是否能運行。如果不能繼續(xù)執(zhí)行,將任務(wù)移出隊列并取消任務(wù)。

canRunInCurrentRunState處理任務(wù)加入等待隊列后,又未執(zhí)行就發(fā)生線程池關(guān)閉的情況,它通過預(yù)設(shè)的兩個變量判斷任務(wù)到底能不能執(zhí)行。

  • 延遲任務(wù)用executeExistingDelayedTasksAfterShutdown
  • 周期任務(wù)用continueExistingPeriodicTasksAfterShutdown
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

最后調(diào)用到ensurePrestart,使用addWorkder增加工作線程,這在ThreadPoolExecutor解釋過了

添加周期任務(wù)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                             long initialDelay,long period,TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (period <= 0)
       throw new IllegalArgumentException();
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   delayedExecute(t);
   return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                long delay,TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (delay <= 0)
       throw new IllegalArgumentException();
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
                                     unit.toNanos(-delay));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   delayedExecute(t);
   return t;
}

執(zhí)行周期任務(wù)有上面兩個方法,具體作用方法名寫得很清楚:

  • scheduleAtFixedRate:按固定的頻率執(zhí)行,不受執(zhí)行時長影響,到點就執(zhí)行;
  • scheduleWithFixedDelay:任務(wù)執(zhí)行完后,按固定的延后時間再執(zhí)行。

兩個方法幾乎一樣,不同的是構(gòu)建ScheduledFutureTask時,period一個傳正數(shù),另一個傳負(fù)數(shù)。不用懷疑,區(qū)分兩種情況就是用正負(fù)。

等待隊列

線程池的等待隊列使用了內(nèi)部類DelayedWorkQueue,和普通線程池等待隊列最大的不同是它的任務(wù)是按照目標(biāo)執(zhí)行時間進(jìn)行排序。

入隊的offer被重寫了,add和put方法也是調(diào)用offer,具體BlockingQueue的實現(xiàn)邏輯不在這里討論,重點是看offer里的siftUp方法。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

siftUp根據(jù)任務(wù)的compareTo,將任務(wù)移動到隊列中指定的位置,就是這樣。

對應(yīng)地,出隊take方法,根據(jù)任務(wù)的delay時間,小于等于0時將任務(wù)出隊,否則等待。

任務(wù)執(zhí)行

當(dāng)線程池從等待隊列取出一個任務(wù)時,會執(zhí)行它的run方法。

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

方法有三個分支,第一個if判斷任務(wù)在當(dāng)前線程池狀態(tài)下是否能執(zhí)行,canRunInCurrentRunState已經(jīng)講解過。第二個if是判斷是否周期任務(wù),不是的話直接執(zhí)行,不需要多余的操作。重點來看第三個if,也就是周期執(zhí)行任務(wù)。

  1. runAndReset:任務(wù)執(zhí)行完重置為初始狀態(tài),等待下一次執(zhí)行;
  2. setNextRunTime:計算下次執(zhí)行時間;
  3. reExecutePeriodic:再調(diào)度任務(wù)。
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

計算下次執(zhí)行時間,period根據(jù)正負(fù)有不同的計算邏輯,負(fù)的時間也會先改正,很明顯對應(yīng)上文的scheduleAtFixedRate和scheduleWithFixedDelay兩個方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

將任務(wù)重新加入等待隊列,中間幾個方法都解釋過了。

Timer的缺陷

自從知道ScheduledThreadPoolExecutor,再沒有使用Timer,因為它有幾個缺陷:

  • 多任務(wù)在單線程里執(zhí)行,一個任務(wù)結(jié)束,另一個任務(wù)才能開始,時間間隔不準(zhǔn);
  • 出現(xiàn)異常會導(dǎo)致全部任務(wù)停止;
  • 絕對時間,受系統(tǒng)時間影響。
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);

Timer的代碼很簡單,主要數(shù)據(jù)結(jié)構(gòu)是一個任務(wù)隊列和一個執(zhí)行線程。新增的任務(wù)會加入任務(wù)隊列,到達(dá)時間后,由執(zhí)行線程執(zhí)行。只有一個線程,很容易理解上面講的缺陷。

ScheduledThreadPoolExecutor每個任務(wù)都有對應(yīng)的執(zhí)行線程,時間使用相對時間計算,也就沒有上面的缺陷,所以沒有理由使用Timer了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容