ThreadPoolExceutor與ScheduledThreadPoolExecutor原理剖析

《Java并發(fā)編程之美》讀書筆記

第8章 Java并發(fā)包中線程池ThreadPoolExceutor原理探究

介紹

線程池主要解決兩個問題:

  1. 一是當執(zhí)行大量異步任務(wù)時線程池能提供較好的性能,在不適用線程池時,每當需要執(zhí)行異步任務(wù)時直接new一個線程來運行,而線程的創(chuàng)建和銷毀都是需要開銷的。線程池里面的線程是可以復(fù)用的,不需要每次執(zhí)行異步任務(wù)的時候都重新創(chuàng)建和銷毀線程。
  2. 線程池提供了一種資源限制和管理的手段,比如可以限制線程的個數(shù),動態(tài)增減線程的等,ThreadPoolExceutor也保留了一些基本的統(tǒng)計參數(shù),比如當前線程池完成的任務(wù)數(shù)目等
    另外,線程池也提供了許多可調(diào)的參數(shù)個可擴展性接口,以滿足不同情況的需要,程序員可以使用更方便的Executors的工程方法,比如newCachedThreadPool(線程池線程個數(shù)最多可達Integer.MAX_VALUE,線程自動回收)newFixedThreadPool(固定大小的線程池)和newSingleThreadExecutor(單個線程)等來創(chuàng)建線程池,當然用戶還可以自定義

類圖介紹

在類圖中,Executors其實是個工具類,里面提供了好多靜態(tài)方法,這些方法更具用戶選擇返回不同的線程池實例。ThreadPoolExceutor繼承了AbstractExecutorService,成員變量ctl是一個Integer的原子變量,用來記錄線程池狀態(tài)和線程池中的線程個數(shù),類似于ReentrantReadWriteLock使用一個變量來保存兩種信息。



這里假設(shè)Integer類型是32位二進制表示,則其中高3位用來表示線程池狀態(tài),后面29位用來記錄線程池的線程個數(shù)

    //原子變量ctl高3位用愛表示線程池狀態(tài),低29位用來表示線程個數(shù)
    //默認RUNNING狀態(tài),線程個數(shù)為0
    private final AtomicInteger ctl = new   AtomicInteger(ctlOf(RUNNING, 0));
    //線程個數(shù)掩碼位數(shù),并不是所有平臺的int類型都是32位的,所有的來說,是具體平臺下的Integer的二進制位數(shù)-3的剩余位數(shù)所表示的數(shù)才是線程的個數(shù)
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //線程的最大個數(shù)(低29位)00011111111111111111111;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // 線程池的狀態(tài)
    //高3位 運行態(tài)111000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
     //高3位  000000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //高3位 001
    private static final int STOP       =  1 << COUNT_BITS;
    //高3位 010
    privatestatic final int TIDYING    =  2 << COUNT_BITS;
    //高3位 011
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 獲取高3位
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    //獲取低29位(線程個數(shù))
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    //計算ctl新值(線程狀態(tài)與線程個數(shù))
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  • RUNNING:接受新任務(wù)并且處理阻塞隊列里面的任務(wù)
  • SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列里的任務(wù)
  • STOP:拒絕新任務(wù)并且拋棄阻塞隊列里的任務(wù),同時會中斷正在處理的任務(wù)
  • TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊列里面的任務(wù))后當前線程池活動線程為0,將要調(diào)用terminated方法
  • TERMINATED:終止狀態(tài),terminated方法調(diào)用完成以后的狀態(tài)
  • RUNNING->SHUTDOWN:顯示調(diào)用shutdown()方法,或者隱士調(diào)用了finalize()方法里面的shutdown方法;
  • RUNNING or SHUTDOWN ->STOP:顯示調(diào)用shutdownNow()方法時。
  • SHUTDOWN->tIDYING:當線程池和任務(wù)隊列都為空的時候
  • STOP->TIDYING:當線程池為空的時候
  • TIDYING->TERMINATED:當terminated()hook方法完成時。
    線程池的參數(shù)如下:
  • corePoolSize:線程池核心線程的個數(shù)
  • workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊列,比如基于數(shù)組的有界的ArrayBlockingQueue、基于鏈表的無界LinkedBlockingQueue、最多只有一個元素的同步隊列SynchronousQueue及優(yōu)先級隊列PriorityBlockingQueue等
  • maximumPoolSize:線程池最大的線程數(shù)量
  • ThreadFactory:創(chuàng)造線程的工廠
  • RejectedExecutionHandler:飽和策略,當隊列滿并且線程個數(shù)達到maximunPollSize后采取的策略,比如AbortPolicy(拋出異常),CallerRunsPolicy(使用調(diào)用者線程來運行任務(wù))、DiscardOldestPolicy(調(diào)用poll丟棄一個任務(wù),執(zhí)行當前任務(wù))及DiscardPolicy(默默丟棄,不拋出異常)
  • keeyAliveTime:存活時間。如果當前線程池中的線程數(shù)量比核心線程數(shù)量多,并且是閑置狀態(tài),則這些閑置的線程能存活的最大時間。
  • TimeUnit:存活時間的時間單位
  • newFixedThreadPool:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE。 keepAliveTime=0說明只要線程個數(shù)比核心線程多并且當前空閑則回收。
  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

