Java 線程池之必懂應用-原理篇(上)

前言

線程并發(fā)系列文章:

Java 線程基礎
Java 線程狀態(tài)
Java “優(yōu)雅”地中斷線程-實踐篇
Java “優(yōu)雅”地中斷線程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有誤
Java Unsafe/CAS/LockSupport 應用與原理
Java 并發(fā)"鎖"的本質(一步步實現(xiàn)鎖)
Java Synchronized實現(xiàn)互斥之應用與源碼初探
Java 對象頭分析與使用(Synchronized相關)
Java Synchronized 偏向鎖/輕量級鎖/重量級鎖的演變過程
Java Synchronized 重量級鎖原理深入剖析上(互斥篇)
Java Synchronized 重量級鎖原理深入剖析下(同步篇)
Java并發(fā)之 AQS 深入解析(上)
Java并發(fā)之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 詳解
Java 并發(fā)之 ReentrantLock 深入分析(與Synchronized區(qū)別)
Java 并發(fā)之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(應用篇)
最詳細的圖文解析Java各種鎖(終極篇)
線程池必懂系列

線程池系列文章:

Java 線程池之線程返回值
Java 線程池之必懂應用-原理篇(上)
Java 線程池之必懂應用-原理篇(下)

Java 線程池是面試、工作必須掌握的基礎之一,使用線程池能夠更好地規(guī)劃應用CPU占用率,提高應用運行的流暢度,本篇將來探索線程池的應用與原理。
通過本篇文章,你將了解到:

1、為什么需要線程池
2、自己實現(xiàn)簡單線程池
3、線程池原理
4、總結

1、為什么需要線程池

名詞由來

池,顧名思義:就是裝一堆東西的地方。當有需要的時候,從池子里拿東西,當不用的時候,就往池子里放??梢钥闯龃娣?、取用都很方便。

線程池類比

咱們在打疫苗的時候,若是啥時候想打啥時候過去,到了醫(yī)院,醫(yī)生需要換上防護服,戴上手套,準備疫苗,準備電腦記錄信息等。


image.png

接種者1 到了醫(yī)院,醫(yī)生進行各項準備工作,最后給接種者1打了疫苗后,卸下裝備。此時接種者2也到了醫(yī)院,醫(yī)生又需要換上裝備??梢钥闯鲠t(yī)生的準備工作耗時費力,若是在固定的時間接種疫苗,醫(yī)生就無需頻繁換裝備,既節(jié)約醫(yī)生時間,也縮短了接種者的等待時間。如下圖:


image.png

為什么需要線程池

從上面的例子可知,因為醫(yī)生的準備工作耗時費力,因此盡可能集中打一段時間再換裝備。而對于計算機里線程也是類似的:

1、線程切換需要切換上下文,切換上下文涉及到系統(tǒng)調用,占用系統(tǒng)資源。
2、線程切換需要時間,成功創(chuàng)建線程后才能真正做事。
3、線程開啟后無法有效管理線程。

基于以上原因,需要引入線程池。

2、自己實現(xiàn)簡單線程池

簡單Demo

    private void createThread() {
        Thread thread = new Thread(() -> {
            System.out.println("thread running...");
        });
        thread.start();
    }

該線程執(zhí)行完畢就結束了,現(xiàn)在想讓它不結束:

    private void createThread() {
        Thread thread = new Thread(() -> {
            while (true) {
                System.out.println("thread running...");   
            }
        });
        thread.start();
    }

線程一直在運行著,外部想要提交Runnable 給它運行,那么需要有個共享的變量,選擇隊列作為共享變量:

class ThreadPool {
    public static void main(String args[]) {
        ThreadPool threadPool = new ThreadPool();
        threadPool.createThread();
        threadPool.startRunnable();
    }

    BlockingQueue<Runnable> shareQueue = new LinkedBlockingQueue<>();
    private void createThread() {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    Runnable targetRunnable = shareQueue.take();
                    targetRunnable.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
    }

    public void startRunnable() {
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("執(zhí)行 runnable " + finalI);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            System.out.println("放入隊列 runnable " + i);
            shareQueue.offer(runnable);
        }
    }
}

上述Demo里只開啟了一個線程,該線程是個死循環(huán)。線程體里從共享隊列shareQueue 取出Runnable,若有元素則拿出來執(zhí)行,若沒有則阻塞等待。
而外部調用者可以往共享隊列里存放Runnable,等待線程執(zhí)行該Runnable。
由此實現(xiàn)了只有一個線程在運行,卻是可以執(zhí)行不同的任務,也避免了每個任務都需要開啟線程執(zhí)行的情況。

