面試必備的線程池知識-線程池的原理

@TOC

前言

上一篇我們介紹了線程池的使用,這一篇我們接著分析下線程池的實(shí)現(xiàn)原理。首先從創(chuàng)建線程池的核心類ThreadPoolExecutor類說起。

ThreadPoolExecutor類的常量

    //用來存放工作線程數(shù)量和線程池狀態(tài)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;  //32-3=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //運(yùn)行狀態(tài),可以執(zhí)行任務(wù)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //不能接受新任務(wù),但是可以執(zhí)行完正在執(zhí)行任務(wù)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //不能接受新任務(wù),也不能執(zhí)行已有的任務(wù)
    private static final int STOP       =  1 << COUNT_BITS;
    //所有任務(wù)都終止,工作線程數(shù)歸零
    private static final int TIDYING    =  2 << COUNT_BITS;
    //終止?fàn)顟B(tài)執(zhí)行完成
    private static final int TERMINATED =  3 << COUNT_BITS;
        
    //獲取線程池的狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取工作線程的數(shù)量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl 變量主要是為了把工作線程數(shù)量和線程池狀態(tài)放在一個(gè)整型變量存儲而設(shè)置的一個(gè)原子類型的變量。在ctl中,低位的29位表示工作線程的數(shù)量,高位用來表示RUNNING,SHUTDOWN,STOP等線程池狀態(tài)。上面定義的三個(gè)方法只是為了計(jì)算得到線程池的狀態(tài)和工作線程的數(shù)量以及得到ctl。
下面是一段線程池的測試代碼,定義線程池,并調(diào)用execute方法添加任務(wù),并執(zhí)行任務(wù)。

public class ExectorTest {
    public static void main(String[] args) {
       //給線程設(shè)置一個(gè)自定義名稱
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("測試線程-%d").build();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                6,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5),
                threadFactory
//                , new ThreadPoolExecutor.CallerRunsPolicy()
        );
        for (int i=0;i<20;i++) {
            executorService.execute(()->{
               //模擬耗時(shí)的任務(wù)
                System.out.println(Thread.currentThread().getName()+" 開始執(zhí)行任務(wù)");
                int j = 10000 * 10000;
                while (j >0) {
                    j--;
                }

                System.out.println(Thread.currentThread().getName()+" 執(zhí)行結(jié)束");

            });
        }
    }
}

利用debug模式得到的調(diào)試棧如下:

在這里插入圖片描述

提交任務(wù)execute方法是整個(gè)線程池的執(zhí)行入口,下面我就從它開始分析。

execute方法

    public void execute(Runnable command) {
    //如果任務(wù)為空,則拋出NPE異常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         //獲取線程池狀態(tài)
        int c = ctl.get();
        //1.如果工作線程的數(shù)量小于核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            //調(diào)用addWorker增加一個(gè)新線程,并執(zhí)行一個(gè)任務(wù)
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果線程池的狀態(tài)是運(yùn)行狀態(tài),并且任務(wù)加入到了工作隊(duì)列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //雙重檢查,再次檢查線程池的狀態(tài)。
            int recheck = ctl.get();
            //如果線程池的狀態(tài)不是運(yùn)行狀態(tài)并且移除任務(wù)成功則調(diào)用拒絕策略
            if (! isRunning(recheck) && remove(command))
                //調(diào)用RejectedExecutionHandler.rejectedExecution()方法。根據(jù)不同的拒絕策略去處理
                 reject(command);
            //如果工作線程的數(shù)量為0,說明工作隊(duì)列中可能有任務(wù)沒有線程執(zhí)行,此時(shí)則新建一個(gè)線程來執(zhí)行任務(wù),由于執(zhí)行的是隊(duì)列中已經(jīng)堆積的任務(wù),所以沒有傳入具體的任務(wù)。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果前面的新增work,放入隊(duì)列都失敗,則會繼續(xù)新增worker,此時(shí)線程池中的工作線程數(shù)達(dá)到corePoolSize,阻塞隊(duì)列任務(wù)已滿,只能基于maximumPoolSize來繼續(xù)增加work,如果還是失敗
        else if (!addWorker(command, false))
           //如果還是失敗,則調(diào)用RejectedExecutionHandler.rejectedExecution()方法。根據(jù)不同的拒絕策略去處理
            reject(command);
    }

從上代碼中,我們可以總結(jié)出execute方法主要有如下三個(gè)流程

  1. 如果線程池中當(dāng)前工作線程數(shù)小于核心線程數(shù)(corePoolSize),則創(chuàng)建一個(gè)新線程來執(zhí)行傳入的任務(wù)(執(zhí)行這一步驟需要獲取全局鎖)
  2. 如果工作線程數(shù)大于等于核心線程數(shù),并且線程池是運(yùn)行狀態(tài),則將傳入的任務(wù)加入到工作隊(duì)列(BlockingQueue)中。
  3. 如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來處理任務(wù)(執(zhí)行這一步驟需要獲取全局鎖)
  4. 如果創(chuàng)建新線程將使得當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。根據(jù)不同的拒絕策略去處理
    運(yùn)行的流程圖如下:
    在這里插入圖片描述

    execute()方法可以看到新增線程并且執(zhí)行任務(wù)核心邏輯在addWorker方法中。

