Android線程池學(xué)習(xí)筆記(二)

ThreadPoolExecutor

線程池的實(shí)現(xiàn)類。

構(gòu)造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize——最大核心線程數(shù)。核心線程即使空閑也不會(huì)被銷(xiāo)毀,除非調(diào)用allowCoreThreadTimeOut(true)。
  • maximumPoolSize——線程池最多可運(yùn)行的線程數(shù)。該值不小于corePoolSize。
  • keepAliveTime——非核心線程在空閑狀態(tài)的存活時(shí)間。
  • unit——keepAliveTime的時(shí)間單位。
  • workQueue——存放等待執(zhí)行的任務(wù)的隊(duì)列,只有通過(guò)execute(Runnable)提交的任務(wù)才可能進(jìn)入該隊(duì)列。
  • threadFactory——線程池創(chuàng)建通過(guò)該工廠創(chuàng)建線程。
  • handler——在線程池滿載的情況下,提交的任務(wù)交由handler處理。

以上各參數(shù)均有響應(yīng)的setter方法。

ThreadPoolExecutor提供了四種handler:

  1. CallerRunsPolicy——在線程池未關(guān)閉的情況,直接在調(diào)用execute(Runnable)方法的線程執(zhí)行任務(wù)。
  2. AbortPolicy——拋出一個(gè)RejectedExecutionException。
  3. DiscardPolicy——丟棄
  4. DiscardOldestPolicy——丟棄等待隊(duì)列中最早提交的那個(gè)任務(wù),然后重新提交新的任務(wù)。

ThreadPoolExecutor默認(rèn)使用AbortPolicy。

線程池狀態(tài)和線程數(shù)量的指示字段

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是一個(gè)32位原子整型,高3位表示線程狀態(tài)(runState),剩余位代碼線程數(shù)量(workerCount)。所以線程池可容納的最大線程數(shù)是(2^29)-1。
wokerCount未必和存活的線程數(shù)一致,比如使用ThreadFactory創(chuàng)建線程失敗時(shí),或者線程終結(jié)前仍然進(jìn)行著某些工作時(shí)。

線程池狀態(tài)有以下幾種:

  • RUNNING(-1)——可接收新的任務(wù),可執(zhí)行隊(duì)列中的任務(wù)。
  • SHUTDOWN(0)——不再接收新的任務(wù),仍可執(zhí)行隊(duì)列中的任務(wù)。
  • STOP(1)——不再接收新的任務(wù),不再執(zhí)行隊(duì)列中的任務(wù),中斷正在執(zhí)行的任務(wù)。
  • TIDYING(2)——過(guò)渡狀態(tài)。所有任務(wù)都已終結(jié),workerCount等于0,terminated()方法執(zhí)行之前的狀態(tài)。可以覆寫(xiě)terminated()做一些清理工作。
  • TERMINATED(3)——terminated()執(zhí)行之后的狀態(tài)。

狀態(tài)的更改是遞增的,可能的狀態(tài)改變?nèi)缦拢?/p>

  • RUNNING -> SHUTDOWN——調(diào)用了shutdown()方法,可能是通過(guò)finalize()隱式調(diào)用的。
  • (RUNNING or SHUTDOWN) -> STOP——調(diào)用了shutdownNow()。
  • SHUTDOWN -> TIDYING——線程池和隊(duì)列都為空。
  • STOP -> TIDYING——線程池為空。
  • TIDYING -> TERMINATED——terminated()方法執(zhí)行完畢。

awaitTermination()方法在狀態(tài)為T(mén)ERMINATED時(shí)返回。

public void execute(Runnable command)

提交任務(wù)有幾種情況:

  1. 工作線程數(shù) < 核心線程數(shù)——?jiǎng)?chuàng)建新的核心線程,并處理該任務(wù)。
  2. 工作線程數(shù) >= 核心線程數(shù),隊(duì)列未滿——添加到隊(duì)列。
  3. 隊(duì)列滿,工作線程數(shù) < 最大線程數(shù)——?jiǎng)?chuàng)建非核心線程處理任務(wù)。
  4. 交由handler處理。

public boolean prestartCoreThread()

手動(dòng)啟動(dòng)一個(gè)核心線程,這樣新來(lái)的任務(wù)就可以直接運(yùn)行,從而減少線程啟動(dòng)的時(shí)間。如果所有核心線程都已啟動(dòng),返回false。

public int prestartAllCoreThreads()

手動(dòng)啟動(dòng)所有核心線程。返回啟動(dòng)的核心線程數(shù)。

