線程池你真的懂了嗎

閱讀任何源碼,我們都應(yīng)該帶著幾個(gè)問(wèn)題去閱讀,從源碼中找出這些問(wèn)題的答案,這樣才能徹底搞明白某個(gè)知識(shí)點(diǎn)。

下面我們就帶著這樣幾個(gè)問(wèn)題,一起看一下ThreadPoolExecutor的源碼

  1. 為什么要用線程池
  2. 為什么不推薦使用juc直接創(chuàng)建的線程池
  3. 線程池的幾個(gè)核心參數(shù)
  4. 線程池是什么時(shí)候創(chuàng)建線程的?
  5. 線程池是如何重復(fù)利用線程的?
  6. 任務(wù)提交的順序和執(zhí)行的順序是一樣的嗎?

1、為什么要使用線程池

這個(gè)其實(shí)可以寫一個(gè)簡(jiǎn)單的程序去跑一下,比如使用線程池去跑1000個(gè)task和開(kāi)1000個(gè)線程去跑這1000個(gè)task,線程池的效率會(huì)高出很多倍,原因是線程池能夠重復(fù)利用線程,沒(méi)有創(chuàng)建和銷毀線程的開(kāi)銷。
其實(shí)池化的技術(shù)在很多地方都會(huì)用到比如數(shù)據(jù)庫(kù)的連接池,字符串常量池,netty的對(duì)象池等等

2、為什么不推薦使用juc直接創(chuàng)建線程池的方式

我們找兩個(gè)Exectors創(chuàng)建線程池的源碼

a、newCachedTreadPool的源碼

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到這里的最大線程數(shù)是0xffff個(gè),這個(gè)最大線程數(shù)在大并發(fā)提交任務(wù)的情況下會(huì)創(chuàng)建大量線程,會(huì)導(dǎo)致CPU100%

b、newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

//看一下LinkedBlockingQueue的實(shí)現(xiàn)
 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

會(huì)初始化一個(gè)容量為0xffff的隊(duì)列,由于這個(gè)隊(duì)列太大,如果我們提交的任務(wù)數(shù)很多并且自定義的線程里的對(duì)象又很大的話,就很容易發(fā)生oom的問(wèn)題。

從這個(gè)工具類創(chuàng)建線程的參數(shù)我們可以看到底層調(diào)用的都是ThreadPoolExecutor的構(gòu)造方法,所以我們建議根據(jù)具體業(yè)務(wù)的規(guī)模設(shè)置合適的線程池參數(shù)。

new ThreadPoolExecutor()

3、線程池的幾個(gè)核心參數(shù)

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) 

corePoolSize:最大線程數(shù)
maximumPoolSize:最大線程數(shù)
keepAiveTime:線程存活時(shí)間
TimeUnit:存活時(shí)間的參數(shù)
BlockingQueue:線程池的任務(wù)隊(duì)列

task投遞到線程池中的整個(gè)過(guò)程如下


image.png

看一下具體的代碼

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

        //這里判斷線程數(shù)量是否小于corePoolSize,如過(guò)小于corePoolSize則直接創(chuàng)建worker線程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //到這個(gè)if說(shuō)明worker線程數(shù)量大于了corePoolSize了,這里直接添加到任務(wù)隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //到這個(gè)if說(shuō)明任務(wù)加入隊(duì)列失敗,隊(duì)列滿了,則再創(chuàng)建線程worker線程,如果創(chuàng)建失敗則執(zhí)行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

看到這里從宏觀上我們就能看到整個(gè)task投遞到線程池的一個(gè)過(guò)程,其實(shí)這里最主要的方法是addWorker,其實(shí)addworker里才是線程池的精華,里面有如何創(chuàng)建線程及start的邏輯,如何回收過(guò)期的線程,如何重復(fù)利用創(chuàng)建的線程去運(yùn)行task的邏輯

4、線程池是什么時(shí)候創(chuàng)建線程的

線程池的線程不是線程池創(chuàng)建的時(shí)候創(chuàng)建的,線程池的線程是在調(diào)用addWorker方法,并且addWorker執(zhí)行成功才會(huì)創(chuàng)建
下面我們一起分析一下addworker代碼,這個(gè)方法很長(zhǎng),其實(shí)要看明白這個(gè)方法我們要先看一下Worker這個(gè)類,可以看到這個(gè)類實(shí)際上實(shí)現(xiàn)了Runnable接口,其實(shí)線程池中運(yùn)行的runnable任務(wù)都會(huì)被包裝成Worker對(duì)象

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        private static final long serialVersionUID = 6138294804551838833L;
        //當(dāng)前worker會(huì)綁定一個(gè)線程
        final Thread thread;
       
        //當(dāng)前worker處理的第一個(gè)任務(wù)
        Runnable firstTask;
        volatile long completedTasks;
        //創(chuàng)建worker時(shí)就會(huì)生成 一個(gè)線程及賦值firstTask,
        //注意線程池的線程就是在這里被創(chuàng)建的
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //注意看這里,這個(gè)線程創(chuàng)建的時(shí)候傳的是this,也就意味著一會(huì)this.thread.start時(shí),
            //執(zhí)行的是worker對(duì)象的run方法
            //這個(gè)大家想一下,回味一下
            this.thread = getThreadFactory().newThread(this);
        }
}

