線程池原理

image

核心屬性

  1. corePoolSize :核心線程數,一般情況下,該數量的核心線程創(chuàng)建好之后,會常駐在線程池中,不會應空閑而關閉,可以設置allowCoreThreadTimeOut=true使核心線程空閑關閉
  2. maximumPoolSize :最大線程數,>核心線程數。
  3. keepAliveTime : 空閑時間,當線程獲取任務時,超過keepAliveTime仍然獲取不到任務,那么線程執(zhí)行完所有邏輯后,自動消亡,workerSet也會移除該worker對象
  4. unit : 空閑事件keepAliveTime 的單位
  5. BlockingQueue<Runnable> workQueue : 任務的阻塞隊列,當前提交任務時,工作線程已經>= 核心線程數, 則會將任務 推入阻塞隊列中。如果阻塞隊列達到最大長度,則會在工作線程數 不超過最大線程數maximumPoolSize的情況下,繼續(xù)創(chuàng)建空閑線程來處理任務。
  6. ThreadFactory threadFactory: 創(chuàng)建線程的工廠
  7. RejectedExecutionHandler handler :任務的拒絕策略。當線程數任務阻塞隊列滿了,且工作線程數 大于等于 最大線程數了, 則 線程池無法調度線程則處理任務,調用構造方法傳入的RejectedExecutionHandler實例的rejectedExecution()方法來拒絕任務。
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

線程池狀態(tài)

ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數量

狀態(tài) value 說明
RUNNING(當線程池創(chuàng)建出來的初始狀態(tài)) 111 能接受任務,能執(zhí)行阻塞任務
SHUTDOWN(調用shutdown方法) 000 不接受新任務,能執(zhí)行阻塞任務 肯定可以 執(zhí)行正在執(zhí)行的任務
STOP(調用shutDownNow) 001 不接受新任務,打斷正在執(zhí)行的任務,丟棄阻塞任務
TIDYING(中間狀態(tài)) 010 任務全部執(zhí)行完,活動線程也沒了
TERMINATED(終結狀態(tài)) 011 線程池終結

常用api

execute()

執(zhí)行任務,無返回值

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 則創(chuàng)建新增核心線程,并執(zhí)行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
    // 如果 阻塞隊列還沒有滿,則是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
    // 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
    else if (!addWorker(command, false))
        // 第四 : 種情況如果 >= maximumPoolSize,執(zhí)行拒絕策略
        reject(command);
}

submit(Runable)

會返回Future對象,調用Future對象的get(),會阻塞,直到拿到返回值返回

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

doInvokeAny

返回最快執(zhí)行完的任務的結果,集合中其他正在執(zhí)行的線程會被關閉

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)

invokeAll

執(zhí)行所有任務,返回List<Future<T>>

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

shutdown()

不會接收新的任務,但是已經運行和在隊列中的任務會執(zhí)行完,然后在關閉線程

線程狀態(tài)變成SHUTDOWN

 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

awaitTermination(long timeout, TimeUnit unit)

等待線程池關閉,會提前也會超時

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

shutdownNow

打斷所有所有正在執(zhí)行的任務,返回隊列中的任務

線程狀態(tài)變成STOP

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // 打斷正在工作的線程
            interruptWorkers();
            // 從隊列中取出等待的任務
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        // 返回隊列中等待的任務
        return tasks;
    }

常用的線程池配置

jdk的Executors類提供了4個創(chuàng)建線程池的配置方法, 通過之前的原理,我們來分析下這些線程池的不同

1. newFixedThreadPool

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

創(chuàng)建一個 通過操作一個共享的無界隊列來復用固定數量的線程的線程池。

首先看構造方法,核心線程數和最大線程數是一樣的 ,說明不存在線程池擴容的情況

空閑有效時間為0 毫秒, 由于只存在核心線程,所以不存在 線程被注銷的情況

LinkedBlockingQueue 是一個無界隊列,默認大小為int的最大值,所以不會出現(xiàn) 隊列長度不夠而導致 創(chuàng)建空閑線程的情況,也就不會出現(xiàn) 拒絕策略。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

總結 :

  1. 線程數固定,
  2. 沒有多余線程線程回收,
  3. 不會出現(xiàn)因線程不夠,隊列裝不下而拒絕任務的情況

2.newSingleThreadExecutor

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

只有一個線程,無界隊列

3.newCachedThreadPool

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

最大線程數很大,沒有核心線程,但是空閑時間 較長有1分鐘, 說明適用于 任務周期較短, 很多線程都可以快速處理完任務,并被復用,超過一分鐘線程就被注銷

4.newScheduledThreadPool

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

