ThreadPoolExecutor源碼學習筆記

轉(zhuǎn)載請附原文鏈接:ThreadPoolExecutor源碼學習筆記
歡迎來我的個人主頁交流:http://extremej.itscoder.com/

大部分分析以注釋形式寫在源碼中

本篇筆記將從 ThreadPoolExecutor 的一次使用上來分析源碼,主要涉及線程池創(chuàng)建,execute 的步驟,任務添加到阻塞隊列,線程從阻塞隊列中拿取任務執(zhí)行,線程的回收,線程池的終止。

涉及到的類有

  • Executors — 獲取線程池

  • ThreadPoolExecutor — 線程池

  • Worker — 工作線程

  • LinkedBlockingQueue — 阻塞隊列

  • RejectedExecutionHandler — 任務拒絕處理器(實在不知道什么翻譯~)

    ?

線程池的獲取

我們知道可以通過 Executors 來獲取不同類型的線程池,那么就從 Executors 來開始看它是如何返回不同類型的線程池的,看看我們常用的一些方法

//獲取一個固定線程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
//獲取只有一個線程的池子
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
//獲取一個緩存線程池,可變
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

從上面的三個方法可以發(fā)現(xiàn)其實都是 new 了一個 ThreadPoolExecutor ,但是傳入的參數(shù)不同,我們進到這個構(gòu)造方法中去一探究竟,看看不同的參數(shù)到底代表了什么

//構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

通過參數(shù)名稱以及注釋可以知道這幾個參數(shù)的作用分別是

  • corePoolSize — 核心線程數(shù),即允許閑置的線程數(shù)目
  • maximumPoolSize — 最大線程數(shù),即這個線程池的容量
  • keepAliveTime — 非核心線程的閑置存活時間
  • unit — 上一個參數(shù)的單位
  • workQueue — 任務隊列(阻塞隊列)
  • threadFacotry — 線程創(chuàng)建工廠
  • handler — 當線程池或者任務隊列容量已滿時用于reject

這里要明白一件事情,核心線程只是通過數(shù)目來判斷,而不是說先創(chuàng)建的線程就是核心線程

在構(gòu)造方法里面初始化了成員變量值,通過構(gòu)造方法應該明白了不同類型的線程獲取的原理。

任務的執(zhí)行

A.狀態(tài)屬性

在看源碼之前先了解一下 ThreadPoolExecutor的幾個狀態(tài)屬性,這對后面的源碼閱讀有很重要的作用,ThreadPoolExecutor 有五種狀態(tài)

private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

從上到下依次是

  • RUNNING — 運行狀態(tài),可以添加新任務,也可以處理阻塞隊列中的任務
  • SHUTDOWN — 待關閉狀態(tài),不再接受新的任務,會繼續(xù)處理阻塞隊列中的任務
  • STOP — 停止狀態(tài),此時的線程池不處理任何任務
  • TIDYING — 整理狀態(tài),也可以理解為預終結(jié)狀態(tài),這個時候任務都處理完畢,池中無有效線程
  • TERMINATED — 終止狀態(tài)

B.execute(Runnable command)

當獲取到了一個線程池之后,需要它來執(zhí)行異步任務,也就是 execute(Runnable) ,傳入一個 runnable 對象,在 run 方法中執(zhí)行我們的代碼,那么來看一下 execute() 是怎么工作的,因為源碼的注釋解釋得十分清楚,這里將注釋也貼出來。簡單翻譯一下,當 execute 被調(diào)用時總共有三種情況。

  • 如果當前的有效線程數(shù)小于核心線程數(shù),則試圖創(chuàng)建一個新的 worker 線程
  • 如果上面一步失敗了,則試圖將任務添加到阻塞隊列中,并且要再一次判斷需要不需要回滾隊列,或者說創(chuàng)建線程(后面會詳細說明)
  • 如果上面兩步都失敗了,則會試圖強行創(chuàng)建一個線程來執(zhí)行這個任務,如果還是失敗,扔掉這個任務

