Java線程池詳解(一)

什么是線程池

所謂線程池,就是將多個(gè)線程放在一個(gè)池子里面(所謂池化技術(shù)),然后需要線程的時(shí)候不是創(chuàng)建一個(gè)線程,而是從線程池里面獲取一個(gè)可用的線程,然后執(zhí)行我們的任務(wù)。線程池的關(guān)鍵在于它為我們管理了多個(gè)線程,我們不需要關(guān)心如何創(chuàng)建線程,我們只需要關(guān)系我們的核心業(yè)務(wù),然后需要線程來(lái)執(zhí)行任務(wù)的時(shí)候從線程池中獲取線程。任務(wù)執(zhí)行完之后線程不會(huì)被銷毀,而是會(huì)被重新放到池子里面,等待機(jī)會(huì)去執(zhí)行任務(wù)。

我們?yōu)槭裁葱枰€程池呢?首先一點(diǎn)是線程池為我們提高了一種簡(jiǎn)易的多線程編程方案,我們不需要投入太多的精力去管理多個(gè)線程,線程池會(huì)自動(dòng)幫我們管理好,它知道什么時(shí)候該做什么事情,我們只要在需要的時(shí)候去獲取就可以了。其次,我們使用線程池很大程度上歸咎于創(chuàng)建和銷毀線程的代價(jià)是非常昂貴的,甚至我們創(chuàng)建和銷毀線程的資源要比我們實(shí)際執(zhí)行的任務(wù)所花費(fèi)的時(shí)間還要長(zhǎng),這顯然是不科學(xué)也是不合理的,而且如果沒(méi)有一個(gè)合理的管理者,可能會(huì)出現(xiàn)創(chuàng)建了過(guò)多的線程的情況,也就是在JVM中存活的線程過(guò)多,而存活著的線程也是需要銷毀資源的,另外一點(diǎn),過(guò)多的線程可能會(huì)造成線程過(guò)度切換的尷尬境地。

如何使用線程池。

定義2個(gè)線程

public class PlayThread implements Runnable{

    @Override
    public void run() {
        // TODO Auto-generated method stub
        for(int i=0;i<100;i++) {
            System.out.println(Thread.currentThread().getName()+"在玩");
        }
        
    }

}
public class WorkThread implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + "在工作");
        }
    }
}

普通線程池

    // 普通線程池
    public static void fun1() {
        // 1創(chuàng)建一個(gè)線程池 2個(gè)線程
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 2提交任務(wù)
        WorkThread work1 = new WorkThread();
        WorkThread work2 = new WorkThread();
        PlayThread play = new PlayThread();
        // execute()方法沒(méi)有返回值 submit()方法有返回值
        executorService.execute(work1);
        executorService.execute(work2);
        executorService.execute(play);
        // 初始2個(gè)。執(zhí)行3個(gè)線程 前面2個(gè)執(zhí)行完畢了。第三個(gè)才會(huì)執(zhí)行
        // 3停止線程池
        // executorService.shutdownNow();
    }

調(diào)度線程池

    // 調(diào)度線程池
    public static void fun2() {
        // 1創(chuàng)建一個(gè)調(diào)度線程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        // 2提交周期性任務(wù)
        WorkThread work1 = new WorkThread();
        // command:執(zhí)行線程
        // initialDelay:初始化延時(shí)
        // period:兩次開(kāi)始執(zhí)行最小間隔時(shí)間
        // unit:計(jì)時(shí)單位
        scheduledExecutorService.scheduleAtFixedRate(work1, 10, 2, TimeUnit.SECONDS);
        // 3停止線程池
        // scheduledExecutorService.shutdownNow();
    }

可以發(fā)現(xiàn)使用線程池非常簡(jiǎn)單,只需要極少的代碼就可以創(chuàng)建出我們需要的線程池,然后將我們的任務(wù)提交到線程池中去。我們只需要在結(jié)束之時(shí)記得關(guān)閉線程池就可以了。本文的重點(diǎn)并非在于如何使用線程池,而是試圖剖析線程池的實(shí)現(xiàn),比如一個(gè)調(diào)度線程池是怎么實(shí)現(xiàn)的?是靠什么實(shí)現(xiàn)的?為什么能這樣實(shí)現(xiàn)等等問(wèn)題。

