ThreadPoolExecutor源碼學(xué)習(xí)筆記

轉(zhuǎn)載請(qǐng)附原文鏈接:ThreadPoolExecutor源碼學(xué)習(xí)筆記
歡迎來我的個(gè)人主頁交流:http://extremej.itscoder.com/

大部分分析以注釋形式寫在源碼中

本篇筆記將從 ThreadPoolExecutor 的一次使用上來分析源碼,主要涉及線程池創(chuàng)建,execute 的步驟,任務(wù)添加到阻塞隊(duì)列,線程從阻塞隊(duì)列中拿取任務(wù)執(zhí)行,線程的回收,線程池的終止。

涉及到的類有

  • Executors — 獲取線程池

  • ThreadPoolExecutor — 線程池

  • Worker — 工作線程

  • LinkedBlockingQueue — 阻塞隊(duì)列

  • RejectedExecutionHandler — 任務(wù)拒絕處理器(實(shí)在不知道什么翻譯~)

    ?

線程池的獲取

我們知道可以通過 Executors 來獲取不同類型的線程池,那么就從 Executors 來開始看它是如何返回不同類型的線程池的,看看我們常用的一些方法

//獲取一個(gè)固定線程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
//獲取只有一個(gè)線程的池子
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
//獲取一個(gè)緩存線程池,可變
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

從上面的三個(gè)方法可以發(fā)現(xiàn)其實(shí)都是 new 了一個(gè) ThreadPoolExecutor ,但是傳入的參數(shù)不同,我們進(jìn)到這個(gè)構(gòu)造方法中去一探究竟,看看不同的參數(shù)到底代表了什么

//構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

通過參數(shù)名稱以及注釋可以知道這幾個(gè)參數(shù)的作用分別是

  • corePoolSize — 核心線程數(shù),即允許閑置的線程數(shù)目
  • maximumPoolSize — 最大線程數(shù),即這個(gè)線程池的容量
  • keepAliveTime — 非核心線程的閑置存活時(shí)間
  • unit — 上一個(gè)參數(shù)的單位
  • workQueue — 任務(wù)隊(duì)列(阻塞隊(duì)列)
  • threadFacotry — 線程創(chuàng)建工廠
  • handler — 當(dāng)線程池或者任務(wù)隊(duì)列容量已滿時(shí)用于reject

這里要明白一件事情,核心線程只是通過數(shù)目來判斷,而不是說先創(chuàng)建的線程就是核心線程

在構(gòu)造方法里面初始化了成員變量值,通過構(gòu)造方法應(yīng)該明白了不同類型的線程獲取的原理。

任務(wù)的執(zhí)行

A.狀態(tài)屬性

在看源碼之前先了解一下 ThreadPoolExecutor的幾個(gè)狀態(tài)屬性,這對(duì)后面的源碼閱讀有很重要的作用,ThreadPoolExecutor 有五種狀態(tài)

private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

從上到下依次是

  • RUNNING — 運(yùn)行狀態(tài),可以添加新任務(wù),也可以處理阻塞隊(duì)列中的任務(wù)
  • SHUTDOWN — 待關(guān)閉狀態(tài),不再接受新的任務(wù),會(huì)繼續(xù)處理阻塞隊(duì)列中的任務(wù)
  • STOP — 停止?fàn)顟B(tài),此時(shí)的線程池不處理任何任務(wù)
  • TIDYING — 整理狀態(tài),也可以理解為預(yù)終結(jié)狀態(tài),這個(gè)時(shí)候任務(wù)都處理完畢,池中無有效線程
  • TERMINATED — 終止?fàn)顟B(tài)

B.execute(Runnable command)