3、線程池原理

線程池基本構成

上面雖然實現(xiàn)了線程池,但是是乞丐版的,很明顯地看出很多缺陷:

1、線程一直在運行,不能停下來。
2、只有一個線程在執(zhí)行任務,其它任務需要排隊。
3、隊列無限膨脹,消耗內存。
4、其它缺點...

作為通用的工具庫,來看看Java 線程池是如何實現(xiàn)的。
線程池的核心圍繞著一個原子變量:ctl

#ThreadPoolExecutor.java
    //初始化狀態(tài):線程池處在運行狀態(tài),當前線程數(shù)為0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //用來表示線程數(shù)的位數(shù)個數(shù),為29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //線程池線程最大個數(shù),(1<<29) - 1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 線程池的5種狀態(tài)
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //ctl 為int 類型,總共32位,包含了兩個值,高3位表示線程池狀態(tài),低29位表示線程個數(shù)
    //提取線程池狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取線程個數(shù)
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //將狀態(tài)和線程個數(shù)存儲在int 里
    private static int ctlOf(int rs, int wc) { return rs | wc; }
image.png

為什么需要29位表示線程數(shù)呢,因為線程池狀態(tài)有5種,需要3位才能區(qū)分這5種狀態(tài)。

線程池執(zhí)行任務

ThreadPoolExecutor 是線程池最核心的類,實現(xiàn)了Executor 接口,并重寫了execute(Runnable) 方法。
當外部調用者需要線程池執(zhí)行任務時,只需要調用ThreadPoolExecutor.execute(Runnable)方法即可,線程池里的某個線程體里將會執(zhí)行Runnable,也即是執(zhí)行了一次任務。

#ThreadPoolExecutor.java
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取ctl 變量值
        int c = ctl.get();
        //當前正在運行的線程數(shù)沒有超過核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            //新加入的任務將會新建一個核心線程來執(zhí)行它
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果線程池還在運行,那么就往等待隊列里添加任務
        if (isRunning(c) && workQueue.offer(command)) {
            //再次獲取ctl 變量值
            int recheck = ctl.get();
            //再次檢查線程池是否在運行
            if (! isRunning(recheck) && remove(command))
                //若沒有運行了,那么將任務從隊列里移出,移出成功則執(zhí)行拒絕策略(類似定義的回調)
                reject(command);
            //若是當前沒有線程在運行
            else if (workerCountOf(recheck) == 0)
                //新增非核心線程執(zhí)行新的任務,注意此時任務是放在隊列里的,因此
                //這里的第一個參數(shù)為null
                addWorker(null, false);
        }
        //加入隊列失敗后,嘗試直接新建非核心線程執(zhí)行該任務
        else if (!addWorker(command, false))
            //執(zhí)行拒絕任務
            reject(command);
    }

可以看出有多次判斷ctl的值,這是因為可能存在多個線程同時操作ctl的值,而ctl是AtomicInteger類型,底層使用的是CAS,我們知道CAS是無鎖的,因此需要循環(huán)判斷其狀態(tài)。
線程池很多地方都是依賴CAS,可以說理解了CAS就大部分讀懂了線程池。
CAS 可移步查看:Java Unsafe/CAS/LockSupport 應用與原理

上述代碼用流程圖表示如下:


image.png

線程池幾個概念

上述流程涉及到幾個概念:

1、核心線程、核心線程數(shù)
2、非核心線程
3、最大線程數(shù)
3、等待隊列

計算機世界很多時候可以在現(xiàn)實世界找到類比關系,還是以打疫苗為例:

1、某個接種點只有4位醫(yī)生可以打疫苗,這四位醫(yī)生可類比4個線程,因為他們是常駐該接種點,因此核心醫(yī)生-->核心線程。
2、4位醫(yī)生同時只能接種4個人,當同時來的接種人數(shù)超過4人,那么就需要按順序排隊等候,這個等候隊伍稱為等待隊列。
3、某天該接種點突然來了很多待接種者,隊伍排得很長,實在排不下去了,于是需要其他地方的醫(yī)生支援。這些趕過來的醫(yī)生并非常駐此地,他們支援完成后就要回去了,因此可類比為:非核心線程。
4、核心線程 + 非核心線程 == 最大線程數(shù) (類似接種點里最多接種醫(yī)生的人數(shù))。
5、當然,有些接種點醫(yī)生資源比較緊張,隊伍太長了也沒其他地方的醫(yī)生過來支援,一天都打不完,于是剩下的待接種者被告知,今天打不了了你們回去吧,這個過程可類比:拒絕策略。即使有其他醫(yī)生來支援,但是待接種者還是很多,超過了接種點的接種能力,這個時候新來的待接種者也是不能接種的,還是會被拒絕。

