深入理解Java線程池:ScheduledThreadPoolExecutor

博客鏈接:http://www.ideabuffer.cn/2017/04/14/深入理解Java線程池:ScheduledThreadPoolExecutor/


介紹

自JDK1.5開始,JDK提供了ScheduledThreadPoolExecutor類來支持周期性任務的調(diào)度。在這之前的實現(xiàn)需要依靠Timer和TimerTask或者其它第三方工具來完成。但Timer有不少的缺陷:

  • Timer是單線程模式;
  • 如果在執(zhí)行任務期間某個TimerTask耗時較久,那么就會影響其它任務的調(diào)度;
  • Timer的任務調(diào)度是基于絕對時間的,對系統(tǒng)時間敏感;
  • Timer不會捕獲執(zhí)行TimerTask時所拋出的異常,由于Timer是單線程,所以一旦出現(xiàn)異常,則線程就會終止,其他任務也得不到執(zhí)行。

ScheduledThreadPoolExecutor繼承ThreadPoolExecutor來重用線程池的功能,它的實現(xiàn)方式如下:

  • 將任務封裝成ScheduledFutureTask對象,ScheduledFutureTask基于相對時間,不受系統(tǒng)時間的改變所影響;
  • ScheduledFutureTask實現(xiàn)了java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有兩個重要的方法:compareTo和getDelay。compareTo方法用于比較任務之間的優(yōu)先級關系,如果距離下次執(zhí)行的時間間隔較短,則優(yōu)先級高;getDelay方法用于返回距離下次任務執(zhí)行時間的時間間隔;
  • ScheduledThreadPoolExecutor定義了一個DelayedWorkQueue,它是一個有序隊列,會通過每個任務按照距離下次執(zhí)行時間間隔的大小來排序;
  • ScheduledFutureTask繼承自FutureTask,可以通過返回Future對象來獲取執(zhí)行的結(jié)果。

通過如上的介紹,可以對比一下Timer和ScheduledThreadPoolExecutor:

Timer ScheduledThreadPoolExecutor
單線程 多線程
單個任務執(zhí)行時間影響其他任務調(diào)度 多線程,不會影響
基于絕對時間 基于相對時間
一旦執(zhí)行任務出現(xiàn)異常不會捕獲,其他任務得不到執(zhí)行 多線程,單個任務的執(zhí)行不會影響其他線程

所以,在JDK1.5之后,應該沒什么理由繼續(xù)使用Timer進行任務調(diào)度了。

ScheduledThreadPoolExecutor的使用

下面用一個具體的例子來說明ScheduledThreadPoolExecutor的使用:

public class ScheduledThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {
        // 創(chuàng)建大小為5的線程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

        for (int i = 0; i < 3; i++) {
            Task worker = new Task("task-" + i);
            // 只執(zhí)行一次
//          scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS);
            // 周期性執(zhí)行,每5秒執(zhí)行一次
            scheduledThreadPool.scheduleAtFixedRate(worker, 0,5, TimeUnit.SECONDS);
        }

        Thread.sleep(10000);

        System.out.println("Shutting down executor...");
        // 關閉線程池
        scheduledThreadPool.shutdown();
        boolean isDone;
        // 等待線程池終止
        do {
            isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("awaitTermination...");
        } while(!isDone);

        System.out.println("Finished all threads");
    }


}


class Task implements Runnable {

    private String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("name = " + name + ", startTime = " + new Date());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("name = " + name + ", endTime = " + new Date());
    }

}

下面就來具體分析一下ScheduledThreadPoolExecutor的實現(xiàn)過程。

ScheduledThreadPoolExecutor的實現(xiàn)

ScheduledThreadPoolExecutor的類結(jié)構(gòu)

看下ScheduledThreadPoolExecutor內(nèi)部的類圖:

不要被這么多類嚇到,這里只不過是為了更清楚的了解ScheduledThreadPoolExecutor有關調(diào)度和隊列的接口。

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,實現(xiàn)了ScheduledExecutorService接口,該接口定義了schedule等任務調(diào)度的方法。

同時ScheduledThreadPoolExecutor有兩個重要的內(nèi)部類:DelayedWorkQueue和ScheduledFutureTask??梢钥吹?,DelayeddWorkQueue是一個阻塞隊列,而ScheduledFutureTask繼承自FutureTask,并且實現(xiàn)了Delayed接口。有關FutureTask的介紹請參考另一篇文章:FutureTask源碼解析。

