Java - 線程池簡介

簡介

1. 什么是java線程池

一個(gè)管理線程的池子,它幫我們我們管理線程,避免增加創(chuàng)建線程和銷毀線程的資源損耗

2. 線程池的優(yōu)點(diǎn)

重用線程池中的線程:避免因?yàn)榫€程的創(chuàng)建和銷毀所帶來的性能開銷
提高相應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行
提高線程的可管理性:線程是稀缺資源,如果無限制的創(chuàng)建不僅會(huì)消耗系統(tǒng)的資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控

相關(guān)類及架構(gòu)圖

image.png
  • Executor:任務(wù)執(zhí)行者,線程池中幾乎所有的類都直接或者間接的實(shí)現(xiàn)了Executor,它是線程池框架的基礎(chǔ),它提供了一種將“任務(wù)提交”與“任務(wù)執(zhí)行”分離開來的機(jī)制
  • ExecutorServices:它繼承自Executor,它是“執(zhí)行者服務(wù)接口”,添加了一些用來管理執(zhí)行器生命周期和任務(wù)生命周期的方法
  • AbstractExecutorService:是一個(gè)抽象類,實(shí)現(xiàn)了ExecutorService 接口,為ExecutorService中的函數(shù)提供了默認(rèn)實(shí)現(xiàn)
  • ThreadPoolExecutor:線程池的核心類,用來處理被提交的任務(wù)
  • ScheduledExecutorService:一個(gè)接口,它相當(dāng)于提供了“延時(shí)”和“周期執(zhí)行”功能的ExecutorService
  • ScheduledThreadPoolExecutor:一個(gè)實(shí)現(xiàn)類,可以在給定的延遲后執(zhí)行任務(wù),或者定期執(zhí)行命令,比Timer靈活強(qiáng)大
  • Executors:它通過靜態(tài)工廠方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等類的對(duì)象。

核心類ThreadPoolExecutor

1. 構(gòu)造方法及參數(shù)含義
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ...
    }

corePoolSize:核心線程數(shù),默認(rèn)情況下會(huì)一直存活,設(shè)置了allowCoreThreadTimeOut屬性為true時(shí),當(dāng)?shù)却龝r(shí)間超過 keepAliveTime時(shí),核心線程數(shù)會(huì)被終止
maximumPoolSize:線程池中最大的線程數(shù),活動(dòng)線程數(shù)達(dá)到這個(gè)數(shù)值后,后續(xù)的新任務(wù)會(huì)被阻塞
keepAliveTime:非核心線程的閑置時(shí)的超時(shí)時(shí)長,超過這個(gè)時(shí)長,非核心線程就會(huì)被回收
unit:超時(shí)時(shí)長的時(shí)間單位
workQueue:任務(wù)隊(duì)列
threadFactory:線程工廠,為線程池提供創(chuàng)建新線程的功能
RejectedExecutionHandler:拒絕策略

2. 關(guān)鍵參數(shù)

轉(zhuǎn)自:https://juejin.im/entry/58fada5d570c350058d3aaad

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

ctl是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常量表示workerCount的上限值,大約是5億。
下面再介紹下線程池的運(yùn)行狀態(tài). 線程池一共有五種狀態(tài), 分別是:

RUNNING:能接受新提交的任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù);
SHUTDOWN:關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)。在線程池處于 RUNNING 狀態(tài)時(shí),調(diào)用 shutdown()方法會(huì)使線程池進(jìn)入到該狀態(tài)。(finalize() 方法在執(zhí)行過程中也會(huì)調(diào)用shutdown()方法進(jìn)入該狀態(tài));
STOP:不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài);
TIDYING:如果所有的任務(wù)都已終止了,workerCount (有效線程數(shù)) 為0,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)用 terminated() 方法進(jìn)入TERMINATED 狀態(tài)。
TERMINATED:在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài),默認(rèn)terminated()方法中什么也沒有做。
進(jìn)入TERMINATED的條件如下:
線程池不是RUNNING狀態(tài);
線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);
如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;
workerCount為0;
設(shè)置TIDYING狀態(tài)成功。

image.png