Java線程池實(shí)現(xiàn)架構(gòu)

Java中與線程池相關(guān)的類有下面一些:

Executor
ExecutorService
ScheduledExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executors

通過(guò)上面一節(jié)中的使用示例,可以發(fā)現(xiàn)Executors類是一個(gè)創(chuàng)建線程池的有用的類,事實(shí)上,Executors類的角色也就是創(chuàng)建線程池,它是一個(gè)工廠類,可以產(chǎn)生不同類型的線程池,而Executor是線程池的鼻祖類,它有兩個(gè)子類是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor則是真正的線程池,我們的任務(wù)將被這兩個(gè)類交由其所管理者的線程池運(yùn)行,可以發(fā)現(xiàn),ScheduledThreadPoolExecutor是一個(gè)集大成者類,下面我們可以看看它的類關(guān)系圖:

ScheduledThreadPoolExecutor的類關(guān)系圖

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,ThreadPoolExecutor實(shí)現(xiàn)了一般的線程池,沒(méi)有調(diào)度功能,而ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor的實(shí)現(xiàn),然后增加了調(diào)度功能。
最為原始的Executor只有一個(gè)方法execute,它接受一個(gè)Runnable類型的參數(shù),意思是使用線程池來(lái)執(zhí)行這個(gè)Runnable,可以發(fā)現(xiàn)Executor不提供有返回值的任務(wù)。ExecutorService繼承了Executor,并且極大的增強(qiáng)了Executor的功能,不僅支持有返回值的任務(wù)執(zhí)行,而且還有很多十分有用的方法來(lái)為你提供服務(wù),下面展示了ExecutorService提供的方法:

ExecutorService提供的方法

ScheduledExecutorService繼承了ExecutorService,并且增加了特有的調(diào)度(schedule)功能。關(guān)于Executor、ExecutorService和ScheduledExecutorService的關(guān)系,可以見(jiàn)下圖:

Executor、ExecutorService和ScheduledExecutorService的關(guān)系

總結(jié)一下,經(jīng)過(guò)我們的調(diào)研,可以發(fā)現(xiàn)其實(shí)對(duì)于我們編寫多線程代碼來(lái)說(shuō),最為核心的是Executors類,根據(jù)我們是需要ExecutorService類型的線程池還是ScheduledExecutorService類型的線程池調(diào)用相應(yīng)的工廠方法就可以了,而ExecutorService的實(shí)現(xiàn)表現(xiàn)在ThreadPoolExecutor上,ScheduledExecutorService的實(shí)現(xiàn)則表現(xiàn)在ScheduledThreadPoolExecutor上,下文將分別剖析這兩者,嘗試弄清楚線程池的原理。

ThreadPoolExecutor解析

上文中描述了Java中線程池相關(guān)的架構(gòu),了解了這些內(nèi)容其實(shí)我們就可以使用java的線程池為我們工作了,使用其提供的線程池我們可以很方便的寫出高質(zhì)量的多線程代碼,本節(jié)將分析ThreadPoolExecutor的實(shí)現(xiàn),來(lái)探索線程池的運(yùn)行原理。下面的圖片展示了ThreadPoolExecutor的類圖:

ThreadPoolExecutor的類圖

下面是幾個(gè)比較關(guān)鍵的類成員:

private final BlockingQueue<Runnable> workQueue;  // 任務(wù)隊(duì)列,我們的任務(wù)會(huì)添加到該隊(duì)列里面,線程將從該隊(duì)列獲取任務(wù)來(lái)執(zhí)行
  
private final HashSet<Worker> workers = new HashSet<Worker>();//任務(wù)的執(zhí)行值集合,來(lái)消費(fèi)workQueue里面的任務(wù)
   
private volatile ThreadFactory threadFactory;//線程工廠
       
private volatile RejectedExecutionHandler handler;//拒絕策略,默認(rèn)會(huì)拋出異異常,還要其他幾種拒絕策略如下:
   
   1、CallerRunsPolicy:在調(diào)用者線程里面運(yùn)行該任務(wù)
   2、DiscardPolicy:丟棄任務(wù)
   3、DiscardOldestPolicy:丟棄workQueue的頭部任務(wù)
   
