你了解線程池嗎

前言

如果有人問(wèn)我:“你了解Java線程池嗎”,我不打算回答Java中常用的幾種線程池,也記不住。從線程池的上層API來(lái)看,再多種的線程池,無(wú)非是參數(shù)的不同,讓它們呈現(xiàn)出了不同的特性,那么這些特性到底依賴什么樣的原理實(shí)現(xiàn),就更值得去深究,也是本文的目的。

試著回答以下幾個(gè)問(wèn)題:

  • 線程池如何實(shí)現(xiàn)
  • 非核心線程延遲死亡,如何做到
  • 核心線程為什么不會(huì)死
  • 如何釋放核心線程
  • 非核心線程能成為核心線程嗎
  • Runnable在線程池里如何執(zhí)行
  • 線程數(shù)如何做選擇
  • 常見(jiàn)的不同類型的線程池的功效如何做到

如果以上問(wèn)題回答不出一二三,可以借鑒本文。

基礎(chǔ)知識(shí)

要了解線程池,必然涉及到ThreadPoolExecutor。ThreadPoolExecutors實(shí)現(xiàn)了線程池所需的最小功能集,已能hold住很多場(chǎng)景。常見(jiàn)的線程池類型,通過(guò)Executors提供的API,屏蔽了構(gòu)造參數(shù)細(xì)節(jié)來(lái)創(chuàng)建ThreadPoolExecutors,因?yàn)椴涣私饩唧w參數(shù)含義的話,可能拿到的線程池與設(shè)想的會(huì)有偏差。

構(gòu)造參數(shù)與對(duì)象成員變量

  • corePoolSize:核心線程數(shù),期望保持的并發(fā)狀態(tài)
  • maximumPoolSize:最大線程數(shù),允許超載,雖然期望將并發(fā)狀態(tài)保持在一定范圍,但是在任務(wù)過(guò)多時(shí),增加非核心線程來(lái)處理任務(wù)。非核心線程數(shù) = maximumPoolSize - corePoolSize
  • workQueue:阻塞隊(duì)列,存儲(chǔ)線程任務(wù)Runnable
  • keepAliveTime:在沒(méi)有任務(wù)時(shí),線程存活時(shí)間
  • threadFactory:用來(lái)構(gòu)建線程
  • handler:當(dāng)任務(wù)已滿,并且無(wú)法再增加線程數(shù)時(shí),或拒絕添加任務(wù)時(shí),所執(zhí)行的策略

Worker

線程池中的工作線程以Worker作為體現(xiàn),真正工作的線程為Worker的成員變量,Worker即是Runnable,又是同步器。Worker從工作隊(duì)列中取出任務(wù)來(lái)執(zhí)行,并能通過(guò)Worker控制任務(wù)狀態(tài)。

ctl

ctl用來(lái)控制線程池的狀態(tài),并用來(lái)表示線程池線程數(shù)量。在線程池中,有以下五種狀態(tài)

  • RUNNABLE:運(yùn)行狀態(tài),接受新任務(wù),持續(xù)處理任務(wù)隊(duì)列里的任務(wù)
  • SHUTDOWN:不再接受新任務(wù),但要處理任務(wù)隊(duì)列里的任務(wù)
  • STOP:不接受新任務(wù),不再處理任務(wù)隊(duì)列里的任務(wù),中斷正在進(jìn)行中的任務(wù)
  • TIDYING:表示線程池正在停止運(yùn)作,中止所有任務(wù),銷毀所有工作線程
  • TERMINATED:表示線程池已停止運(yùn)作,所有工作線程已被銷毀,所有任務(wù)已被清空或執(zhí)行完畢

狀態(tài)轉(zhuǎn)換關(guān)系如下圖


狀態(tài)轉(zhuǎn)換.png