//使用自定義線程創(chuàng)建工廠
  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

  • newSingleThreadExecutor:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1的線程池,并且阻塞隊列的長度為Integer.NAX_VALUE.keepAliveTime=0說明只要線程個數(shù)比核心個數(shù)多并且當前空閑則回收。
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    //使用自己的線程工廠
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • newCachedTheadPool:創(chuàng)建一個按需創(chuàng)建線程的線程池,出事的線程個數(shù)為0,最多線程的個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列。KeepAliveTime=60,表示當前線程在60s內(nèi)空閑則回收。這個類型的特殊之處在于,加入同步隊列的任務(wù)會被馬上執(zhí)行,同步隊列里面最多只有一個任務(wù)。
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    //使用自定義的線程工廠
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

如上的TheadPoolExecutor類圖所示,其中mainLock是獨占鎖,用來控制新增Worker線程操作的原子性,termination是該鎖對應(yīng)的條件隊列,在線程調(diào)用awaitTermination時用來存放阻塞的線程。
Worker繼承AQS和Runnable接口,是具體承載任務(wù)的對象,worker繼承了AQS,自己實現(xiàn)了簡單不可重入獨占鎖,其中state=0表示鎖未被獲取狀態(tài),state=1表示所以就備貨區(qū),state=-1是創(chuàng)建worker默認的狀態(tài),創(chuàng)建時狀態(tài)設(shè)置為-1是為了避免該線程在運行runWorker方法前被中斷,其中變量firstTask記錄該工作線程執(zhí)行的第一個任務(wù),thread是具體執(zhí)行任務(wù)的線程。
DefaultThreadFactory是線程工廠,newThread方法時對線程的一個修飾,其中poolNumber是個靜態(tài)的原子變量,用來統(tǒng)計線程工程的個數(shù),threadNumber用來記錄每個線程工廠創(chuàng)建了多少的線程,這兩個值也作為線程池和線程的名稱的一部分。

源碼分析

public void execute(Runnable command)

execute方法的作用是提交任務(wù)command到線程池進行執(zhí)行,用戶線程提交任務(wù)到線程池的模型圖如下圖所示:


從該圖可以看出,ThreadPoolExecutor的實現(xiàn)實際上是一個生產(chǎn)消費模型,當用戶添加任務(wù)到線程池到相當于生產(chǎn)者生產(chǎn)元素,workers線程工作集中的線程直接執(zhí)行任務(wù)或者從任務(wù)隊列里面獲取任務(wù)時則相當于消費者消費元素。

public void execute(Runnable command) {
        //1.如果任務(wù)為null,則拋出NPE異常
        if (command == null)
            throw new NullPointerException();
        //2獲取當前線程池的狀態(tài)+線程個數(shù)變量的組合值
        int c = ctl.get();
        //3當前線程池中的線程個數(shù)是否小于corePoolSize,小于的話則開啟線程運行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //4如果線程池處于Running狀態(tài),則添加任務(wù)到阻塞隊列
        if (isRunning(c) && workQueue.offer(command)) {
        //4,1二次檢查
            int recheck = ctl.get();
            //4.2如果當前的線程池狀態(tài)不是RUNNING則從隊列中刪除任務(wù),并且執(zhí)行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //4.3否則如果當前線程池為空,則添加一個線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //5如果隊列滿,則新增線程,新增失敗則執(zhí)行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

代碼3判斷如果當前線程池中線程個數(shù)小于corePoolSize,會向workers里面新增一個核心線程(core線程)執(zhí)行該任務(wù)
如果當前線程池中線程個數(shù)大于等于corePoolSize則執(zhí)行代碼4.如果當前線程池處于RUNNING狀態(tài)則添加當前任務(wù)到任務(wù)隊列。這里需要判斷線程池狀態(tài)是因為線程池已經(jīng)處于非RUNNING,而非RUNNING狀態(tài)下是要拋棄新任務(wù)的。
如果向任務(wù)隊列添加任務(wù)成功,則代碼4.2對線程池狀態(tài)進行二次校驗,這是因為添加任務(wù)到任務(wù)隊列后,執(zhí)行代碼4.2之前有可能線程池的狀態(tài)已經(jīng)發(fā)生了變化了,這里進行二次檢驗,如果當前線程池狀態(tài)不是RUNNINGLE則把任務(wù)從任務(wù)隊列里面移除,移除后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行待嗎4.3重新判斷當前線程池里面是否還有線程,如果沒有則新增一個線程。
如果代碼4添加任務(wù)失敗,則說明任務(wù)隊列已滿,那么執(zhí)行代碼5嘗試新開啟線程如圖中中thread3,thread4來執(zhí)行該任務(wù),如果當前線程池中線程個數(shù)>maximunPoolSize則執(zhí)行拒絕策略。
下面分析新增線程addWorker方法:

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // 6檢查隊列是否只在必要時為空
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;
            //7 循環(huán)CAS增加線程個數(shù)
            for (;;) {
            //7.1如果線程個數(shù)則返回false
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                    //7.2CAS增加線程個數(shù),同時只有一個線程成功
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //7.3到這里說明CAS失敗了,則看線程池狀態(tài)是否變化了,變化則調(diào)到外層循環(huán)重新嘗試獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新CAS
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
             
            }
        }
        //8到這里說明CAS成功了
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        //8.1創(chuàng)建worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //8.2加獨占鎖,為了實現(xiàn)workers同步,因為可能多個線程調(diào)用了線程池的execute方法
                mainLock.lock();
                try {
                    // 重新檢查線程池狀態(tài),以避免在獲取鎖調(diào)用了shutdown接口
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            //添加任務(wù)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //添加成功后則啟動任務(wù)
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代碼比較長主要分為兩個部分:

  1. 第一部分雙重循環(huán)的目的是通過CAS操作添加線程數(shù)
  2. 第二部分主要是把并發(fā)安全的任務(wù)添加到workers里面,并且啟動任務(wù)執(zhí)行**
    首先來分析第一部分代碼
 if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)//1
                    || firstTask != null//2
                    || workQueue.isEmpty()))//3
                return false;