private volatile int corePoolSize;//最低保活work數(shù)量
  
private volatile int maximumPoolSize;//work上限

我們嘗試執(zhí)行submit方法,下面是執(zhí)行的關(guān)鍵路徑,總結(jié)起來(lái)就是:如果Worker數(shù)量還沒(méi)達(dá)到上限則繼續(xù)創(chuàng)建,否則提交任務(wù)到workQueue,然后讓worker來(lái)調(diào)度運(yùn)行任務(wù)。

    step 1: <ExecutorService>
    Future<?> submit(Runnable task);  
    
    step 2:<AbstractExecutorService>
        public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
    step 3:<Executor>
    void execute(Runnable command);
    
    step 4:<ThreadPoolExecutor>
     public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //提交我們的額任務(wù)到workQueue
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //使用maximumPoolSize作為邊界
            reject(command); //還不行?拒絕提交的任務(wù)
    }
    
    step 5:<ThreadPoolExecutor>
    private boolean addWorker(Runnable firstTask, boolean core) 
    
    
    step 6:<ThreadPoolExecutor>
    w = new Worker(firstTask); //包裝任務(wù)
    final Thread t = w.thread; //獲取線程(包含任務(wù))
    workers.add(w);   // 任務(wù)被放到works中
    t.start(); //執(zhí)行任務(wù)

上面的流程是高度概括的,實(shí)際情況遠(yuǎn)比這復(fù)雜得多,但是我們關(guān)心的是怎么打通整個(gè)流程,所以這樣分析問(wèn)題是沒(méi)有太大的問(wèn)題的。觀察上面的流程,我們發(fā)現(xiàn)其實(shí)關(guān)鍵的地方在于Worker,如果弄明白它是如何工作的,那么我們也就大概明白了線程池是怎么工作的了。下面分析一下Worker類。

worker類圖

上面的圖片展示了Worker的類關(guān)系圖,關(guān)鍵在于他實(shí)現(xiàn)了Runnable接口,所以問(wèn)題的關(guān)鍵就在于run方法上。在這之前,我們來(lái)看一下Worker類里面的關(guān)鍵成員:

 final Thread thread; 
 
 Runnable firstTask; //我們提交的任務(wù),可能被立刻執(zhí)行,也可能被放到隊(duì)列里面

thread是Worker的工作線程,上面的分析我們也發(fā)現(xiàn)了在addWorker中會(huì)獲取worker里面的thread然后start,也就是這個(gè)線程的執(zhí)行,而Worker實(shí)現(xiàn)了Runnable接口,所以在構(gòu)造thread的時(shí)候Worker將自己傳遞給了構(gòu)造函數(shù),thread.start執(zhí)行的其實(shí)就是Worker的run方法。下面是run方法的內(nèi)容:


        public void run() {
            runWorker(this);
        }
        
        final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

我們來(lái)分析一下runWorker這個(gè)方法,這就是整個(gè)線程池的核心。首先獲取到了我們剛提交的任務(wù)firstTask,然后會(huì)循環(huán)從workQueue里面獲取任務(wù)來(lái)執(zhí)行,獲取任務(wù)的方法如下:


 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

其實(shí)核心也就一句:

     Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