了解了這三個步驟,來看看源碼,源碼中調(diào)用了 addworker 方法,這是創(chuàng)建線程的方法,會在后面講到

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
  //1.判斷有效線程數(shù)是否小于核心線程數(shù)
    if (workerCountOf(c) < corePoolSize) {
      //創(chuàng)建新線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
  //2.分開來看,首先判斷當前的池子是否是處于 running 狀態(tài)
  //因為只有 running 狀態(tài)才可以接收新任務
  //接下來判斷能否成功添加到隊列中,如果隊列滿了或者其他情況則會跳到下一步
    if (isRunning(c) && workQueue.offer(command)) {
      //再次檢查池子的狀態(tài),如果進入了非 running 狀態(tài),回滾隊列,扔掉這個任務
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
      //如果處于 running 狀態(tài)則檢查當前的有效線程,如果沒有則創(chuàng)建一個線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  //3.前兩步失敗了,就強行創(chuàng)建線程,成功會返回true,如果失敗扔掉這個任務
    else if (!addWorker(command, false))
        reject(command);
}

解釋一下第二步,為什么要recheck

當這個任務被添加到了阻塞隊列前,池子處于 RUNNING 狀態(tài),但如果在添加到隊列成功后,池子進入了 SHUTDOWN 狀態(tài)或者其他狀態(tài),這時候是不應該再接收新的任務的,所以需要把這個任務從隊列中移除,并且 reject

同樣,在沒有添加到隊列前,可能有一個有效線程,但添加完任務后,這個線程閑置超時或者因為異常被干掉了,這時候需要創(chuàng)建一個新的線程來執(zhí)行任務

C .addWorker()

前一步把 execute 的流程捋了一遍,里面多次出現(xiàn)了 addWorker() 方法,前文說到這是個創(chuàng)建線程的方法,來看看 addWorker 做了些什么,這個方法代碼比較長,我們拆開來一點一點看

  • 第一部分 — 判斷各種基礎異常
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

      // Check if queue empty only if necessary.
      // 檢查線程池狀態(tài),隊列狀態(tài),以及 firstask ,拆開來看
      // 這段代碼看起來異常的蛋疼,轉(zhuǎn)換一下邏輯即
     //rs>= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
      // 總結(jié)起來就是 當前處于非 Running 狀態(tài),并且這三種情況
      // 1. 不是處于 SHUTDOWN 狀態(tài),不能再創(chuàng)建線程
      // 2. 有新的任務 (因為不能再接收新的任務)
      // 3. 阻塞隊列中已經(jīng)沒有任務 (不需要再創(chuàng)建線程)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        for (;;) {
            int wc = workerCountOf(c);//當前有效線程數(shù)目
           // 根據(jù)傳入的參數(shù)確定以核心線程數(shù)還是最大線程數(shù)作為判斷條件
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
              // 大于容量 或者指定的線程數(shù),不允許創(chuàng)建
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
  }
  • 第二部分 — 試圖創(chuàng)建線程

創(chuàng)建一個Worker(什么東西?下文會講解,這里把它就當成是一個線程的容器)

boolean workerStarted = false;//標記 worker 開啟狀態(tài)
boolean workerAdded = false;//標記 worker 添加狀態(tài)
Worker w = null;
try {
    w = new Worker(firstTask); //將這個任務作為 worker 的第一個任務傳入
    final Thread t = w.thread;//通過 worker 獲取到一個線程
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());
            
           // running狀態(tài),或者 shutdown 狀態(tài)但是沒有新的任務
            if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                // 將這個 worker 添加到線程池中
                workers.add(w); 
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
              //標記worker添加成功
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果 worker 創(chuàng)建成功,開啟線程
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

上面代碼從邏輯層面來看不算難懂,到這里一個任務到達后,ThreadPoolExecutor 的處理就結(jié)束了,那么任務又是怎么被添加到阻塞隊列中,線程是如何從隊列中取出任務,上文中的 Worker 又是什么東西?

一個一個來,先來看看 Worker 到底是什么

D.Worker

Worker 是 ThreadPoolExecutor 的一個內(nèi)部類,實現(xiàn)了 Runnable 接口,繼承自 AbstractQueuedSynchronizer,這又是個什么鬼???我也不造~可以看看這篇文章

《Java并發(fā)包源碼學習之AQS框架(一)概述》

簡單來說,Worker實現(xiàn)了 lock 和 unLock 方法來標示當前線程的狀態(tài)是否為閑置

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

上一節(jié)創(chuàng)建線程成功后調(diào)用 t.start() 而這個線程又是 Worker 的成員變量

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