也就是說代碼6在下面幾種情況下會返回false

  • 當前線程池狀態(tài)為STOP TIDYING TERMINATED
  • 當前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有第一個任務(wù)
  • 當前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊列為空
    內(nèi)層循環(huán)的作用是使用CAS操作增加線程數(shù),代碼7.1如果線程個數(shù)超限則返回false,否則執(zhí)行代碼7.2CAS操作設(shè)置線程個數(shù),CAS成功則退出雙循環(huán),CAS失敗則執(zhí)行代碼7.3看當前線程池的狀態(tài)是否變化了,如果變了,則再次進入外層循環(huán)重新獲取線程池狀態(tài),否則進入內(nèi)存循環(huán)繼續(xù)進行CAS嘗試。
    執(zhí)行到了第二部分的代碼8是說明使用CAS成功的增加了線程個數(shù),但是現(xiàn)在任務(wù)還沒有開始執(zhí)行。這里要使用全局的獨占鎖把新增的Worker添加到工作集workers中。代碼8.1創(chuàng)建了一個工作線程Worker。
    代碼8.2獲取了獨占鎖,代碼8.3重新檢查線程池狀態(tài),這是為了避免在獲取鎖之前其他線程調(diào)用了shutdown關(guān)閉了線程池,如果線程池已經(jīng)被關(guān)閉,則釋放鎖,新增線程失敗,否則執(zhí)行代碼8.4天假工作線程到線程工作集,然后釋放鎖,代碼8.5判斷如果新增工作線程成功,則啟動工作線程。

工作線程Worker的執(zhí)行

用戶線程提交任務(wù)到線程池后,由worker來執(zhí)行。先看下worker的構(gòu)造函數(shù)。

  Worker(Runnable firstTask) {
            setState(-1); // 在調(diào)用runworker前禁止中斷
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//創(chuàng)建一個線程
        }