我們?cè)倩仡^看一下execute,其實(shí)我們上面只走了一條邏輯,在execute的時(shí)候,我們的worker的數(shù)量還沒(méi)有到達(dá)我們?cè)O(shè)定的corePoolSize的時(shí)候,會(huì)走上面我們分析的邏輯,而如果達(dá)到了我們?cè)O(shè)定的閾值之后,execute中會(huì)嘗試去提交任務(wù),如果提交成功了就結(jié)束,否則會(huì)拒絕任務(wù)的提交。我們上面還提到一個(gè)成員:maximumPoolSize,其實(shí)線程池的最大的Worker數(shù)量應(yīng)該是maximumPoolSize,但是我們上面的分析是corePoolSize,這是因?yàn)槲覀兊膒rivate boolean addWorker(Runnable firstTask, boolean core)的參數(shù)core的值來(lái)控制的,core為true則使用corePoolSize來(lái)設(shè)定邊界,否則使用maximumPoolSize來(lái)設(shè)定邊界。直觀的解釋一下,當(dāng)線程池里面的Worker數(shù)量還沒(méi)有到corePoolSize,那么新添加的任務(wù)會(huì)伴隨著產(chǎn)生一個(gè)新的worker,如果Worker的數(shù)量達(dá)到了corePoolSize,那么就將任務(wù)存放在阻塞隊(duì)列中等待Worker來(lái)獲取執(zhí)行,如果沒(méi)有辦法再向阻塞隊(duì)列放任務(wù)了,那么這個(gè)時(shí)候maximumPoolSize就變得有用了,新的任務(wù)將會(huì)伴隨著產(chǎn)生一個(gè)新的Worker,如果線程池里面的Worker已經(jīng)達(dá)到了maximumPoolSize,那么接下來(lái)提交的任務(wù)只能被拒絕策略拒絕了。可以參考下面的描述來(lái)理解:

 * When a new task is submitted in method {@link #execute(Runnable)},
 * and fewer than corePoolSize threads are running, a new thread is
 * created to handle the request, even if other worker threads are
 * idle.  If there are more than corePoolSize but less than
 * maximumPoolSize threads running, a new thread will be created only
 * if the queue is full.  By setting corePoolSize and maximumPoolSize
 * the same, you create a fixed-size thread pool. By setting
 * maximumPoolSize to an essentially unbounded value such as {@code
 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
 * number of concurrent tasks. Most typically, core and maximum pool
 * sizes are set only upon construction, but they may also be changed
 * dynamically using {@link #setCorePoolSize} and {@link
 * #setMaximumPoolSize}.

在此需要說(shuō)明一點(diǎn),有一個(gè)重要的成員:keepAliveTime,當(dāng)線程池里面的線程數(shù)量超過(guò)corePoolSize了,那么超出的線程將會(huì)在空閑keepAliveTime之后被terminated??梢詤⒖枷旅娴奈臋n:

 * If the pool currently has more than corePoolSize threads,
 * excess threads will be terminated if they have been idle for more
 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).

ScheduledThreadPoolExecutor解析

ScheduledThreadPoolExecutor適用于延時(shí)執(zhí)行,或者周期性執(zhí)行的任務(wù)調(diào)度,ScheduledThreadPoolExecutor在實(shí)現(xiàn)上繼承了ThreadPoolExecutor,所以你依然可以將ScheduledThreadPoolExecutor當(dāng)成ThreadPoolExecutor來(lái)使用,但是ScheduledThreadPoolExecutor的功能要強(qiáng)大得多,因?yàn)镾cheduledThreadPoolExecutor可以根據(jù)設(shè)定的參數(shù)來(lái)周期性調(diào)度運(yùn)行,下面的圖片展示了四個(gè)和周期性相關(guān)的方法:

四個(gè)Scheduled方法
  • 如果你想延時(shí)一段時(shí)間之后運(yùn)行一個(gè)Runnable,那么使用第一個(gè)方法
  • 如果你想延時(shí)一段時(shí)間然后運(yùn)行一個(gè)Callable,那么使用的第二個(gè)方法
  • 如果你想要延時(shí)一段時(shí)間,然后根據(jù)設(shè)定的參數(shù)周期執(zhí)行Runnable,那么可以選擇第三個(gè)和第四個(gè)方法,第三個(gè)方法和第四個(gè)方法的區(qū)別在于:第三個(gè)方法嚴(yán)格按照規(guī)劃的時(shí)間路徑來(lái)執(zhí)行,比如周期為2,延時(shí)為0,那么執(zhí)行的序列為0,2,4,6,8....,而第四個(gè)方法將基于上次執(zhí)行時(shí)間來(lái)規(guī)劃下次的執(zhí)行,也就是在上次執(zhí)行完成之后再次執(zhí)行。比如上面的執(zhí)行序列0,2,4,6,8...,如果第2秒沒(méi)有被調(diào)度執(zhí)行,而在第三秒的時(shí)候才被調(diào)度,那么下次執(zhí)行的時(shí)間不是4,而是5,以此類推。

下面來(lái)看一下這四個(gè)方法的一些細(xì)節(jié):

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
    
     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    
    
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

通過(guò)上面的代碼我們可以發(fā)現(xiàn),前兩個(gè)方法是類似的,后兩個(gè)方法也是類似的。前兩個(gè)方法屬于一次性調(diào)度,所以period都為0,區(qū)別在于參數(shù)不同,一個(gè)是Runnable,而一個(gè)是Callable,可笑的是,最后都變?yōu)榱薈allable了,見(jiàn)下面的構(gòu)造函數(shù):

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