可以看到這里將 Worker 作為 Runnable 參數(shù)創(chuàng)建了一個新的線程,我們知道 Thread 接收一個 Runnable 對象后 start 運行的是 Runnable 的 run 方法,Worker 的 run 方法調(diào)用了 runWorker ,這個方法里面就是取出任務執(zhí)行的邏輯

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 獲取到 worker 的第一個任務
        w.firstTask = null;
        w.unlock(); // 標記為閑置,還沒有開始任務 允許打斷
        boolean completedAbruptly = true; // 異常退出標記
        try {
          // 循環(huán)取出任務,如果第一個任務不為空,或者從隊列中拿到了任務
          // 只要這兩個條件滿足,會一直循環(huán),直到?jīng)]有任務,正常退出,或者異常退出
            while (task != null || (task = getTask()) != null) {
                w.lock();// 該線程標記為非閑置
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
              // 翻譯注釋:1.如果線程池STOPPING狀態(tài),需要中斷線程
              // 2.Thread.interrupted()是一個native方法,返回當前線程是否有被等待中斷的請求
              // 3.第二個條件成立時,檢查線程池狀態(tài),如果為STOP,并且沒有被中斷,則中斷線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
              // 執(zhí)行任務
                try {
                    beforeExecute(wt, task);// 執(zhí)行前
                    Throwable thrown = null;
                    try {
                        task.run(); // 執(zhí)行任務
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); // 執(zhí)行結(jié)束
                    }
                } finally {
                    task = null; // 將 worker 的任務置空
                    w.completedTasks++; 
                    w.unlock(); // 釋放鎖,進入閑置狀態(tài)
                }
            }// 循環(huán)結(jié)束
            completedAbruptly = false; // 標記為正常退出
        } finally {
          // 干掉 worker
            processWorkerExit(w, completedAbruptly);
        }
    }

這里弄清楚了一件事情,進入循環(huán)準備執(zhí)行任務時,worker 加鎖標記為非閑置,任務執(zhí)行完畢或者出現(xiàn)異常,worker 釋放鎖,進入閑置狀態(tài)。

也就是當一個 worker 執(zhí)行任務前或者執(zhí)行完任務,到取出下一個任務期間,都是閑置狀態(tài)可以被打斷

上面取出任務調(diào)用了 getTask() ,誒~為什么有一個死循環(huán),別著急,慢慢看來。上面的代碼可以知道如果 getTask 返回任務則執(zhí)行,如果返回為 null 則 worker 需要被回收

private Runnable getTask() {
  // 標記取任務是否超時
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果線程池狀態(tài)為 STOP 或者 SHUTDOWN 并且隊列已經(jīng)為空,回收 wroker
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //獲取當前有效線程數(shù)
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed 用來標記當前的 worker 是否設置超時時間,
        // 還記得獲取線程池的時候 可以設置核心線程超時時間
        //1.允許核心線程超時回收(即所有線程) 2.當前有效線程超過核心線程數(shù)(需要回收)
        // 如果timed == false 則該worker不會被回收,如果沒有取到任務 會一直阻塞
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 回收線程條件
        // 1. 有效線程數(shù)已經(jīng)大于了線程池的最大線程數(shù)或者設置了超時回收并且已經(jīng)超時
        // 2. 有效線程數(shù)大于1或者隊列任務已經(jīng)為空
        // 只有當上面1和2 同時滿足時 則試圖回收線程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          // 如果減少workercount成功 直接回收
            if (compareAndDecrementWorkerCount(c))
                return null;
          // 否則重走循環(huán),從第一個判斷條件處回收
            continue;
        }
        // 取任務
        try {
          // 根據(jù)是否設置超時回收來選擇不同的取任務的方式
          // poll 方法取任務會有超時時間,超過時間則返回null
          // take 方法沒有超時時間,阻塞式方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
          // 如果任務不為空返回任務
            if (r != null)
                return r;
          // 否則標記超時 進入下一次循環(huán)等待回收
            timedOut = true;
        } catch (InterruptedException retry) {
          // 如果出現(xiàn)異常,試圖重試
            timedOut = false;
        }
    }
}

getTask() 方法邏輯也捋得差不多了,這里又出現(xiàn)了兩個新的方法,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() ,這兩個都是阻塞隊列的方法,來看看它們又各自是怎么實現(xiàn)的

E.LinkedBlockingQueue — 阻塞隊列

ThreadPoolExecutor 使用的是鏈表結(jié)構(gòu)的阻塞隊列,實現(xiàn)了 BlockingQueue 接口,而 BlockingQueue 則是繼承自 Queue 接口,再上層就是 Collection 接口。

因為本篇筆記主要是分析 ThreadPoolExecutor 的原理,所以不會詳細介紹 LinkedBlockingQueue 中的其它代碼,主要介紹這里所用的方法,首先來看一下上文所提到的 take()

public E take() throws InterruptedException {
    E x; // 任務
    int c = -1; // 取出任務后的剩余任務數(shù)量
    final AtomicInteger count = this.count; // 當前任務數(shù)量
    final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
    takeLock.lockInterruptibly();
    try {
      // 如果隊列數(shù)量為空,則一直循環(huán),阻塞線程
        while (count.get() == 0) {
            notEmpty.await();
        }
      // 取出任務
        x = dequeue();
      // 任務數(shù)量減一
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();// 標記隊列非空
    } finally {
        takeLock.unlock(); // 釋放鎖
    }
  //
    if (c == capacity)
        signalNotFull();//標記隊列已滿
    return x;// 返回任務
}

