Java線程池如何保證線程池的核心線程存活不被銷(xiāo)毀?execute()的執(zhí)行邏輯

線程池介紹

基本元素

線程池是一個(gè)創(chuàng)建使用線程并能保存使用過(guò)的線程以達(dá)到服用的對(duì)象,他使用workQueue當(dāng)作阻塞隊(duì)列,workers作為線程的封裝操作對(duì)象。

/**
* 用于保留任務(wù)并移交給工作線程(指允許不超過(guò)maximumPoolSize大小的線程)的隊(duì)列。
* 我們不要求workQueue.poll()返回null必然意味著workQueue.isEmpty(),因此僅依靠isEmpty來(lái)查看隊(duì)列是否為空(例如,在決定是否從SHUTDOWN過(guò)渡到TIDYING時(shí)必須這樣做)。
* 這可容納特殊用途的隊(duì)列,例如DelayQueues,允許poll()返回null,即使它在延遲到期后稍后可能返回non-null。
*/
    private final BlockingQueue<Runnable> workQueue;
/**
* 包含池中所有工作線程的集合。只有在持有mainLock鎖時(shí)才能訪問(wèn)。
*/
    private final HashSet<Worker> workers = new HashSet<Worker>();

保證線程安全的元素

/**
* 主池控制狀態(tài)變量ctl包含了兩個(gè)概念字段,workerCount表示有效線程數(shù),runState表示是否正在運(yùn)行,正在關(guān)閉等
* 為了將它們打包為一個(gè)int,我們將workerCount限制為(2 ^ 29)-1(約5億)個(gè)線程,而不是(2 ^ 31)-1(20億)可以表示的線程。如果將來(lái)有問(wèn)題,可以將該變量更改為AtomicLong,并調(diào)整以下移位掩碼常量。但是在需要之前,使用int可以使此代碼更快,更簡(jiǎn)單。
* workerCount是已被允許啟動(dòng)但不允許停止的工人數(shù)。該值可能與活動(dòng)線程的實(shí)際數(shù)量暫時(shí)不同,例如,當(dāng)ThreadFactory在被詢(xún)問(wèn)時(shí)創(chuàng)建線程失敗,并且退出線程仍在終止之前執(zhí)行簿記操作時(shí),該值會(huì)有所不同。用戶(hù)可見(jiàn)的線程池大小是工作集合的當(dāng)前大小。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 鎖定時(shí)要鎖定工人集合和相關(guān)簿記。
* 雖然我們可以使用某種并發(fā)集合,但事實(shí)證明,通常最好使用鎖。 原因之一是,這可以序列化interruptIdleWorkers,從而避免了不必要的中斷風(fēng)暴,尤其是在關(guān)機(jī)期間。 否則,退出線程將同時(shí)中斷那些尚未中斷的線程。 它還簡(jiǎn)化了一些相關(guān)的統(tǒng)計(jì)數(shù)據(jù),如largePoolSize等。我們還在shutdown和shutdownNow上保留mainLock,以確保在單獨(dú)檢查中斷和實(shí)際中斷的權(quán)限時(shí),工人集合是穩(wěn)定的。
*/
private final ReentrantLock mainLock = new ReentrantLock();

execute(Runnable)執(zhí)行邏輯

execute()執(zhí)行邏輯,來(lái)自下方參考鏈接
/*
* 1.如果正在運(yùn)行的線程少于corePoolSize線程,會(huì)創(chuàng)建新線程。對(duì)addWorker方法的調(diào)用從原子上檢查runState和workerCount,并通過(guò)返回false來(lái)表示添加工作線程失敗了。
* 2.如果一個(gè)任務(wù)可以成功排隊(duì),那么我們?nèi)匀恍枰屑?xì)檢查是否應(yīng)該添加一個(gè)線程(因?yàn)楝F(xiàn)有線程自上次檢查后就死掉了)或自從進(jìn)入該方法以來(lái)該池已關(guān)閉。因此,我們重新檢查狀態(tài),并在必要時(shí)回滾排隊(duì)(如果已停止),或者在沒(méi)有線程的情況下啟動(dòng)新線程。
* 3.如果我們無(wú)法將任務(wù)排隊(duì),則嘗試添加一個(gè)新線程。如果失敗,線程池可能已關(guān)閉或已飽和,因此拒絕該任務(wù)。
*/
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        ////獲取線程池控制狀態(tài)
        int c = ctl.get();
        //通過(guò)workerCountOf計(jì)算出實(shí)際線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            //未超過(guò)核心線程數(shù),則新增 Worker 對(duì)象,true表示核心線程,false表示最大線程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //核心線程滿(mǎn)了,如果線程池處于運(yùn)行狀態(tài)則往隊(duì)列中添加任務(wù),
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果不在運(yùn)行狀態(tài)則刪除阻塞隊(duì)列中的任務(wù)并執(zhí)行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果當(dāng)前線程池沒(méi)有任何工作線程在運(yùn)行,再次嘗試添加新的工作線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      //嘗試調(diào)用最非核心線程,失敗則執(zhí)行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