ctl類型為AtomicInteger,那用一個(gè)基礎(chǔ)如何表示以上五種狀態(tài)以及線程池工作線程數(shù)量呢?int型變量占用4字節(jié),共32位,因此采用位表示,可以解決上述問(wèn)題。5種狀態(tài)使用5種數(shù)值進(jìn)行表示,需要占用3位,余下的29位就可以用來(lái)表示線程數(shù)。因此,高三位表示進(jìn)程狀態(tài),低29位為線程數(shù)量,代碼如下:

    // 值為29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 高三位全為0,低29位全為1,因此線程數(shù)量的表示范圍為 0 ~ 2^29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    /**
    因?yàn)閏tl分位來(lái)表示狀態(tài)和數(shù)量,下面幾個(gè)狀態(tài)僅看有效位的值
    */
    // 有效值為 111
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 有效值為 000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 有效值為 001
    private static final int STOP       =  1 << COUNT_BITS;
    // 有效值為 010
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 有效值為 011
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // 默認(rèn)狀態(tài)為RUNNING,線程數(shù)量為0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

既然采用了int分位表示線程池狀態(tài)和線程數(shù)量,那么線程池自然提供了方法來(lái)獲取狀態(tài)與數(shù)量

  • runStateOf(): 獲取線程池狀態(tài)
  • workerCountOf(): 獲取工作線程數(shù)量

兩函數(shù)均為二進(jìn)制操作,代碼不貼,可用下圖說(shuō)明:


ctl結(jié)構(gòu)與操作.png

線程池實(shí)現(xiàn)

添加任務(wù)

線程池可以通過(guò)submit()、execute()提交線程任務(wù),其中,submit()可以通過(guò)Future拿到執(zhí)行結(jié)果,內(nèi)部也是通過(guò)execute()向線程池提交線程任務(wù).

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 獲取當(dāng)前ctl值
        int c = ctl.get();
        // 當(dāng)前線程數(shù)少于最大核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            // 添加核心線程,添加線程任務(wù)
            if (addWorker(command, true))
                return;
            // 上面的過(guò)程期間,ctl可能已被更改,獲取最新值
            c = ctl.get();
        }
        // 線程池狀態(tài)為RUNNABLE,向工作隊(duì)列添加任務(wù)
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次檢查用
            int recheck = ctl.get();
            // 線程不處于RUNNABLE狀態(tài),移除任務(wù)
            if (! isRunning(recheck) && remove(command))
                // 執(zhí)行拒絕任務(wù)策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 執(zhí)行到這里說(shuō)明已沒(méi)有可用的工作線程,創(chuàng)建新的工作現(xiàn)線程
                // ,并從任務(wù)隊(duì)列里取任務(wù)。因?yàn)樵谶@個(gè)時(shí)刻,存在所有工作線程
                // 都被釋放的可能,為了應(yīng)對(duì)這個(gè)線程池“假死”的情況,所以創(chuàng)建
                // 了新的工作線程
                addWorker(null, false);
        }
        // 添加非核心隊(duì)列來(lái)執(zhí)行線程任務(wù)
        else if (!addWorker(command, false))
            // 說(shuō)明線程池達(dá)到飽和,或者線程池shut down,執(zhí)行拒絕策略
            reject(command);
    }

當(dāng)有任務(wù)到來(lái)時(shí),按照如下策略進(jìn)行:

  1. 如果當(dāng)前核心線程數(shù)量沒(méi)達(dá)到最大值corePoolSize,創(chuàng)建新線程來(lái)執(zhí)行此任務(wù)
  2. 如果當(dāng)前核心線程到達(dá)最大,向阻塞隊(duì)列添加任務(wù)
  3. 如果核心線程已滿,阻塞隊(duì)列已滿,嘗試開(kāi)啟非核心線程來(lái)執(zhí)行任務(wù)
  4. 如果線程池不處于RUNNABLE狀態(tài),或者處于飽和狀態(tài),執(zhí)行任務(wù)拒絕策略

線程池是按照上面123的順序來(lái)處理新進(jìn)的任務(wù)的,并且在每一個(gè)過(guò)程中,會(huì)檢查ctl的最新值有效性,因?yàn)樵谔幚磉^(guò)程中線程池的各種狀態(tài)隨時(shí)可能發(fā)生了改變。