ScheduledThreadPoolExecutor的構(gòu)造方法

ScheduledThreadPoolExecutor有3中構(gòu)造方法:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

因為ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以這里都是調(diào)用的ThreadPoolExecutor類的構(gòu)造方法。有關ThreadPoolExecutor可以參考深入理解Java線程池:ThreadPoolExecutor。

這里注意傳入的阻塞隊列是DelayedWorkQueue類型的對象。后面會詳細介紹。

schedule方法

在上文的例子中,使用了schedule方法來進行任務調(diào)度,schedule方法的代碼如下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}


public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

首先,這里的兩個重載的schedule方法只是傳入的第一個參數(shù)不同,可以是Runnable對象或者Callable對象。會把傳入的任務封裝成一個RunnableScheduledFuture對象,其實也就是ScheduledFutureTask對象,decorateTask默認什么功能都沒有做,子類可以重寫該方法:

/**
 * 修改或替換用于執(zhí)行 runnable 的任務。此方法可重寫用于管理內(nèi)部任務的具體類。默認實現(xiàn)只返回給定任務。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

/**
 * 修改或替換用于執(zhí)行 callable 的任務。此方法可重寫用于管理內(nèi)部任務的具體類。默認實現(xiàn)只返回給定任務。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

然后,通過調(diào)用delayedExecute方法來延時執(zhí)行任務。
最后,返回一個ScheduledFuture對象。

scheduleAtFixedRate方法

該方法設置了執(zhí)行周期,下一次執(zhí)行時間相當于是上一次的執(zhí)行時間加上period,它是采用已固定的頻率來執(zhí)行任務:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

scheduleWithFixedDelay方法

該方法設置了執(zhí)行周期,與scheduleAtFixedRate方法不同的是,下一次執(zhí)行時間是上一次任務執(zhí)行完的系統(tǒng)時間加上period,因而具體執(zhí)行時間不是固定的,但周期是固定的,是采用相對固定的延遲來執(zhí)行任務:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

注意這里的unit.toNanos(-delay));,這里把周期設置為負數(shù)來表示是相對固定的延遲執(zhí)行。

scheduleAtFixedRate和scheduleWithFixedDelay的區(qū)別在setNextRunTime方法中就可以看出來:

private void setNextRunTime() {
    long p = period;
    // 固定頻率,上次執(zhí)行時間加上周期時間
    if (p > 0)
        time += p;
    // 相對固定延遲執(zhí)行,使用當前系統(tǒng)時間加上周期時間
    else
        time = triggerTime(-p);
}

setNextRunTime方法會在run方法中執(zhí)行完任務后調(diào)用。

triggerTime方法

triggerTime方法用于獲取下一次執(zhí)行的具體時間:

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}


long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

這里的delay < (Long.MAX_VALUE >> 1是為了判斷是否要防止Long類型溢出,如果delay的值小于Long類型最大值的一半,則直接返回delay,否則需要進行防止溢出處理。

overflowFree方法

該方法的作用是限制隊列中所有節(jié)點的延遲時間在Long.MAX_VALUE之內(nèi),防止在compareTo方法中溢出。

private long overflowFree(long delay) {
    // 獲取隊列中的第一個節(jié)點
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        // 獲取延遲時間
        long headDelay = head.getDelay(NANOSECONDS);
        // 如果延遲時間小于0,并且 delay - headDelay 超過了Long.MAX_VALUE
        // 將delay設置為 Long.MAX_VALUE + headDelay 保證delay小于Long.MAX_VALUE
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}

當一個任務已經(jīng)可以執(zhí)行出隊操作,但還沒有執(zhí)行,可能由于線程池中的工作線程不是空閑的。具體分析一下這種情況:

  • 為了方便說明,假設Long.MAX_VALUE=1023,也就是11位,并且當前的時間是100,調(diào)用triggerTime時并沒有對delay進行判斷,而是直接返回了now() + delay,也就是相當于100 + 1023,這肯定是溢出了,那么返回的時間是-925;
  • 如果頭節(jié)點已經(jīng)可以出隊但是還沒有執(zhí)行出隊,那么頭節(jié)點的執(zhí)行時間應該是小于當前時間的,假設是95;
  • 這時調(diào)用offer方法向隊列中添加任務,在offer方法中會調(diào)用siftUp方法來排序,在siftUp方法執(zhí)行時又會調(diào)用ScheduledFutureTask中的compareTo方法來比較執(zhí)行時間;
  • 這時如果執(zhí)行到了compareTo方法中的long diff = time - x.time;時,那么計算后的結(jié)果就是-925 - 95 = -1020,那么將返回-1,而正常情況應該是返回1,因為新加入的任務的執(zhí)行時間要比頭結(jié)點的執(zhí)行時間要晚,這就不是我們想要的結(jié)果了,這會導致隊列中的順序不正確。
  • 同理也可以算一下在執(zhí)行compareTo方法中的long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);時也會有這種情況;
  • 所以在triggerTime方法中對delay的大小做了判斷,就是為了防止這種情況發(fā)生。

如果執(zhí)行了overflowFree方法呢,這時headDelay = 95 - 100 = -5,然后執(zhí)行delay = 1023 + (-5) = 1018,那么triggerTime會返回100 + 1018 = -930,再執(zhí)行compareTo方法中的long diff = time - x.time;時,diff = -930 - 95 = -930 - 100 + 5 = 1018 + 5 = 1023,沒有溢出,符合正常的預期。

所以,overflowFree方法中把已經(jīng)超時的部分時間給減去,就是為了避免在compareTo方法中出現(xiàn)溢出情況。

(說實話,這段代碼看的很痛苦,一般情況下也不會發(fā)生這種情況,誰會傳一個Long.MAX_VALUE呢。要知道Long.MAX_VALUE的納秒數(shù)換算成年的話是292年,誰會這么無聊。。。)

ScheduledFutureTask的getDelay方法

public long getDelay(TimeUnit unit) {
    // 執(zhí)行時間減去當前系統(tǒng)時間
    return unit.convert(time - now(), NANOSECONDS);
}

ScheduledFutureTask的構(gòu)造方法

ScheduledFutureTask繼承自FutureTask并實現(xiàn)了RunnableScheduledFuture接口,具體可以參考上文的類圖,構(gòu)造方法如下:

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

/**
 * Creates a periodic action with given nano time and period.
 */
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