其中主要方法是addWorker()。

private boolean addWorker(Runnable firstTask, boolean core) {

        //這部分主要是對(duì)運(yùn)行狀態(tài)的操作,嘗試通過(guò)原子操作增加工作線程數(shù),如果成功則跳出循環(huán),否則重新獲取ctl的值并重新檢查運(yùn)行狀,略過(guò)。。。

        boolean workerStarted = false;//線程是否啟動(dòng)
        boolean workerAdded = false;//線程是否添加
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 在鎖的保護(hù)下重新檢查運(yùn)行狀態(tài)rs。
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 線程是否正在執(zhí)行任務(wù)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        //更新最大線程池大小largestPoolSize。 
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //線程啟動(dòng)!
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

經(jīng)過(guò)一系列邏輯運(yùn)行后,終于 t.start() 了!,然后他會(huì)調(diào)用Worker類(lèi)重寫(xiě)的run()方法,而里面只有runWorker()方法調(diào)用。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //獲取要執(zhí)行的任務(wù)
        Runnable task = w.firstTask;
        w.firstTask = null;
        //釋放鎖以允許中斷。
        w.unlock(); 
        //設(shè)置一個(gè)布爾變量completedAbruptly,用于標(biāo)記任務(wù)是否突然完成。 
        boolean completedAbruptly = true;
        try {
            //完成初始任務(wù)后,繼續(xù)等待獲取新任務(wù)執(zhí)行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //檢查線程池是否正在停止,如果是,則確保線程被中斷;如果不是,則確保線程沒(méi)有被中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//方法里面是空的,需要自定義實(shí)現(xiàn)
                    Throwable thrown = null;
                    try {
                        //執(zhí)行我們傳入的代碼
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//方法里面是空的,需要自定義實(shí)現(xiàn)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //最后執(zhí)行線程退出前的操作
            processWorkerExit(w, completedAbruptly);
        }
    }

這個(gè)方法的主要邏輯是通過(guò)循環(huán)不停調(diào)用getTask()獲取任務(wù)并執(zhí)行,直到getTask()返回為空。

/**
* 根據(jù)當(dāng)前配置設(shè)置執(zhí)行阻塞或定時(shí)等待任務(wù),或者線程由于以下任何原因而必須退出,則返回null:
* 1. 超過(guò)最大線程數(shù)。
* 2.線程池已停止。
* 3.線程池被關(guān)閉并且隊(duì)列已經(jīng)空了。
* 4.線程超時(shí)等待任務(wù),并且在定時(shí)等待之前和之后都將終止線程(即{@code allowCoreThreadTimeOut || workerCount> corePoolSize}),并且如果隊(duì)列為非空,此工作程序不是線程池中的最后一個(gè)線程。
*/
private Runnable getTask() {
        //聲明一個(gè)布爾變量timedOut,用于記錄上一次poll()是否超時(shí)。
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 檢查線程池狀態(tài)以及阻塞隊(duì)列是否為空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 允許核心線程超時(shí)或者實(shí)際線程數(shù)大于核心線程則為true,否則為false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //poll():使當(dāng)前線程等待,直到發(fā)出信號(hào)或中斷它,或者經(jīng)過(guò)指定的等待時(shí)間。
                //take():使當(dāng)前線程等待,直到發(fā)出信號(hào)或被中斷為止。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

核心線程和非核心線程的邏輯區(qū)分

特別說(shuō)明下核心線程和非核心線程的區(qū)別,面試容易問(wèn)到。
核心線程如果不設(shè)置屬性 allowCoreThreadTimeOut 為 true,那么創(chuàng)建后永遠(yuǎn)不會(huì)被關(guān)閉中斷,會(huì)在 getTask() 方法中的 workQueue.take() 處 阻塞等待任務(wù);
如果核心線程設(shè)置屬性 allowCoreThreadTimeOut 為 true或者是非核心線程,那么就會(huì)調(diào)用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法,指定時(shí)間內(nèi)獲取不到任務(wù),就會(huì)跳出 runWorker(Worker w) 方法中的while循環(huán)而關(guān)閉線程。

參考鏈接

Java線程池是如何保證核心線程不被銷(xiāo)毀的

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容