不過(guò)是通過(guò)添加核心或是通過(guò)添加非核心線程來(lái)執(zhí)行任務(wù),都是通過(guò)addWorker()來(lái)完成,下面是代碼

   private boolean addWorker(Runnable firstTask, boolean core) {
        // 這個(gè)是類似 goto 的語(yǔ)法,代碼有效片段是下面第一for循環(huán)
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取程序狀態(tài)
            int rs = runStateOf(c);

            /**
             這一個(gè)條件需要仔細(xì)理解。
             1. 當(dāng)線程處于STOP、TIDYING、TERMINATED時(shí),線程池是拒絕執(zhí)行任務(wù)的
             因此不需要任務(wù),也不添加線程
             2. 當(dāng)線程處于SHUTDOWN狀態(tài)時(shí),線程池需要把任務(wù)處理完,才會(huì)到達(dá)后面的
             TIDYING、TERMINATED狀態(tài)。因此,如果阻塞隊(duì)列還有任務(wù)的話,繼續(xù)添加
             線程來(lái)加快處理。
            */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 獲取線程數(shù)
                int wc = workerCountOf(c);
                // 線程數(shù)超過(guò)或等于能表示的上限
                // 或 比較 核心線程數(shù)達(dá)到上限,或比較線程池允許的最大線程數(shù),取決于core
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS操作增加線程數(shù),跳出循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 上面的CAS操作沒(méi)成功,檢查線程池狀態(tài)與開(kāi)始是否一致,
                // 如果一致,繼續(xù)執(zhí)行此for循環(huán),否則重新執(zhí)行retry代碼塊,
                // 自旋以期CAS成功,后續(xù)才能添加線程
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 將線程任務(wù)加入Worker,新增了Worker,就是新增了線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    // 再次檢查線程池狀態(tài)
                    // 1. 處于RUNNABLE狀態(tài),繼續(xù)添加線程執(zhí)行任務(wù)
                    // 2. 處于SHUTDOWN狀態(tài),到這里說(shuō)明隊(duì)列里還有任務(wù)要執(zhí)行
                    // 增加線程期望讓任務(wù)執(zhí)行快一點(diǎn)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 這里說(shuō)明發(fā)生了意外狀況,新建的線程不可用
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        // 添加worker進(jìn)集合
                        workers.add(w);
                        int s = workers.size();
                        // largestPoolSize可以表示線程池達(dá)到的最大并發(fā)
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 添加線程成功    
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 啟動(dòng)新添加的線程
                    t.start();
                    // 線程啟動(dòng)成功
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 線程啟動(dòng)失敗,移除work,銷毀線程
                addWorkerFailed(w);
        }
        return workerStarted;
    }

以上代碼做了如下幾件事:

  1. 線程池處于 RUNNBALE 或者處于 SHUTDOWN 并在阻塞隊(duì)列里還有任務(wù)時(shí),需要添加新線程。自旋確保 CAS 成功,然后添加新線程
  2. 線程存于Worker,線程池存有Worker信息,就能訪問(wèn)線程
  3. 線程啟動(dòng)失敗,則移除Worker,銷毀線程

addWorkerFailed()操作就不進(jìn)去看了,首先是將Worker移除,然后通過(guò)CAS操作更新ctl,最后調(diào)用tryTerminate()操作嘗試中止線程池。

執(zhí)行任務(wù)

之前的代碼開(kāi)啟了新線程并讓線程執(zhí)行,但是沒(méi)有看到有Runnable提交。之前說(shuō)過(guò)Worker本身為Runnable,并且存有為T(mén)hread類型的成員變量。線程池執(zhí)行的任務(wù)的線程,也就是Workder里的Thread。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        Worker(Runnable firstTask) {
            setState(-1);
            // firstTask就是addWorker()帶來(lái)的Runnable
            this.firstTask = firstTask;
            // 通過(guò)ThreadFactory創(chuàng)建線程,將自己作為Runnable提交
            this.thread = getThreadFactory().newThread(this);
        }
        ......
    }

因此線程執(zhí)行后,執(zhí)行的是Worker.run(),run()則調(diào)用了ThreadPoolExecutor.runWorker()

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // task一開(kāi)始是firstTask, 后面就通過(guò)getTask()從阻塞隊(duì)列里拿任務(wù)
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 線程池狀態(tài)檢查
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 這個(gè)方法子類可以重寫(xiě),在任務(wù)執(zhí)行前有回調(diào)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執(zhí)行任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 這個(gè)方法子類可以重寫(xiě),在任務(wù)執(zhí)行后有回調(diào)
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 線程池已沒(méi)有任務(wù)了,工作線程達(dá)到了可退出的狀態(tài),Worker退出
            processWorkerExit(w, completedAbruptly);
        }
    }