當(dāng)獲取到了一個(gè)線程池之后,需要它來執(zhí)行異步任務(wù),也就是 execute(Runnable) ,傳入一個(gè) runnable 對(duì)象,在 run 方法中執(zhí)行我們的代碼,那么來看一下 execute() 是怎么工作的,因?yàn)樵创a的注釋解釋得十分清楚,這里將注釋也貼出來。簡單翻譯一下,當(dāng) execute 被調(diào)用時(shí)總共有三種情況。

  • 如果當(dāng)前的有效線程數(shù)小于核心線程數(shù),則試圖創(chuàng)建一個(gè)新的 worker 線程
  • 如果上面一步失敗了,則試圖將任務(wù)添加到阻塞隊(duì)列中,并且要再一次判斷需要不需要回滾隊(duì)列,或者說創(chuàng)建線程(后面會(huì)詳細(xì)說明)
  • 如果上面兩步都失敗了,則會(huì)試圖強(qiáng)行創(chuàng)建一個(gè)線程來執(zhí)行這個(gè)任務(wù),如果還是失敗,扔掉這個(gè)任務(wù)

了解了這三個(gè)步驟,來看看源碼,源碼中調(diào)用了 addworker 方法,這是創(chuàng)建線程的方法,會(huì)在后面講到

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
  //1.判斷有效線程數(shù)是否小于核心線程數(shù)
    if (workerCountOf(c) < corePoolSize) {
      //創(chuàng)建新線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
  //2.分開來看,首先判斷當(dāng)前的池子是否是處于 running 狀態(tài)
  //因?yàn)橹挥?running 狀態(tài)才可以接收新任務(wù)
  //接下來判斷能否成功添加到隊(duì)列中,如果隊(duì)列滿了或者其他情況則會(huì)跳到下一步
    if (isRunning(c) && workQueue.offer(command)) {
      //再次檢查池子的狀態(tài),如果進(jìn)入了非 running 狀態(tài),回滾隊(duì)列,扔掉這個(gè)任務(wù)
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
      //如果處于 running 狀態(tài)則檢查當(dāng)前的有效線程,如果沒有則創(chuàng)建一個(gè)線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  //3.前兩步失敗了,就強(qiáng)行創(chuàng)建線程,成功會(huì)返回true,如果失敗扔掉這個(gè)任務(wù)
    else if (!addWorker(command, false))
        reject(command);
}

解釋一下第二步,為什么要recheck

當(dāng)這個(gè)任務(wù)被添加到了阻塞隊(duì)列前,池子處于 RUNNING 狀態(tài),但如果在添加到隊(duì)列成功后,池子進(jìn)入了 SHUTDOWN 狀態(tài)或者其他狀態(tài),這時(shí)候是不應(yīng)該再接收新的任務(wù)的,所以需要把這個(gè)任務(wù)從隊(duì)列中移除,并且 reject

同樣,在沒有添加到隊(duì)列前,可能有一個(gè)有效線程,但添加完任務(wù)后,這個(gè)線程閑置超時(shí)或者因?yàn)楫惓1桓傻袅?,這時(shí)候需要?jiǎng)?chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)

C .addWorker()

前一步把 execute 的流程捋了一遍,里面多次出現(xiàn)了 addWorker() 方法,前文說到這是個(gè)創(chuàng)建線程的方法,來看看 addWorker 做了些什么,這個(gè)方法代碼比較長,我們拆開來一點(diǎn)一點(diǎn)看

  • 第一部分 — 判斷各種基礎(chǔ)異常
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

      // Check if queue empty only if necessary.
      // 檢查線程池狀態(tài),隊(duì)列狀態(tài),以及 firstask ,拆開來看
      // 這段代碼看起來異常的蛋疼,轉(zhuǎn)換一下邏輯即
     //rs>= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
      // 總結(jié)起來就是 當(dāng)前處于非 Running 狀態(tài),并且這三種情況
      // 1. 不是處于 SHUTDOWN 狀態(tài),不能再創(chuàng)建線程
      // 2. 有新的任務(wù) (因?yàn)椴荒茉俳邮招碌娜蝿?wù))
      // 3. 阻塞隊(duì)列中已經(jīng)沒有任務(wù) (不需要再創(chuàng)建線程)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        for (;;) {
            int wc = workerCountOf(c);//當(dāng)前有效線程數(shù)目
           // 根據(jù)傳入的參數(shù)確定以核心線程數(shù)還是最大線程數(shù)作為判斷條件
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
              // 大于容量 或者指定的線程數(shù),不允許創(chuàng)建
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
  }
  • 第二部分 — 試圖創(chuàng)建線程