在構(gòu)造函數(shù)內(nèi)首先設(shè)置worker的狀態(tài)為1,這是為了避免當前worker在調(diào)用runworker方法前被中斷(當其他線程調(diào)用了線程池的shutdownNow時候,如果worker代碼中狀態(tài)>=0則會中斷該線程。這里設(shè)置線程的狀態(tài)為-1,所以該線程就不會中斷了,咋子如下的runworker代碼中,運行代碼9時會調(diào)用unlock方法,該方法把status設(shè)置了為0,所以這時候調(diào)用shutDownNow會中斷worker線程。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts9將state設(shè)置為0,允許中斷
        boolean completedAbruptly = true;
        try {
        //10
            while (task != null || (task = getTask()) != null) {        //10.1
                w.lock();
                ...
                try {
                //10.2執(zhí)行任務(wù)前干一些事情
                    beforeExecute(wt, task);
                    try {
                        task.run();//10.3執(zhí)行任務(wù)
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                   //10.5 統(tǒng)計當前worker完成了多少任務(wù)
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        //11執(zhí)行清理工作
            processWorkerExit(w, completedAbruptly);
        }
    }

在如上的代碼10中,如果當前task==null或者調(diào)用getTask()從任務(wù)隊列獲取的任務(wù)返回null,則跳轉(zhuǎn)到代碼11執(zhí)行,如果task不為null則執(zhí)行任務(wù)10.1獲取線程內(nèi)部持有的獨占鎖,然后執(zhí)行擴展接口代碼10.2在具體任務(wù)之前做些事情,代碼10.3具體執(zhí)行任務(wù),代碼10。5統(tǒng)計當前worker完成了多少個任務(wù),并釋放鎖。
這里在執(zhí)行具體的任務(wù)期間加鎖,是為了避免在任務(wù)運行的期間,其他線程調(diào)用了shutdown后正在執(zhí)行的任務(wù)被中斷(shutdown只會中斷當前被阻塞掛起的線程)
代碼11執(zhí)行清理任務(wù),其代碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        //11.1統(tǒng)計整個線程池完成的任務(wù)個數(shù),并從工作集里面刪除當前的worker
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //11.2嘗試設(shè)置線程池的狀態(tài)為Terminated,如果當前是shutdown狀態(tài)并工作隊列為空
        //或者當前是stop狀態(tài),當前線程池里沒有活動線程。
        tryTerminate();
        int c = ctl.get();
        //11.3如果當前線程個數(shù)小于核心個數(shù),則增加
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

在如上的代碼中,代碼11.1統(tǒng)計線程池完成任務(wù)的個數(shù),并且在統(tǒng)計前加了全局所,把在當前工作線程中完成的任務(wù)累加到全局計數(shù)器,然后從工作集中刪除當前的worker。
代碼11.2判斷如果線程池的狀態(tài)是SHUTDOWN并且工作隊列為空,或者當前線程池狀態(tài)是STOP并且當前線程池里面沒有活動線程,則設(shè)置線程池狀態(tài)為TERMINATED。如果設(shè)置為了TERMINATED狀態(tài),則還需要調(diào)用條件變量termination的signalAll()方法激活所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程。
代碼11.3則判斷當前線程池個數(shù)是否小于核心線程個數(shù),如果是則在新增一個線程。

shutdown操作

調(diào)用shutdown方法之后,線程系就不會在接受新的任務(wù)了,但是工作隊列里面的任務(wù)還是需要執(zhí)行的。該方法會立刻返回,并不等待隊列任務(wù)完成在返回。

 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
        //12 權(quán)限檢查
            checkShutdownAccess();
            //13 設(shè)置當前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是了SHUTDOWN則直接返回。
            advanceRunState(SHUTDOWN);
            //設(shè)置中斷標志
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //15 嘗試將狀態(tài)變?yōu)門ERMINATED
        tryTerminate();
    }

在如上的代碼中,代碼12檢查看是否設(shè)置了安全管理器,是則看點前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有則還要看調(diào)用線程是否有中斷線程工作線程的權(quán)限,如果沒有權(quán)限則拋出SecurityException或者NullPointerException
其中代碼13的內(nèi)從如下,如果當前線程池狀態(tài)>=
SHUTDOWN則直接返回,否則是指為SHUTDOWN狀態(tài)。

private void advanceRunState(int targetState) {
        // assert targetState == SHUTDOWN || targetState == STOP;
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

代碼14的內(nèi)容如下,其設(shè)置所有空閑線程的中斷標志。這里首先加了全局鎖,同時只有一個線程可以調(diào)用shutdown方法設(shè)置中斷標志,然后嘗試獲取worker自己的鎖,獲取成功則設(shè)置中斷標志。由于正在執(zhí)行的任務(wù)已經(jīng)獲取了鎖,所以正在執(zhí)行的任務(wù)沒有被中斷。這里中斷的是阻塞到getTask()方法并企圖從隊列里面獲取任務(wù)的線程,也就是空閑線程。

  private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //如果工作線程沒有被中斷,并且沒有正在運行則設(shè)置中斷標志
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
 final void tryTerminate() {
        for (;;) {
        ..
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {//設(shè)置當前線程池狀態(tài)為TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                    //設(shè)置當前線程池狀態(tài)為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
     }
    }

在如上代碼中,首先使用CAS設(shè)置當前線程池狀態(tài)為TIDYING,如果設(shè)置成功則執(zhí)行擴展接口terminated在線程池狀態(tài)變?yōu)門ERMINATED前做一些事情,然后設(shè)置當前線程值得狀態(tài)為TERMINATED。最后調(diào)用termination.signalAll激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程。

shutdownNow操作

調(diào)用shutdownNow方法后,線程池就不會再接受新的任務(wù)了,并且會丟棄工作隊列里面的任務(wù),正在執(zhí)行的任務(wù)會被中斷,該方法會立刻返回,并不等待激活的任務(wù)執(zhí)行完成。返回值為這時候隊列里面被丟棄的任務(wù)列表。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//16權(quán)限檢查
            advanceRunState(STOP);//17設(shè)置線程池的狀態(tài)為stop
            interruptWorkers();//18中斷所有線程
            tasks = drainQueue();//19將隊列任務(wù)全部移動到tasks里面
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

在如上的代碼中,首先調(diào)用代碼16檢查權(quán)限,然后調(diào)用代碼17設(shè)置當前線程池的狀態(tài)為stop,隨后執(zhí)行代碼18中斷所有工作線程,這里需要注意的是,中斷的所有線程包含空閑線程和正在執(zhí)行任務(wù)的線程。

private void interruptWorkers() {
        // assert mainLock.isHeldByCurrentThread();
        for (Worker w : workers)
            w.interruptIfStarted();
    }

然后執(zhí)行代碼19將任務(wù)隊列里面的任務(wù)移動到tasks列表

awaitTermination操作

當線程調(diào)用awaitTermination方法后,當前線程會阻塞,直到線程池狀態(tài)變?yōu)門ermination才返回,或者等待時間超時才返回。整個過程中獨占鎖的代碼:

 public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            while (runStateLessThan(ctl.get(), TERMINATED)) {
                if (nanos <= 0L)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
            return true;
        } finally {
            mainLock.unlock();
        }
    }

如上代碼首先獲取獨占鎖,然后再無限循環(huán)內(nèi)部判斷當前狀態(tài)池是否至少是Termination狀態(tài),如果是則直接返回,否則說明當前線程池還有線程在執(zhí)行,則看設(shè)置的超時時間nanos是否小于0,小于0則說明不需要等待,那就直接返回,如果大于0則調(diào)用條件變量termination的awaitNanos方法等待nanos時間,期望在這段時間內(nèi)線程池的狀態(tài)變?yōu)門ERMINATED
在講shutdown方法提到過,當線程池狀態(tài)變?yōu)門ERMINATED的時候,會調(diào)用termination.signalAll激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程,所以如果在調(diào)用awaitTermination方法之后調(diào)用shutdown方法,并且在shutdown內(nèi)部將線程池狀態(tài)設(shè)置為TERMINATED,則termination.awaitNanos方法會返回。
另外在工作線程中worker的runworker方法內(nèi),當工作線程運行結(jié)束后,會低啊用processWorkerExit方法,在processWorkerExit方法內(nèi)部也會調(diào)用trytREMINATE方法測試當前時候應(yīng)該把線程池狀態(tài)設(shè)置為TERMINATED,如果是,則也會調(diào)用termination.signalAll激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程。
而且當?shù)却龝r間超時后,terminate.awaitNanos也會返回,這時候會重現(xiàn)檢查當前線程池狀態(tài)是否為TERMINATED;如果是則世界返回,否則繼續(xù)阻塞掛起自己。

