從github的一行代碼改動(dòng)來(lái)分析線(xiàn)程池原理

引言

最近看了一個(gè)開(kāi)源庫(kù)的改動(dòng),其中里面的一個(gè)代碼改動(dòng)引起了我的好奇


1.jpg

,可以看到作者將阻塞隊(duì)列從LinkedBlockingQueue換成了SynchronousQueue。那么問(wèn)題來(lái)了,作者為什么要進(jìn)行這種更改。對(duì)于線(xiàn)程池的使用,相信大家即使沒(méi)看過(guò)源碼,面試中也會(huì)不可避免的總會(huì)被問(wèn)到一些問(wèn)題,背都背會(huì)了。但是欠下的技術(shù)債遲早哪天是要還的,不明白原理,用起來(lái)心里沒(méi)底。

再比如這個(gè)問(wèn)題

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            poolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    Log.e("mandy", "run" + finalI);
                }
            });
        }

打印的結(jié)果是什么,如果你以為是隨機(jī)打印,說(shuō)明你對(duì)線(xiàn)程池的理解還沒(méi)到位,運(yùn)行下代碼就知道結(jié)果。

現(xiàn)在網(wǎng)上都這么多優(yōu)秀的線(xiàn)程池源碼分析文章了,為什么還要去寫(xiě),其實(shí)之所以寫(xiě)這篇關(guān)于線(xiàn)程池的源碼分析,主要有以下兩個(gè)原因:
(1) 證明我看過(guò)線(xiàn)程池的源碼,沒(méi)錯(cuò)就是裝逼用,不是那種背兩個(gè)面試題就說(shuō)自己會(huì)線(xiàn)程池的老鐵
(2) 看別人的文章總是會(huì)有一些理解不了地方,還需要自己去翻源碼進(jìn)一步消化,不如寫(xiě)一篇給自己看的文章,好記性不如爛筆頭。其實(shí)這不是我第一次看線(xiàn)程池的源碼,一段時(shí)間后有些關(guān)鍵實(shí)現(xiàn)總會(huì)想不起來(lái),把自己疑惑的點(diǎn)寫(xiě)出來(lái),將來(lái)直接看自己的文章做到查漏補(bǔ)缺的作用,這才是寫(xiě)這篇文章的主要目的。

線(xiàn)程池類(lèi)之間的關(guān)系

下面進(jìn)入正題,來(lái)看下和線(xiàn)程池有關(guān)的類(lèi)

Executor:最底層的接口,內(nèi)部只包含一個(gè)execute方法
public interface Executor {
    void execute(Runnable command);
}
ExecutorService:接口,繼承自Executor接口,內(nèi)部包含了shutDown,submit等線(xiàn)程池使用的方法,看下它的繼承關(guān)系
public interface ExecutorService extends Executor
AbstractExecutorService:實(shí)現(xiàn)了ExecutorService大部分方法,但沒(méi)有實(shí)現(xiàn)execute方法,該類(lèi)中提供了一系列的submit方法,線(xiàn)程池要提交任務(wù)最終都會(huì)調(diào)用這個(gè)方法,來(lái)隨便看一個(gè)submit方法
public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

通過(guò)newTaskFor生成RunnableFuture,然后調(diào)用execute執(zhí)行任務(wù)。newTaskFor最終會(huì)生成一個(gè)FutureTask類(lèi),關(guān)于FutureTask不就詳細(xì)說(shuō)了,之前寫(xiě)過(guò)一篇文章分析過(guò)Asynctask源碼分析,不止于此,感興趣的可以看看,你可以將FutrueTask當(dāng)成一個(gè)類(lèi)似Runnable的類(lèi)可以執(zhí)行任務(wù)。

所以最核心的還是execute方法的實(shí)現(xiàn),最終的實(shí)現(xiàn)類(lèi)就是ThreadPoolExecutor了。而另一個(gè)和ThreadPoolExecutor相關(guān)的類(lèi)就是Executors,內(nèi)部提供了各種現(xiàn)成的線(xiàn)程池使用,本質(zhì)上就是一個(gè)工具類(lèi)。到此就將Executor,Executors,ExecutorService,AbstractExecutorService,ThreadPoolExecutor之間的關(guān)系表述清楚了。核心還是execute方法。