創(chuàng)建一個(gè)Worker(什么東西?下文會(huì)講解,這里把它就當(dāng)成是一個(gè)線程的容器)

boolean workerStarted = false;//標(biāo)記 worker 開啟狀態(tài)
boolean workerAdded = false;//標(biāo)記 worker 添加狀態(tài)
Worker w = null;
try {
    w = new Worker(firstTask); //將這個(gè)任務(wù)作為 worker 的第一個(gè)任務(wù)傳入
    final Thread t = w.thread;//通過 worker 獲取到一個(gè)線程
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());
            
           // running狀態(tài),或者 shutdown 狀態(tài)但是沒有新的任務(wù)
            if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                // 將這個(gè) worker 添加到線程池中
                workers.add(w); 
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
              //標(biāo)記worker添加成功
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果 worker 創(chuàng)建成功,開啟線程
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

上面代碼從邏輯層面來看不算難懂,到這里一個(gè)任務(wù)到達(dá)后,ThreadPoolExecutor 的處理就結(jié)束了,那么任務(wù)又是怎么被添加到阻塞隊(duì)列中,線程是如何從隊(duì)列中取出任務(wù),上文中的 Worker 又是什么東西?

一個(gè)一個(gè)來,先來看看 Worker 到底是什么

D.Worker

Worker 是 ThreadPoolExecutor 的一個(gè)內(nèi)部類,實(shí)現(xiàn)了 Runnable 接口,繼承自 AbstractQueuedSynchronizer,這又是個(gè)什么鬼???我也不造~可以看看這篇文章

《Java并發(fā)包源碼學(xué)習(xí)之AQS框架(一)概述》

簡單來說,Worker實(shí)現(xiàn)了 lock 和 unLock 方法來標(biāo)示當(dāng)前線程的狀態(tài)是否為閑置

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

上一節(jié)創(chuàng)建線程成功后調(diào)用 t.start() 而這個(gè)線程又是 Worker 的成員變量

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

可以看到這里將 Worker 作為 Runnable 參數(shù)創(chuàng)建了一個(gè)新的線程,我們知道 Thread 接收一個(gè) Runnable 對(duì)象后 start 運(yùn)行的是 Runnable 的 run 方法,Worker 的 run 方法調(diào)用了 runWorker ,這個(gè)方法里面就是取出任務(wù)執(zhí)行的邏輯

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 獲取到 worker 的第一個(gè)任務(wù)
        w.firstTask = null;
        w.unlock(); // 標(biāo)記為閑置,還沒有開始任務(wù) 允許打斷
        boolean completedAbruptly = true; // 異常退出標(biāo)記
        try {
          // 循環(huán)取出任務(wù),如果第一個(gè)任務(wù)不為空,或者從隊(duì)列中拿到了任務(wù)
          // 只要這兩個(gè)條件滿足,會(huì)一直循環(huán),直到?jīng)]有任務(wù),正常退出,或者異常退出
            while (task != null || (task = getTask()) != null) {
                w.lock();// 該線程標(biāo)記為非閑置
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
              // 翻譯注釋:1.如果線程池STOPPING狀態(tài),需要中斷線程
              // 2.Thread.interrupted()是一個(gè)native方法,返回當(dāng)前線程是否有被等待中斷的請(qǐng)求
              // 3.第二個(gè)條件成立時(shí),檢查線程池狀態(tài),如果為STOP,并且沒有被中斷,則中斷線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
              // 執(zhí)行任務(wù)
                try {
                    beforeExecute(wt, task);// 執(zhí)行前
                    Throwable thrown = null;
                    try {
                        task.run(); // 執(zhí)行任務(wù)
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); // 執(zhí)行結(jié)束
                    }
                } finally {
                    task = null; // 將 worker 的任務(wù)置空
                    w.completedTasks++; 
                    w.unlock(); // 釋放鎖,進(jìn)入閑置狀態(tài)
                }
            }// 循環(huán)結(jié)束
            completedAbruptly = false; // 標(biāo)記為正常退出
        } finally {
          // 干掉 worker
            processWorkerExit(w, completedAbruptly);
        }
    }

