深入理解scheduledthreadpoolexecutor

scheduledthreadpool是JDK自帶的一個(gè)定時(shí)調(diào)度任務(wù)的實(shí)現(xiàn),通過(guò)它可以實(shí)現(xiàn)定時(shí)的循環(huán)調(diào)度,最近在看線程池的源碼,順便也把它看了一下,發(fā)現(xiàn)里面的實(shí)現(xiàn)真的是很精彩,干貨很多。

首先,scheduledthreadpool是繼承自ThreadPoolExecutor,也就是說(shuō)它具有線程池的一些特性,它也正是利用線程池實(shí)現(xiàn)了任務(wù)的調(diào)度。

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

可以發(fā)現(xiàn)ScheduledThreadPoolExecutor只是用了核心線程池,同時(shí)它的任務(wù)隊(duì)列是采用了DelayedWorkQueue去實(shí)現(xiàn)。對(duì)于ThreadPoolExecutor不熟悉的,可以翻翻我之前的筆記,有對(duì)線程池很詳細(xì)的解釋。
接下來(lái)看任務(wù)提交的代碼,以定時(shí)循環(huán)調(diào)度為例:

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

command表示要提交的線程。
initialDelay表示初始化時(shí)的延遲。
period表示調(diào)度周期。
unit為時(shí)間單位。
首先將command等參數(shù)包裝成ScheduledFutureTask類,這個(gè)類是任務(wù)調(diào)度的基本單位。
triggerTime方法主要是用來(lái)計(jì)算下次被調(diào)度的時(shí)間:

    /**
     * Returns the trigger time of a delayed action.
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

我們一會(huì)兒在回頭看ScheduledFutureTask,先看任務(wù)的提交delayedExecute(t):

    /**
     * Main execution method for delayed or periodic tasks.  If pool
     * is shut down, rejects the task. Otherwise adds task to queue
     * and starts a thread, if necessary, to run it.  (We cannot
     * prestart the thread to run the task because the task (probably)
     * shouldn't be run yet,) If the pool is shut down while the task
     * is being added, cancel and remove it if required by state and
     * run-after-shutdown parameters.
     *
     * @param task the task
     */
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

這里主要做了一個(gè)任務(wù)入隊(duì)的操作, super.getQueue().add(task);這個(gè)Queue就是我們剛才看到的DelayedWorkQueue,接下來(lái)ensurePrestart()是判斷當(dāng)前核心線程池的大小,如果過(guò)少,那么增加核心線程。保證任務(wù)及時(shí)運(yùn)行。
程序很短,但是我們并沒(méi)有看到任務(wù)是怎么跑起來(lái),怎么被調(diào)度的。那這里其實(shí)有個(gè)前提的知識(shí)就是,當(dāng)任務(wù)添加到Queue之中后,那么線程池中的線程會(huì)不斷的去Queue中獲取任務(wù),那么我們看一下Queue的offer方法和take方法,是怎么放入、怎么取出的:

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//擴(kuò)容
                size = i + 1;
                if (i == 0) {//如果是第一個(gè)元素
                    queue[0] = e;
                    setIndex(e, 0);
                } else {//如果不是,那么進(jìn)行上濾操作,保證堆有序
                    siftUp(i, e);
                }
                //如果當(dāng)前新增加的元素在堆頂,那么可能是最新要執(zhí)行的
                if (queue[0] == e) {
                    leader = null;
                    available.signal();//發(fā)一個(gè)信號(hào),通知take的時(shí)候的線程,趕緊檢測(cè)新加入的task
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

這是DelayedWorkQueue的offer方法,DelayedWorkQueue里面是使用數(shù)組去維護(hù)任務(wù)隊(duì)列的,那么數(shù)組是怎么保證任務(wù)有序呢?
其實(shí)仔細(xì)看代碼,我們能發(fā)現(xiàn),這里的實(shí)現(xiàn)是用一個(gè)二叉堆去對(duì)數(shù)組元素進(jìn)行排序。確切的說(shuō)是小頂堆。
首先判斷容量,如果容量不夠就擴(kuò)容,接著判斷是不是第一個(gè)元素,如果是,那么直接放在index為0的位置,不是的話進(jìn)行上濾操作。接下來(lái)判斷添加的元素是不是在堆頂,如果是那么需要進(jìn)行優(yōu)先調(diào)度,那么進(jìn)行signal。
其實(shí)這里又引申出一個(gè)問(wèn)題,那么就是是靠什么排序的?這個(gè)時(shí)候我們看一下任務(wù)的實(shí)體ScheduledFutureTask,它復(fù)寫了compareTo方法:

        //比較方法
        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                //根據(jù)time去比較,time是在創(chuàng)建任務(wù)的時(shí)候計(jì)算出來(lái)的,指下一次運(yùn)行的時(shí)間
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }

里面用time去判斷大小,time便是下一次調(diào)度的時(shí)間點(diǎn),那么顯然越小的離現(xiàn)在越近,越要放在前面。
看了offer方法我們?cè)倏纯磘ake方法,這個(gè)方法是用來(lái)獲取任務(wù)的:

        public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();//阻塞
            //隊(duì)列的存儲(chǔ)采用數(shù)組,優(yōu)先級(jí)排序采用二叉堆實(shí)現(xiàn)。
            try {
                for (;;) {
                    //最大的一個(gè)是最先應(yīng)該被執(zhí)行的
                    RunnableScheduledFuture first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        //獲取第一個(gè)任務(wù),是距離當(dāng)前最近的任務(wù),可能會(huì)有一點(diǎn)延遲
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)//要立即執(zhí)行
                            return finishPoll(first);
                        else if (leader != null)
                            available.await();//拿不到leader的線程全部await
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);//如果此時(shí)線程喚醒了,那么其他線程將不能進(jìn)入同步塊
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();//喚醒所有的await線程
                lock.unlock();
            }
        }

毫無(wú)疑問(wèn),take中直接獲取queue[0],它是距離目前最近的要被執(zhí)行的任務(wù),先檢測(cè)一下還有多長(zhǎng)時(shí)間,任務(wù)會(huì)被執(zhí)行,如果小于0,那么立刻彈出,并且做一個(gè)下濾操作,重新找出堆頂元素。如果不小于0,那么證明時(shí)間還沒(méi)到,那么available.awaitNanos(delay);等到delay時(shí)間后自動(dòng)喚醒,或者因?yàn)樘砑恿艘粋€(gè)更加緊急的任務(wù)即offer中的signal被調(diào)用了,那么喚醒,重新循環(huán)獲取最優(yōu)先執(zhí)行的任務(wù),如果delay小于0,那么直接彈出任務(wù)。
至此任務(wù)調(diào)度的邏輯分析完了,但是還有周期執(zhí)行是怎么實(shí)現(xiàn)的呢?其實(shí)是在ScheduledFutureTask的run中實(shí)現(xiàn)的:

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            boolean periodic = isPeriodic();//判斷是不是定時(shí)周期調(diào)度的
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {//設(shè)置當(dāng)前狀態(tài)是可重復(fù)執(zhí)行的
                setNextRunTime();//計(jì)算下一次執(zhí)行時(shí)期
                reExecutePeriodic(outerTask);//加入隊(duì)列
            }
        }

判斷是不是周期調(diào)度的任務(wù),如果是等待執(zhí)行完畢之后,重新設(shè)置下一次執(zhí)行時(shí)間,并且將此任務(wù)重新offer到queue中,這樣就實(shí)現(xiàn)了周期調(diào)度。

問(wèn)題:
其實(shí)這里是有一個(gè)問(wèn)題的,就是如果當(dāng)核心線程池比較少,但是執(zhí)行的任務(wù)又有很多阻塞性的任務(wù),那么就會(huì)導(dǎo)致在任務(wù)到期改執(zhí)行的時(shí)候,而沒(méi)有線程去執(zhí)行任務(wù)。這樣就會(huì)導(dǎo)致任務(wù)調(diào)度時(shí)間不準(zhǔn),同時(shí)后面的任務(wù)也可能被影響,所以在設(shè)置的時(shí)候可以將自己的核心線程池調(diào)大一點(diǎn),避免這種問(wèn)題。

整體的流程可以參考下面這個(gè)圖,描述的十分清楚:

Paste_Image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 深入理解Java線程池 線程池初探 所謂線程池,就是將多個(gè)線程放在一個(gè)池子里面(所謂池化技術(shù)),然后需要線程的時(shí)候...
    程序員七哥閱讀 1,372評(píng)論 0 16
  • 從哪說(shuō)起呢? 單純講多線程編程真的不知道從哪下嘴。。 不如我直接引用一個(gè)最簡(jiǎn)單的問(wèn)題,以這個(gè)作為切入點(diǎn)好了 在ma...
    Mr_Baymax閱讀 2,911評(píng)論 1 17
  • NSThread 第一種:通過(guò)NSThread的對(duì)象方法 NSThread *thread = [[NSThrea...
    攻城獅GG閱讀 952評(píng)論 0 3
  • 這里陸陸續(xù)續(xù)添加一些自己總結(jié)或借鑒的erlang代碼規(guī)約 [強(qiáng)制] [推薦] [建議] =============...
    kamfon閱讀 1,974評(píng)論 0 0
  • 沒(méi)有星星,沒(méi)有月亮,天黑得仿佛不會(huì)再天亮了…… 日子一天天飛過(guò),平靜得似乎被定格,伴著歡笑、伴著打鬧,小奇和伙伴們...
    Victorfy閱讀 484評(píng)論 4 4

友情鏈接更多精彩內(nèi)容