public void allowCoreThreadTimeOut(boolean value)

設(shè)置空閑時(shí),核心線程是否允許超時(shí)關(guān)閉。存活時(shí)間同非核心線程。

public boolean allowsCoreThreadTimeOut()

查詢核心線程在空閑時(shí),是否允許超時(shí)關(guān)閉。

public BlockingQueue<Runnable> getQueue()

獲取等待隊(duì)列。

public boolean remove(Runnable task)

從等待隊(duì)列中刪除task。

public void purge()

將等待隊(duì)列中所有已經(jīng)取消的Future任務(wù)立即移除。

public int getActiveCount()

返回正在執(zhí)行任務(wù)的線程數(shù)。

public int getLargestPoolSize()

返回線程池中出現(xiàn)過(guò)的最大的線程數(shù)量,不大于最大線程數(shù)。

public long getTaskCount()

返回線程池總共執(zhí)行過(guò)的以及正在執(zhí)行的任務(wù)數(shù),該值是個(gè)大概值。

public long getCompletedTaskCount()

返回線程池總共執(zhí)行過(guò)的任務(wù)數(shù),該值是個(gè)大概值。


以上是線程池的基本理解和使用,不想深究的話,到這里也就可以了。

下面是需要注意的點(diǎn)。

關(guān)于新建線程

線程池使用ThreadFactory來(lái)創(chuàng)建線程,如果沒(méi)有顯示提供ThreadFactory,則使用默認(rèn)的Executors#DefaultThreadFactory來(lái)創(chuàng)建線程:

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

從以上代碼可以得到如下信息:

  1. 線程名稱均是“pool-XX-thread-XX”形式。
  2. 均是非daemon線程。
  3. 優(yōu)先級(jí)均是Thread.NORM_PRIORITY

關(guān)于隊(duì)列選擇

常用的隊(duì)列選擇策略有以下幾種:

  1. 直接傳遞——使用SynchronousQueue同步隊(duì)列實(shí)現(xiàn)。該隊(duì)列不保存任務(wù),可以理解為一個(gè)單純的管道。其“放入”和“取出”都是阻塞的,每個(gè)“放入”操作都要等待一個(gè)“取出”操作,反之亦然。直接傳遞,通常要求最大線程數(shù)量不受限制,以避免新提交的任務(wù)交由handler處理;但這就會(huì)導(dǎo)致線程數(shù)量不可控。該策略在任務(wù)間有內(nèi)部依賴時(shí),可避免鎖住。
  2. 無(wú)限隊(duì)列——使用一個(gè)無(wú)容量限制的隊(duì)列,比如LinkedBlockingQueue(FIFO隊(duì)列)或者PriorityBlockingQueue(可自定義Comparator)。這樣在核心線程都在工作時(shí),新的任務(wù)會(huì)被添加到隊(duì)列中,最大線程數(shù)不會(huì)超過(guò)核心線程數(shù),也就是說(shuō)maximumPoolSize這個(gè)參數(shù)將不起作用。該策略適合任務(wù)間完全獨(dú)立,相互不影響的情況。所謂無(wú)限,并非是真的無(wú)限,只是容量非常大而已,可能是Integer.MAX_VALUE,也可能是其他值。
  3. 有限隊(duì)列——比如ArrayBlockingQueue??梢员苊赓Y源的過(guò)度消耗,但控制起來(lái)比較復(fù)雜,需要權(quán)衡隊(duì)列容量和最大線程數(shù)的關(guān)系:使用大隊(duì)列和小線程數(shù)量,會(huì)減少CPU的使用,以及操作系統(tǒng)資源的占用,但會(huì)降低吞吐率;使用小隊(duì)列和大線程數(shù),可以充分利用CPU資源,但可能會(huì)加大調(diào)度開(kāi)支,同樣降低吞吐率。

關(guān)于RejectedExecutionHandler

上面介紹了四種默認(rèn)的RejectedExecutionHandler,同樣也可以自己定義。需要注意的是,RejectedExecutionHandler的選擇需要參照線程數(shù)量和隊(duì)列選擇策略。比如無(wú)限隊(duì)列的情況,可以隨意設(shè)置。

關(guān)于覆寫(xiě)ThreadPoolExecutor

ThreadPoolExecutor提供了3個(gè)protected的hook方法。beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable)分別在每個(gè)任務(wù)的執(zhí)行前/后調(diào)用,terminated()方法在線程池終結(jié)時(shí)調(diào)用。