線程首個(gè)任務(wù)為firstTask,之后通過(guò)getTask()就從阻塞隊(duì)列里任務(wù)。線程池提供了beforeExecute()和afterExecute()通知子類任務(wù)執(zhí)行前后的回調(diào),讓子類有時(shí)機(jī)能執(zhí)行自己的事情。如果線程池已沒(méi)有任務(wù)了,工作線程達(dá)到了可退出的狀態(tài),則將線程退出。

主要看getTask() 和 processWorkerExit()

    private Runnable getTask() {
        // 超時(shí)標(biāo)志
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 檢查線程池和阻塞隊(duì)列狀態(tài)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 減少線程數(shù)
                decrementWorkerCount();
                return null;
            }
            
            // 獲取線程數(shù)
            int wc = workerCountOf(c);

            // 線程等待方式標(biāo)志位判斷依據(jù)
            // allowCoreThreadTimeOut代表核心線程是不是能退出,如果核心線程能退出,就更別說(shuō)非核心線程了
            // 另一個(gè)則是看是否存在非核心線程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // 超時(shí),或者并且線程超標(biāo)超標(biāo),返回null,讓上一層函數(shù)退出線程
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 如果timed為true,則使用poll等待最多keepAliveTime時(shí)間獲取任務(wù)
                // 如果timed為false,使用take()獲取任務(wù),阻塞線程,直到可以從阻塞隊(duì)列拿到任務(wù)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 超時(shí)
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

線程池里的線程從阻塞隊(duì)列里拿任務(wù),如果存在非核心線程,假設(shè)阻塞隊(duì)列里沒(méi)有任務(wù),那么非核心線程也要在等到keepAliveTime時(shí)間后才會(huì)釋放。如果當(dāng)前僅有核心線程存在,如果允許釋放核心線程的話,也就和非核線程的處理方式一樣,反之,則通過(guò)take()一直阻塞直到拿到任務(wù),這也就是線程池里的核心線程為什么不死的原因。

從之前的代碼一直看到這,并沒(méi)有發(fā)現(xiàn)有明顯的標(biāo)志來(lái)標(biāo)志核心線程與非核心線程,而是以線程數(shù)來(lái)表達(dá)線程身份。0 ~ corePoolSize 表示線程池里只有核心線程,corePoolSize ~ maximumPoolSize 表示線程池里核心線程滿,存在非核心線程。然后,根據(jù)區(qū)間狀態(tài)做有差異的處理??梢源竽懖聹y(cè),線程池實(shí)際并不區(qū)分核心線程與非核心線程,是根據(jù)當(dāng)前的總體并發(fā)狀態(tài)來(lái)決定怎樣處理線程任務(wù)。corePoolSize是線程池希望達(dá)到并保持的并發(fā)狀態(tài),而corePoolSize ~ maximumPoolSize則是線程池允許的并發(fā)的超載狀態(tài),不希望長(zhǎng)期保持。

釋放線程

在線程沒(méi)有拿到任務(wù)后,退出線程,通過(guò)processWorkerExit()可以證實(shí)上述所言。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 到這里說(shuō)明線程中斷,先通過(guò)decrementWorkerCount()減少線程數(shù)值
        // 否則,說(shuō)明是線程沒(méi)有從阻塞隊(duì)列獲取到線程
        if (completedAbruptly) 
                decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // completedTaskCount記錄線程池總共完成的任務(wù)
            // w.completedTasks則是線程完成的任務(wù)數(shù)
            completedTaskCount += w.completedTasks;
            // 移除Worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        
        // 線程池狀態(tài)改變,嘗試中止線程池
        tryTerminate();

        int c = ctl.get();
        // 檢查線程池狀態(tài),線程池處于RUNNABLE或者SHUTDOWN則進(jìn)入
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                // 線程池最小數(shù)量,取決于是否能釋放核心線程
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果任務(wù)隊(duì)列還有線程,最起碼都要有一個(gè)線程來(lái)處理任務(wù)
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; 
            }
            // 因?yàn)榫€程中斷,可能導(dǎo)致沒(méi)有線程來(lái)執(zhí)行阻塞隊(duì)列里的任務(wù)
            // 因此嘗試創(chuàng)建線程去執(zhí)行任務(wù)
            addWorker(null, false);
        }
    }