總結(jié)

線程池巧妙的使用一個Integer類型的原子變量來記錄線程池狀態(tài)和線程池中線程個數(shù),通過線程池狀態(tài)來控制任務(wù)的執(zhí)行,每個Worker線程可以處理多個任務(wù),線程池通過線程的復(fù)用減少了線程的創(chuàng)建和銷毀的開銷。

Java并發(fā)包中ScheduledThreadPoolExecutor原理探究

ThreadPollExecutor只是Executors工具類的一部分功能,而ScheduledThreadPoolExecutor是一個可以在指定一定延遲時間后或者定時進行任務(wù)調(diào)度執(zhí)行的線程池。

類圖介紹

Executors其實是個工具類,它提供了好多靜態(tài)方法,可根據(jù)用戶的選擇返回不同的線程池實例。ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實現(xiàn)了ScheduledExecutorService接口,線程池隊列是DelayedWorkQueue,其和DelayedQueue類似,是一個延遲隊列。ScheduledFutureTask是具有返回值得任務(wù),繼承自FutureTask。FutureTask的內(nèi)部用一個變量state用來表示任務(wù)的狀態(tài),一開始狀態(tài)為new,所有狀態(tài)為

  private volatile int state;
    private static final int NEW          = 0;//初始狀態(tài)
    private static final int COMPLETING   = 1;//執(zhí)行中狀態(tài)
    private static final int NORMAL       = 2;//正常運行結(jié)束狀態(tài)
    private static final int EXCEPTIONAL  = 3;//運行中異常
    private static final int CANCELLED    = 4;//任務(wù)被取消
    private static final int INTERRUPTING = 5;//任務(wù)正在被中斷
    private static final int INTERRUPTED  = 6;//任務(wù)已經(jīng)被中斷

可能的任務(wù)狀態(tài)的轉(zhuǎn)換路徑為
new->completing-normal 初始態(tài)->執(zhí)行中-正常結(jié)束
new->completing->exception 初始態(tài)->執(zhí)行中->執(zhí)行異常
new->cancelled 初始態(tài)->任務(wù)取消
new->interrupting-interrupted初始狀態(tài)->被中斷中->被中斷
ScheduledFutureTask內(nèi)部還有一個變量period用來表示任務(wù)的類型,任務(wù)的類型如下:

  • period=0說明當前任務(wù)時一次性的,執(zhí)行完畢后就退出了
  • period為附屬,說明當前任務(wù)為fixed-delay任務(wù),是固定延遲的定時可重復(fù)執(zhí)行的任務(wù)
  • period為正數(shù),說明當前任務(wù)為fixed-rate任務(wù),是固定頻率的定時可重復(fù)執(zhí)行任務(wù)
    ScheduledThreadPoolExecutor的一個構(gòu)造函數(shù)如下,由構(gòu)造函數(shù)可知線程池隊列是DelayedWorkQueue
  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

原理剖析

  • schedule(Runnable command,long delay,TimeUnit unit)
  • scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
  • scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

schedule(Runnable command,long delay,TimeUnit unit)

這份方法的作用是提交一個延遲執(zhí)行的任務(wù),任務(wù)從提交時間算起延遲單位為unit的delay的時間開始執(zhí)行。提交的任務(wù)不是周期性任務(wù),任務(wù)只會執(zhí)行一次

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
    //1參數(shù)校驗
        if (command == null || unit == null)
            throw new NullPointerException();
    //2任務(wù)轉(zhuǎn)換
        RunnableScheduledFuture<Void> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit),
                                          sequencer.getAndIncrement()));
    //3添加任務(wù)到延遲隊列
        delayedExecute(t);
        return t;
    }