ctl相關(guān)方法

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • runStateOf:獲取運(yùn)行狀態(tài);
  • workerCountOf:獲取活動(dòng)線程數(shù);
  • ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。
3.線程池初始化執(zhí)行過程
  1. 未達(dá)到核心線程數(shù)時(shí),會(huì)直接啟動(dòng)一個(gè)核心線程執(zhí)行任務(wù)
  2. 線程池中的線程數(shù)已達(dá)到或超過核心線程數(shù),任務(wù)會(huì)被插入到任務(wù)隊(duì)列中排隊(duì)等待執(zhí)行
  3. 如果任務(wù)隊(duì)列已滿,且此時(shí)未達(dá)到線程池規(guī)定的最大值,那此時(shí)立即啟動(dòng)一個(gè)非核心線程來執(zhí)行任務(wù)
  4. 如果線程數(shù)量已達(dá)到線程池規(guī)定的最大值,就拒絕執(zhí)行此任務(wù),執(zhí)行拒絕策略

代碼:
execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * clt記錄著runState和workerCount
     */
    int c = ctl.get();
    /*
     * workerCountOf方法取出低29位的值,表示當(dāng)前活動(dòng)的線程數(shù);
     * 如果當(dāng)前活動(dòng)線程數(shù)小于corePoolSize,則新建一個(gè)線程放入線程池中;
     * 并把任務(wù)添加到該線程中。
     */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二個(gè)參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷;
         * 如果為true,根據(jù)corePoolSize來判斷;
         * 如果為false,則根據(jù)maximumPoolSize來判斷
         */
        if (addWorker(command, true))
            return;
        /*
         * 如果添加失敗,則重新獲取ctl值
         */
        c = ctl.get();
    }
    /*
     * 如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊(duì)列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新獲取ctl值
        int recheck = ctl.get();
        // 再次判斷線程池的運(yùn)行狀態(tài),如果不是運(yùn)行狀態(tài),由于之前已經(jīng)把command添加到workQueue中了,
        // 這時(shí)需要移除該command
        // 執(zhí)行過后通過handler使用拒絕策略對(duì)該任務(wù)進(jìn)行處理,整個(gè)方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法
         * 這里傳入的參數(shù)表示:
         * 1. 第一個(gè)參數(shù)為null,表示在線程池中創(chuàng)建一個(gè)線程,但不去啟動(dòng);
         * 2. 第二個(gè)參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize,添加線程時(shí)根據(jù)maximumPoolSize來判斷;
         * 如果判斷workerCount大于0,則直接返回,在workQueue中新增的command會(huì)在將來的某個(gè)時(shí)刻被執(zhí)行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 如果執(zhí)行到這里,有兩種情況:
     * 1. 線程池已經(jīng)不是RUNNING狀態(tài);
     * 2. 線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且workQueue已滿。
     * 這時(shí),再次調(diào)用addWorker方法,但第二個(gè)參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
     * 如果失敗則拒絕該任務(wù)
     */
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法
addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),core參數(shù)為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize,代碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取運(yùn)行狀態(tài)
        int rs = runStateOf(c);

        /*
         * 這個(gè)if判斷
         * 如果rs >= SHUTDOWN,則表示此時(shí)不再接收新任務(wù);
         * 接著判斷以下3個(gè)條件,只要有1個(gè)不滿足,則返回false:
         * 1. rs == SHUTDOWN,這時(shí)表示關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)
         * 2. firsTask為空
         * 3. 阻塞隊(duì)列不為空
         * 
         * 首先考慮rs == SHUTDOWN的情況
         * 這種情況下不會(huì)接受新提交的任務(wù),所以在firstTask不為空的時(shí)候會(huì)返回false;
         * 然后,如果firstTask為空,并且workQueue也為空,則返回false,
         * 因?yàn)殛?duì)列中已經(jīng)沒有任務(wù)了,不需要再添加線程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取線程數(shù)
            int wc = workerCountOf(c);
            // 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進(jìn)制是29個(gè)1),返回false;
            // 這里的core是addWorker方法的第二個(gè)參數(shù),如果為true表示根據(jù)corePoolSize來比較,
            // 如果為false則根據(jù)maximumPoolSize來比較。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增加workerCount,如果成功,則跳出第一個(gè)for循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失敗,則重新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果當(dāng)前的運(yùn)行狀態(tài)不等于rs,說明狀態(tài)已被改變,返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據(jù)firstTask來創(chuàng)建Worker對(duì)象
        w = new Worker(firstTask);
        // 每一個(gè)Worker對(duì)象都會(huì)創(chuàng)建一個(gè)線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態(tài);
                // 如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且firstTask為null,向線程池中添加線程。
                // 因?yàn)樵赟HUTDOWN時(shí)不會(huì)在添加新的任務(wù),但還是會(huì)執(zhí)行workQueue中的任務(wù)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個(gè)HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動(dòng)線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

注意一下這里的t.start()這個(gè)語句,啟動(dòng)時(shí)會(huì)調(diào)用Worker類中的run方法,Worker本身實(shí)現(xiàn)了Runnable接口,所以一個(gè)Worker類型的對(duì)象也是一個(gè)線程。

Worker類
線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是一組Worker對(duì)象,看一下Worker的定義:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker類繼承了AQS,并實(shí)現(xiàn)了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。

在調(diào)用構(gòu)造方法時(shí),需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個(gè)線程,newThread方法傳入的參數(shù)是this,因?yàn)閃orker本身繼承了Runnable接口,也就是一個(gè)線程,所以一個(gè)Worker對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用Worker類中的run方法。

Worker繼承了AQS,使用AQS來實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用ReentrantLock來實(shí)現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的

runWorker方法
在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的代碼如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 獲取第一個(gè)任務(wù)
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允許中斷
    w.unlock(); // allow interrupts
    // 是否因?yàn)楫惓M顺鲅h(huán)
    boolean completedAbruptly = true;
    try {
        // 如果task為空,則通過getTask來獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

這里說明一下第一個(gè)if判斷,目的是:

如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài);
如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài);
這里要考慮在執(zhí)行該if語句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會(huì)把狀態(tài)設(shè)置為STOP,回顧一下STOP狀態(tài):

不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài)。

STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態(tài)時(shí)線程是非中斷狀態(tài)的,因?yàn)門hread.interrupted()方法會(huì)復(fù)位中斷的狀態(tài)。

總結(jié)一下runWorker方法的執(zhí)行過程:

while循環(huán)不斷地通過getTask()方法獲取任務(wù);
getTask()方法從阻塞隊(duì)列中取任務(wù);
如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài);
調(diào)用task.run()執(zhí)行任務(wù);
如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;
runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。
這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實(shí)現(xiàn)。

completedAbruptly變量來表示在執(zhí)行任務(wù)過程中是否出現(xiàn)了異常,在processWorkerExit方法中會(huì)對(duì)該變量的值進(jìn)行判斷。

getTask方法
getTask方法用來從阻塞隊(duì)列中取任務(wù),代碼如下:

private Runnable getTask() {
    // timeOut變量的值表示上次從阻塞隊(duì)列中取任務(wù)時(shí)是否超時(shí)
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 如果線程池狀態(tài)rs >= SHUTDOWN,也就是非RUNNING狀態(tài),再進(jìn)行以下判斷:
         * 1. rs >= STOP,線程池是否正在stop;
         * 2. 阻塞隊(duì)列是否為空。
         * 如果以上條件滿足,則將workerCount減1并返回null。
         * 因?yàn)槿绻?dāng)前線程池狀態(tài)的值是SHUTDOWN或以上時(shí),不允許再向阻塞隊(duì)列中添加任務(wù)。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed變量用于判斷是否需要進(jìn)行超時(shí)控制。
        // allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進(jìn)行超時(shí);
        // wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
        // 對(duì)于超過核心線程數(shù)量的這些線程,需要進(jìn)行超時(shí)控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /*
         * wc > maximumPoolSize的情況是因?yàn)榭赡茉诖朔椒▓?zhí)行階段同時(shí)執(zhí)行了setMaximumPoolSize方法;
         * timed && timedOut 如果為true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制,并且上次從阻塞隊(duì)列中獲取任務(wù)發(fā)生了超時(shí)
         * 接下來判斷,如果有效線程數(shù)量大于1,或者阻塞隊(duì)列是空的,那么嘗試將workerCount減1;
         * 如果減1失敗,則返回重試。
         * 如果wc == 1時(shí),也就說明當(dāng)前線程是線程池中唯一的一個(gè)線程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根據(jù)timed來判斷,如果為true,則通過阻塞隊(duì)列的poll方法進(jìn)行超時(shí)控制,如果在keepAliveTime時(shí)間內(nèi)沒有獲取到任務(wù),則返回null;
             * 否則通過take方法,如果這時(shí)隊(duì)列為空,則take方法會(huì)阻塞直到隊(duì)列不為空。
             * 
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,說明已經(jīng)超時(shí),timedOut設(shè)置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回循環(huán)重試
            timedOut = false;
        }
    }
}

這里重要的地方是第二個(gè)if判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時(shí),如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時(shí),則可以增加工作線程,但這時(shí)如果超時(shí)沒有獲取到任務(wù),也就是timedOut為true的情況,說明workQueue已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。

什么時(shí)候會(huì)銷毀?當(dāng)然是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動(dòng)回收。

getTask方法返回null時(shí),在runWorker方法中會(huì)跳出while循環(huán),然后會(huì)執(zhí)行processWorkerExit方法。

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly值為true,則說明線程執(zhí)行時(shí)出現(xiàn)了異常,需要將workerCount減1;
    // 如果線程執(zhí)行時(shí)沒有出現(xiàn)異常,說明在getTask()方法中已經(jīng)已經(jīng)對(duì)workerCount進(jìn)行了減1操作,這里就不必再減了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統(tǒng)計(jì)完成的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        // 從workers中移除,也就表示著從線程池中移除了一個(gè)工作線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
    tryTerminate();

    int c = ctl.get();
    /*
     * 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時(shí),如果worker是異常結(jié)束,那么會(huì)直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待隊(duì)列有任務(wù),至少保留一個(gè)worker;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個(gè)工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進(jìn)入processWorkerExit方法,整個(gè)線程結(jié)束,如圖所示:


【2,3部分轉(zhuǎn)自:https://juejin.im/entry/58fada5d570c350058d3aaad

4 線程池如何實(shí)現(xiàn)復(fù)用的

線程重用的核心是,我們知道,Thread.start()只能調(diào)用一次,一旦這個(gè)調(diào)用結(jié)束,則該線程就到了stop狀態(tài),不能再次調(diào)用start。
則要達(dá)到復(fù)用的目的,則必須從Runnable接口的run()方法上入手,可以這樣設(shè)計(jì)這個(gè)Runnable.run()方法(就叫外面的run()方法):
它本質(zhì)上是個(gè)無限循環(huán),跑的過程中不斷檢查我們是否有新加入的子Runnable對(duì)象(就叫內(nèi)部的runnable:run()吧,它就是用來實(shí)現(xiàn)我們自己的任務(wù)),有就調(diào)一下我們的run(),其實(shí)就一個(gè)大run()把其它小run()#1,run()#2,...給串聯(lián)起來了,基本原理就這么簡單

詳細(xì)請(qǐng)看:https://www.cnblogs.com/myseries/p/10895078.html

5 java 中自帶的幾個(gè)線程池
  • FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定線程數(shù)的線程池:最大線程數(shù)和核心線程數(shù)相同,在默認(rèn)設(shè)置時(shí),線程不受keepAliveTime影響;使用的無界隊(duì)列,則表示運(yùn)行中不會(huì)拒絕任務(wù),由于newFixedThreadPool只有核心線程,并且這些線程都不會(huì)被回收,也就是它能夠更快速的響應(yīng)外界請(qǐng)求

  • SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

單一任務(wù)線程池:保證任務(wù)按順序執(zhí)行,其他的參數(shù)和Fix 無異.這一個(gè)任務(wù)處于活動(dòng)狀態(tài)時(shí),其他任務(wù)都會(huì)在任務(wù)隊(duì)列中排隊(duì)等候依次執(zhí)行,所以在這個(gè)任務(wù)執(zhí)行之間我們不需要處理線程同步的問題。

  • CachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

緩沖線程池:是一個(gè)根據(jù)需求創(chuàng)建新線程的線程池,max 是無界的,提供了一個(gè)沒有容量的隊(duì)列,如果主線程提供任務(wù)的速度大于線程處理的速度,則會(huì)不斷的創(chuàng)建線程,極端情況會(huì)耗盡cpu和內(nèi)存資源,所以建議執(zhí)行好事少的任務(wù)

  • SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
  • ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

定期執(zhí)行任務(wù)的線程池:可以根據(jù)給定的時(shí)間定期的執(zhí)行任務(wù)

使用方法

ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.schedule(new Runnable() {
    public void run() {
        System.out.println(Thread.currentThread().getName()+"延遲三秒執(zhí)行");
    }
}, 3, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Runnable() {
    public void run() {
        System.out.println(Thread.currentThread().getName()+"延遲三秒后每隔2秒執(zhí)行");
    }
}, 3, 2, TimeUnit.SECONDS);

schedule(Runnable command, long delay, TimeUnit unit):延遲一定時(shí)間后執(zhí)行Runnable任務(wù);
schedule(Callable callable, long delay, TimeUnit unit):延遲一定時(shí)間后執(zhí)行Callable任務(wù);
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):延遲一定時(shí)間后,以間隔period時(shí)間的頻率周期性地執(zhí)行任務(wù);
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit):與scheduleAtFixedRate()方法很類似,但是不同的是scheduleWithFixedDelay()方法的周期時(shí)間間隔是以上一個(gè)任務(wù)執(zhí)行結(jié)束到下一個(gè)任務(wù)開始執(zhí)行的間隔,而scheduleAtFixedRate()方法的周期時(shí)間間隔是以上一個(gè)任務(wù)開始執(zhí)行到下一個(gè)任務(wù)開始執(zhí)行的間隔,也就是這一些任務(wù)系列的觸發(fā)時(shí)間都是可預(yù)知的。

線程池的使用技巧

需要針對(duì)具體情況而具體處理,不同的任務(wù)類別應(yīng)采用不同規(guī)模的線程池,任務(wù)類別可劃分為CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。(N代表CPU個(gè)數(shù))

CPU密集型任務(wù):線程池中線程個(gè)數(shù)應(yīng)盡量少,如配置N+1個(gè)線程的線程池。
IO密集型任務(wù):由于IO操作速度遠(yuǎn)低于CPU速度,那么在運(yùn)行這類任務(wù)時(shí),CPU絕大多數(shù)時(shí)間處于空閑狀態(tài),那么線程池可以配置盡量多些的線程,以提高CPU利用率,如2*N。
混合型任務(wù):可以拆分為CPU密集型任務(wù)和IO密集型任務(wù),當(dāng)這兩類任務(wù)執(zhí)行時(shí)間相差無幾時(shí),通過拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類任務(wù)執(zhí)行時(shí)間有數(shù)據(jù)級(jí)的差距,那么沒有拆分的意義。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ThreadPoolExecutor解析 Java里線程池的基本接口是 Executor: 實(shí)現(xiàn)線程池的類是Thr...
    Cris_Ma閱讀 707評(píng)論 0 0
  • 轉(zhuǎn)自http://www.cnblogs.com/dolphin0520/p/3932921.html Java并...
    Allen_cyn閱讀 2,005評(píng)論 0 4
  • Java并發(fā)編程:線程池的使用 在前面的文章中,我們使用線程的時(shí)候就去創(chuàng)建一個(gè)線程,這樣實(shí)現(xiàn)起來非常簡便,但是就會(huì)...
    逗逼程序員閱讀 522評(píng)論 0 2
  • 因?yàn)橹暗穆犈笥汛致缘慕榻B和自己粗枝大葉的吸收,古詩背的目的不明章法不分。今天從識(shí)字營重新開始。 中午回家坐陽臺(tái)看...
    夏更幽閱讀 118評(píng)論 0 0
  • 持續(xù)分享570天,2019年2月3日,張紅。 家里清理干凈,今天就自己老老實(shí)實(shí)的看了一天電視。 挺好!都是一些勵(lì)志...
    啊呦a7_94閱讀 186評(píng)論 0 1

友情鏈接更多精彩內(nèi)容