這里弄清楚了一件事情,進(jìn)入循環(huán)準(zhǔn)備執(zhí)行任務(wù)時(shí),worker 加鎖標(biāo)記為非閑置,任務(wù)執(zhí)行完畢或者出現(xiàn)異常,worker 釋放鎖,進(jìn)入閑置狀態(tài)。

也就是當(dāng)一個(gè) worker 執(zhí)行任務(wù)前或者執(zhí)行完任務(wù),到取出下一個(gè)任務(wù)期間,都是閑置狀態(tài)可以被打斷

上面取出任務(wù)調(diào)用了 getTask() ,誒~為什么有一個(gè)死循環(huán),別著急,慢慢看來。上面的代碼可以知道如果 getTask 返回任務(wù)則執(zhí)行,如果返回為 null 則 worker 需要被回收

private Runnable getTask() {
  // 標(biāo)記取任務(wù)是否超時(shí)
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果線程池狀態(tài)為 STOP 或者 SHUTDOWN 并且隊(duì)列已經(jīng)為空,回收 wroker
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //獲取當(dāng)前有效線程數(shù)
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed 用來標(biāo)記當(dāng)前的 worker 是否設(shè)置超時(shí)時(shí)間,
        // 還記得獲取線程池的時(shí)候 可以設(shè)置核心線程超時(shí)時(shí)間
        //1.允許核心線程超時(shí)回收(即所有線程) 2.當(dāng)前有效線程超過核心線程數(shù)(需要回收)
        // 如果timed == false 則該worker不會(huì)被回收,如果沒有取到任務(wù) 會(huì)一直阻塞
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 回收線程條件
        // 1. 有效線程數(shù)已經(jīng)大于了線程池的最大線程數(shù)或者設(shè)置了超時(shí)回收并且已經(jīng)超時(shí)
        // 2. 有效線程數(shù)大于1或者隊(duì)列任務(wù)已經(jīng)為空
        // 只有當(dāng)上面1和2 同時(shí)滿足時(shí) 則試圖回收線程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          // 如果減少workercount成功 直接回收
            if (compareAndDecrementWorkerCount(c))
                return null;
          // 否則重走循環(huán),從第一個(gè)判斷條件處回收
            continue;
        }
        // 取任務(wù)
        try {
          // 根據(jù)是否設(shè)置超時(shí)回收來選擇不同的取任務(wù)的方式
          // poll 方法取任務(wù)會(huì)有超時(shí)時(shí)間,超過時(shí)間則返回null
          // take 方法沒有超時(shí)時(shí)間,阻塞式方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
          // 如果任務(wù)不為空返回任務(wù)
            if (r != null)
                return r;
          // 否則標(biāo)記超時(shí) 進(jìn)入下一次循環(huán)等待回收
            timedOut = true;
        } catch (InterruptedException retry) {
          // 如果出現(xiàn)異常,試圖重試
            timedOut = false;
        }
    }
}

getTask() 方法邏輯也捋得差不多了,這里又出現(xiàn)了兩個(gè)新的方法,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() ,這兩個(gè)都是阻塞隊(duì)列的方法,來看看它們又各自是怎么實(shí)現(xiàn)的

E.LinkedBlockingQueue — 阻塞隊(duì)列

ThreadPoolExecutor 使用的是鏈表結(jié)構(gòu)的阻塞隊(duì)列,實(shí)現(xiàn)了 BlockingQueue 接口,而 BlockingQueue 則是繼承自 Queue 接口,再上層就是 Collection 接口。