ThreadPoolExecutor成員字段

要想看明白execute方法的源碼,就得先來(lái)理解下ThreadPoolExecutor中的一些成員變量以及成員字段的含義,等理解之后再看execute方法。看下關(guān)鍵的成員字段

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

線(xiàn)程池有5種狀態(tài),分別為RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,所以需要3個(gè)比特位才能表示。一個(gè)Integer32個(gè)比特長(zhǎng)度,線(xiàn)程池中通過(guò)一個(gè)Integer長(zhǎng)度來(lái)表示工作線(xiàn)程數(shù)和線(xiàn)程池的狀態(tài),高3位表示狀態(tài)后29位表示工作線(xiàn)程數(shù)量,這種方式類(lèi)似于android中的MeasureSpec使用方式。

runStateOf和workerCountOf就是通過(guò)位運(yùn)算得到線(xiàn)程池狀態(tài)和工作線(xiàn)程數(shù)量。ctlOf就是將狀態(tài)和數(shù)量整合成一個(gè)Integer??梢钥吹匠跏蓟臅r(shí)候,ctl為AtomicInteger(ctlOf(RUNNING, 0));,表示線(xiàn)程池處于運(yùn)行狀態(tài),此時(shí)工作線(xiàn)程為0個(gè)。

Worker

上面介紹了線(xiàn)程池中有關(guān)的成員,接下來(lái)介紹下線(xiàn)程池中一個(gè)很重要的成員變量Worker,來(lái)看下源碼

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        ......
}

thread成員變量:每一個(gè)Worker都可以看成一個(gè)thread,當(dāng)Worker被創(chuàng)建時(shí)就會(huì)生成一個(gè)對(duì)應(yīng)的thread保存到這個(gè)變量中。

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

從Worker的繼承關(guān)系可以看出它是一個(gè)AQS的子類(lèi),關(guān)于AQS可以說(shuō)是實(shí)現(xiàn)并發(fā)的核心基礎(chǔ),這里不展開(kāi)說(shuō),網(wǎng)上有很多優(yōu)秀的分析文章,這里只需要知道AQS可以實(shí)現(xiàn)同步操作即可。Worker構(gòu)造函數(shù)通過(guò)調(diào)用setState(-1)將標(biāo)志位state設(shè)置為-1,在runWorker方法中又通過(guò)unlock操作重新將state設(shè)置為0。這么做的目的和shutDownNow有一定的關(guān)系。

來(lái)看下shutDownNow源碼

 public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

里面調(diào)用了 interruptWorkers();關(guān)閉所有的Worker,而interruptWorkers源碼就是通過(guò)worker的state來(lái)決定是否調(diào)用中斷操作。在worker還沒(méi)真正運(yùn)行起來(lái)之前state為-1,那么interruptWorkers也就不需要中斷worker,而如果運(yùn)行起來(lái)后state設(shè)置為0滿(mǎn)足關(guān)閉條件。

Worker中的另一個(gè)成員變量firstTask,從命名可以看出表示是否屬于Worker的第一個(gè)task。Worker運(yùn)行起來(lái)后獲取的task有兩個(gè)途徑,其一就是在Worker創(chuàng)建的時(shí)候通過(guò)構(gòu)造函數(shù)自帶過(guò)來(lái),另一個(gè)途徑就是從阻塞隊(duì)列中去取。如果從第二種途徑獲取到那么firstTask即為null。

execute源碼

搞明白線(xiàn)程池的成員字段和Worker的作用之后,再來(lái)看execute源碼就比較清楚了。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看出道格.利大佬在寫(xiě)代碼的時(shí)候真的就是言簡(jiǎn)意賅,每個(gè)函數(shù)都顯得十分的短小精悍,這點(diǎn)在看ReentrantLock和AQS源碼的時(shí)候也體現(xiàn)的非常明顯?;氐缴鲜鲈创a中,首先通過(guò)ctl獲取到當(dāng)前線(xiàn)程池的狀態(tài),然后通過(guò)workerCountOf判斷worker的工作數(shù)量,如果小于corePoolSize則說(shuō)明worker數(shù)量少于指定核心線(xiàn)程數(shù),通過(guò)addWorker再去啟動(dòng)線(xiàn)程。