/**
 * Creates a one-shot action with given nanoTime-based trigger time.
 */
ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

這里面有幾個重要的屬性,下面來解釋一下:

  • time:下次任務執(zhí)行時的時間;
  • period:執(zhí)行周期;
  • sequenceNumber:保存任務被添加到ScheduledThreadPoolExecutor中的序號。

在schedule方法中,創(chuàng)建完ScheduledFutureTask對象之后,會執(zhí)行delayedExecute方法來執(zhí)行任務。

delayedExecute方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果線程池已經(jīng)關閉,使用拒絕策略拒絕任務
    if (isShutdown())
        reject(task);
    else {
        // 添加到阻塞隊列中
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 確保線程池中至少有一個線程啟動,即使corePoolSize為0
            // 該方法在ThreadPoolExecutor中實現(xiàn)
            ensurePrestart();
    }
}

說一下這里的第二個if判斷:

  1. 如果不是SHUTDOWN狀態(tài),執(zhí)行else,否則執(zhí)行步驟2;
  2. 如果在當前線程池運行狀態(tài)下可以執(zhí)行任務,執(zhí)行else,否則執(zhí)行步驟3;
  3. 從阻塞隊列中刪除任務,如果失敗,執(zhí)行else,否則執(zhí)行步驟4;
  4. 取消任務,但不中斷執(zhí)行中的任務。

對于步驟2,可以通過setContinueExistingPeriodicTasksAfterShutdownPolicy方法設置在線程池關閉時,周期任務繼續(xù)執(zhí)行,默認為false,也就是線程池關閉時,不再執(zhí)行周期任務。

ensurePrestart方法在ThreadPoolExecutor中定義:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

調(diào)用了addWorker方法,可以在深入理解Java線程池:ThreadPoolExecutor中查看addWorker方法的介紹,線程池中的工作線程是通過該方法來啟動并執(zhí)行任務的。

ScheduledFutureTask的run方法

回顧一下線程池的執(zhí)行過程:當線程池中的工作線程啟動時,不斷地從阻塞隊列中取出任務并執(zhí)行,當然,取出的任務實現(xiàn)了Runnable接口,所以是通過調(diào)用任務的run方法來執(zhí)行任務的。