因?yàn)楸酒P記主要是分析 ThreadPoolExecutor 的原理,所以不會(huì)詳細(xì)介紹 LinkedBlockingQueue 中的其它代碼,主要介紹這里所用的方法,首先來看一下上文所提到的 take()

public E take() throws InterruptedException {
    E x; // 任務(wù)
    int c = -1; // 取出任務(wù)后的剩余任務(wù)數(shù)量
    final AtomicInteger count = this.count; // 當(dāng)前任務(wù)數(shù)量
    final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
    takeLock.lockInterruptibly();
    try {
      // 如果隊(duì)列數(shù)量為空,則一直循環(huán),阻塞線程
        while (count.get() == 0) {
            notEmpty.await();
        }
      // 取出任務(wù)
        x = dequeue();
      // 任務(wù)數(shù)量減一
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();// 標(biāo)記隊(duì)列非空
    } finally {
        takeLock.unlock(); // 釋放鎖
    }
  //
    if (c == capacity)
        signalNotFull();//標(biāo)記隊(duì)列已滿
    return x;// 返回任務(wù)
}

上面的代碼可以知道 take 方法會(huì)一直阻塞直到隊(duì)列有新的任務(wù)為止

接下來是 poll 方法,可以看到幾乎與 take 方法相同,唯一的區(qū)別是在阻塞的循環(huán)代碼塊里面加了時(shí)間判斷,如果超時(shí)則直接返回為空,不會(huì)一直阻塞下去

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null; // 存放的任務(wù)
    int c = -1; 
    long nanos = unit.toNanos(timeout); // 超時(shí)時(shí)間
    final AtomicInteger count = this.count; // 隊(duì)列中的數(shù)量
    final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
    takeLock.lockInterruptibly();
    try {
      // 如果隊(duì)列為空,則不斷的循環(huán)
        while (count.get() == 0) {
          // 如果當(dāng)?shù)褂?jì)時(shí)小于0 即超時(shí)時(shí)間到 則返回空
            if (nanos <= 0)
                return null;
          // 讓線程等待
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue(); // 取出一個(gè)任務(wù)
        c = count.getAndDecrement(); // 取出后的隊(duì)列數(shù)量
        if (c > 1)
            notEmpty.signal(); // 標(biāo)記非空
    } finally {
        takeLock.unlock(); // 釋放鎖
    }
    if (c == capacity)
        signalNotFull(); // 標(biāo)記隊(duì)列已滿
    return x; // 返回任務(wù)
}

線程池的回收及終止

前一節(jié)分析了任務(wù)的執(zhí)行流程及原理,也留下了一個(gè)問題,worker 是如何被回收的呢?線程池該如何管理呢?回到上一節(jié)的 runWorker() 方法中,還記得最后調(diào)用了一個(gè)方法

processWorkerExit(w, completedAbruptly);

這個(gè)方法傳入了兩個(gè)參數(shù),第一個(gè)是當(dāng)前的 Woker ,第二個(gè)是標(biāo)記異常退出的標(biāo)識(shí)

首先判斷是否為異常退出,如果是異常退出的話需要手動(dòng)調(diào)整線程數(shù)量,如果是正?;厥盏模琯etTask 方法里面已經(jīng)手動(dòng)調(diào)整過了,不記得的小伙伴可以看看前文的代碼,找找 decrementWorkerCount(),

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    // 加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    // 記錄線程池完成的任務(wù)總數(shù),從 workers 中移除該 worker
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    tryTerminate();//嘗試關(guān)閉池子

    int c = ctl.get();
  // 以下的代碼是判斷需不需要給線程池創(chuàng)建一個(gè)新的線程
  // 如果線程池的狀態(tài)是 RUNNING 或者 SHUTDOWN 進(jìn)一步判斷需不需要?jiǎng)?chuàng)建
    if (runStateLessThan(c, STOP)) {
      // 如果為異常退出直接創(chuàng)建,如果不是異常退出進(jìn)入判斷
        if (!completedAbruptly) {
          // 獲取線程池應(yīng)該存在的最小線程數(shù) 如果設(shè)置了超時(shí) 則是0,否則是核心線程數(shù)
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
          // 如果 min 是0 但是隊(duì)列又不為空,則 min 應(yīng)該是1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
          //如果當(dāng)前池中的有效線程數(shù)大于等于最小線程數(shù) 則不需要?jiǎng)?chuàng)建
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
      // 創(chuàng)建線程
        addWorker(null, false);
    }
}