關(guān)于addWorker的大致邏輯就是生成一個(gè)Woker對(duì)象,然后將worker內(nèi)部的thread啟動(dòng)起來(lái)去處理task,詳細(xì)源碼等下分析,這里知道個(gè)大概即可。如果worker數(shù)量大于corePoolSize那么直接進(jìn)入

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

之前說(shuō)過(guò)線(xiàn)程池有5種狀態(tài),只有在running狀態(tài)并且阻塞隊(duì)列可以存儲(chǔ)元素的時(shí)候才會(huì)執(zhí)行內(nèi)部邏輯,在內(nèi)部邏輯當(dāng)中又會(huì)進(jìn)行一次isRunning判斷,這種雙重判斷和單例的doublecheck是不是有些類(lèi)似,如果發(fā)現(xiàn)線(xiàn)程池不是running狀態(tài)則直接調(diào)用拒絕策略對(duì)該command進(jìn)行處理。否則進(jìn)行workerCountOf(recheck) == 0的判斷。這里可以看出只有在worker一個(gè)都沒(méi)啟動(dòng)的時(shí)候才會(huì)執(zhí)行addWorker操作,否則就是塞到阻塞隊(duì)列就完事了。

這段代碼比較關(guān)鍵,可以看出只要worker啟動(dòng)了一個(gè)那么addWorker就不會(huì)被執(zhí)行到。換句話(huà)說(shuō)核心線(xiàn)程都已經(jīng)啟動(dòng)的情況下,只要阻塞隊(duì)列還能容納command,那么永遠(yuǎn)不會(huì)addWorker去啟動(dòng)一個(gè)臨時(shí)線(xiàn)程,或者說(shuō)線(xiàn)程池不允許有核心線(xiàn)程,那么只會(huì)啟動(dòng)一個(gè)臨時(shí)線(xiàn)程。直到阻塞隊(duì)列offer失敗才會(huì)進(jìn)入到最后的那段邏輯,而阻塞隊(duì)列是否能offer成功就和具體的阻塞隊(duì)列實(shí)現(xiàn)有關(guān)系了。

 else if (!addWorker(command, false))
            reject(command);

分析完execute之后就會(huì)發(fā)現(xiàn)addWorker才是問(wèn)題關(guān)鍵,execute的三個(gè)分支都調(diào)用到了該方法,如果留心的話(huà)會(huì)發(fā)現(xiàn)一些地方調(diào)用的是addWorker(null)而另一些地方調(diào)用的是addWorker(command)那么接下來(lái)看下addWorker是如何啟動(dòng)一個(gè)線(xiàn)程,并源源不斷的去執(zhí)行task。

addWorker

先說(shuō)下addWorker(null)和addWorker(command)的區(qū)別,如果阻塞隊(duì)列還能容納下的話(huà),那么worker從隊(duì)列取task執(zhí)行即可,此時(shí)調(diào)用addWorker(null),如果隊(duì)列滿(mǎn)了或者說(shuō)直接啟動(dòng)核心線(xiàn)程那么首個(gè)task就不會(huì)從隊(duì)列去取,此時(shí)就需要調(diào)用addWorker(command)傳入需要被執(zhí)行的task。這也是Worker中firstTask的含義。搞明白這些區(qū)別后再看addWorker的源碼

retry:
for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

源碼中會(huì)首先對(duì)線(xiàn)程池的狀態(tài)進(jìn)行一些判斷,如果線(xiàn)程池調(diào)用了shutDown關(guān)閉了,那么直接返回,shutdown之后submit的任何task都不會(huì)被執(zhí)行。如果沒(méi)有shutdown那么繼續(xù)下面的邏輯判斷確保wc是正確的范圍之內(nèi),然后通過(guò)CAS操作將worker的數(shù)量加1,表示將有一個(gè)新的線(xiàn)程馬上要被啟動(dòng)了。最后判斷下線(xiàn)程池的狀態(tài)是否正確。一切正常之后接下來(lái)進(jìn)行啟動(dòng)線(xiàn)程的操作。

 boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