定時任務線程池

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @return a newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
延遲執(zhí)行
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
重復執(zhí)行

自定義

schedule(){

        // dosomething
// 遞歸
        schedule();

}

api

scheduleWithFixedDelay

執(zhí)行完任務再計算延遲時間

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 * @throws IllegalArgumentException   {@inheritDoc}
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
scheduleAtFixedRate

從任務開始執(zhí)行就計算延遲時間

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
案例 每周三22點執(zhí)行
public static void main(String[] args) {
    ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
    // 計算當前時間距離目標時間還有多久
    //  初始化延遲時間 = 目標時間 - 當前時間
    // 周期  = 7天
    int period = 7;
    scheduled.scheduleAtFixedRate(() -> {
        //doSomething
    }, 初始化延遲時間, period, TimeUnit.DAYS)
}

源碼解析

流程圖

image

一 、任務的執(zhí)行以及線程的創(chuàng)建 : execute(Runnable task):

傳入Runnable 對象作為要執(zhí)行的任務。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 則創(chuàng)建新增核心線程,并執(zhí)行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
    // 如果 阻塞隊列還沒有滿,則是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 判斷工作線程是否 = 0,這種情況是 核心線程數為0的時候,需要創(chuàng)建空閑線程來處理隊列中的任務,比如CachedThreadPool,第一次進來
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
    // 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
    else if (!addWorker(command, false))
        // 第四 : 種情況如果 >= maximumPoolSize,執(zhí)行拒絕策略
        reject(command);
}

1. 當前工作線程小于核心線程數,則創(chuàng)建Worker對象,加入到workerSet中

image
addWorker(command, true)

創(chuàng)建Worker對象(持有線程),并調用持有線程的start方法,在run方法中執(zhí)行runnable

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取當前工作線程數
            int wc = workerCountOf(c);
            //  如果當前工作線程數 大于最大線程數 2 ^ 29 -1 ,或者大于 (根據當前添加工作線程的類型) 核心線程數 還是 線程池最大線程數
            // 核心線程 判斷是否 > corePoolSize,空閑線程,判斷 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // cas,工作線程數+1,退出循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 下面走創(chuàng)建線程的邏輯
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 線程一個worker, Worker 是Thread的子類,傳入runnable對象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加鎖
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 不是關閉狀態(tài)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加到工作線程集合中
                    workers.add(w);
                    // 當前工作線程集合大小
                    int s = workers.size();
                    // 更新 線程池 線程數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 執(zhí)行task
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker.Run方法
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 執(zhí)行任務
    public void run() {
        runWorker(this);
    }
}

在run方法中調用runWorker方法,傳入當前對象

  1. 該worker對象第一次執(zhí)行任務時,w.firstTask是!= null的,所以可以進入while的循環(huán)體, 執(zhí)行Runnable的run方法
  2. 第二次進來則從阻塞隊列中拿任務。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // task第一次被創(chuàng)建時,構造方法傳入了Runnable對象,所以現(xiàn)在是!= null的
    Runnable task = w.firstTask;
    // 之后清空,第二次進來是null
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 無限循環(huán),當前有任務未執(zhí)行 或者 阻塞隊列中有任務
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行runnable的run方法執(zhí)行業(yè)務邏輯
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 允許核心線程超時關閉 或者 當前工作線程數 > 核心線程數
        // 線程關閉前,從worderSet中移除worker對象
        processWorkerExit(w, completedAbruptly);
    }
}

2. 當前工作線程數已達到核心線程數了,但是阻塞隊列還沒滿

則會往workQueue 存入Runnable對象

如果 隊列長度還沒達到上限,則offer方法會成功存入Runnable對象,返回true

如果 隊列長度已達到上限,則返回false,說明當前工作線程從隊列中拿task,處理task的速度還不夠,會創(chuàng)建非工作線程。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 則創(chuàng)建新增核心線程,并執(zhí)行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
    // 如果 阻塞隊列還沒有滿,則是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
    // 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
    else if (!addWorker(command, false))
        // 如果 >= maximumPoolSize,執(zhí)行拒絕策略
        reject(command);
}

3. 阻塞隊列已滿,添加task失敗,就會嘗試創(chuàng)建空閑線程

創(chuàng)建空閑線程的方法和創(chuàng)建核心線程的方法都是addWorker(runnable,boolean core),只不過傳入的core參數是false,表示是空閑線程

如果是空閑線程創(chuàng)建,則會判斷當前工作線程數是否 > 最大線程數maximumPoolSize, 如果是創(chuàng)建核心線程則判斷的是 核心線程數