以下是一個(gè)覆寫(xiě)的例子,添加了pause|resume方法:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(...) { 
        super(...);
     }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) 
                unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}

下面學(xué)習(xí)線程池的實(shí)現(xiàn)原理。

線程在哪里

線程池的線程由內(nèi)部類Worker持有。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker類繼承自AbstractQueuedSynchronizer,該父類這里不深究,只需要知道它是一個(gè)鎖的實(shí)現(xiàn)即可。Worker類還實(shí)現(xiàn)了Runnable接口。
由構(gòu)造方法可知,Worker在創(chuàng)建時(shí)會(huì)通過(guò)ThreadFactory.newThread方法創(chuàng)建一個(gè)線程,并將自身作為Runnable對(duì)象傳遞給該線程。
protected修飾的方法是覆寫(xiě)的父類方法,暫且不用管。lock、tryLock、unlock、isLocked是自身鎖的調(diào)用方法。

Worker的run方法中調(diào)用了runWorker方法。如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();//獲取Worker類的thread
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//getTask為阻塞方法
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||//當(dāng)前線程池狀態(tài)至少為STOP,不考慮offset,其值為1。則滿足的狀態(tài)為STOP、TIDYING、TERMINATED。
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//hook方法,可重寫(xiě)
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//hook方法,可重寫(xiě)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;//已完成的任務(wù)數(shù)加1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//Worker退出時(shí)的清理操作
        }
    }

由源碼可知,該方法不斷從隊(duì)列中取出任務(wù),交由該Worker所在的線程執(zhí)行,是執(zhí)行任務(wù)的最終場(chǎng)所。
順騰摸瓜,這里涉及到getTaskprocessWorkerExit兩個(gè)方法。