1.如上代碼1進行參數(shù)校驗,如果command或者unit為null,則拋出空指針異常
2。代碼2執(zhí)行裝飾任務(wù),把提交的command任務(wù)轉(zhuǎn)換為ScheduledFutureTask。ScheduledFutureTask是具體放入延遲隊列的東西,由于是延遲任務(wù),所以ScheduledFutureTask實現(xiàn)了long getDelay(TimeUnit unit)和int compareTo(Dealyed other)方法。triggerTime方法將延遲時間轉(zhuǎn)為絕對時間,也就是把當前時間的那描述加上延遲的納秒數(shù)的long值,ScheduledFutureTask的構(gòu)造函數(shù)如下。

ScheduledFutureTask(Runnable r, V result, long triggerTime,
                            long sequenceNumber) {
            //調(diào)用父類FutureTask構(gòu)造函數(shù)
            super(r, result);
            this.time = triggerTime;
            this.period = 0;//period為0,說明為一次性任務(wù)
            this.sequenceNumber = sequenceNumber;
        }

在構(gòu)造函數(shù)內(nèi)部首先調(diào)用了父類FutureTask的構(gòu)造函數(shù),父類FutureTask的構(gòu)造函數(shù)的代碼如下

//通過適配器將runnable轉(zhuǎn)換為callable
 public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // 設(shè)置當前任務(wù)狀態(tài)為new
        }

FutureTask中的任務(wù)被轉(zhuǎn)換為Callable類型后,被保存到了變量this.callable里面,并設(shè)置FutureTask的任務(wù)為NEW
然后再ScheduledFutureTask構(gòu)造函數(shù)內(nèi)部設(shè)置time為上面說的絕對時間,需要注意,這里的period的值為0,這說明當前任務(wù)為一次性的任務(wù),不是定時反復(fù)執(zhí)行任務(wù)。其中 long getDealy(TimeUnit unit)方法的代碼如下(剛方法是用來計算當前任務(wù)還有多少時間就過期了)

public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.nanoTime(), NANOSECONDS);
        }
 public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

compareTo的作用是假如元素到延遲隊列后,在內(nèi)部建立或者調(diào)整堆的時候回使用鈣元素的compareTo方法與隊列里面其他元素進行比較,讓最快要過期的元素放到隊首。所以無論什么時候往隊列里面添加元素,隊首的元素都是最快要過期的元素

 private void delayedExecute(RunnableScheduledFuture<?> task) {
 //4 如果線程池關(guān)閉了,則執(zhí)行線程池拒絕策略
        if (isShutdown())
            reject(task);
        else {
        //5添加任務(wù)到延遲隊列
            super.getQueue().add(task);
            //6再次檢查線程池狀態(tài)
            if (!canRunInCurrentRunState(task) && remove(task))
                task.cancel(false);
            else
            //7確保至少一個線程處理任務(wù)
                ensurePrestart();
        }
    }

4.代碼4首先判斷當前線程池是否已經(jīng)關(guān)閉了,如果已經(jīng)關(guān)閉則中線程池的拒絕策略沒否則實行代碼5將任務(wù)添加到延遲隊列。添加完畢后還要重新檢查線程池是否被關(guān)閉了,如果已經(jīng)關(guān)閉了則從延遲隊列里面刪除剛才添加的任務(wù),但是此時有可能線程池中的線程已經(jīng)從任務(wù)隊列里面移除了該任務(wù),也就是該任務(wù)已經(jīng)在執(zhí)行了,所以還需要調(diào)用任務(wù)的cancel方法取消任務(wù)
5如果代碼6判斷的結(jié)果為false,則會執(zhí)行代碼7確保至少有一個線程在處理任務(wù),即使核心線程數(shù)corePoolSize被設(shè)置為0,ensureOrestart的代碼如下

void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
        //增加核心線程數(shù)
            addWorker(null, true);
        //如果初始化corePoolsize==0,則也增加一個線程
        else if (wc == 0)
            addWorker(null, false);
    }

如上代碼首先獲取線程池中的線程個數(shù),如果線程個數(shù)小于核心線程數(shù)則核心線程數(shù)新增一個線程,否則如果當前線程數(shù)為0則新增一個線程。
上面我們分析了如何向延遲隊列里面添加任務(wù),接下來我們來見線程池里面的線程如何獲取并執(zhí)行任務(wù),在前面講解ThreadPoolExecutor時有提及,具體執(zhí)行任務(wù)的線程是worker線程,worker線程調(diào)用具體任務(wù)的run方法來執(zhí)行,由于這里的任務(wù)是ScheduledFutureTask,所以我們具體來看看ScheduledFutureTask的run方法

public void run() {
            //8是否只執(zhí)行一次
            boolean periodic=isPeriodic()
            //9取消任務(wù)
            if (!canRunInCurrentRunState(this))
                cancel(false);
            //10 只執(zhí)行一次,調(diào)用schedule方法時候
            else if (!isPeriodic())
                super.run();
                //11定時執(zhí)行
            else if (super.runAndReset()) {
                //11.1設(shè)置time=time+period
                setNextRunTime();
                //11.2重新加入該任務(wù)到delay隊列
                reExecutePeriodic(outerTask);
            }
        }

