源碼篇-ThreadPoolExecutor

一、執(zhí)行任務(wù)

public void execute(Runnable command) {
    // 如果任務(wù)為空,直接拋異常
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    
    // 獲取線程池運行狀態(tài)
    int c = ctl.get();
    // 如果運行的線程數(shù)小于核心數(shù),添加worker
    if (workerCountOf(c) < corePoolSize) {
        // 添加worker,并將core設(shè)為true,表示是核心線程
        if (addWorker(command, true))
            return;
        // 如果添加失敗重新獲取線程池運行狀態(tài)
        c = ctl.get();
    }
    // 如果線程池在運行且隊列未滿
    if (isRunning(c) && workQueue.offer(command)) {     
        int recheck = ctl.get();
        // 如果線程池不在運行且刪除任務(wù)成功,執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池工作線程為0,添加空的任務(wù)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 走到這里,說明核心線程數(shù)用完且任務(wù)隊列已滿,那么啟用非核心線程數(shù),如果失敗,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}
  • 首先用核心線程執(zhí)行任務(wù),如果核心線程已滿,將任務(wù)添加到任務(wù)隊列;如果隊列也滿了,那么用非核心線程執(zhí)行任務(wù)
  • addWorker(Runnable firstTask, boolean core)第一個參數(shù)是執(zhí)行的任務(wù),第二個參數(shù)如果為true,表示用的是核心線程,false表示用的是非核心線程
  • 成員變量ctl是AtomicInteger類型,用來表示線程運行狀態(tài)和線程數(shù),高3位表示運行狀態(tài),低29位表示運行線程數(shù)
private boolean addWorker(Runnable firstTask, boolean core) {
    
    //retry用來判斷是否可以添加任務(wù),并更新線程數(shù)    
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果線程池已經(jīng)關(guān)閉且沒有任務(wù),直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取工作線程數(shù)
            int wc = workerCountOf(c);
            // 如果工作線程數(shù)大于等于最大線程數(shù),直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 更新線程數(shù),如果成功,跳出retry
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到這,說明更新線程數(shù)失敗了,重新獲取線程池狀態(tài)
            c = ctl.get();  // Re-read ctl
            // 如果線程池狀態(tài)變化了,從retry重新執(zhí)行;如果線程池狀態(tài)沒有變化,繼續(xù)for循環(huán)
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 創(chuàng)建Worker對象
        w = new Worker(firstTask);
        // 獲取線程
        final Thread t = w.thread;
        // 如果線程不為空,啟動線程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                
                // 再次獲取狀態(tài)
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 因為線程還沒啟動,所以這里線程是alive,說明是不正常的
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將新創(chuàng)建的worker加入到workers集合
                    workers.add(w);
                    int s = workers.size();
                    // 更新largestPoolSize值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // workerAdded標記為true
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果任務(wù)添加完成,啟動線程且將workerStarted標記為true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果啟動線程失敗,從workers刪除新創(chuàng)建的任務(wù),且執(zhí)行tryTerminate
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 最后返回線程是否啟動成功
    return workerStarted;
}
  • 首先去更新工作的線程數(shù)
  • 創(chuàng)建Worker對象,此時會創(chuàng)建Thread類型的成員變量thread,Worker對象會傳入到該線程,因為Worker對象實現(xiàn)了Runnable方法,所以啟動線程thread時,會執(zhí)行Worker的run方法
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

二、執(zhí)行任務(wù)

1. 發(fā)起任務(wù)執(zhí)行

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    // 用來表示是否正常執(zhí)行任務(wù),true表示被打斷了,false表示未被打斷
    boolean completedAbruptly = true;
    try {
        // 如果worker對象有傳入了任務(wù)或者任務(wù)隊列有任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            
            // 如果線程池狀態(tài)是停止以上的級別
            // 或者
            // 線程已經(jīng)被中斷且狀態(tài)是停止以上的級別且當前線程還不是打斷狀態(tài)
            // 那么中斷線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 方法執(zhí)行前,空方法,留給子類實現(xiàn)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執(zhí)行完后,空方法,留給子類實現(xiàn)
                    afterExecute(task, thrown);
                }
            } finally {               
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        // 被中斷標記設(shè)為false
        completedAbruptly = false;
    } finally {
        // 任務(wù)完成后的操作
        processWorkerExit(w, completedAbruptly);
    }
}        
  • 首先執(zhí)行傳入worker對象里的任務(wù),如果為空,則從任務(wù)隊列里獲取任務(wù)
  • 判斷是否需要打斷線程
  • 執(zhí)行任務(wù)
  • 任務(wù)完成相關(guān)的操作