先看processWorkerExit

    /**
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();//由異常引起的,需要ctl變量中存儲(chǔ)的工作線程數(shù)量減1。

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//workers是一個(gè)Set,存儲(chǔ)了所有活動(dòng)的Worker。
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//該方法留意一下,下面會(huì)介紹

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {//線程池的狀態(tài)低于STOP,包括RUNNING、SHUTDOWN
            if (!completedAbruptly) {//非異常引起的死亡,比如空閑狀態(tài)超過(guò)了存活時(shí)間。
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);//重新添加一個(gè)Worker。
        }
    }

該方法為死掉的Worker做一些清理工作。首先將死亡的Worker已經(jīng)完成的任務(wù)數(shù)添加到線程池已完成的任務(wù)數(shù)中,緊接著從線程池的Worker集合中刪除該Worker,然后調(diào)用tryTerminate,最后根據(jù)線程池的狀態(tài)等,判斷是否需要重新添加Worker到Worker集合中。

getTask方法從隊(duì)列中取出一個(gè)任務(wù)并返回。在Worker由于一些原因退出時(shí),返回null。這些原因包括:

  1. 線程數(shù)量超過(guò)maximumPoolSize(比如通過(guò)setMaximumPoolSize修改了最大線程數(shù)量)。
  2. 線程池被stop。
  3. 線程池被關(guān)閉,并且隊(duì)列為空。
  4. 等待取出任務(wù)超時(shí),而超時(shí)的Worker需要終結(jié)。
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//線程池狀態(tài)

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//對(duì)應(yīng)情況2、3
            //該條件等價(jià)于if (rs >= STOP || (rs >= SHUTDOWN && workQueue.isEmpty()))
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);//當(dāng)前線程數(shù)量

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//當(dāng)前Worker是否允許超時(shí)關(guān)閉

            //對(duì)應(yīng)情況1、4
            if ((wc > maximumPoolSize || (timed && timedOut))//線程數(shù)量大于maximumPoolSize或者達(dá)到超時(shí)條件
                && (wc > 1 || workQueue.isEmpty())) {//并且此時(shí)隊(duì)列為空,或者線程數(shù)大于1(在隊(duì)列非空時(shí),至少需要保留一個(gè)Worker來(lái)執(zhí)行任務(wù))
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://允許超時(shí)關(guān)閉則調(diào)用poll超時(shí)阻塞方法
                    workQueue.take();//否則調(diào)用take,一直阻塞下去,直到新任務(wù)到來(lái)
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

在processWorkerExit方法中,我們接觸到了addWorker方法。
該方法根據(jù)線程池的狀態(tài)的最大線程數(shù)量,決定是否向線程池添加新的Worker。遇到以下條件之一時(shí),添加失敗,返回false:

  1. 線程池已經(jīng)stop或者達(dá)到shutdown的條件。
  2. 當(dāng)前線程數(shù)達(dá)到上限。
  3. ThreadFactory創(chuàng)建線程失敗。

Worker創(chuàng)建失敗時(shí),會(huì)回滾一些數(shù)據(jù)。

    /**
     * @param firstTask 新Worker需要執(zhí)行的第一個(gè)任務(wù),可為null
     * @param core 是否是核心線程
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry://添加一個(gè)標(biāo)簽
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))//對(duì)應(yīng)情況1
            //等價(jià)于if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
            //之所以有firstTask != null是因?yàn)镾HUTDOWN后不再接收新任務(wù)。
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||//CAPACITY為線程池所能容納的最大線程數(shù)量2^29-1
                    wc >= (core ? corePoolSize : maximumPoolSize))//對(duì)應(yīng)情況2
                    return false;
                if (compareAndIncrementWorkerCount(c))//ctl字段中的Worker數(shù)量加1,此時(shí)Worker還沒(méi)有被實(shí)際創(chuàng)建,
                //也證實(shí)了講ctl字段時(shí)提到的“wokerCount未必和存活的線程數(shù)一致”
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//線程池狀態(tài)發(fā)生變化時(shí),重新執(zhí)行前面的邏輯
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {//線程創(chuàng)建成功
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {//SHUTDOWN后不再接收新任務(wù)
                        if (t.isAlive()) // precheck that t is startable
                            //t.isAlive()返回true,則表明t.start已被調(diào)用過(guò),而正常來(lái)說(shuō),此時(shí)還未調(diào)用t.start
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)//largestPoolSize是線程池中出現(xiàn)過(guò)的最大線程的數(shù)量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//執(zhí)行數(shù)據(jù)回滾
        }
        return workerStarted;
    }

接下來(lái)看addWorkerFailed方法。

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();//講ctl字段的workerCount減1
            tryTerminate();//查看是否需要終結(jié)線程池
        } finally {
            mainLock.unlock();
        }
    }

processWorkerExit和addWorkerFailed都涉及到了tryTerminate方法。
僅tryTerminate方法會(huì)將線程池狀態(tài)置為T(mén)IDYING和TERMINATED,且需要滿足以下條件之一:

  1. 當(dāng)前狀態(tài)為SHUTDOWN,并且線程池和隊(duì)列都為空。
  2. 當(dāng)前狀態(tài)為STOP,并且隊(duì)列為空。

在執(zhí)行了可能導(dǎo)致線程池TERMINATED的操作后,必須調(diào)用該方法,比如減少了worker的數(shù)量,或者shutdown之后從隊(duì)列中移除了任務(wù)。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||//線程池正在運(yùn)行中
                runStateAtLeast(c, TIDYING) ||//線程池正在terminate
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//或者不滿足條件1
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);//workerCount不為0時(shí),中斷一個(gè)空閑的worker,將中斷信號(hào)傳遞下去(如何傳遞請(qǐng)往下看)
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();//hook方法,可重寫(xiě)
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));//此處可以看出TIDYING只是一個(gè)過(guò)渡態(tài)。
                        termination.signalAll();//用于喚醒a(bǔ)waitTermination阻塞方法
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

至此我們就見(jiàn)到了可重寫(xiě)的3個(gè)hook方法:beforeExecute、afterExecuteterminate。

termination.signalAll()這一語(yǔ)句,簡(jiǎn)單看一下awaitTermination

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            while (!runStateAtLeast(ctl.get(), TERMINATED)) {
                if (nanos <= 0L)
                    return false;
                nanos = termination.awaitNanos(nanos);//狀態(tài)不是TERMINATED,則阻塞下去,等待termination.signalAll()喚醒
            }
            return true;
        } finally {
            mainLock.unlock();
        }
    }

tryTerminate方法中調(diào)用了interruptIdleWorkers,僅此處調(diào)用傳遞的onlyOne參數(shù)為true:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

前面說(shuō)了,調(diào)用interruptIdleWorkers方法是為了將終結(jié)信號(hào)傳遞下去,那究竟是如何傳遞的呢?

  1. interruptIdleWorkers會(huì)中斷一個(gè)Worker。
  2. Worker中斷,則runWorker方法就會(huì)調(diào)用finally塊中的processWorkerExit方法,參數(shù)completedAbruptly為true。
  3. processWorkerExit方法中會(huì)再次調(diào)用tryTerminate方法,從而完成終結(jié)信號(hào)的傳遞。

至此我們就學(xué)習(xí)了線程池的實(shí)現(xiàn)原理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容