上面的代碼可以知道 take 方法會一直阻塞直到隊列有新的任務為止

接下來是 poll 方法,可以看到幾乎與 take 方法相同,唯一的區(qū)別是在阻塞的循環(huán)代碼塊里面加了時間判斷,如果超時則直接返回為空,不會一直阻塞下去

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null; // 存放的任務
    int c = -1; 
    long nanos = unit.toNanos(timeout); // 超時時間
    final AtomicInteger count = this.count; // 隊列中的數(shù)量
    final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
    takeLock.lockInterruptibly();
    try {
      // 如果隊列為空,則不斷的循環(huán)
        while (count.get() == 0) {
          // 如果當?shù)褂嫊r小于0 即超時時間到 則返回空
            if (nanos <= 0)
                return null;
          // 讓線程等待
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue(); // 取出一個任務
        c = count.getAndDecrement(); // 取出后的隊列數(shù)量
        if (c > 1)
            notEmpty.signal(); // 標記非空
    } finally {
        takeLock.unlock(); // 釋放鎖
    }
    if (c == capacity)
        signalNotFull(); // 標記隊列已滿
    return x; // 返回任務
}

線程池的回收及終止

前一節(jié)分析了任務的執(zhí)行流程及原理,也留下了一個問題,worker 是如何被回收的呢?線程池該如何管理呢?回到上一節(jié)的 runWorker() 方法中,還記得最后調(diào)用了一個方法

processWorkerExit(w, completedAbruptly);

這個方法傳入了兩個參數(shù),第一個是當前的 Woker ,第二個是標記異常退出的標識

首先判斷是否為異常退出,如果是異常退出的話需要手動調(diào)整線程數(shù)量,如果是正?;厥盏模琯etTask 方法里面已經(jīng)手動調(diào)整過了,不記得的小伙伴可以看看前文的代碼,找找 decrementWorkerCount(),

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    // 加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    // 記錄線程池完成的任務總數(shù),從 workers 中移除該 worker
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    tryTerminate();//嘗試關閉池子

    int c = ctl.get();
  // 以下的代碼是判斷需不需要給線程池創(chuàng)建一個新的線程
  // 如果線程池的狀態(tài)是 RUNNING 或者 SHUTDOWN 進一步判斷需不需要創(chuàng)建
    if (runStateLessThan(c, STOP)) {
      // 如果為異常退出直接創(chuàng)建,如果不是異常退出進入判斷
        if (!completedAbruptly) {
          // 獲取線程池應該存在的最小線程數(shù) 如果設置了超時 則是0,否則是核心線程數(shù)
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
          // 如果 min 是0 但是隊列又不為空,則 min 應該是1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
          //如果當前池中的有效線程數(shù)大于等于最小線程數(shù) 則不需要創(chuàng)建
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
      // 創(chuàng)建線程
        addWorker(null, false);
    }
}

上面的代碼中調(diào)用了 tryTerminate() 方法,這個方法是用于終止線程池的,又是一個 for 循環(huán),從代碼結(jié)構(gòu)來看是異常情況的重試機制。還是老方法,慢慢來看總共做了幾件事情

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
      // 如果處于這三種情況不需要關閉線程池
      // 1. Running 狀態(tài)
      // 2. SHUTDOWN 狀態(tài)并且任務隊列不為空,不能終止
      // 3. TIDYING 或者 TERMINATE 狀態(tài),說明已經(jīng)在關閉了 不需要重復關閉
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
      // 進入到關閉線程池的代碼,如果線程池中還有線程,則需要打斷線程
        if (workerCountOf(c) != 0) { // Eligible to terminate 可以關閉池子
          // 打斷閑置線程,只打斷一個
            interruptIdleWorkers(ONLY_ONE);
            return;
          // 如果有兩個以上怎么辦?只打斷一個?
          // 這里只打斷一個是因為 worker 回收的時候都會進入到該方法中來,可以回去再看看
          // runWorker方法最后的代碼
        }
        
        // 線程已經(jīng)回收完畢,準備關閉線程池
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();// 加鎖
        try {
          //  將狀態(tài)改變?yōu)?TIDYING 并且即將調(diào)用 terminated
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); // 終止線程池
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // 改變狀態(tài)
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
          // 如果終止失敗會重試
        }
        // else retry on failed CAS
    }
}