對(duì)于后兩個(gè)方法,區(qū)別僅僅在于period的,scheduleWithFixedDelay對(duì)參數(shù)進(jìn)行了操作,將原來(lái)的時(shí)間變?yōu)樨?fù)數(shù)了,而后面在計(jì)算下次被調(diào)度的時(shí)間的時(shí)候會(huì)根據(jù)這個(gè)參數(shù)的正負(fù)值來(lái)分別處理,正數(shù)代表scheduleAtFixedRate,而負(fù)數(shù)代表了scheduleWithFixedDelay。
一個(gè)需要被我們注意的細(xì)節(jié)是,以上四個(gè)方法最后都會(huì)調(diào)用一個(gè)方法: delayedExecute(t),下面看一下這個(gè)方法:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

大概的意思就是先判斷線程池是否被關(guān)閉了,如果被關(guān)閉了,則拒絕任務(wù)的提交,否則將任務(wù)加入到任務(wù)隊(duì)列中去等待被調(diào)度執(zhí)行。最后的ensurePrestart的意思是需要確保線程池已經(jīng)被啟動(dòng)起來(lái)了。下面是這個(gè)方法:

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

主要是增加了一個(gè)沒(méi)有任務(wù)的worker,有什么用呢?我們還記得Worker的邏輯嗎?addWorker方法的執(zhí)行,會(huì)觸發(fā)Worker的run方法的執(zhí)行,然后runWorker方法就會(huì)被執(zhí)行,而runWorker方法是循環(huán)從workQueue中取任務(wù)執(zhí)行的,所以確保線程池被啟動(dòng)起來(lái)是重要的,而只需要簡(jiǎn)單的執(zhí)行addWorker便會(huì)觸發(fā)線程池的啟動(dòng)流程。對(duì)于調(diào)度線程池來(lái)說(shuō),只要執(zhí)行了addWorker方法,那么線程池就會(huì)一直在后臺(tái)周期性的調(diào)度執(zhí)行任務(wù)。
到此,似乎我們還是沒(méi)有鬧明白ScheduledThreadPoolExecutor是如何實(shí)現(xiàn)周期性的,上面講到四個(gè)scheduled方法時(shí),我們沒(méi)有提一個(gè)重要的類:ScheduledFutureTask,對(duì),所有神奇的事情將會(huì)發(fā)生在這個(gè)類中,下面來(lái)分析一下這個(gè)類。

ScheduledFutureTask類圖

看上面的類圖,貌似這個(gè)類非常復(fù)雜,還好,我們發(fā)現(xiàn)他實(shí)現(xiàn)了Runnable接口,那么必然會(huì)有一個(gè)run方法,而這個(gè)run方法必然是整個(gè)類的核心,下面來(lái)看一下這個(gè)run方法的內(nèi)容:

     public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
    }

首先,判斷是否是周期性的任務(wù),如果不是,則直接執(zhí)行(一次性),否則執(zhí)行,然后設(shè)置下次執(zhí)行的時(shí)間,然后重新調(diào)度,等待下次執(zhí)行。這里有一個(gè)方法需要注意,也就是setNextRunTime,上面我們提到scheduleAtFixedRate和scheduleWithFixedDelay在傳遞參數(shù)時(shí)不一樣,后者將delay值變?yōu)榱素?fù)數(shù),所以下面的處理正好印證了前文所述。

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

下面來(lái)看一下reExecutePeriodic方法是如何做的,他的目標(biāo)是將任務(wù)再次被調(diào)度執(zhí)行,下面的代碼展示了這個(gè)功能的實(shí)現(xiàn):

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

可以看到,這個(gè)方法就是將我們的任務(wù)再次放到了workQueue里面,那這個(gè)參數(shù)是什么?在上面的run方法中我們調(diào)用了reExecutePeriodic方法,參數(shù)為outerTask,而這個(gè)變量是什么?看下面的代碼:

  /** The actual task to be re-enqueued by reExecutePeriodic */
  RunnableScheduledFuture<V> outerTask = this;