addWorker的方法

首先第一段代碼,這段代碼有兩個(gè)死循環(huán),外層的死循環(huán)主要是檢查線程池的狀態(tài),更新線程池的狀態(tài)。內(nèi)層的死循環(huán),是檢查工作線程的數(shù)量,并且通過CAS的方式在ctl中更新工作線程的數(shù)量。

 for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //檢查線程池的狀態(tài)是否是運(yùn)行狀態(tài),并且隊(duì)列不為空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //獲取工作線程數(shù)
                int wc = workerCountOf(c);
                //工作線程數(shù)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    //通過CAS的方式來在ctl中增加工作線程的數(shù)量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //再次獲取狀態(tài)
                c = ctl.get();  // Re-read ctl
                //如果狀態(tài)更新失敗,則循環(huán)更新
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

分析完了前置的一些檢查工作的代碼,接下來,來看下主流程的代碼:

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //1. 新建一個(gè)工作線程,Work后面會說
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //再次獲取線程池的狀態(tài),在新建線程或者釋放鎖時(shí),都會重新檢查。
                    int rs = runStateOf(ctl.get());
                    //如果線程池的狀態(tài)不是關(guān)閉狀態(tài),則進(jìn)入下面的分支
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //檢查新建的線程是否是可運(yùn)行狀態(tài)
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //將工作線程添加到HashSet類型的集合中
                        workers.add(w);
                        int s = workers.size();
                        //如果工作線程的集合數(shù)大于largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //新建工作線程成功之后,將操作標(biāo)志workerAdded設(shè)為true,表示新增工作線程成功,后續(xù)流程用
                        workerAdded = true;
                    }
                } finally {
                    //釋放鎖
                    mainLock.unlock();
                }
                //如果新建工作線程成功,則調(diào)用start() 方法啟動(dòng)線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果workerStarted為false,表示新建工作線程失敗
            if (! workerStarted)
                //移除已經(jīng)創(chuàng)建的工作線程
                addWorkerFailed(w);
        }
        return workerStarted;

如上,該主流程的代碼邏輯也是比較清晰的,首先是新建一個(gè)工作線程,然后就是在同步代碼塊中檢查線程池的狀態(tài),如果不是SHUTDOWN狀態(tài),則將新增的線程放在HashSet類型線程的集合中,放入成功之后,將創(chuàng)建work的標(biāo)識workerAdded改成true,然后釋放鎖。接著就是調(diào)用start()方法使得線程可以執(zhí)行任務(wù)。接下來就來看看Worker的結(jié)構(gòu)

Work 類

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
  Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;   //傳入的任務(wù)
            this.thread = getThreadFactory().newThread(this);  //創(chuàng)建一個(gè)新線程
        }
       public void run() {
            runWorker(this);
        }

    }

Worker類是ThreadPoolExecutor類的一個(gè)私有內(nèi)部不變類,其實(shí)現(xiàn)了Runnable接口,內(nèi)部的run()方法里面調(diào)用的runWorker()方法。所以,任務(wù)的最終執(zhí)行時(shí)通過runWorker()方法的。 在介紹runWorker()之前,我們先看看創(chuàng)建線程的邏輯。

ThreadFactoryBuilder類

按照前面調(diào)用棧我們接著分析下ThreadFactoryBuilder。ThreadFactoryBuilder類用于生成ThreadFactory并且設(shè)置一些參數(shù),比如線程名,線程的等級,是否是后臺線程等信息。這里設(shè)置信息用到了建造者模式。代碼如下:

 public ThreadFactory build() {
    return build(this);
  }

  private static ThreadFactory build(ThreadFactoryBuilder builder) {
    final String nameFormat = builder.nameFormat;
    final Boolean daemon = builder.daemon;
    final Integer priority = builder.priority;
    final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
    //沒有指定ThreadFactory實(shí)現(xiàn)類的話默認(rèn)就是Executors.defaultThreadFactory()
    final ThreadFactory backingThreadFactory =
        (builder.backingThreadFactory != null)
            ? builder.backingThreadFactory
            : Executors.defaultThreadFactory();
    final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
   //匿名內(nèi)部類
   return new ThreadFactory() {
      @Override
      public Thread newThread(Runnable runnable) {
        //調(diào)用 backingThreadFactory.newThread得到生成的工作線程
        Thread thread = backingThreadFactory.newThread(runnable);
        //重置線程名
        if (nameFormat != null) {
          thread.setName(format(nameFormat, count.getAndIncrement()));
        }
        //重置是否是后臺線程
        if (daemon != null) {
          thread.setDaemon(daemon);
        }
        //重置線程的等級
        if (priority != null) {
          thread.setPriority(priority);
        }
        if (uncaughtExceptionHandler != null) {
          thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
        }
        return thread;
      }
    };
  }

