Java線程池原理分析ScheduledThreadPoolExecutor篇

前言

在上一篇線程池的文章《Java線程池原理分析ThreadPoolExecutor篇》中從ThreadPoolExecutor源碼分析了其運(yùn)行機(jī)制。限于篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文繼續(xù)從源代碼出發(fā)分析ScheduledThreadPoolExecutor的內(nèi)部原理。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor同ThreadPoolExecutor一樣也可以從 Executors線程池工廠創(chuàng)建,所不同的是它具有定時(shí)執(zhí)行,以周期或間隔循環(huán)執(zhí)行任務(wù)等功能。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

}

public interface ScheduledExecutorService extends ExecutorService {
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,因此它具有ThreadPoolExecutor的所有能力。
通過super方法的參數(shù)可知,核心線程的數(shù)量即傳入的參數(shù),而線程池的線程數(shù)為Integer.MAX_VALUE,幾乎為無上限。
這里采用了DelayedWorkQueue任務(wù)隊(duì)列,也是定時(shí)任務(wù)的核心,留在后面分析。

ScheduledThreadPoolExecutor實(shí)現(xiàn)了ScheduledExecutorService 中的接口:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

延時(shí)執(zhí)行Callable任務(wù)的功能。

 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

延時(shí)執(zhí)行Runnable任務(wù)的功能。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

可以延時(shí)循環(huán)執(zhí)行周期性任務(wù)。

假設(shè)任務(wù)執(zhí)行時(shí)間固定2s,period為1s,因?yàn)槿蝿?wù)的執(zhí)行時(shí)間大于規(guī)定的period,所以任務(wù)會(huì)每隔2s(任務(wù)執(zhí)行時(shí)間)開始執(zhí)行一次。如果任務(wù)執(zhí)行時(shí)間固定為0.5s,period為1s,因?yàn)槿蝿?wù)執(zhí)行時(shí)間小于period,所以任務(wù)會(huì)每隔1s(period)開始執(zhí)行一次。實(shí)際任務(wù)的執(zhí)行時(shí)間即可能是大于period的,也可能小于period,scheduleAtFixedRate的好處就是每次任務(wù)的開始時(shí)間間隔必然大于等于period。

假設(shè)一項(xiàng)業(yè)務(wù)需求每天凌晨3點(diǎn)將數(shù)據(jù)庫備份,然而數(shù)據(jù)庫備份的時(shí)間小于24H,最適合用scheduleAtFixedRate方法實(shí)現(xiàn)。

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

可以延時(shí)以相同間隔時(shí)間循環(huán)執(zhí)行任務(wù)。

假設(shè)任務(wù)執(zhí)行的時(shí)間固定為2s,delay為1s,那么任務(wù)會(huì)每隔3s(任務(wù)時(shí)間+delay)開始執(zhí)行一次。

如果業(yè)務(wù)需求本次任務(wù)的結(jié)束時(shí)間與下一個(gè)任務(wù)的開始時(shí)間固定,使用scheduleWithFixedDelay可以方便地實(shí)現(xiàn)業(yè)務(wù)。

ScheduledFuture

四個(gè)執(zhí)行任務(wù)的方法都返回了ScheduledFuture對象,它與Future有什么區(qū)別?

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
    public int compareTo(T o);
}

可以看到ScheduledFuture也繼承了Future,并且繼承了Delayed,增加了getDelay方法,而Delayed繼承自Comparable,所以具有compareTo方法。

四種執(zhí)行定時(shí)任務(wù)的方法

schedule(Runnable command,long delay, TimeUnit unit)

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

這個(gè)方法中出現(xiàn)了幾個(gè)陌生的類,首先是ScheduledFutureTask:

private class ScheduledFutureTask<V>  extends FutureTask<V> implements RunnableScheduledFuture<V> {
            ...
            ScheduledFutureTask(Runnable r, V result, long ns) {
              super(r, result);
              this.time = ns;
              this.period = 0;
              this.sequenceNumber = sequencer.getAndIncrement();
           }
}

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    boolean isPeriodic();
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