這個(gè)變量指向了自己,而this的類型是什么?是ScheduledFutureTask,也就是可以被調(diào)度的task,這樣就實(shí)現(xiàn)了循環(huán)執(zhí)行任務(wù)了。
上面的分析已經(jīng)到了循環(huán)執(zhí)行,但是ScheduledThreadPoolExecutor的功能是周期性執(zhí)行,所以我們接著分析ScheduledThreadPoolExecutor是如何根據(jù)我們的參數(shù)走走停停的。這個(gè)時(shí)候,是應(yīng)該看一下ScheduledThreadPoolExecutor的構(gòu)造函數(shù)了,我們來(lái)看一個(gè)最簡(jiǎn)單的構(gòu)造函數(shù):

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

我們知道ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,所以這里的super其實(shí)是ThreadPoolExecutor的構(gòu)造函數(shù),我們發(fā)現(xiàn)其中有一個(gè)參數(shù)DelayedWorkQueue,看名字貌似是一個(gè)延遲隊(duì)列的樣子,進(jìn)一步跟蹤代碼,發(fā)現(xiàn)了下面的一行代碼(構(gòu)造函數(shù)中):

     this.workQueue = workQueue;

所以在ScheduledThreadPoolExecutor中,workQueue是一個(gè)DelayedWorkQueue類型的隊(duì)列,我們暫且認(rèn)為DelayedWorkQueue是一種具備延遲功能的隊(duì)列吧,那么,到此我們便可以想明白了,上面的分析我們明白了ScheduledThreadPoolExecutor是如何循環(huán)執(zhí)行任務(wù)的,而這里我們明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue來(lái)達(dá)到延遲的目標(biāo),所以組合起來(lái),就可以實(shí)現(xiàn)ScheduledThreadPoolExecutor周期性執(zhí)行的目標(biāo)。下面我們來(lái)看一下DelayedWorkQueue是如何做到延遲的吧,上文中提到一個(gè)方法:getTask,這個(gè)方法的作用是從workQueue中取出任務(wù)來(lái)執(zhí)行,而在ScheduledThreadPoolExecutor里面,getTask方法是從DelayedWorkQueue中取任務(wù)的,而取任務(wù)無(wú)非兩個(gè)方法:poll或者take,下面我們對(duì)DelayedWorkQueue的take方法來(lái)分析一下:


 public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

在for循環(huán)里面,首先從queue中獲取第一個(gè)任務(wù),然后從任務(wù)中取出延遲時(shí)間,而后使用available變量來(lái)實(shí)現(xiàn)延遲效果。這里面需要幾個(gè)點(diǎn)需要探索一下:

這個(gè)queue是什么東西?
延遲時(shí)間的來(lái)龍去脈?
available變量的來(lái)龍去脈?
對(duì)于第一個(gè)問(wèn)題,看下面的代碼:

   private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

它是一個(gè)RunnableScheduledFuture類型的數(shù)組,下面是RunnableScheduledFuture類的類關(guān)系圖:

RunnableScheduledFuture類關(guān)系

數(shù)組里面保存了我們的RunnableScheduledFuture,對(duì)queue的操作,主要來(lái)看一下增加元素和消費(fèi)元素的操作。首先,假設(shè)使用add方法來(lái)增加RunnableScheduledFuture到queue,調(diào)用的鏈路如下:


        public boolean add(Runnable e) {
            return offer(e);
        }
        
        
         public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

解釋一下,add方法直接轉(zhuǎn)到了offer方法,該方法中,首先判斷數(shù)組的容量是否足夠,如果不夠則grow,增長(zhǎng)的策略如下:

 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%

每次增長(zhǎng)50%,入戲下去。增長(zhǎng)完成后,如果這是第一個(gè)元素,則放在坐標(biāo)為0的位置,否則,使用siftUp操作,下面是該方法的內(nèi)容:

        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