這里的任務類型是ScheduledFutureTask,所以下面看一下ScheduledFutureTask的run方法:

public void run() {
    // 是否是周期性任務
    boolean periodic = isPeriodic();
    // 當前線程池運行狀態(tài)下如果不可以執(zhí)行任務,取消該任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果不是周期性任務,調(diào)用FutureTask中的run方法執(zhí)行
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性任務,調(diào)用FutureTask中的runAndReset方法執(zhí)行
    // runAndReset方法不會設置執(zhí)行結(jié)果,所以可以重復執(zhí)行任務
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 計算下次執(zhí)行該任務的時間
        setNextRunTime();
        // 重復執(zhí)行任務
        reExecutePeriodic(outerTask);
    }
}

有關FutureTask的run方法和runAndReset方法,可以參考FutureTask源碼解析。

分析一下執(zhí)行過程:

  1. 如果當前線程池運行狀態(tài)不可以執(zhí)行任務,取消該任務,然后直接返回,否則執(zhí)行步驟2;
  2. 如果不是周期性任務,調(diào)用FutureTask中的run方法執(zhí)行,會設置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟3;
  3. 如果是周期性任務,調(diào)用FutureTask中的runAndReset方法執(zhí)行,不會設置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4和步驟5;
  4. 計算下次執(zhí)行該任務的具體時間;
  5. 重復執(zhí)行任務。

ScheduledFutureTask的reExecutePeriodic方法

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

該方法和delayedExecute方法類似,不同的是:

  1. 由于調(diào)用reExecutePeriodic方法時已經(jīng)執(zhí)行過一次周期性任務了,所以不會reject當前任務;
  2. 傳入的任務一定是周期性任務。

onShutdown方法

onShutdown方法是ThreadPoolExecutor中的鉤子方法,在ThreadPoolExecutor中什么都沒有做,參考深入理解Java線程池:ThreadPoolExecutor,該方法是在執(zhí)行shutdown方法時被調(diào)用:

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    // 獲取在線程池已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有延遲任務
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // 獲取在線程池已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有定期任務
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 如果在線程池已 shutdown 的情況下不繼續(xù)執(zhí)行延遲任務和定期任務
    // 則依次取消任務,否則則根據(jù)取消狀態(tài)來判斷
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                // 如果有在 shutdown 后不繼續(xù)的延遲任務或周期任務,則從隊列中刪除并取消任務
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要自己實現(xiàn)阻塞的工作隊列,是因為ScheduledThreadPoolExecutor要求的工作隊列有些特殊。

DelayedWorkQueue是一個基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時任務的時候,每個任務的執(zhí)行時間都不同,所以DelayedWorkQueue的工作就是按照執(zhí)行時間的升序來排列,執(zhí)行時間距離當前時間越近的任務在隊列的前面(注意:這里的順序并不是絕對的,堆中的排序只保證了子節(jié)點的下次執(zhí)行時間要比父節(jié)點的下次執(zhí)行時間要大,而葉子節(jié)點之間并不一定是順序的,下文中會說明)。

堆結(jié)構(gòu)如下圖所示:

堆結(jié)構(gòu)

可見,DelayedWorkQueue是一個基于最小堆結(jié)構(gòu)的隊列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組:

在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性:

假設,索引值從0開始,子節(jié)點的索引值為k,父節(jié)點的索引值為p,則:

  1. 一個節(jié)點的左子節(jié)點的索引為:k = p * 2 + 1;
  2. 一個節(jié)點的右子節(jié)點的索引為:k = (p + 1) * 2;
  3. 一個節(jié)點的父節(jié)點的索引為:p = (k - 1) / 2。

為什么要使用DelayedWorkQueue呢?

定時任務執(zhí)行時需要取出最近要執(zhí)行的任務,所以任務在隊列中每次出隊時一定要是當前隊列中執(zhí)行時間最靠前的,所以自然要使用優(yōu)先級隊列。

DelayedWorkQueue是一個優(yōu)先級隊列,它可以保證每次出隊的任務都是當前隊列中執(zhí)行時間最靠前的,由于它是基于堆結(jié)構(gòu)的隊列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時的最壞時間復雜度是 O(logN)。

DelayedWorkQueue的屬性

// 隊列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根據(jù)初始容量創(chuàng)建RunnableScheduledFuture類型的數(shù)組
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