這個(gè)類是ScheduledThreadPoolExecutor的內(nèi)部類,繼承自FutureTask實(shí)現(xiàn)了RunnableScheduledFuture接口。RunnableScheduledFuture有些復(fù)雜,繼承自RunnableFuture和ScheduledFuture接口。可見ScheduledThreadPoolExecutor身兼多職。這個(gè)類既可以作為Runnable被線程執(zhí)行,又可以作為FutureTask用于獲取Callable任務(wù)call方法返回的結(jié)果。

在FutureTask的構(gòu)造方法中傳入Runnable對象會(huì)將其轉(zhuǎn)換為返回值為null的Callable對象。

/**
     * Modifies or replaces the task used to execute a runnable.
     * This method can be used to override the concrete
     * class used for managing internal tasks.
     * The default implementation simply returns the given task.
     *
     * @param runnable the submitted Runnable
     * @param task the task created to execute the runnable
     * @param <V> the type of the task's result
     * @return a task that can execute the runnable
     * @since 1.6
     */
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }

從decorateTask的字面意義判斷它將具體的RunnableScheduledFuture實(shí)現(xiàn)類向上轉(zhuǎn)型為RunnableScheduledFuture接口。從它的方法描述和實(shí)現(xiàn)看出它只是簡單的將ScheduledFutureTask向上轉(zhuǎn)型為RunnableScheduledFuture接口,由protected 修飾符可知設(shè)計(jì)者希望子類擴(kuò)展這個(gè)方法的實(shí)現(xiàn)。

之所以向上轉(zhuǎn)型為RunnableScheduledFuture接口,設(shè)計(jì)者也是希望將具體與接口分離。

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            //情況一
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                //情況二
                task.cancel(false);
            else
                //情況三
                ensurePrestart();
        }
    }

    boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }

    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

delayedExecute方法負(fù)責(zé)執(zhí)行延時(shí)任務(wù)。
情況一 : 先判斷線程池是否關(guān)閉,若關(guān)閉則拒絕任務(wù)。
情況二:線程池未關(guān)閉,將任務(wù)添加到父類的任務(wù)隊(duì)列,即DelayedWorkQueue中。下面再次判斷線程池是否關(guān)閉,并且判斷canRunInCurrentRunState方法的返回值是否為false。因?yàn)閭魅隦unnable參數(shù),task.isPeriodic()為false,所以isRunningOrShutdown返回true。所以這里不會(huì)執(zhí)行到。
情況三:任務(wù)成功添加到任務(wù)隊(duì)列,執(zhí)行ensurePrestart方法。

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

addWorker已經(jīng)在ThreadPoolExecutor篇分析過,該方法負(fù)責(zé)同步將線程池?cái)?shù)量+1,并且創(chuàng)建Worker對象添加到HashSet中,最后開啟Worker對象中的線程。因?yàn)镽unnableScheduledFuture對象已經(jīng)被添加到任務(wù)隊(duì)列,Worker中的線程通過getTask方法自然會(huì)取到DelayedWorkQueue中的RunnableScheduledFuture任務(wù)并執(zhí)行它的run方法。

這里需要注意的是addWorker方法只在核心線程數(shù)未達(dá)上限或者沒有線程的情況下執(zhí)行,并不像ThreadPoolExecutor那樣可以同時(shí)存在多個(gè)非核心線程,ScheduledThreadPoolExecutor最多只支持一個(gè)非核心線程,除非它終止了不會(huì)再創(chuàng)建新的非核心線程。

schedule(Callable<V> callable, long delay, TimeUnit unit)

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    protected <V> RunnableScheduledFuture<V> decorateTask(
        Callable<V> callable, RunnableScheduledFuture<V> task) {
        return task;
    }

     ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

與schedule(Runnable command,long delay,TimeUnit unit)相比除了可以通過ScheduledFutureTask的get方法得到返回值外沒有區(qū)別。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

與上述兩個(gè)方法的區(qū)別在于ScheduledFutureTask的構(gòu)造函數(shù)多了參數(shù)period,即任務(wù)執(zhí)行的最小周期:

        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

與scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)的區(qū)別是參數(shù)delay傳入到ScheduledFutureTask的構(gòu)造方法中是以負(fù)數(shù)的形式。

小結(jié)