下面在具體分析一下addWorker是如何start線程及重復(fù)利用線程運(yùn)行任務(wù)的

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //獲取worker數(shù)量
            int c = ctl.get();
            //獲取線程池狀態(tài)
            int rs = runStateOf(c);

            // 如果線程池狀態(tài)為SHUTDOWN就不接收任務(wù)了直接returnfalse
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //這里會(huì)判斷一下worker數(shù)量
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        //如果代碼運(yùn)行到了這里,就表示線程池狀態(tài)正常,任務(wù)達(dá)到了創(chuàng)建worker線程的條件
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //這里會(huì)創(chuàng)建一個(gè)worker對(duì)象,
            //結(jié)合上面的代碼,worker對(duì)象中會(huì)包含一個(gè)線程和一個(gè)firstTask
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //下面這一部分是用來(lái)判斷需不需要將worker線程進(jìn)行緩存,給其他的任務(wù)使用
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //注意看這個(gè)workers對(duì)象,這是一個(gè)hashset,用來(lái)緩存創(chuàng)建好的worker對(duì)象
                        //注意在強(qiáng)調(diào)一下這個(gè)worker對(duì)象包含一個(gè)Thread引用及Runnable引用
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }

                //緩存好worker線程后,這里會(huì)執(zhí)行線程的start邏輯
                //注意線程池的任務(wù)就是在這里開(kāi)啟start使任務(wù)真正進(jìn)入runnable狀態(tài)的
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面我們看一下start的具體邏輯:
剛剛我們已經(jīng)看到了,thread是worker對(duì)象的一個(gè)屬性,實(shí)際上創(chuàng)建線程時(shí),傳的參數(shù)就是worker對(duì)象本身,所以線程start執(zhí)行的邏輯就是worker對(duì)象的run方法

//worker的run方法很簡(jiǎn)單,下面我們看一下runWorker方法
public void run() {
            runWorker(this);
        }

看完這個(gè)我們就基本上看完了線程池的核心邏輯了,但是還有一些細(xì)節(jié)沒(méi)有仔細(xì)看,后面我會(huì)提到,留給大家思考

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //獲取一下fisrtTask
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //這個(gè)task什么時(shí)候不為空?
           //注意這里會(huì)有一個(gè)隱含的問(wèn)題,我們提交的任務(wù)存放的順序是 核心worker->隊(duì)列->最大線程worker
          //這里的getTask是從隊(duì)列里獲取的任務(wù)
          //這里task的執(zhí)行順序就變成了,核心worker->最大線程worker->隊(duì)列
          //不知道大家有沒(méi)有g(shù)et到我的點(diǎn),可以做一個(gè)實(shí)驗(yàn)就是為task編一個(gè)號(hào),比如安1-10的順序提交任務(wù),最終執(zhí)行的結(jié)果卻是 1,2,3,4,8,9,10,5,6,7這種順序,這里就是出現(xiàn)這種提交順序和執(zhí)行順序不一樣的原理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //這里會(huì)最終調(diào)用task的run方法,到此線程池的創(chuàng)建到運(yùn)行任務(wù)就看結(jié)束了
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {

            //這里會(huì)銷毀線程,在隊(duì)列為空的時(shí)候,會(huì)銷毀線程,讓線程數(shù)量停留在corePoolsize范圍內(nèi),什么時(shí)候會(huì)執(zhí)行到這,當(dāng)task為空的時(shí)候,什么時(shí)候task為空,看getTask的邏輯,getTask會(huì)判斷隊(duì)列是否為空及活躍線程的時(shí)間來(lái)返回task的值,這里就不細(xì)講了
            processWorkerExit(w, completedAbruptly);
        }
    }

5、線程池是如何重復(fù)利用線程的

看完上面的分析,其實(shí)就能回答這個(gè)問(wèn)題了,當(dāng)任務(wù)提交時(shí)會(huì)創(chuàng)建worker對(duì)象,這個(gè)對(duì)象里會(huì)有綁定一個(gè)線程,同時(shí)會(huì)將worker對(duì)象放入workers Set中,創(chuàng)建成功后,會(huì)立馬調(diào)用worker.thread.start方法啟動(dòng)線程,這個(gè)線程的run方法進(jìn)行了包裝,首先判斷fisrtTask是否為空如果不為空則直接運(yùn)行,否則會(huì)while循環(huán)拿緩存隊(duì)列中的任務(wù),知道緩存隊(duì)列為空,或者空閑線程超過(guò)了keepalive時(shí)間就會(huì)銷毀線程,以保證線程維持在corePoolSize的大小

6、任務(wù)提交的順序和執(zhí)行的順序是一樣的嗎?

不一樣,提交順序是 核心worker線程->隊(duì)列->非核心worker線程,
但是執(zhí)行的順序卻是核心worker任務(wù)->非核心worker任務(wù)->隊(duì)列任務(wù)
如果能理解這個(gè),線程池就真的理解的差不多了

7、其他

上面幾個(gè)問(wèn)題有些是我在看源碼時(shí)的困惑點(diǎn),有些是我看完源碼之后的一些想法,除了這些問(wèn)題外,線程池還有很多精妙的地方比如,
a、線程池的狀態(tài)和核心線程數(shù)其實(shí)是用一個(gè)4個(gè)字節(jié)int表示的,為什么要這么表示
b、線程池中使用到的設(shè)計(jì)模式有哪些
c、線程池本身就是并發(fā)場(chǎng)景下提交任務(wù)的,那它自己的安全性是如何保證的,execute方法是如何保證安全性的

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容