這個(gè)數(shù)組實(shí)現(xiàn)了堆這種數(shù)據(jù)結(jié)構(gòu),使用對(duì)象比較將最需要被調(diào)度執(zhí)行的RunnableScheduledFuture放到數(shù)組的前面,而這得力于compareTo方法,下面是RunnableScheduledFuture類的compareTo方法的實(shí)現(xiàn),主要是通過(guò)延遲時(shí)間來(lái)做比較。

        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

上面是生產(chǎn)元素,下面來(lái)看一下消費(fèi)數(shù)據(jù)。在上面我們提到的take方法中,使用了一個(gè)方法如下:

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

這個(gè)方法中調(diào)用了一個(gè)方法siftDown,這個(gè)方法如下:

        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            setIndex(key, k);
        }

對(duì)其的解釋就是:

Replaces first element with last and sifts it down.  Call only when holding lock.

總結(jié)一下,當(dāng)我們向queue插入任務(wù)的時(shí)候,會(huì)發(fā)生siftUp方法的執(zhí)行,這個(gè)時(shí)候會(huì)把任務(wù)盡量往根部移動(dòng),而當(dāng)我們完成任務(wù)調(diào)度之后,會(huì)發(fā)生siftDown方法的執(zhí)行,與siftUp相反,siftDown方法會(huì)將任務(wù)盡量移動(dòng)到queue的末尾??傊?,大概的意思就是queue通過(guò)compareTo實(shí)現(xiàn)了類似于優(yōu)先級(jí)隊(duì)列的功能。
下面我們來(lái)看一下第二個(gè)問(wèn)題:延遲時(shí)間的來(lái)龍去脈。在上面的take方法里面,首先獲取了delay,然后再使用available來(lái)做延遲效果,那這個(gè)delay從哪里來(lái)的呢?通過(guò)上面的類圖RunnableScheduledFuture的類圖我們知道,RunnableScheduledFuture類實(shí)現(xiàn)了Delayed接口,而Delayed接口里面的唯一方法是getDelay,我們到RunnableScheduledFuture里面看一下這個(gè)方法的具體實(shí)現(xiàn):

       public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }

time是我們?cè)O(shè)定的下次執(zhí)行的時(shí)間,所以延遲就是(time - now()),沒(méi)毛??!

第三個(gè)問(wèn)題:available變量的來(lái)龍去脈,至于這個(gè)問(wèn)題,我們看下面的代碼:


        /**
         * Condition signalled when a newer task becomes available at the
         * head of the queue or a new thread may need to become leader.
         */
        private final Condition available = lock.newCondition();

這是一個(gè)條件變量,take方法里面使用這個(gè)變量來(lái)做延遲效果。Condition可以在多個(gè)線程間做同步協(xié)調(diào)工作,更為具體細(xì)致的關(guān)于Condition的內(nèi)容,可以參考更多的資料來(lái)學(xué)習(xí),本文對(duì)此知識(shí)點(diǎn)點(diǎn)到為止。
到此為止,我們梳理了ScheduledThreadPoolExecutor是如何實(shí)現(xiàn)周期性調(diào)度的,首先分析了它的循環(huán)性,然后分析了它的延遲效果,本文到此也就結(jié)束了,對(duì)于線程池的學(xué)習(xí)現(xiàn)在才剛剛起步,需要更多更專業(yè)的知識(shí)類幫我理解更為底層的內(nèi)容,當(dāng)然,為了更進(jìn)一步理解線程池的實(shí)現(xiàn)細(xì)節(jié),首先需要對(duì)線程間通信有足夠的把握,其次是要對(duì)各種數(shù)據(jù)結(jié)構(gòu)有清晰的認(rèn)識(shí),比如隊(duì)列、優(yōu)先級(jí)隊(duì)列、堆等高級(jí)的數(shù)據(jù)結(jié)構(gòu),以及java語(yǔ)言對(duì)于這些數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn),更為重要的是要結(jié)合實(shí)際情況分析問(wèn)題,在工作和平時(shí)的學(xué)習(xí)中不斷總結(jié),不斷迭代對(duì)于線程、線程池的認(rèn)知。

作者:一字馬胡
鏈接:http://www.itdecent.cn/p/5d5198b434a2
來(lái)源:簡(jiǎn)書
簡(jiǎn)書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請(qǐng)聯(lián)系作者獲得授權(quán)并注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容