// leader線程
private Thread leader = null;
// 當較新的任務在隊列的頭部可用時,或者新線程可能需要成為leader,則通過該條件發(fā)出信號
private final Condition available = lock.newCondition();

注意這里的leader,它是Leader-Follower模式的變體,用于減少不必要的定時等待。什么意思呢?對于多線程的網(wǎng)絡模型來說:

所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態(tài):proccesser。它的基本原則就是,永遠最多只有一個leader。而所有follower都在等待成為leader。線程池啟動時會自動產(chǎn)生一個Leader負責等待網(wǎng)絡IO事件,當有一個事件產(chǎn)生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個網(wǎng)絡事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強CPU高速緩存相似性,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。

參考自:http://blog.csdn.net/goldlevi/article/details/7705180

具體leader的作用在分析take方法時再詳細介紹。

offer方法

既然是阻塞隊列,入隊的操作如add和put方法都調(diào)用了offer方法,下面查看一下offer方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // queue是一個RunnableScheduledFuture類型的數(shù)組,如果容量不夠需要擴容
        if (i >= queue.length)
            grow();
        size = i + 1;
        // i == 0 說明堆中還沒有數(shù)據(jù)
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
        // i != 0 時,需要對堆進行重新排序
            siftUp(i, e);
        }
        // 如果傳入的任務已經(jīng)是隊列的第一個節(jié)點了,這時available需要發(fā)出信號
        if (queue[0] == e) {
            // leader設置為null為了使在take方法中的線程在通過available.signal();后會執(zhí)行available.awaitNanos(delay);
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

有關Condition的介紹請參考深入理解AbstractQueuedSynchronizer(三)

這里的重點是siftUp方法。

siftUp方法

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        // 找到父節(jié)點的索引
        int parent = (k - 1) >>> 1;
        // 獲取父節(jié)點
        RunnableScheduledFuture<?> e = queue[parent];
        // 如果key節(jié)點的執(zhí)行時間大于父節(jié)點的執(zhí)行時間,不需要再排序了
        if (key.compareTo(e) >= 0)
            break;
        // 如果key.compareTo(e) < 0,說明key節(jié)點的執(zhí)行時間小于父節(jié)點的執(zhí)行時間,需要把父節(jié)點移到后面
        queue[k] = e;
        // 設置索引為k
        setIndex(e, k);
        k = parent;
    }
    // key設置為排序后的位置中
    queue[k] = key;
    setIndex(key, k);
}

代碼很好理解,就是循環(huán)的根據(jù)key節(jié)點與它的父節(jié)點來判斷,如果key節(jié)點的執(zhí)行時間小于父節(jié)點,則將兩個節(jié)點交換,使執(zhí)行時間靠前的節(jié)點排列在隊列的前面。

假設新入隊的節(jié)點的延遲時間(調(diào)用getDelay()方法獲得)是5,執(zhí)行過程如下:

  1. 先將新的節(jié)點添加到數(shù)組的尾部,這時新節(jié)點的索引k為7:
  2. 計算新父節(jié)點的索引:parent = (k - 1) >>> 1,parent = 3,那么queue[3]的時間間隔值為8,因為 5 < 8 ,將執(zhí)行queue[7] = queue[3]:
  3. 這時將k設置為3,繼續(xù)循環(huán),再次計算parent為1,queue[1]的時間間隔為3,因為 5 > 3 ,這時退出循環(huán),最終k為3:

可見,每次新增節(jié)點時,只是根據(jù)父節(jié)點來判斷,而不會影響兄弟節(jié)點。

另外,setIndex方法只是設置了ScheduledFutureTask中的heapIndex屬性:

private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
        ((ScheduledFutureTask)f).heapIndex = idx;
}