嘗試終止線程池的代碼分析完了,好像就結(jié)束了~但作為好奇寶寶,我們是不是應該看看如何打斷閑置線程,以及 terminated 中做了什么呢?來吧,繼續(xù)裝逼

先來看打斷線程

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖~
    try {
      // 遍歷線程池中的 wroker
        for (Worker w : workers) {
            Thread t = w.thread;
          // 如果線程沒有被中斷,并且能夠獲取到 worker的鎖(說明是閑置線程)
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();// 中斷線程
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
          // 只中斷一個 worker 跳出循環(huán),否則會將所有的閑置線程都中斷
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();// 釋放鎖
    }
}

有同學開始裝逼了,說我們是好奇寶寶,t.interrupt() 方法也應該看,嗯~沒錯,但這里是調(diào)用了 native 方法,會 c 的可以去看看裝逼,我就算了~

好了,再來看看 terminate, 是不是很坑爹? terminated 里面神!馬!也!沒!干!。。。淡定,其實這個方法類似于 Activity 的生命周期方法,允許你在被終止時做一些事情,默認的線程池沒有什么要做的事情,當然什么也沒寫啦~

/**
 * Method invoked when the Executor has terminated.  Default
 * implementation does nothing. Note: To properly nest multiple
 * overridings, subclasses should generally invoke
 * {@code super.terminated} within this method.
 */
protected void terminated() { }

異常處理

還記得前面講到,出現(xiàn)各種異常情況,添加隊列失敗等等,只是籠統(tǒng)的說了一句扔掉,當然代碼實現(xiàn)不可能是簡單一句扔掉就完了?;氐?execute() 方法中找到 reject() 任務,看看究竟是怎么處理的

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

還記得在創(chuàng)建線程池的時候,初始化了一個 handler — RejectedExecutionHandler

這是一個接口,只有一個方法,接收兩個參數(shù)

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

既然是一個接口,那么肯定有他的實現(xiàn)類,我們先不急著看所有實現(xiàn)類,先來看看這里的 handler 可能是什么,記得在使用 Executors 獲取線程池調(diào)用構(gòu)造方法的時候并沒有傳入 handler 參數(shù),那么 ThreadPoolExecutor 應該會有一個默認的 handler

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

默認 handler 是 AbortPolicy ,這個類實現(xiàn)了 rejectedExecution() 方法,拋了一個 Runtime 異常,也就是說當任務添加失敗,就會拋出異常。這個類在 AsyncTask 引發(fā)了一場血案~所以在 API19 以后修改了 AsyncTask 的部分代碼邏輯,這里就不細說啦,會在下一篇 AsyncTask 的筆記中分析。

實際上,在 ThreadPoolExecutor 中除了 AbortPolicy 外還實現(xiàn)了三種不同類型的 handler

  • CallerRunsPolicy — 在 線程池沒有 shutdown 的前提下,會直接在執(zhí)行 execute 方法的線程里執(zhí)行這個任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
  • DiscardPolicy — 啥也不干,默默地丟掉任務~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  • DiscardOldestPolicy — 丟棄掉隊列中未執(zhí)行的,最老的任務,也就是任務隊列排頭的任務,然后再試圖在執(zhí)行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

總結(jié)

其實我不想做任何概念性的總結(jié),原因是我之前沒有開始學習源碼的時候也看過很多源碼分析的文章,大部分文章都會總結(jié)一些概念,這些概念本身可能是沒有錯的,起碼是作者自己對源碼的理解,但是文字所傳達的思想真的是有限的,有時候因為概念的模糊,反而會被帶入一個誤區(qū),并且長時間的無法轉(zhuǎn)變。

我自己一開始對線程池的理解其實是有偏差的,宏觀上可能沒有大的問題,但在細節(jié)上有很大的誤區(qū),通過自己耐心的閱讀源碼分析后學習到了很多東西。

非要總結(jié)的話就給一點我閱讀源碼的小思路吧:

  • 一定要使用過,起碼能完整的使用。如果沒有用過很難把流程捋清楚
  • 從使用的角度作為突破口,一步步的去尋找線索
  • 一開始看不需要每一句都弄得很清楚,比如一個方法,應該先搞清楚這個方法里面做了幾件事,核心的邏輯是什么
  • 在捋清了整體邏輯后,再去看細節(jié)上的實現(xiàn)
  • 實在無法理解的內(nèi)容,再看看別人的文章,因為有了源碼的基礎,再看別人的文章能夠有自己的思路
  • 與你的好基友探討,你會發(fā)現(xiàn)每個人有不同的角度去理解源碼,找到最合適你的那一種

以上是我的一點拙見。

最后感謝我的好基友 — 用語,在與他的探討中我走出了誤區(qū)并有了很多新的理解。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容