2. 獲取任務(wù)
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        // 獲取線程池狀態(tài)
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        
        
        // 如果線程池狀態(tài)關(guān)閉且任務(wù)隊列為空
        // 或者
        // 線程池狀態(tài)是停止
        // 那么將線程數(shù)減1,返回空任務(wù)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 獲取線程數(shù)
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        
        // 如果allowCoreThreadTimeOut為真,表示核心線程也有超時時間,一般默認為false
        // 或者
        // 工作線程數(shù)超過核心線程數(shù)
        // 那么將timed設(shè)為真,設(shè)為真的目的是為了沒任務(wù)的時候,減少工作線程的數(shù)量
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果工作線程大于最大線程數(shù)或者超時了
        // 且
        // 工作線程數(shù)大于1或者任務(wù)隊列不為空
        // 那么工作線程減1
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果有超時設(shè)置,那么帶有超時時間獲取任務(wù),否則就阻塞獲取任務(wù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 如果任務(wù)不為空,返回任務(wù)
            if (r != null)
                return r;
            // 如果任務(wù)為空,將超時設(shè)置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • 這部分代碼能說明線程復(fù)用,超時未獲取到任務(wù)減少線程的原理
  • 如果設(shè)置了核心線程有超時時間或者線程數(shù)超過了核心線程數(shù),那么采用帶超時的方式獲取任務(wù),如果沒有獲取到任務(wù),那么線程數(shù)會減1;如果不采用帶超時的方式獲取任務(wù),那么一直等待,知道從任務(wù)隊列里獲取了任務(wù)

3. 退出任務(wù)執(zhí)行

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 從任務(wù)集合中刪除任務(wù)
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 嘗試終止任務(wù)
    tryTerminate();

    int c = ctl.get();
    // 如果線程池狀態(tài)小于STOP,說明還需要工作線程
    if (runStateLessThan(c, STOP)) {
        // 如果任務(wù)執(zhí)行被中斷了,保證至少還有1個工作線程在執(zhí)行
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 相當于線程還在工作
        addWorker(null, false);
    }
}
  • 從任務(wù)集合中刪除當前任務(wù)
  • 嘗試終止線程池
  • 如果線程池狀態(tài)是小于STOP,保證有工作線程在工作
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // isRunning(c) 表示在運行,不能停止
        // runStateAtLeast(c, TIDYING)表示已經(jīng)停止了,沒必要停止
        // (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())表示關(guān)閉但是有任務(wù),不能停止
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
       // 如果工作線程數(shù)不等于0,停止1個空閑的工作線程,通過tryLock判斷是否空閑
       if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 走到這,說明工作線程是0了,將狀態(tài)改為TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 終止線程池,空方法,留給子類實現(xiàn)
                    terminated();
                } finally {
                    // 最后將狀態(tài)改為TERMINATED,通知等待線程池終止的線程
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
  • 如果線程池狀態(tài)不在運行,且工作線程數(shù)為0,那么最終將線程池狀態(tài)改為TERMINATED

三、拒絕策略

1. 直接拋異常(默認策略)
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
2. 調(diào)用者的線程執(zhí)行任務(wù)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
3. 丟掉最早的任務(wù)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
4. 空的策略
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

四、線程工廠

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        // 設(shè)置線程組
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        // 設(shè)置線程前綴名
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        // 創(chuàng)建線程
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

五、常見四種線程池

1.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • 可自定義線程數(shù)和線程工廠,核心線程數(shù)與最大線程數(shù)相等,這樣的話線程一旦創(chuàng)建,就會一直運行
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
  • 只可以自定義線程工廠,核心線程數(shù)為0,最大線程數(shù)為Integer最大值
  • 相當于沒有限制工作線程數(shù),任務(wù)量大的時候,會影響機器性能
  • 空任務(wù)的時候,會有1個線程在運行
3.newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}    
  • 只創(chuàng)建一個工作線程,可自定義線程工廠,可以定時執(zhí)行任務(wù)
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}    
  • 可定義線程工廠和核心工作線程數(shù),最大工作線程數(shù)為Integer.MAX_VALUE
  • 可定時執(zhí)行任務(wù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容