take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                // 計算當前時間到執(zhí)行時間的時間間隔
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                // leader不為空,阻塞線程
                if (leader != null)
                    available.await();
                else {
                    // leader為空,則把leader設置為當前線程,
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞到執(zhí)行時間
                        available.awaitNanos(delay);
                    } finally {
                        // 設置leader = null,讓其他線程執(zhí)行available.awaitNanos(delay);
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader不為空,則說明leader的線程正在執(zhí)行available.awaitNanos(delay);
        // 如果queue[0] == null,說明隊列為空
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

take方法是什么時候調(diào)用的呢?在深入理解Java線程池:ThreadPoolExecutor中,介紹了getTask方法,工作線程會循環(huán)地從workQueue中取任務。但定時任務卻不同,因為如果一旦getTask方法取出了任務就開始執(zhí)行了,而這時可能還沒有到執(zhí)行的時間,所以在take方法中,要保證只有在到指定的執(zhí)行時間的時候任務才可以被取走。

再來說一下leader的作用,這里的leader是為了減少不必要的定時等待,當一個線程成為leader時,它只等待下一個節(jié)點的時間間隔,但其它線程無限期等待。 leader線程必須在從take()或poll()返回之前signal其它線程,除非其他線程成為了leader。

舉例來說,如果沒有l(wèi)eader,那么在執(zhí)行take時,都要執(zhí)行available.awaitNanos(delay),假設當前線程執(zhí)行了該段代碼,這時還沒有signal,第二個線程也執(zhí)行了該段代碼,則第二個線程也要被阻塞。多個這時執(zhí)行該段代碼是沒有作用的,因為只能有一個線程會從take中返回queue[0](因為有l(wèi)ock),其他線程這時再返回for循環(huán)執(zhí)行時取的queue[0],已經(jīng)不是之前的queue[0]了,然后又要繼續(xù)阻塞。

所以,為了不讓多個線程頻繁的做無用的定時等待,這里增加了leader,如果leader不為空,則說明隊列中第一個節(jié)點已經(jīng)在等待出隊,這時其它的線程會一直阻塞,減少了無用的阻塞(注意,在finally中調(diào)用了signal()來喚醒一個線程,而不是signalAll())。

poll方法

下面看下poll方法,與take類似,但這里要提供超時功能:

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                // 如果delay <= 0,說明已經(jīng)到了任務執(zhí)行的時間,返回。
                if (delay <= 0)
                    return finishPoll(first);
                // 如果nanos <= 0,說明已經(jīng)超時,返回null
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // nanos < delay 說明需要等待的時間小于任務要執(zhí)行的延遲時間
                // leader != null 說明有其它線程正在對任務進行阻塞
                // 這時阻塞當前線程nanos納秒
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 這里的timeLeft表示delay減去實際的等待時間
                        long timeLeft = available.awaitNanos(delay);
                        // 計算剩余的等待時間
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

finishPoll方法

當調(diào)用了take或者poll方法能夠獲取到任務時,會調(diào)用該方法進行返回:

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 數(shù)組長度-1
    int s = --size;
    // 取出最后一個節(jié)點
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    // 長度不為0,則從第一個元素開始排序,目的是要把最后一個節(jié)點放到合適的位置上
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

siftDown方法

siftDown方法使堆從k開始向下調(diào)整:

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    // 根據(jù)二叉樹的特性,數(shù)組長度除以2,表示取有子節(jié)點的索引
    int half = size >>> 1;
    // 判斷索引為k的節(jié)點是否有子節(jié)點
    while (k < half) {
        // 左子節(jié)點的索引
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        // 右子節(jié)點的索引
        int right = child + 1;
        // 如果有右子節(jié)點并且左子節(jié)點的時間間隔大于右子節(jié)點,取時間間隔最小的節(jié)點
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        // 如果key的時間間隔小于等于c的時間間隔,跳出循環(huán)
        if (key.compareTo(c) <= 0)
            break;
        // 設置要移除索引的節(jié)點為其子節(jié)點
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    // 將key放入索引為k的位置
    queue[k] = key;
    setIndex(key, k);
}

siftDown方法執(zhí)行時包含兩種情況,一種是沒有子節(jié)點,一種是有子節(jié)點(根據(jù)half判斷)。例如:

沒有子節(jié)點的情況:

假設初始的堆如下:

假設 k = 3 ,那么 k = half ,沒有子節(jié)點,在執(zhí)行siftDown方法時直接把索引為3的節(jié)點設置為數(shù)組的最后一個節(jié)點:

有子節(jié)點的情況:

假設 k = 0 ,那么執(zhí)行以下步驟:

  1. 獲取左子節(jié)點,child = 1 ,獲取右子節(jié)點, right = 2 :


  2. 由于 right < size ,這時比較左子節(jié)點和右子節(jié)點時間間隔的大小,這里 3 < 7 ,所以 c = queue[child] ;
  3. 比較key的時間間隔是否小于c的時間間隔,這里不滿足,繼續(xù)執(zhí)行,把索引為k的節(jié)點設置為c,然后將k設置為child,;


  4. 因為 half = 3 ,k = 1 ,繼續(xù)執(zhí)行循環(huán),這時的索引變?yōu)椋?/p>

  5. 這時再經(jīng)過如上判斷后,將k的值為3,最終的結(jié)果如下:


  6. 最后,如果在finishPoll方法中調(diào)用的話,會把索引為0的節(jié)點的索引設置為-1,表示已經(jīng)刪除了該節(jié)點,并且size也減了1,最后的結(jié)果如下:


可見,siftdown方法在執(zhí)行完并不是有序的,但可以發(fā)現(xiàn),子節(jié)點的下次執(zhí)行時間一定比父節(jié)點的下次執(zhí)行時間要大,由于每次都會取左子節(jié)點和右子節(jié)點中下次執(zhí)行時間最小的節(jié)點,所以還是可以保證在take和poll時出隊是有序的。

remove方法

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;

        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            // 從i開始向下調(diào)整
            siftDown(i, replacement);
            // 如果queue[i] == replacement,說明i是葉子節(jié)點
            // 如果是這種情況,不能保證子節(jié)點的下次執(zhí)行時間比父節(jié)點的大
            // 這時需要進行一次向上調(diào)整
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

假設初始的堆結(jié)構(gòu)如下:

這時要刪除8的節(jié)點,那么這時 k = 1,key為最后一個節(jié)點:

這時通過上文對siftDown方法的分析,siftDown方法執(zhí)行后的結(jié)果如下:

這時會發(fā)現(xiàn),最后一個節(jié)點的值比父節(jié)點還要小,所以這里要執(zhí)行一次siftUp方法來保證子節(jié)點的下次執(zhí)行時間要比父節(jié)點的大,所以最終結(jié)果如下:

總結(jié)

本文詳細分析了ScheduedThreadPoolExecutor的實現(xiàn),主要介紹了以下方面:

  • 與Timer執(zhí)行定時任務的比較,相比Timer,ScheduedThreadPoolExecutor有什么優(yōu)點;
  • ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個線程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在于,自己實現(xiàn)了優(yōu)先工作隊列DelayedWorkQueue;
  • ScheduedThreadPoolExecutor實現(xiàn)了ScheduledExecutorService,所以就有了任務調(diào)度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同時注意他們之間的區(qū)別;
  • 內(nèi)部類ScheduledFutureTask繼承自FutureTask,實現(xiàn)了任務的異步執(zhí)行并且可以獲取返回結(jié)果。同時也實現(xiàn)了Delayed接口,可以通過getDelay方法獲取將要執(zhí)行的時間間隔;
  • 周期任務的執(zhí)行其實是調(diào)用了FutureTask類中的runAndReset方法,每次執(zhí)行完不設置結(jié)果和狀態(tài)。參考FutureTask源碼解析;
  • 詳細分析了DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu),它是一個基于最小堆結(jié)構(gòu)的優(yōu)先隊列,并且每次出隊時能夠保證取出的任務是當前隊列中下次執(zhí)行時間最小的任務。同時注意一下優(yōu)先隊列中堆的順序,堆中的順序并不是絕對的,但要保證子節(jié)點的值要比父節(jié)點的值要大,這樣就不會影響出隊的順序。

總體來說,ScheduedThreadPoolExecutor的重點是要理解下次執(zhí)行時間的計算,以及優(yōu)先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關鍵。


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 深入理解Java線程池 線程池初探 所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術(shù)),然后需要線程的時候...
    程序員七哥閱讀 1,372評論 0 16
  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,022評論 1 19
  • 前言:線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,合理的使用線程池對線程進行統(tǒng)一...
    SDY_0656閱讀 853評論 0 1
  • 文/靜仁 小雪 一種漸冷的姿勢 拽落了秋葉的繁華 瑟縮著靈魂的更迭 以節(jié)氣的名義守望著天堂 蟲兒 一種生命的炫耀 ...
    趙靜仁閱讀 374評論 1 4
  • 記得上一次寫和媽媽有關的東西是初二,語文老師布置了一篇作文,題目叫什么我忘了,我寫媽媽很辛苦很認真的工作。這篇...
    傻傻的堅持閱讀 581評論 0 0

友情鏈接更多精彩內(nèi)容