image.png

核心線程和非核心線程有啥區(qū)別呢?

1、在沒有達到最大核心線程數(shù)的時候,有新的任務到來都會嘗試新建核心線程來執(zhí)行新任務。
2、核心線程常駐線程池,除非設置了超時銷毀,否則一直等待執(zhí)行新的任務。
3、非核心線程執(zhí)行完畢后,不管是否設置了超時銷毀,只要沒有任務執(zhí)行了,就會退出線程執(zhí)行。

實際上線程池里淡化了線程本身的概念,只關注任務是否得到執(zhí)行,而不關注任務被哪個線程執(zhí)行,因此核心線程、非核心線程最主要的區(qū)別在于是否常駐線程池。
核心線程、等待隊列、非核心線程 三者執(zhí)行任務順序:


image.png

線程池管理線程

上面提到過一個重要的方法:addWorker(xx)。

#ThreadPoolExecutor.java
    private boolean addWorker(Runnable firstTask, boolean core) {
        //跳轉標記,java里一般很少用,c/c++用得比較多
        retry://--------------------(1)
        for (;;) {
            //死循環(huán),主要是用來判斷線程池狀態(tài)以及線程個數(shù)
            int c = ctl.get();
            //取出線程池狀態(tài)
            int rs = runStateOf(c);

            //如果線程池已經(jīng)關閉/關閉中,則無需再往里邊添加了
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //取出線程個數(shù)
                int wc = workerCountOf(c);
                //當前需要開啟核心線程數(shù),而核心線程數(shù)已經(jīng)滿了(其它線程成功添加了任務到核心線程)
                //或者需要開啟非核心線程,但是超出了總的線程數(shù)
                //這兩種情況下,將不能再添加任務到線程池
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS 修改ctl,增加線程個數(shù)計數(shù)
                //如果不成功(被別人改了),則繼續(xù)循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //成功增加計數(shù)
                c = ctl.get();  // Re-read ctl
                //狀態(tài)發(fā)生改變了,則再重試
                if (runStateOf(c) != rs)
                    //跳轉到開頭的標記
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //創(chuàng)建Worker,將任務添加到Worker里,并創(chuàng)建線程(new Thread)
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //線程創(chuàng)建成功
                final ReentrantLock mainLock = this.mainLock;//-----------------(2)
                //上鎖
                mainLock.lock();
                try {
                    //獲取線程池狀態(tài)
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        //添加worker到HashSet里
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //標記添加成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //正式開啟線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //線程池添加任務成功的標記
        return workerStarted;
    }

該方法有兩個參數(shù),第一個參數(shù)為Runnable,第二個為指示要添加的線程是否為核心線程。
上述標記了兩個重點:
(1)
為啥需要死循環(huán)?因為ctl變量可能存在多個線程修改的情況,而ctl 為AtomicInteger,底層為CAS,因此需要多次判斷直至條件滿足為止。

(2)
為啥需要鎖?因為worker聲明如下:

    private final HashSet<Worker> workers = new HashSet<>();

該集合可能被多個線程操作,因此其添加(add)、刪除(remove)需要上鎖后才能安全操作。
Worker 繼承自AQS,利用AQS 實現(xiàn)了自己的一套鎖(類似ReentrantLock)。
其存放的主要信息如下:


image.png

線程對象存儲在workers里,而線程池通過管理workers來控制核心/非核心線程數(shù)。

線程池管理任務

線程已經(jīng)被創(chuàng)建,接下來看看如何來執(zhí)行任務。
Worker 實現(xiàn)了Runnable接口,因此必須要重寫run()方法,當構造Thread 對象時傳入了Worker作為Thread的Runnable,因此當Thread.start()后實際執(zhí)行的Runnable為Worker里的run()方法,而該方法調用了runWorker(xx)。
來看看其源碼:

#ThreadPoolExecutor.java
    final void runWorker(Worker w) {
        //當前線程
        Thread wt = Thread.currentThread();
        //取出任務
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //兩個條件------------>(1)
            while (task != null || (task = getTask()) != null) {
                //worker 上鎖
                w.lock();------------>(2)
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    //如果線程池已經(jīng)被停止運行,并且該線程沒有被中斷過
                    //那么中斷該線程
                    wt.interrupt();
                try {
                    //執(zhí)行任務前的回調(鉤子)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //真正執(zhí)行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //執(zhí)行任務后的回調
                        afterExecute(task, thrown);
                    }
                } finally {
                    //置空
                    task = null;
                    //該線程完成任務計數(shù)
                    w.completedTasks++;
                    //釋放鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //線程退出執(zhí)行
            processWorkerExit(w, completedAbruptly);
        }
    }

上面標記出了兩個重點:
(1)
在最開始的自己實現(xiàn)線程池Demo里,我們有演示過如何讓線程一直在運行,而此處有兩個判斷條件:

1、當前worker關聯(lián)的第一個任務是否存在,若是則取出運行。這種判斷對應于新增核心線程/非核心線程(非存放在隊列里)的場景。也就是說,當線程開啟后,若是關聯(lián)了任務則拿出執(zhí)行。
2、當線程沒有關聯(lián)到第一個任務,這種判斷對應于線程不是第一次執(zhí)行或是線程第一次執(zhí)行(有任務在隊列里),此時從隊列里取出任務執(zhí)行。

getTask()方法如下:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        //死循環(huán)為了判斷線程池狀態(tài)
        for (;;) {
            int c = ctl.get();
            //取出狀態(tài)
            int rs = runStateOf(c);
            //線程池關閉或是沒有任務在等待
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //減少線程計數(shù)
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            //判斷是否需要超時等待
            //1、allowCoreThreadTimeOut 外部設置了是否允許超時關閉核心線程
            //2、當前線程數(shù)是否大于核心線程數(shù)(也就是開啟了非核心線程數(shù))
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //總共有四種組合判斷
            //timeout 指的是獲取隊列元素是否已經(jīng)發(fā)生超時
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                //若是超時發(fā)生,或者隊列為空,那么嘗試減少線程計數(shù)
                //修改計數(shù)成功,則返回null
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //決定是否超時獲取元素
                //若是設定了超時獲取元素,那么當超時時間耗盡后,poll方法將會返回,底層使用LockSupport
                //若沒有設定超時,那么一直阻塞直到隊列有元素為止
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                //走到這超時了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

可以看出:

1、借助于阻塞隊列的特性,在沒有設置超時的情況下,核心線程將一直會阻塞在BlockingQueue.take()方法,進而線程一直被阻塞于此,直到有新的任務被放入線程池請求執(zhí)行。這也是為什么核心線程能夠常駐線程池的原因。
2、當線程數(shù)超過了核心線程數(shù),那么timed=true,也就是此時線程是獲取隊列元素時設置了超時的,若超時過了沒有獲取到元素,那么就會退出線程執(zhí)行。這也是為什么非核心線程不能常駐線程池的原因。

(2)
此處為什么需要鎖?
前面提到過操作wokers集合需要鎖:mainLock,它的目的是為了多線程對集合進行添加/刪除操作的安全性進行考慮。每個woker同時只能由一個線程操作,似乎沒有加鎖的必要,其實Worker鎖更重要的作用體現(xiàn)在:

用來巧妙地判斷線程是否在運行中(忙碌中)。
當有任務執(zhí)行的時候,Runnable.run()方法被上鎖,執(zhí)行結束后釋放鎖。因此當判斷Woker沒有獲取鎖時,表明它正在等待獲取隊列的元素,此時它是空閑的。

判斷了線程的空閑與否可以用來給外部中斷線程池的執(zhí)行提供依據(jù)。

4、總結

至此,我們了解線程池的核心優(yōu)勢:不頻繁重復創(chuàng)建線程。依靠阻塞隊列特性使得線程常駐,用圖表示如下:


image.png

由于篇幅原因,剩下的線程池關閉、線程池一些重要的運行時狀態(tài)、簡單的創(chuàng)建線程池的幾種方式等將在下篇分析,敬請關注!

演示代碼 若是有幫助,給github 點個贊唄~

您若喜歡,請點贊、關注,您的鼓勵是我前進的動力

持續(xù)更新中,和我一起步步為營系統(tǒng)、深入學習Android/Java

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容