如果大于 最大線程數maximumPoolSize 就會創(chuàng)建線程失敗

 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取當前工作線程,
            int wc = workerCountOf(c);
            // 如果當前工作線程數 大于最大線程數 2 ^ 29 -1 ,或者大于 (根據當前添加工作線程的類型) 核心線程數 還是 線程池最大線程數
            // 核心線程 判斷是否 > corePoolSize,空閑線程,判斷 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 后面創(chuàng)建線程Worker對象和創(chuàng)建核心線程是一模一樣的
 }

4. 任務拒絕:阻塞隊列滿了,并且 工作線程數已經達到最大線程數了, 則嘗試創(chuàng)建空閑線程會失敗,走任務的拒絕策略。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 則創(chuàng)建新增核心線程,并執(zhí)行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
    // 如果 阻塞隊列還沒有滿,則是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
    // 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
    else if (!addWorker(command, false))
        // 如果 >= maximumPoolSize,執(zhí)行拒絕策略
        reject(command);
}

jdk提供的拒絕策略類 :

image
  1. AbortPolicy 拋異常

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
    
        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
  2. DiscardPolicy 丟棄 = 啥也不干

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
    
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
  3. DiscardOldestPolicy 推出并忽略阻塞隊列中的第一個任務,嘗試執(zhí)行當前任務

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
         // poll出阻塞隊列中的第一個任務,并忽略掉
            e.getQueue().poll();
            // 以執(zhí)行當前任務
            e.execute(r);
        }
    }
}

................................................

二 、線程的維護

線程池中的作用維護線程,避免頻繁創(chuàng)建,銷毀線程而帶來系統(tǒng)資源的浪費。

核心線程默認(可以配置allowCoreThreadTimeOut = true 來設置 注銷核心線程 )是不會在執(zhí)行完某一個任務后被注銷的

空閑線程 在空閑時間達到keepAliveTime 后, 會自動注銷(執(zhí)行完run方法)。

1. 線程的阻塞 :

Worker.runWorker(Worker w)

線程的執(zhí)行方法中,用while的方式,判斷 當前是否有任務(第一次被創(chuàng)建出來) 或者 從阻塞隊列中拿任務

  1. 判斷 當前是否有任務(第一次被創(chuàng)建出來)
  2. 阻塞隊列中有任務 :execute任務時,當工作線程數 > 大于核心線程數時且 阻塞隊列沒有滿時, 會把任務存入阻塞隊列
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // task第一次被創(chuàng)建時,構造方法傳入了Runnable對象,所以現(xiàn)在是!= null的
    Runnable task = w.firstTask;
    // 之后清空,第二次進來是null
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 無限循環(huán),當前有任務未執(zhí)行 或者 阻塞隊列中有任務
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行runnable的run方法執(zhí)行業(yè)務邏輯
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 允許核心線程超時關閉 或者 當前工作線程數 > 核心線程數
        // 線程關閉前,從worderSet中移除worker對象
        processWorkerExit(w, completedAbruptly);
    }
}

如果可以不停的獲取任務,處理任務,這種情況下 所有線程都不會被注銷,因為無法退出while循環(huán)

2. 線程的注銷

但是當沒有任務提交時,也就是當前任務沒有,阻塞隊列里也拿不到任務,線程則處于空閑狀態(tài),空閑線程 空閑狀態(tài)下的時間達到keepAliveTime ,則會退出while循環(huán),結束線程。

而核心線程則會在getTask中(如果沒配置allowCoreThreadTimeOut=true) 阻塞住, 不返回結果,直到阻塞隊列中可以獲取到任務, 再進入while循環(huán)體。

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling? 
        // 是否允許核心線程超時關閉 或者 當 工作線程數 > 核心線程數了
        // 當 線程中 只剩下核心線程的時候, wc > corePoolSize 就不會返回true,則會workQueue.take()阻塞住
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果允許核心線程超時注銷 或者 當前工作線程數 > 核心線程數, 則調阻塞隊列的 poll,超時返回null
            // 否則調take()方法,一直拿不到就一直阻塞
            
            // 這就說明,只有允許核心線程超時注銷,或者 當 當前工作線程數 > 核心線程數時,才會調 阻塞隊列會超時的poll方法,
            // runWorker方法才會退出while循環(huán)體, 結束線程
            
            // 如果allowCoreThreadTimeOut被設置為true,則所有線程從隊列中拿任務調用的都是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,所有線程在poll超時之后,仍然沒獲取到任務,則會返回 null ,退出循環(huán)體, 結束線程
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

從workers移除線程

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 從workers移除線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允許核心線程超時關閉,則為0,否則為corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果當前工作線程數 > 最小的線程數量
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 小于最小的線程數量,添加worker
        addWorker(null, false);
    }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容