四種延時(shí)啟動(dòng)任務(wù)的方法除了構(gòu)造ScheduledFutureTask的參數(shù)不同外,運(yùn)行機(jī)制是相同的。先將任務(wù)添加到DelayedWorkQueue 中,然后創(chuàng)建Worker對象,啟動(dòng)內(nèi)部線程輪詢DelayedWorkQueue 中的任務(wù)。

那么DelayedWorkQueue的add方法是如何實(shí)現(xiàn)的,線程輪詢DelayedWorkQueue 調(diào)用的poll和take方法又如何實(shí)現(xiàn)?

回顧getTask方法獲取任務(wù)時(shí)的代碼片段:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

如果我們設(shè)置ScheduledThreadPoolExecutor的核心線程數(shù)量為0,則執(zhí)行poll方法。而對于核心線程則執(zhí)行take方法。

下面分析DelayedWorkQueue 的具體實(shí)現(xiàn)。

DelayedWorkQueue

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
}

首先DelayedWorkQueue 是ScheduledThreadPoolExecutor的靜態(tài)內(nèi)部類。它的內(nèi)部有一個(gè)RunnableScheduledFuture數(shù)組,且初始容量為16.這里提前說明下,queue 數(shù)組儲(chǔ)存的其實(shí)是二叉樹結(jié)構(gòu)的索引,這個(gè)二叉樹其實(shí)就是最小堆。

add方法

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }

        private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }

        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

在執(zhí)行add方法時(shí)內(nèi)部執(zhí)行的是offer方法,添加RunnableScheduledFuture任務(wù)到隊(duì)列時(shí)先通過內(nèi)部的ReentrantLock加鎖,因此在多線程調(diào)用schedule(Runnable command,long delay, TimeUnit unit)添加任務(wù)時(shí)也能保證同步。

接下來先判斷隊(duì)列是否已滿,若已滿就先通過grow方法擴(kuò)容。擴(kuò)容算法是將現(xiàn)有容量*1.5,然后將舊的數(shù)組復(fù)制到新的數(shù)組。(左移一位等于除以2)。

然后判斷插入的是否為第一個(gè)任務(wù),如果是就將RunnableScheduledFuture向下轉(zhuǎn)型為ScheduledFutureTask,并將其heapIndex 屬性設(shè)置為0.

如果不是第一個(gè)任務(wù),則執(zhí)行siftUp方法。該方法先找到父親RunnableScheduledFuture對象節(jié)點(diǎn),將要插入的RunnableScheduledFuture節(jié)點(diǎn)與之compareTo比較,若父親RunnableScheduledFuture對象的啟動(dòng)時(shí)間小于當(dāng)前要插入的節(jié)點(diǎn)的啟動(dòng)時(shí)間,則將節(jié)點(diǎn)插入到末尾。反之會(huì)對二叉樹以啟動(dòng)時(shí)間升序重新排序RunnableScheduledFuture接口的實(shí)現(xiàn)其實(shí)是ScheduledFutureTask類:

    new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit);

    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    final long now() {
        return System.nanoTime();
    }

第三個(gè)參數(shù)triggerTime方法返回的就是任務(wù)延時(shí)的時(shí)間加上當(dāng)前時(shí)間。

在ScheduledFutureTask內(nèi)部實(shí)現(xiàn)了compareTo方法:

public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

比較的兩個(gè)任務(wù)的啟動(dòng)時(shí)間。所以DelayedWorkQueue內(nèi)部的二叉樹是以啟動(dòng)時(shí)間早晚排序的。