代碼8中isPeriodic的作用是判斷當前任務(wù)是一次性任務(wù)還是可重復(fù)執(zhí)行的任務(wù)

 public boolean isPeriodic() {
            return period != 0;
        }

其內(nèi)部是通過period的值來判斷,由于轉(zhuǎn)換任務(wù)在創(chuàng)建ScheduledFutureTask時傳遞的period的值為0,所以這里isPeriodic
返回false.
6.代碼9判斷當前任務(wù)是否應(yīng)該被取消,

 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
        return isRunningOrShutdown(preiodic?continueExistingPeriodicTasksAfterShutdown:
executeExistingDelayedTasksAfterShutdown);
            }

這里傳遞的preiodic的值為false,所以isRunningOrShutdown的參數(shù)為executeExistingDelayedTasksAfterShutdown。executeExistingDelayedTasksAfterShutdown默認為true,表示當其他線程調(diào)用了shutdown命令關(guān)閉了線程后,當前任務(wù)還是要執(zhí)行,否則如果為false,則當前任務(wù)要取消。
7.由于periodic的值為false,所以執(zhí)行代碼10父類FutureTask的方法執(zhí)行具體執(zhí)行任務(wù),F(xiàn)utureTask的run方法代碼如下。

  public void run() {
        //12
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        //13
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //13.1
                    setException(ex);
                }
                //13.2
                if (ran)
                    set(result);
            }
        } finally {
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

代碼12判斷如果任務(wù)狀態(tài)不是new則直接返回,或者如果當前任務(wù)狀態(tài)為new但是使用CAS設(shè)置當前任務(wù)的持有者為當前線程失敗則直接返回,代碼13具體調(diào)用callable的call方法執(zhí)行任務(wù)。這里在調(diào)用前又判斷了任務(wù)的狀態(tài)是否為new,是為了避免在執(zhí)行代碼12后其他線程不該了任務(wù)的狀態(tài)(比如取消了該任務(wù))
如果任務(wù)執(zhí)行成功則執(zhí)行代碼13.2修改任務(wù)的狀態(tài),set方法的代碼如下

  protected void set(V v) {
        //如果當前任務(wù)的狀態(tài)為new,則設(shè)置為COMPLETING
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = v;
            //設(shè)置當前任務(wù)的狀態(tài)為normal,也就是任務(wù)正常結(jié)束
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }

如上的代碼首先使用CAS將當前任務(wù)的狀態(tài)從NEW轉(zhuǎn)換為COMPLETING。這里當有多個線程調(diào)用時只有一個線程會成功。成功的線程在此通過unsafe.putOrderInt設(shè)置任務(wù)的狀態(tài)為正常結(jié)束狀態(tài),這里沒有使用CAS時因為對于同一個任務(wù)只可能有一個線程運行到這里。在這里使用putOrderInt比使用CAS或者putLongvolatile效率更高,并且這里的場景不要求其他線程馬上對設(shè)置的狀態(tài)值可見。
思考個問題,在什么時候多個線程會同時執(zhí)行CAS將當前任務(wù)的狀態(tài)從NEW轉(zhuǎn)換到COMPLETING?其實當同一個command被多次提交到線程池就會存在這樣的情況,因為痛一個任務(wù)共享一個狀態(tài)值state。
如果任務(wù)執(zhí)行失敗,則執(zhí)行代碼13.1,setException的代碼如下,可見與set函數(shù)類似。

protected void setException(Throwable t) {
    //如果當前任務(wù)的狀態(tài)為new,則設(shè)置為COMPLETING
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = t;
            //設(shè)置當前任務(wù)的狀態(tài)為EXCEPTIONAL,也就是任務(wù)非正常結(jié)束
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

到這里代碼10的邏輯執(zhí)行完畢,一次性任務(wù)也就執(zhí)行完畢了

scheduleWithFixedDelay

**** 該方法的作用是,當任務(wù)執(zhí)行完畢后,讓其延遲固定時間后再次運行(fixed-delay)。其中initialDelay表示提交任務(wù)后延遲多少時間可以執(zhí)行command任務(wù),delay表示當任務(wù)執(zhí)行完畢后延長多少時間后再次運行command任務(wù),unit是delay和initialDelay時間單位,任務(wù)會一直重復(fù)運行中直到運行中拋出異常,被取消了,或者關(guān)閉了線程池。

 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
//14參數(shù)校驗                                                 TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0L)
            throw new IllegalArgumentException();
        //15任務(wù)轉(zhuǎn)換,注意這里是period=-delay<0
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          -unit.toNanos(delay),
                                          sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //16 添加任務(wù)到隊列
        delayedExecute(t);
        return t;
    }

代碼14進行參數(shù)校驗,校驗失敗則拋出異常,代碼15將command任務(wù)轉(zhuǎn)化為ScheduledFutureTask,這里需要注意的是,傳遞給ScheduledFutureTask的period變量的值為-delay,period<0說明該任務(wù)時可重復(fù)執(zhí)行的任務(wù)。然后代碼16添加任務(wù)到延遲隊列后返回。
將任務(wù)添加到延遲隊列后線程池線程會從隊列里面獲取任務(wù),然后調(diào)用ScheduledFutureTask的run方法執(zhí)行,優(yōu)質(zhì)這里的period<0,所以isPeriodic返回true,所以執(zhí)行代碼11。

 protected boolean runAndReset() {
 //17
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return false;
            //18
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;//19
    }

這個代碼和FutureTask的run方法類似,只是任務(wù)正常執(zhí)行完畢后不會設(shè)置任務(wù)的狀態(tài),這樣做是為了讓任務(wù)成為可重復(fù)執(zhí)行的任務(wù),這里多了代碼19,這段代碼判斷當前任務(wù)正常執(zhí)行完畢并且任務(wù)狀態(tài)為NEW則返回true,否則返回false。如果true則執(zhí)行代碼11.1的setNextRunTime方法設(shè)置該任務(wù)下次一的執(zhí)行時間。setNextRunTime的代碼如下

 private void setNextRunTime() {
            long p = period;
            if (p > 0)//fixed-rate類型任務(wù)
                time += p;
            else//fixed-delay類型任務(wù)
                time = triggerTime(-p);
        }

這里p<0說明當前任務(wù)為fixed-delay類型任務(wù)。然后設(shè)置time為當前時間加上-p的時間,也就是延遲-p時間后再次執(zhí)行
總結(jié):fixed-delay類型的任務(wù)的執(zhí)行原理為:當添加一個人任務(wù)到延遲隊列后,等待initialDelay時間,任務(wù)就會過期,過去的任務(wù)就會被從隊列移除,并執(zhí)行,執(zhí)行完畢后,會重新設(shè)置任務(wù)的延遲時間,然后再把任務(wù)放入延遲隊列,循環(huán)往復(fù),需要注意的是,如果一個任務(wù)在執(zhí)行中拋出了異常,那么這個任務(wù)就結(jié)束了,但是不影響其他任務(wù)的執(zhí)行

scheduleAtFixedRate

這個方法相對其實時間點以固定頻率調(diào)用指定的任務(wù)(fixed-rate)。當把任務(wù)提交到線程池并延遲initialDelay時間,時間單位為(unit)后開始執(zhí)行任務(wù)command。然后從initialDelay+period時間點再次執(zhí)行,而后在initialDelay+2*period時間點再次執(zhí)行,循環(huán)往復(fù),直到拋出異?;蛘哒{(diào)用了任務(wù)的cancel取消了任務(wù),或者關(guān)閉了線程池,scheduleAtFixedRate和scheduleWithFixedDelay類似。

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        ...
        //裝飾任務(wù)類,注意period=period》0,不是負的
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period),
        return t;
    }