上面的代碼中調(diào)用了 tryTerminate() 方法,這個(gè)方法是用于終止線程池的,又是一個(gè) for 循環(huán),從代碼結(jié)構(gòu)來看是異常情況的重試機(jī)制。還是老方法,慢慢來看總共做了幾件事情

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
      // 如果處于這三種情況不需要關(guān)閉線程池
      // 1. Running 狀態(tài)
      // 2. SHUTDOWN 狀態(tài)并且任務(wù)隊(duì)列不為空,不能終止
      // 3. TIDYING 或者 TERMINATE 狀態(tài),說明已經(jīng)在關(guān)閉了 不需要重復(fù)關(guān)閉
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
      // 進(jìn)入到關(guān)閉線程池的代碼,如果線程池中還有線程,則需要打斷線程
        if (workerCountOf(c) != 0) { // Eligible to terminate 可以關(guān)閉池子
          // 打斷閑置線程,只打斷一個(gè)
            interruptIdleWorkers(ONLY_ONE);
            return;
          // 如果有兩個(gè)以上怎么辦?只打斷一個(gè)?
          // 這里只打斷一個(gè)是因?yàn)?worker 回收的時(shí)候都會(huì)進(jìn)入到該方法中來,可以回去再看看
          // runWorker方法最后的代碼
        }
        
        // 線程已經(jīng)回收完畢,準(zhǔn)備關(guān)閉線程池
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();// 加鎖
        try {
          //  將狀態(tài)改變?yōu)?TIDYING 并且即將調(diào)用 terminated
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); // 終止線程池
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // 改變狀態(tài)
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
          // 如果終止失敗會(huì)重試
        }
        // else retry on failed CAS
    }
}

嘗試終止線程池的代碼分析完了,好像就結(jié)束了~但作為好奇寶寶,我們是不是應(yīng)該看看如何打斷閑置線程,以及 terminated 中做了什么呢?來吧,繼續(xù)裝逼

先來看打斷線程

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖~
    try {
      // 遍歷線程池中的 wroker
        for (Worker w : workers) {
            Thread t = w.thread;
          // 如果線程沒有被中斷,并且能夠獲取到 worker的鎖(說明是閑置線程)
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();// 中斷線程
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
          // 只中斷一個(gè) worker 跳出循環(huán),否則會(huì)將所有的閑置線程都中斷
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();// 釋放鎖
    }
}

有同學(xué)開始裝逼了,說我們是好奇寶寶,t.interrupt() 方法也應(yīng)該看,嗯~沒錯(cuò),但這里是調(diào)用了 native 方法,會(huì) c 的可以去看看裝逼,我就算了~

好了,再來看看 terminate, 是不是很坑爹? terminated 里面神!馬!也!沒!干!。。。淡定,其實(shí)這個(gè)方法類似于 Activity 的生命周期方法,允許你在被終止時(shí)做一些事情,默認(rèn)的線程池沒有什么要做的事情,當(dāng)然什么也沒寫啦~

/**
 * Method invoked when the Executor has terminated.  Default
 * implementation does nothing. Note: To properly nest multiple
 * overridings, subclasses should generally invoke
 * {@code super.terminated} within this method.
 */
protected void terminated() { }

異常處理