poll方法

        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null) {
                        //情況一 空隊(duì)列
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //情況二 已到啟動(dòng)時(shí)間  
                            return finishPoll(first);
                        if (nanos <= 0)
                            //情況三 未到啟動(dòng)時(shí)間,但是線程等待超時(shí)
                            return null;
                        first = null; // don't retain ref while waiting
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

非核心線程會(huì)通過poll方法同步獲取任務(wù)隊(duì)列中的RunnableScheduledFuture,如果隊(duì)列為空或者在timeout內(nèi)還等不到任務(wù)的啟動(dòng)時(shí)間,都將返回null。如果任務(wù)隊(duì)列不為空,并且首個(gè)任務(wù)已到啟動(dòng)時(shí)間線程就能夠獲取RunnableScheduledFuture任務(wù)并執(zhí)行run方法。

take方法

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

與非核心線程執(zhí)行的poll方法相比,核心線程執(zhí)行的take方法并不會(huì)超時(shí),在獲取到首個(gè)將要啟動(dòng)的任務(wù)前,核心線程會(huì)一直阻塞。

finishPoll方法

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

在成功獲取任務(wù)后,DelayedWorkQueue的finishPoll方法會(huì)將任務(wù)移除隊(duì)列,并以啟動(dòng)時(shí)間升序重排二叉樹。

小結(jié)

DelayedWorkQueue內(nèi)部維持了一個(gè)以任務(wù)啟動(dòng)時(shí)間升序排序的二叉樹數(shù)組,啟動(dòng)時(shí)間最靠前的任務(wù)即數(shù)組的首個(gè)位置上的任務(wù)。核心線程通過take方法一直阻塞直到獲取首個(gè)要啟動(dòng)的任務(wù)。非核心線程通過poll方法會(huì)在timeout時(shí)間內(nèi)阻塞嘗試獲取首個(gè)要啟動(dòng)的任務(wù),如果超過timeout未得到任務(wù)不會(huì)繼續(xù)阻塞。

這里要特別說明要啟動(dòng)的任務(wù)指的是RunnableScheduledFuture內(nèi)部的time減去當(dāng)前時(shí)間小于等于0,未滿足條件的任務(wù)不會(huì)被take或poll方法返回,這也就保證了未到指定時(shí)間任務(wù)不會(huì)執(zhí)行。

執(zhí)行ScheduledFutureTask

前面已經(jīng)分析了schedule方法如何將RunnableScheduledFuture插入到DelayedWorkQueue,Worker內(nèi)的線程如何獲取定時(shí)任務(wù)。下面分析任務(wù)的執(zhí)行過程,即ScheduledFutureTask的run方法:

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

如果執(zhí)行的是非周期型任務(wù),調(diào)用ScheduledFutureTask.super.run()方法,即ScheduledFutureTask的父類FutureTask的run方法。FutureTask的run方法已經(jīng)在ThreadPoolExecutor篇分析過,這里不再多說。

如果執(zhí)行的是周期型任務(wù),則執(zhí)行ScheduledFutureTask.super.runAndReset():

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

這個(gè)方法同run方法比較的區(qū)別是call方法執(zhí)行后不設(shè)置結(jié)果,因?yàn)橹芷谛腿蝿?wù)會(huì)多次執(zhí)行,所以為了讓FutureTask支持這個(gè)特性除了發(fā)生異常不設(shè)置結(jié)果。

執(zhí)行完任務(wù)后通過setNextRunTime方法計(jì)算下一次啟動(dòng)時(shí)間:

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                //情況一
                time += p;
            else
                //情況二
                time = triggerTime(-p);
        }

        long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }

還記得ScheduledThreadPoolExecutor執(zhí)行定時(shí)任務(wù)的后兩種scheduleAtFixedRate和scheduleWithFixedDelay。
scheduleAtFixedRate會(huì)執(zhí)行到情況一,下一次任務(wù)的啟動(dòng)時(shí)間最早為上一次任務(wù)的啟動(dòng)時(shí)間加period。
scheduleWithFixedDelay會(huì)執(zhí)行到情況二,這里很巧妙的將period參數(shù)設(shè)置為負(fù)數(shù)到達(dá)這段代碼塊,在此又將負(fù)的period轉(zhuǎn)為正數(shù)。情況二將下一次任務(wù)的啟動(dòng)時(shí)間設(shè)置為當(dāng)前時(shí)間加period。

然后將任務(wù)再次添加到任務(wù)隊(duì)列:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

ScheduledFuture的get方法

既然ScheduledFuture的實(shí)現(xiàn)是ScheduledFutureTask,而ScheduledFutureTask繼承自FutureTask,所以ScheduledFuture的get方法的實(shí)現(xiàn)就是FutureTask的get方法的實(shí)現(xiàn),F(xiàn)utureTask的get方法的實(shí)現(xiàn)分析在ThreadPoolExecutor篇已經(jīng)寫過,這里不再敘述。要注意的是ScheduledFuture的get方法對于非周期任務(wù)才是有效的。