在如上代碼中,在fixed-rate類型的任務(wù)command轉(zhuǎn)換為ScheduledFutureTask是設(shè)置period=period,不再是-period
所以當前任務(wù)執(zhí)行完畢后,調(diào)用setNextRunTime設(shè)置任務(wù)下次執(zhí)行的時間時執(zhí)行的是time+=p而不再是time=triggerTime(-p)
總結(jié):相對于fixed-delay任務(wù)來說,fixed-rate方法執(zhí)行規(guī)則為,時間為initdelday+nperiod時啟動任務(wù),但是如果當前任務(wù)還沒有執(zhí)行完,下一次要執(zhí)行任務(wù)的時間到了,則不會并發(fā)執(zhí)行,下次要執(zhí)行的任務(wù)會延遲執(zhí)行,要等到當前任務(wù)執(zhí)行完畢后在執(zhí)行*

總結(jié)

ScheduledThreadPoolExecutor的內(nèi)部使用了DelayedQueue來存放具體任務(wù)。任務(wù)分為三種,其中一次性執(zhí)行任務(wù)執(zhí)行完畢后就結(jié)束了,fixed-delay任務(wù)保證同一個任務(wù)在多次執(zhí)行期間間隔固定時間,fixed-rate任務(wù)保證按照固定的頻率執(zhí)行。任務(wù)類型使用period的值來劃分。


Xnip2019-08-25_10-10-59.jpg

參考資料:
《Java并發(fā)編程之美》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ??一個任務(wù)通常就是一個程序,每個運行中的程序就是一個進程。當一個程序運行時,內(nèi)部可能包含了多個順序執(zhí)行流,每個順...
    OmaiMoon閱讀 1,804評論 0 12
  • 線程池中有一定數(shù)量的工作線程,工作線程會循環(huán)從任務(wù)隊列中獲取任務(wù),并執(zhí)行這個任務(wù)。那么怎么去停止這些工作線程呢?這...
    wo883721閱讀 1,750評論 0 14
  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,018評論 1 19
  • 我說東方舞是奢侈品相信同行的小伙伴沒有任何人會否定! 這兩天都在閉關(guān)學(xué)習,通過蘇董蘇仲平老師培訓(xùn),我覺得東方舞是投...
    小青的2019閱讀 373評論 0 0
  • Tomcat官網(wǎng)下載: 設(shè)置環(huán)境變量:
    sh0rk閱讀 341評論 0 0

友情鏈接更多精彩內(nèi)容