還記得前面講到,出現(xiàn)各種異常情況,添加隊(duì)列失敗等等,只是籠統(tǒng)的說了一句扔掉,當(dāng)然代碼實(shí)現(xiàn)不可能是簡單一句扔掉就完了?;氐?execute() 方法中找到 reject() 任務(wù),看看究竟是怎么處理的

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

還記得在創(chuàng)建線程池的時(shí)候,初始化了一個(gè) handler — RejectedExecutionHandler

這是一個(gè)接口,只有一個(gè)方法,接收兩個(gè)參數(shù)

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

既然是一個(gè)接口,那么肯定有他的實(shí)現(xiàn)類,我們先不急著看所有實(shí)現(xiàn)類,先來看看這里的 handler 可能是什么,記得在使用 Executors 獲取線程池調(diào)用構(gòu)造方法的時(shí)候并沒有傳入 handler 參數(shù),那么 ThreadPoolExecutor 應(yīng)該會(huì)有一個(gè)默認(rèn)的 handler

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

默認(rèn) handler 是 AbortPolicy ,這個(gè)類實(shí)現(xiàn)了 rejectedExecution() 方法,拋了一個(gè) Runtime 異常,也就是說當(dāng)任務(wù)添加失敗,就會(huì)拋出異常。這個(gè)類在 AsyncTask 引發(fā)了一場血案~所以在 API19 以后修改了 AsyncTask 的部分代碼邏輯,這里就不細(xì)說啦,會(huì)在下一篇 AsyncTask 的筆記中分析。

實(shí)際上,在 ThreadPoolExecutor 中除了 AbortPolicy 外還實(shí)現(xiàn)了三種不同類型的 handler

  • CallerRunsPolicy — 在 線程池沒有 shutdown 的前提下,會(huì)直接在執(zhí)行 execute 方法的線程里執(zhí)行這個(gè)任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
  • DiscardPolicy — 啥也不干,默默地丟掉任務(wù)~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  • DiscardOldestPolicy — 丟棄掉隊(duì)列中未執(zhí)行的,最老的任務(wù),也就是任務(wù)隊(duì)列排頭的任務(wù),然后再試圖在執(zhí)行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

總結(jié)

其實(shí)我不想做任何概念性的總結(jié),原因是我之前沒有開始學(xué)習(xí)源碼的時(shí)候也看過很多源碼分析的文章,大部分文章都會(huì)總結(jié)一些概念,這些概念本身可能是沒有錯(cuò)的,起碼是作者自己對(duì)源碼的理解,但是文字所傳達(dá)的思想真的是有限的,有時(shí)候因?yàn)楦拍畹哪:?,反而?huì)被帶入一個(gè)誤區(qū),并且長時(shí)間的無法轉(zhuǎn)變。

我自己一開始對(duì)線程池的理解其實(shí)是有偏差的,宏觀上可能沒有大的問題,但在細(xì)節(jié)上有很大的誤區(qū),通過自己耐心的閱讀源碼分析后學(xué)習(xí)到了很多東西。

非要總結(jié)的話就給一點(diǎn)我閱讀源碼的小思路吧:

  • 一定要使用過,起碼能完整的使用。如果沒有用過很難把流程捋清楚
  • 從使用的角度作為突破口,一步步的去尋找線索
  • 一開始看不需要每一句都弄得很清楚,比如一個(gè)方法,應(yīng)該先搞清楚這個(gè)方法里面做了幾件事,核心的邏輯是什么
  • 在捋清了整體邏輯后,再去看細(xì)節(jié)上的實(shí)現(xiàn)
  • 實(shí)在無法理解的內(nèi)容,再看看別人的文章,因?yàn)橛辛嗽创a的基礎(chǔ),再看別人的文章能夠有自己的思路
  • 與你的好基友探討,你會(huì)發(fā)現(xiàn)每個(gè)人有不同的角度去理解源碼,找到最合適你的那一種

以上是我的一點(diǎn)拙見。

最后感謝我的好基友 — 用語,在與他的探討中我走出了誤區(qū)并有了很多新的理解。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容