就不一行行分析了,都能看懂,核心就是t.start的調(diào)用,啟動(dòng)了一個(gè)新的線(xiàn)程。新啟動(dòng)的線(xiàn)程會(huì)執(zhí)行runnable中的run方法,而worker就是一個(gè)runnable實(shí)現(xiàn),所以最終調(diào)用到了worker的runworker方法。

runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

首先將firstTask復(fù)制給task,關(guān)于firstTask前面都已經(jīng)分析了,不再累述,unlock方法將原本的-1重置回了0,已經(jīng)在文章分析過(guò)了。核心就是啟動(dòng)的這個(gè)線(xiàn)程通過(guò)一個(gè)while循環(huán)去不斷的執(zhí)行task,task來(lái)源有兩個(gè)地方,firstTask或者getTask方法。getTask等下分析,如果task為null則直接調(diào)用processWorkerExit結(jié)束該線(xiàn)程,內(nèi)部將worker數(shù)量減1。

重點(diǎn)看下task不為null的情況,如果線(xiàn)程池調(diào)用了shutDownNow方法,那么會(huì)將該線(xiàn)程設(shè)置為中斷狀態(tài),所以接下來(lái)的task.run被執(zhí)行時(shí)養(yǎng)成良好的習(xí)慣,判斷下線(xiàn)程是否被中斷才是正確的處理方式。每次while循環(huán)執(zhí)行一遍后都會(huì)重新通過(guò)getTask去獲取新的task,這也是線(xiàn)程池為什么能高效利用線(xiàn)程的關(guān)鍵。

getTask

getTask實(shí)際上就是一個(gè)可以阻塞的方法,有task直接返回,沒(méi)有的話(huà)就會(huì)被阻塞住。阻塞的方法有兩種分別為阻塞隊(duì)列的take和poll,如果是核心線(xiàn)程阻塞那么調(diào)用take,該方法會(huì)一直阻塞,這也是核心線(xiàn)程為什么不會(huì)消亡的原因,而如果是臨時(shí)線(xiàn)程則調(diào)用poll方法阻塞,該方法傳入阻塞時(shí)間的參數(shù),一定時(shí)間后沒(méi)有獲取到task就返回null,這也是臨時(shí)線(xiàn)程為什么有存活時(shí)間的原理。j然后你再去看getTask的源碼是不是就豁然開(kāi)朗了。

到此線(xiàn)程池的主要源碼就分析完畢了,現(xiàn)在再去看創(chuàng)建線(xiàn)程池的那幾個(gè)參數(shù)是不是印象更加深刻了,最后再回到文章開(kāi)頭的兩段代碼中,去好好理解下CC作者為什么會(huì)將LinkedBlockingQueue改成SynchronousQueue,以及下面那段代碼的打印順序?yàn)槭裁词琼樞虻摹H绻€不能解釋的話(huà),只能說(shuō)明你在看源碼的時(shí)候沒(méi)有好好思考了。

shutDown和shutDownNow

最后說(shuō)下shutDown和shutDonwNow的區(qū)別,前者是調(diào)用之后不再接受新的task,而原有線(xiàn)程池中的task還會(huì)繼續(xù)被執(zhí)行。而如果調(diào)用的是shutDownNow方法,不僅不會(huì)接受新的task,包括線(xiàn)程池原有的task執(zhí)行都會(huì)被中斷掉。這些不難理解,結(jié)合上述分析再看一次源碼就明白了。

最后再說(shuō)一點(diǎn),在我們通過(guò)線(xiàn)程池submit任務(wù)的時(shí)候,去判斷下線(xiàn)程的isInterrupted,盡可能的避免被中斷的線(xiàn)程執(zhí)行多余的邏輯不失為一個(gè)好的習(xí)慣。AQS對(duì)于這塊的處理就相當(dāng)?shù)挠眯摹,F(xiàn)在自己也寫(xiě)了一篇關(guān)于線(xiàn)程池的分析文章,以后想回顧下線(xiàn)程池的原理,再也不用到處去找別人的文章慢慢啃了。

堅(jiān)持寫(xiě)作不易,如果對(duì)你有幫助點(diǎn)個(gè)贊就是最大的支持

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容