釋放工作線程也并沒(méi)有區(qū)分核心與非核心,也是隨機(jī)進(jìn)行的。所謂隨機(jī),就是在前面所說(shuō)的區(qū)間范圍內(nèi),根據(jù)釋放策略,哪個(gè)線程先達(dá)到獲取不到任務(wù)的狀態(tài),就釋放哪個(gè)線程。

文中多次出現(xiàn)tryTerminate(),但不深入去看了。里邊最主要的操作是,發(fā)現(xiàn)可以中止線程池時(shí),中止,并調(diào)用terminated()進(jìn)行通知。如果線程池處于RUNNABLE狀態(tài),什么也不做,否則嘗試中斷一個(gè)線程。 中斷線程則是通過(guò)interruptIdleWorker()操作,就不展開(kāi)了。

到這里就能能明白線程池的原理的,如下圖


線程池工作原理(水滴篇).png

線程池里有容納一定的Worker,Worker中的線程就是線程池中用來(lái)執(zhí)行任務(wù)的線程。當(dāng)有任務(wù)加入線程時(shí),根據(jù)線程池狀態(tài)的不同,有不同的步驟。當(dāng)核心線程未滿時(shí),創(chuàng)建新線程來(lái)執(zhí)行;否則將任務(wù)加入到阻塞隊(duì)列;否則創(chuàng)建非核心線程來(lái)執(zhí)行。而線程獲取任務(wù)的方式有兩種,根據(jù)線程池容量區(qū)間,以及是否可以釋放核心線程來(lái)使用take()或者poll()來(lái)獲取任務(wù),其中poll()在一定時(shí)間內(nèi)獲取不到任務(wù),則當(dāng)前線程會(huì)被釋放。

當(dāng)然,在addWorker()方法來(lái)有任務(wù)添加失敗的策略,也就是RejectedExecutionHandler。ThreadPoolExecutor實(shí)現(xiàn)了四種策略來(lái)進(jìn)行處理,簡(jiǎn)單了解即可:

  • CallerRunsPolicy: 如果線程池沒(méi)有SHUTODOWN的話,直接執(zhí)行任務(wù)
  • AbortPolicy: 拋出異常,說(shuō)明當(dāng)前情況的線程池不希望得到接收不了任務(wù)的狀態(tài)
  • DiscardOldestPolicy: 丟棄阻塞隊(duì)列最舊的任務(wù)
  • DiscardPolicy: 什么也不做

需要注意的是,默認(rèn)情況下策略為AbortPolicy。

總結(jié)

做個(gè)總結(jié):

  1. 線程池傾向于使用核心線程來(lái)處理任務(wù),從任務(wù)的添加策略可以看出,先考慮創(chuàng)建核心線程處理,再考慮放到阻塞隊(duì)列,再考慮創(chuàng)建非核心線程處理。以上都不行,則使用任務(wù)拒絕策略
  2. 通過(guò)向阻塞隊(duì)列取任務(wù)的不同操作,能確保線程的存活,take()保證核心線程不死,poll()保證非核心線程存活等待一定時(shí)間
  3. 線程池不區(qū)分核心線程和非核心線程,線程池是期望達(dá)到corePoolSize的并發(fā)狀態(tài),并允許在不得已情況下超載,達(dá)到corePoolSize ~ maximumPoolSize 的并發(fā)狀態(tài)
  4. 線程池狀態(tài)和線程數(shù)量用ctl表示,高三位為狀態(tài),低29位為當(dāng)前線程池?cái)?shù)量
  5. 線程池對(duì)狀態(tài)的檢測(cè)非常苛刻,幾乎在所有稍微耗時(shí)或影響下一步操作正確性的代碼前都校驗(yàn)ctl

線程池中有很多值得學(xué)習(xí)的東西,線程容量調(diào)整的設(shè)計(jì)、ctl的設(shè)計(jì)、任務(wù)調(diào)度的設(shè)計(jì)等。也有需要更深的儲(chǔ)備才能看懂的實(shí)現(xiàn),這里點(diǎn)出,以備近一步學(xué)習(xí),如同步器的使用,并發(fā)場(chǎng)景的考慮與應(yīng)用等。

