Java線程池是如何實現(xiàn)線程復(fù)用的?

前言

沒看本文,面試掛了,別說沒提醒你!
沒看本文,面試掛了,別說沒提醒你!
沒看本文,面試掛了,別說沒提醒你!

相信很多人都接觸過線程池,我們知道線程池有核心線程和非核心線程之分,其中核心線程是一直存活在線程池中的,而非核心線程是在執(zhí)行完任務(wù)之后超時銷毀的。但是大家應(yīng)該都知道一點,當(dāng)Thread執(zhí)行完Runnable任務(wù)之后就會銷毀,而且就算執(zhí)行完任務(wù)之后把線程掛起也沒有辦法再去執(zhí)行其他任務(wù),那線程池是如何做到核心線程復(fù)用的呢?下面就通過閱讀源碼的方法帶大家了解背后的原因。

推薦閱讀之前的文章
深入淺出Java(Android )線程池ThreadPoolExecutor

大家可以對著這個流程圖去學(xué)習(xí)源碼,把這個圖掌握了,線程池原理也就差不多了


線程池工作流程

首先來看一下執(zhí)行線程任務(wù)的方法,里面很簡單,就是根據(jù)工作線程數(shù)量去執(zhí)行不同的策略,里面分成了3種情況,但是都會執(zhí)行addWoker()方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        /*
         * 如果當(dāng)前活躍線程數(shù)小于核心線程數(shù),就會添加一個worker來執(zhí)行任務(wù);
         * 具體來說,新建一個核心線程放入線程池中,并把任務(wù)添加到該線程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

      //程序執(zhí)行到這里,說明要么活躍線程數(shù)大于核心線程數(shù);要么addWorker()失敗

        /*
         * 如果當(dāng)前線程池是運行狀態(tài),會把任務(wù)添加到隊列
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

       //程序執(zhí)行到這里,說明要么線程狀態(tài)不是RUNNING;要么workQueue隊列已經(jīng)滿了

        //調(diào)用addWorker方法去創(chuàng)建非核心線程,
        //如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

這個方法就和名字一樣,添加一個工人來完成任務(wù);而這個工人就是Thread,任務(wù)就是Runnable。

private boolean addWorker(Runnable firstTask, boolean core) {
       ...省略一些不重要的

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker是實現(xiàn)了Runnable接口的包裝類
            w = new Worker(firstTask);
            //Thread是在Worker構(gòu)造方法創(chuàng)建的
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                  
                    //檢查線程池狀態(tài),分為2種情況
                    //1、線程池處于RUNNING
                   //2、線程池處于SHUTDOWN并且firstTask==null
                   //這2種情況都會創(chuàng)建Worker來執(zhí)行隊列中的任務(wù)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //重新設(shè)置標(biāo)識位
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //啟動線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

到這里,我們大概知道是通過創(chuàng)建Worker來執(zhí)行任務(wù)的,而且線程是在Worker內(nèi)部創(chuàng)建的,我們也能猜到Thread需要的Runnable應(yīng)該也在Worker內(nèi)部,所有我們繼續(xù)看一下Worker類。

Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

Worker居然是一個Runnable任務(wù),而且Worker的構(gòu)造方法中創(chuàng)建了Thread對象。這樣的話,在之前的addWorker()方法中調(diào)用t.start();就會執(zhí)行到Worker的run()方法。

繼續(xù)來看看runWorker()方法,這里很關(guān)鍵,一定要仔細(xì)看。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //這個就是addWorker傳進(jìn)來的Runnable
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果task不為null或從workQueue中獲取任務(wù)不為null
            //就會一直執(zhí)行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt

                //檢查線程池狀態(tài),如果線程池處于中斷狀態(tài),將調(diào)用interrupt將線程中斷。 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    //中斷線程
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //線程任務(wù)執(zhí)行啦
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這里的關(guān)鍵在于這個while()條件判斷,當(dāng)?shù)谝淮蝿?chuàng)建Worker時就有任務(wù),當(dāng)執(zhí)行完這個任務(wù)后,這個方法并沒有結(jié)束,而是不斷地調(diào)用getTask()方法從阻塞隊列中獲取任務(wù)然后調(diào)用task.run()執(zhí)行任務(wù)。

getTask()獲取任務(wù)

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 1.allowCoreThreadTimeOut表示是否允許核心線程超時銷毀,默認(rèn)是false,也就是說核心線程即使空閑也不會被銷毀
          //當(dāng)然,如果設(shè)置為true,核心線程是會銷毀的
          //這樣的話,只有正在工作的線程數(shù)大于核心線程數(shù)才會為true,否則返回false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //2.如果timed為true,通過poll取任務(wù);如果為false,通過take取任務(wù)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

這個方法是通過一個死循環(huán)取任務(wù),取任務(wù)的話是通過workQueue這個阻塞隊列去完成的,在默認(rèn)不改變allowCoreThreadTimeOut的前提下,如果工作線程數(shù)大于核心線程數(shù),則通過poll()從隊列取任務(wù);否則通過take()從隊列取任務(wù);這2個方法的區(qū)別,

  • take():如果隊列中任務(wù)為空,會調(diào)用Condition.await()阻塞當(dāng)前線程。
  • poll(long timeout, TimeUnit unit):如果隊列中任務(wù)為空,也會阻塞當(dāng)前線程,但是阻塞時長為timeout

    這樣的話,是不是已經(jīng)搞清楚線程池中的核心線程復(fù)用的原因了。

線程的喚醒是在execute時,當(dāng)調(diào)用workQueue.offer()方法,將任務(wù)放入阻塞隊列時,會調(diào)用Condition.signal()方法喚醒一個之前阻塞的線程。這部分不細(xì)講,感興趣的同學(xué)自行查看。

總結(jié)

  • 1、當(dāng)Thread的run方法執(zhí)行完一個任務(wù)之后,會循環(huán)地從阻塞隊列中取任務(wù)來執(zhí)行,這樣執(zhí)行完一個任務(wù)之后就不會立即銷毀了;
  • 2、當(dāng)工作線程數(shù)小于核心線程數(shù),那些空閑的核心線程再去隊列取任務(wù)的時候,如果隊列中的Runnable數(shù)量為0,就會阻塞當(dāng)前線程,這樣線程就不會回收了

感謝以下作者

Java線程池實現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實踐
線程池原理
徹底理解Java線程池原理篇

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容