這個(gè)類的邏輯比較簡單,主要是兩步

  1. 獲取ThreadFactory的具體實(shí)現(xiàn)類
  2. 調(diào)用ThreadFactory的newThread方法,并重置線程名信息。
    接下來我們看看Executors類的內(nèi)部靜態(tài)類DefaultThreadFactory類。

DefaultThreadFactory類

 static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            //直接new一個(gè)工作線程
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            //是否是后臺線程
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

上面DefaultThreadFactory的代碼比較簡單,就是new一個(gè)工作線程并設(shè)置工作線程的默認(rèn)名。說完了創(chuàng)建工作線程的邏輯,接下來,我們來看看執(zhí)行任務(wù)的runWorker方法的邏輯。

runWorker

  final void runWorker(Worker w) {
  //獲取當(dāng)前線程
        Thread wt = Thread.currentThread();
        //獲取當(dāng)前的任務(wù)
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //一般情況下,task都不會為空(特殊情況上面注釋就是前面execute方法說的)或者可以從工作隊(duì)列中取到任務(wù),會直接進(jìn)入循環(huán)體中。
            while (task != null || (task = getTask()) != null) {
               //加鎖
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果線程池正在停止,確保線程是被中斷,
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
             //該方法是個(gè)空的實(shí)現(xiàn),如果有需要用戶可以自己繼承該類進(jìn)行實(shí)現(xiàn)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                        //調(diào)用任務(wù)的run方法,真正的任務(wù)執(zhí)行邏輯
                        task.run();
                      .....省略部分代碼
                      finally {
                      //該方法是個(gè)空的實(shí)現(xiàn),如果有需要用戶可以自己繼承該類進(jìn)行實(shí)現(xiàn)
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
                //當(dāng)指定任務(wù)執(zhí)行完成,阻塞隊(duì)列中也取不到可執(zhí)行任務(wù)時(shí),會進(jìn)入這里,做一些善后工作,比如在corePoolSize跟maximumPoolSize之間的woker會進(jìn)行回收
            processWorkerExit(w, completedAbruptly);
        }
    }

work線程的執(zhí)行流程就是首先執(zhí)行初始化分配給的任務(wù),執(zhí)行完成之后會嘗試從阻塞中獲取可執(zhí)行的任務(wù),如果指定時(shí)間內(nèi)仍然沒有任務(wù)可以執(zhí)行,則進(jìn)入銷毀邏輯,這里只會回收corePoolSize與maxmumPoolSize之間的那部分worker。

getTask方法

這里getTask方法的實(shí)現(xiàn)更我們構(gòu)造參數(shù)設(shè)置存活時(shí)間有關(guān),我們都知道構(gòu)造參數(shù)設(shè)置的時(shí)間代表了線程池中的線程,即worker線程的存活時(shí)間,如果到期則回收worker線程,這個(gè)邏輯的實(shí)現(xiàn)就在getTask中。來不及執(zhí)行的任務(wù),線程池會放入一個(gè)阻塞隊(duì)列(工作隊(duì)列),getTask方法就是去工作隊(duì)列中取任務(wù),用戶設(shè)置的存活時(shí)間,就是從這個(gè)阻塞隊(duì)列中取任務(wù)等待的最大時(shí)間,如果getTask返回null,意思就是worker等待了指定時(shí)間仍然沒有取到任務(wù),此時(shí)就會跳過循環(huán)體,進(jìn)入worker線程銷毀邏輯。

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //線程池如果是SHUTDOWN或者STOP狀態(tài),則將work移除。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 對于allowCoreThreadTimeOut為true(設(shè)置了核心線程的存活時(shí)間),或者是在corePoolSize與maxmumPoolSize之間的那部分worker
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果timed為true,則需要在keepAliveTime時(shí)間內(nèi)取任務(wù),否則沒有存活時(shí)間的限制
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

總結(jié)

本文對線程池添加任務(wù),執(zhí)行任務(wù)的源碼做了重點(diǎn)解析,內(nèi)部用到了很多設(shè)計(jì)模式,比如創(chuàng)建線程用到了工廠模式,設(shè)置線程的屬性用到了建造者模式。同時(shí)還用到了鎖等知識。了解其實(shí)現(xiàn)原理對我們更好的使用線程池大有好處。

參考

Java線程池總結(jié)
面試題|關(guān)于Java線程池一篇文章就夠了

全網(wǎng)同名【碼農(nóng)飛哥】。不積跬步,無以至千里,享受分享的快樂
我是碼農(nóng)飛哥,再次感謝您讀完本文。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容