下面回答開(kāi)篇提出的問(wèn)題。

問(wèn)答

線程池如何實(shí)現(xiàn)
總結(jié)就是這個(gè)問(wèn)題的答案

非核心線程延遲死亡,如何實(shí)現(xiàn)
通過(guò)阻塞隊(duì)列poll(),讓線程阻塞等待一段時(shí)間,如果沒(méi)有取到任務(wù),則線程死亡

核心線程為什么不死
通過(guò)阻塞隊(duì)列take(),讓線程一直等待,直到獲取到任務(wù)

如何釋放核心線程
將allowCoreThreadTimeOut設(shè)置為true。可用下面代碼實(shí)驗(yàn)

// 偽代碼
{
    // 允許釋放核心線程,等待時(shí)間為100毫秒
    es.allowCoreThreadTimeOut(true);
    for(......){
        // 向線程池里添加任務(wù),任務(wù)內(nèi)容為打印當(dāng)前線程池線程數(shù)
        Thread.currentThread().sleep(200);
    }
}

線程數(shù)會(huì)一直為1。 如果allowCoreThreadTimeOut為false,線程數(shù)會(huì)逐漸達(dá)到飽和,然后大家一起阻塞等待。

非核心線程能成為核心線程嗎
線程池不區(qū)分核心線程于非核心線程,只是根據(jù)當(dāng)前線程池容量狀態(tài)做不同的處理來(lái)進(jìn)行調(diào)整,因此看起來(lái)像是有核心線程于非核心線程,實(shí)際上是滿足線程池期望達(dá)到的并發(fā)狀態(tài)。

Runnable在線程池里如何執(zhí)行
線程執(zhí)行Worker,Worker不斷從阻塞隊(duì)列里獲取任務(wù)來(lái)執(zhí)行。如果任務(wù)加入線程池失敗,則在拒絕策略里,還有處理機(jī)會(huì)。

線程數(shù)如何做選擇
這就要看任務(wù)類型是計(jì)算密集型任務(wù)還是IO密集型任務(wù)了,區(qū)別在于CPU占用率。計(jì)算密集型任務(wù)涉及內(nèi)存數(shù)據(jù)的存取,CPU處于忙綠狀態(tài),因此并發(fā)數(shù)相應(yīng)要低一些。而IO密集型任務(wù),因?yàn)橥獠吭O(shè)備速度不匹配問(wèn)題,CPU更多是處于等待狀態(tài),因此可以把時(shí)間片分給其他線程,因此并發(fā)數(shù)可以高一些。

常見(jiàn)的不同類型的線程池的功效如何做到
常見(jiàn)的線程池有:

  • CachedThreadPool:適合異步任務(wù)多,但周期短的場(chǎng)景
  • FixedThreadPool: 適合有一定異步任務(wù),周期較長(zhǎng)的場(chǎng)景,能達(dá)到有效的并發(fā)狀態(tài)
  • SingleThreadExecutor: 適合任務(wù)串行的場(chǎng)景
  • ScheduledThreadPool: 適合周期性執(zhí)行任務(wù)的場(chǎng)景

對(duì)于如何選擇線程池就要看具體的場(chǎng)景,其中的差異通過(guò)構(gòu)造參數(shù)可以到達(dá)效果,通過(guò)之前的分析,就能知道參數(shù)的具體作用以及為什么能達(dá)到效果。取FixedThreadPool來(lái)看,拋磚引玉。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

nThreads個(gè)數(shù)量核心線程持續(xù)并發(fā)任務(wù),沒(méi)有非核心線程,如果沒(méi)有任務(wù),則通過(guò)take()阻塞等待,不允許核心線程死亡。并且阻塞隊(duì)列為L(zhǎng)inkedBlockingQueue,容量為Integer.MAX_VALUE,可以視為無(wú)界隊(duì)列,更難走到拒絕添加線程邏輯。

參考

線程池原理
徹底理解Java線程池原理篇
Java線程池---ThreadPoolExecutor中的ctl變量
JUC鎖框架_AbstractQueuedSynchronizer詳細(xì)分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容