ScheduledThreadPoolExecutor總結(jié)

  • ScheduledThreadPoolExecutor是實(shí)現(xiàn)自ThreadPoolExecutor的線程池,構(gòu)造方法中傳入?yún)?shù)n,則最多會(huì)有n個(gè)核心線程工作,空閑的核心線程不會(huì)被自動(dòng)終止,而是一直阻塞在DelayedWorkQueue的take方法嘗試獲取任務(wù)。構(gòu)造方法傳入的參數(shù)為0,ScheduledThreadPoolExecutor將以非核心線程工作,并且最多只會(huì)創(chuàng)建一個(gè)非核心線程,參考上文中ensurePrestart方法的執(zhí)行過程。而這個(gè)非核心線程以poll方法獲取定時(shí)任務(wù)之所以不會(huì)因?yàn)槌瑫r(shí)就被回收,是因?yàn)槿蝿?wù)隊(duì)列并不為空,只有在任務(wù)隊(duì)列為空時(shí)才會(huì)將空閑線程回收,詳見ThreadPoolExecutor篇的runWorker方法,之前我以為空閑的非核心線程超時(shí)就會(huì)被回收是不正確的,還要具備任務(wù)隊(duì)列為空這個(gè)條件。
  • ScheduledThreadPoolExecutor的定時(shí)執(zhí)行任務(wù)依賴于DelayedWorkQueue,其內(nèi)部用可擴(kuò)容的數(shù)組實(shí)現(xiàn)以啟動(dòng)時(shí)間升序的二叉樹。
  • 工作線程嘗試獲取DelayedWorkQueue的任務(wù)只有在任務(wù)到達(dá)指定時(shí)間才會(huì)成功,否則非核心線程會(huì)超時(shí)返回null,核心線程一直阻塞。
  • 對于非周期型任務(wù)只會(huì)執(zhí)行一次并且可以通過ScheduledFuture的get方法阻塞得到結(jié)果,其內(nèi)部實(shí)現(xiàn)依賴于FutureTask的get方法。
  • 周期型任務(wù)通過get方法無法獲取有效結(jié)果,因?yàn)镕utureTask對于周期型任務(wù)執(zhí)行的是runAndReset方法,并不會(huì)設(shè)置結(jié)果。周期型任務(wù)執(zhí)行完畢后會(huì)重新計(jì)算下一次啟動(dòng)時(shí)間并且再次添加到DelayedWorkQueue中。

在源代碼的分析過程中發(fā)現(xiàn)分析DelayedWorkQueue還需要有二叉樹的升序插入算法的知識(shí),一開始也沒有認(rèn)出來這種數(shù)據(jù)結(jié)構(gòu),后來又看了別人的文章才了解。這里比較難理解,有興趣的同學(xué)可以參考《深度解析Java8 – ScheduledThreadPoolExecutor源碼解析》。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 使用線程池能夠提高線程的復(fù)用率,避免不必要的創(chuàng)建線程,能夠節(jié)約內(nèi)存空間和CPU運(yùn)行時(shí)間。除此之外用線程池作為...
    Mars_M閱讀 2,930評論 0 11
  • 博客鏈接:http://www.ideabuffer.cn/2017/04/14/深入理解Java線程池:Sche...
    閃電是只貓閱讀 64,319評論 17 95
  • 前言:線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,合理的使用線程池對線程進(jìn)行統(tǒng)一...
    SDY_0656閱讀 850評論 0 1
  • 晨讀材料 今天的晨讀材料可以概括為兩句話,明確目標(biāo)、過程管控。 明確目標(biāo)很重要,不管是一個(gè)公司、一個(gè)團(tuán)隊(duì)或者一個(gè)個(gè)...
    做一個(gè)更好的普通人閱讀 335評論 0 0
  • 元宵佳節(jié),本是冷門的廣場因?yàn)闊魰?huì)的到來,立刻變得熱鬧非凡起來,五彩繽紛的燈在廣場中央,引游人駐目。
    起始之點(diǎn)閱讀 268評論 0 0

友情鏈接更多精彩內(nèi)容