線程池從入門到精通

什么是線程池

在 Java 中,如果每個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程, 創(chuàng)建和銷毀線程花費(fèi)的時(shí)間和消耗的系統(tǒng)資源都相當(dāng)大,甚至可能要比在處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源要多的多。如果在一個(gè) Jvm 里創(chuàng)建太多的線程,可能會(huì)使系統(tǒng)由于過度消耗內(nèi)存或“切換過度”而導(dǎo)致系統(tǒng)資源不足。
為了解決這個(gè)問題,就有了線程池的概念,線程池的核心邏輯是提前創(chuàng)建好若干個(gè)線程放在一個(gè)容器中。如果有任務(wù)需要處理,則將任務(wù)直接分配給線程池中的線程來執(zhí)行就行,任務(wù)處理完以后這個(gè)線程不會(huì)被銷毀,而是等待后續(xù)分配任務(wù)。同時(shí)通過線程池來重復(fù)管理線程還可以避免創(chuàng)建大量線程增加開銷。

線程池的優(yōu)勢(shì)

合理的使用線程池,可以帶來一些好處

  1. 降低創(chuàng)建線程和銷毀線程的性能開銷
  2. 提高響應(yīng)速度,當(dāng)有新任務(wù)需要執(zhí)行是不需要等待線程創(chuàng)建就可以立馬執(zhí)行
  3. 合理的設(shè)置線程池大小可以避免因?yàn)榫€程數(shù)超過硬件資源瓶頸帶來的問題

Java 中提供的線程池 API

線程池的使用

要了解一個(gè)技術(shù),我們?nèi)匀皇菑氖褂瞄_始。 JDK 為我們提供了幾種不同的線程池實(shí)現(xiàn)。我們先來通過一個(gè)簡(jiǎn)單的案例來引入線程池的基本使用在 Java 中怎么創(chuàng)建線程池呢?下面這段代碼演示了創(chuàng)建三個(gè)固定線程數(shù)的線程池

public class ThreadPoolTest implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }

    static ExecutorService service = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            service.execute(new ThreadPoolTest());
        }
        service.shutdown();
    }
}

Java中提供的線程池Api

Executors里面提供了幾個(gè)線程池的工廠方法,這樣,很多新手就不需要了解太多關(guān)于ThreadPoolExecutor的知識(shí)了,他們只需要直接使用Executors 的工廠方法,就可以使用線程池:
newFixedThreadPool: 該方法返回一個(gè)固定數(shù)量的線程池,線程數(shù)不變,當(dāng)有一個(gè)任務(wù)提交時(shí),若線程池中空閑,則立即執(zhí)行,若沒有,則會(huì)被暫緩在一個(gè)任務(wù)隊(duì)列中,等待有空閑的線程去執(zhí)行。
newSingleThreadExecutor: 創(chuàng)建一個(gè)線程的線程池,若空閑則執(zhí)行,若沒有空閑線程則暫緩在任務(wù)隊(duì)列中。
newCachedThreadPool: 返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程個(gè)數(shù)的線程池,不限制最大線程數(shù)量,若用空閑的線程則執(zhí)行任務(wù),若無任務(wù)則不創(chuàng)建線程。并且每一個(gè)空閑線程會(huì)在 60 秒后自動(dòng)回收。
newScheduledThreadPool: 創(chuàng)建一個(gè)可以指定線程的數(shù)量的線程池,但是這個(gè)線程池還帶有 延遲和周期性執(zhí)行任務(wù)的功能,類似定時(shí)器。

ThreadpoolExecutor

上面提到的四種線程池的構(gòu)建,都是基于 ThreadpoolExecutor 來構(gòu)建的,ThreadPoolThread 有哪些構(gòu)造參數(shù)

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }

ThreadpoolExecutor有多個(gè)重載的構(gòu)造方法,我們可以基于它最完整的構(gòu)造方法來分析先來解釋一下每個(gè)參數(shù)的作用,稍后我們?cè)诜治鲈创a的過程中再來詳細(xì)了解參數(shù)的意義。

    public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)量
                              int maximumPoolSize, //最大線程數(shù)
                              long keepAliveTime, //超時(shí)時(shí)間,超出核心線程數(shù)量以外的線程空余存活時(shí)間
                              TimeUnit unit, //存活時(shí)間單位
                              BlockingQueue<Runnable> workQueue, //保存執(zhí)行任務(wù)的隊(duì)列
                              ThreadFactory threadFactory,//創(chuàng)建新線程使用的工廠
                              RejectedExecutionHandler handler //當(dāng)任務(wù)無法執(zhí)行的時(shí)候的處理方式
    )

線程池初始化以后做了什么事情
線程池初始化時(shí)是沒有創(chuàng)建線程的, 線程池里的線程的初始化與其他線程一樣,但是在完成任務(wù)以后,該線程不會(huì)自行銷毀,而是以掛起的狀態(tài)返回到線程池。直到應(yīng)用程序再次向線程池發(fā)出請(qǐng)求時(shí),線程池里掛起的線程就會(huì)再度激活執(zhí)行任務(wù)。這樣既節(jié)省了建立線程所造
成的性能損耗,也可以讓多個(gè)任務(wù)反復(fù)重用同一線程,從而在應(yīng)用程序生存期內(nèi)節(jié)約大量開銷。

newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool 的核心線程數(shù)和最大線程數(shù)都是指定值,也就是說當(dāng)線程池中的線程數(shù)超過核心線程數(shù)后,任務(wù)都會(huì)被放到阻塞隊(duì)列中。 另外keepAliveTime為 0,也就是超出核心線程數(shù)量以外的線程空余存活時(shí)間,而這里選用的阻塞隊(duì)列是 LinkedBlockingQueue,使用的是默認(rèn)容量 Integer.MAX_VALUE,相當(dāng)于沒有上限,這樣會(huì)有問題后面在具體分析。
這個(gè)線程池執(zhí)行任務(wù)的流程如下:

  1. 線程數(shù)少于核心線程數(shù),也就是設(shè)置的線程數(shù)時(shí),新建線程執(zhí)行任務(wù)。
  2. 線程數(shù)等于核心線程數(shù)后,將任務(wù)加入阻塞隊(duì)列。
  3. 由于隊(duì)列容量非常大, 可以一直添加。
  4. 執(zhí)行完任務(wù)的線程反復(fù)去隊(duì)列中取任務(wù)執(zhí)行。
    用途: FixedThreadPool 用于負(fù)載比較大的服務(wù)器,為了資源的合理利用,需要限制當(dāng)前線程數(shù)量。

newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }

CachedThreadPool創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程; 并且沒有核心線程,非核心線程數(shù)無上限,但是每個(gè)空閑的時(shí)間只有 60 秒,超過后就會(huì)被回收。
它的執(zhí)行流程如下:

  1. 沒有核心線程,直接向 SynchronousQueue 中提交任務(wù)
  2. 如果有空閑線程,就去取出任務(wù)執(zhí)行;如果沒有空閑線程,就新建一個(gè)
  3. 執(zhí)行完任務(wù)的線程有 60 秒生存時(shí)間,如果在這個(gè)時(shí)間內(nèi)可以接到新任務(wù),就可以繼續(xù)活下去,否則就被回收。

newSingleThreadExecutor

創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。

線程池的實(shí)現(xiàn)原理分析

線程池的基本使用我們都清楚了,接下來我們來了解一下線程池的實(shí)現(xiàn)原理ThreadPoolExecutor 是線程池的核心,提供了線程池的實(shí)現(xiàn)。
ScheduledThreadPoolExecutor 繼承了 ThreadPoolExecutor,并另外提供一些調(diào)度方法以支持定時(shí)和周期任務(wù)。Executers 是工具類,主要用來創(chuàng)建線程池對(duì)象我們把一個(gè)任務(wù)提交給線程池去處理的時(shí)候,線程池的處理過程是什么樣的呢?首先直接來看看定義。

線程池原理分析(FixedThreadPool)

線程池執(zhí)行順序.png

源碼分析

execute

基于源碼入口進(jìn)行分析,先看 execute方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//1.當(dāng)前池中線程比核心數(shù)少,新建一個(gè)線程執(zhí)行任務(wù)
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務(wù)隊(duì)列未滿,添加到隊(duì)列中
            int recheck = ctl.get();//任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
            if (!isRunning(recheck) && remove(command))
                reject(command);//如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
            else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個(gè) 線程
                addWorker(null, false);
        } else if (!addWorker(command, false)) //3.核心池已滿,隊(duì)列已滿,試著創(chuàng)建一個(gè)新 線程
            reject(command); //如果創(chuàng)建新線程失敗了,說明線程池被關(guān)閉或者線程池完全滿了, 拒絕任務(wù)
    }

ctl 的作用

在線程池中, ctl貫穿在線程池的整個(gè)生命周期中
ctl:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,
0));

它是一個(gè)原子類,主要作用是用來保存線程數(shù)量和線程池的狀態(tài)。我們來分析一下這段代碼,其實(shí)比較有意思,他用到了位運(yùn)算一個(gè) int 數(shù)值是 32 個(gè) bit 位,這里采用高 3 位來保存運(yùn)行狀態(tài),低 29 位來保存線程數(shù)量。我們來分析默認(rèn)情況下,也就是 ctlOf(RUNNING)運(yùn)行狀態(tài),調(diào)用了 ctlOf(int rs,int wc)方法;

private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位. -1 的二進(jìn)制是 32 個(gè) 1(1111 1111 11111111 1111 1111 1111 1111)

-1 的二進(jìn)制計(jì)算方法
原碼是 1000…001 . 高位 1 表示符號(hào)位。
然后對(duì)原碼取反,高位不變得到 1111…110
然后對(duì)反碼進(jìn)行+1 ,也就是補(bǔ)碼操作, 最后得到 1111…1111

那么-1 <<左移 29 位, 也就是 【111】 表示; rs | wc 。二進(jìn)制的 111 | 000 。得到的結(jié)果仍然是 111

& 運(yùn)算規(guī)則:0&0=0; 0&1=0; 1&0=0; 1&1=1;
| 運(yùn)算規(guī)則:0|0=0; 0|1=1; 1|0=1; 1|1=1;
^ 運(yùn)算規(guī)則:0^0=0; 0^1=1; 1^0=1; 1^1=0;
~ 運(yùn)算規(guī)則:~1=0; ~0=1;

那么同理可得其他的狀態(tài)的 bit 位表示

private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //將 1 的二進(jìn)制向左位移 29 位,再減 1 表示最大線程容量
//運(yùn)行狀態(tài)保存在 int 值的高 3 位 (所有數(shù)值左移 29 位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任務(wù),并執(zhí)行隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務(wù),但是執(zhí)行隊(duì)列中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;// 不接收新任務(wù),不執(zhí)行隊(duì)列中的任務(wù),中斷正在執(zhí)行中的任務(wù)
private static final int TIDYING = 2 << COUNT_BITS; //所有的任務(wù)都已結(jié)束,線程數(shù)量為 0,處于該狀態(tài)的線程池即將調(diào)用 terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法執(zhí)行完成

更多關(guān)于ctl
https://www.dazhuanlan.com/2019/12/25/5e0296f9dcf20/

狀態(tài)轉(zhuǎn)化

狀態(tài)轉(zhuǎn)換.png

addWorker

如果工作線程數(shù)小于核心線程數(shù)的話,會(huì)調(diào)用 addWorker,顧名思義,其實(shí)就是要?jiǎng)?chuàng)建一個(gè)工作線程。我們來看看源碼的實(shí)現(xiàn)源碼比較長(zhǎng),看起來比較唬人,其實(shí)就做了兩件事。
1)才用循環(huán) CAS 操作來將線程數(shù)加 1;
2)新建一個(gè)線程并啟用。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //goto 語句,避免死循環(huán)
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);
// Check if queue empty only if necessary.
//            如果線程處于非運(yùn)行狀態(tài),并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
//            workQueue 為空,直接返回 false(表示不可添加 work 狀態(tài))
//            1. 線程池已經(jīng) shutdown 后,還要添加新的任務(wù),拒絕
//            2. (第二個(gè)判斷)SHUTDOWN 狀態(tài)不接受新任務(wù),但仍然會(huì)執(zhí)行已經(jīng)加入任務(wù)隊(duì)列的任
//            務(wù),所以當(dāng)進(jìn)入 SHUTDOWN 狀態(tài),而傳進(jìn)來的任務(wù)為空,并且任務(wù)隊(duì)列不為空的時(shí)候,是允許添加
//            新線程的, 如果把這個(gè)條件取反,就表示不允許添加 worker
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            firstTask == null &&
                            !workQueue.isEmpty()))
                return false;
            for (; ; ) { //自旋
                int wc = workerCountOf(c);//獲得 Worker 工作線程數(shù)
//如果工作線程數(shù)大于默認(rèn)容量大小或者大于核心線程數(shù)大小,則直接返回 false 表示不能再添加 worker。
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//通過 cas 來增加工作線程數(shù),
                    如果 cas 失敗,則直接重試
                break retry;
                c = ctl.get(); // Re-read ctl //再次獲取 ctl 的值
                if (runStateOf(c) != rs) //這里如果不想等,說明線程的狀態(tài)發(fā)生了變化, 繼續(xù)重試
                    continue retry;
// else CAS failed due to workerCount change; retry inner loop
            }
        }//上面這段代碼主要是對(duì) worker 數(shù)量做原子+1 操作,下面的邏輯才是正式構(gòu)建一個(gè) worker
        boolean workerStarted = false; //工作線程是否啟動(dòng)的標(biāo)識(shí)
        boolean workerAdded = false; //工作線程是否已經(jīng)添加成功的標(biāo)識(shí)
        Worker w = null;
        try {
            w = new Worker(firstTask); //構(gòu)建一個(gè) Worker,這個(gè) worker 是什么呢?我們可以看到構(gòu)造方法里面?zhèn)魅肓艘粋€(gè) Runnable 對(duì)象
            final Thread t = w.thread; //從 worker 對(duì)象中取出線程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //這里有個(gè)重入鎖,避免并發(fā)問題
                try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
//只有當(dāng)前線程池是正在運(yùn)行狀態(tài), [或是 SHUTDOWN 且 firstTask 為空],才
                    能添加到 workers 集合中
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
//任務(wù)剛封裝到 work 里面,還沒 start,你封裝的線程就是 alive,幾個(gè)意思?肯定是要拋異常出去的
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); //將新創(chuàng)建的 Worker 添加到 workers 集合中
                        int s = workers.size();
//如果集合中的工作線程數(shù)大于最大線程數(shù),這個(gè)最大線程數(shù)表示線程池曾經(jīng)出現(xiàn)過的最大線程數(shù)
                        if (s > largestPoolSize)
                            largestPoolSize = s; //更新線程池出現(xiàn)過的最大線程數(shù)
                        workerAdded = true;//表示工作線程創(chuàng)建成功了
                    }
                } finally {
                    mainLock.unlock(); //釋放鎖}
                    if (workerAdded) {//如果 worker 添加成功
                        t.start();//啟動(dòng)線程
                        workerStarted = true;
                    }
                }
            } finally{
                if (!workerStarted)
                    addWorkerFailed(w); //如果添加失敗,就需要做一件事,就是遞減實(shí)際工作線程數(shù)(還記得我們最開始的時(shí)候增加了工作線程數(shù)嗎)
            }
            return workerStarted;//返回結(jié)果
        }
    }

Worker 類說明

我們發(fā)現(xiàn) addWorker方法只是構(gòu)造了一個(gè) Worker,并且把 firstTask封裝到 worker 中, 它是做什么的呢?我們來看看

  1. 每個(gè) worker,都是一條線程,同時(shí)里面包含了一個(gè)firstTask,即初始化時(shí)要被首先執(zhí)行的任務(wù).
  2. 最終執(zhí)行任務(wù)的,是 runWorker()方法,為什么這樣說,我們可以看到
this.thread = getThreadFactory().newThread(this);

在調(diào)用構(gòu)造方法時(shí),需要傳入任務(wù),這里通過getThreadFactory().newThread(this);來新建一個(gè)線程, newThread方法傳入的參數(shù)是this,因?yàn)?Worker本身繼承了Runnable接口,也就是一個(gè)線程,所以一個(gè) Worker對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用 Worker 類中的 run方法。Worker 類繼承了 AQS,并實(shí)現(xiàn)了 Runnable接口,注意其中的 firstTaskthread 屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。Worker 繼承了 AQS,使用 AQS 來實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用 ReentrantLock來實(shí)現(xiàn)呢?可以看到 tryAcquire方法,它是不允許重入的,而 ReentrantLock是允許重入的:
lock 方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中; 那么它會(huì)有以下幾個(gè)作用

  1. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;
  2. 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù),這時(shí)可以對(duì)該線程進(jìn)行中斷;
  3. 線程池在執(zhí)行shutdown 方法或tryTerminate方法時(shí)會(huì)調(diào)用 interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會(huì)使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài)
  4. 之所以設(shè)置為不可重入,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像 setCorePoolSize這樣的線程池控制方法時(shí)重新獲取鎖,這樣會(huì)中斷正在運(yùn)行的線程
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        /**
         * Thread this worker is running in. Null if factory fails.
         */
        final Thread thread; //注意了,這才是真正執(zhí)行 task 的線程,從構(gòu)造函數(shù)可知是由ThreadFactury 創(chuàng)建的
        /**
         * Initial task to run. Possibly null.
         */
        Runnable firstTask; //這就是需要執(zhí)行的 task
        /**
         * Per-thread task counter
         */
        volatile long completedTasks; //完成的任務(wù)數(shù),用于線程池統(tǒng)計(jì)

        Worker(Runnable firstTask) {
            setState(-1); //初始狀態(tài) -1,防止在調(diào)用 runWorker(),也就是真正執(zhí)行 task前中斷 thread。
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock() {
            acquire(1);
        }

        public boolean tryLock() {
            return tryAcquire(1);
        }

        public void unlock() {
            release(1);
        }

        public boolean isLocked() {
            return isHeldExclusively();
        }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null
                    && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

addWorkerFailed

addWorker 方法中,如果添加 Worker并且啟動(dòng)線程失敗,則會(huì)做失敗后的處理。這個(gè)方法主要做兩件事

  1. 如果worker已經(jīng)構(gòu)造好了,則從 workers 集合中移除這個(gè) worker
  2. 原子遞減核心線程數(shù)(因?yàn)樵?addWorker 方法中先做了原子增加)
  3. 嘗試結(jié)束線程池
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

runWorker 方法

前面已經(jīng)了解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作線程,而Worker 簡(jiǎn)單理解其實(shí)就是一個(gè)線程,里面重新了 run方法,這塊是線程池中執(zhí)行任務(wù)的真正處理邏輯,也就是runWorker方法,這個(gè)方法主要做幾件事

  1. 如果 task 不為空,則開始執(zhí)行 task
  2. 如果task 為空,則通過getTask()再去取任務(wù),并賦值給 task,如果取到的 Runnable不為空,則執(zhí)行該任務(wù)
  3. 執(zhí)行完畢后,通過 while 循環(huán)繼續(xù) getTask()取任務(wù)
  4. 如果 getTask()取到的任務(wù)依然是空,那么整個(gè) runWorker()方法執(zhí)行完畢
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
//        unlock,表示當(dāng)前 worker 線程允許中斷,因?yàn)?new Worker 默認(rèn)的 state = -1, 此處是調(diào)用 Worker 類的 tryRelease() 方法,將 state 置為 0,
//        而 interruptIfStarted () 中只有 state>=0 才允許調(diào)用中斷
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
//注意這個(gè) while 循環(huán),在這里實(shí)現(xiàn)了 [線程復(fù)用] // 如果 task 為空,則通過getTask 來獲取任務(wù)
            while (task != null || (task = getTask()) != null) {
                //上鎖,不是為了防止并發(fā)執(zhí)行任務(wù),為了在 shutdown()時(shí)不終止正在運(yùn)行的 worker線程池為 stop 狀態(tài)時(shí)不接受新任務(wù),不執(zhí)行已經(jīng)加入任務(wù)隊(duì)列的任務(wù),還中斷正在執(zhí)行的任務(wù)
                w.lock();
//所以對(duì)于 stop 狀態(tài)以上是要中斷線程的
//(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)確保線程中斷標(biāo)志位為 true 且是 stop 狀態(tài)以上,接著清除了中斷標(biāo)志
//!wt.isInterrupted()則再一次檢查保證線程需要設(shè)置中斷標(biāo)志位
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    //這里默認(rèn)是沒有實(shí)現(xiàn)的,在一些特定的場(chǎng)景中我們可以自己繼承 ThreadpoolExecutor 自己重寫
                    Throwable thrown = null;
                    try {
                        task.run(); //執(zhí)行任務(wù)中的 run 方法
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); //這里默認(rèn)默認(rèn)而也是沒有實(shí)現(xiàn)
                    }
                } finally {
//置空任務(wù)(這樣下次循環(huán)開始時(shí),task 依然為 null,需要再通過 getTask() +記錄該 Worker 完成任務(wù)數(shù)量 +解鎖
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
//1.將入?yún)?worker 從數(shù)組 workers 里刪除掉;
//2.根據(jù)布爾值 allowCoreThreadTimeOut 來決定是否補(bǔ)充新的 Worker 進(jìn)數(shù)組workers
        }
    }

getTask

worker線程會(huì)從阻塞隊(duì)列中獲取需要執(zhí)行的任務(wù),這個(gè)方法不是簡(jiǎn)單的take 數(shù)據(jù),我們來分析下他的源碼實(shí)現(xiàn)你也許好奇是怎樣判斷線程有多久沒有活動(dòng)了,是不是以為線程池會(huì)啟動(dòng)一個(gè)監(jiān)控線程,專門監(jiān)控哪個(gè)線程正在偷懶?想太多,其實(shí)只是在線程從工作隊(duì)列 poll任務(wù)時(shí),加上了超時(shí)限制,如果線程在keepAliveTime 的時(shí)間內(nèi) poll不到任務(wù),那我就認(rèn)為這條線程沒事做,可以干掉了,看看這個(gè)代碼片段你就清楚了.

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (; ; ) {//自旋
            int c = ctl.get();
            int rs = runStateOf(c);
  /*對(duì)線程池狀態(tài)的判斷,兩種情況會(huì) workerCount -1,并且返回 null
            1. 線程池狀態(tài)為 shutdown,且 workQueue 為空(反映了 shutdown 狀態(tài)的線程池還是
            要執(zhí)行 workQueue 中剩余的任務(wù)的)
            2. 線程池狀態(tài)為 stop(shutdownNow() 會(huì)導(dǎo)致變成 STOP)(此時(shí)不用考慮 workQueue
            的情況)*/
// Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;//返回 null,則當(dāng)前 worker 線程會(huì)退出
            }
            int wc = workerCountOf(c);
// timed 變量用于判斷是否需要進(jìn)行超時(shí)控制。
// allowCoreThreadTimeOut 默認(rèn)是 false,也就是核心線程不允許進(jìn)行超時(shí);
// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
// 對(duì)于超過核心線程數(shù)量的這些線程,需要進(jìn)行超時(shí)控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            /*1. 線程數(shù)量超過 maximumPoolSize 可能是線程池在運(yùn)行時(shí)被調(diào)用了 setMaximumPoolSize ()
            被改變了大小,否則已經(jīng) addWorker () 成功不會(huì)超過 maximumPoolSize
            2. timed && timedOut 如果為 true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制,并且上次從阻塞隊(duì)列中
            獲取任務(wù)發(fā)生了超時(shí).其實(shí)就是體現(xiàn)了空閑線程的存活時(shí)間*/
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                /*根據(jù) timed 來判斷,如果為 true,則通過阻塞隊(duì)列 poll 方法進(jìn)行超時(shí)控制,如果在
                keepaliveTime 時(shí)間內(nèi)沒有獲取到任務(wù),則返回 null.
                        否則通過 take 方法阻塞式獲取隊(duì)列中的任務(wù)*/
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)//如果拿到的任務(wù)不為空,則直接返回給 worker 進(jìn)行處理
                    return r;
                timedOut = true;//如果 r==null,說明已經(jīng)超時(shí)了,設(shè)置 timedOut=true,在下次自旋的時(shí)候進(jìn)行回收
            } catch (InterruptedException retry) {
                timedOut = false;// 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷,則設(shè)置 timedOut 為false 并返回循環(huán)重試
            }
        }
    }

這里重要的地方是第二個(gè)if 判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時(shí),如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize 且小于maximumPoolSize,并且 workQueue已滿時(shí),則可以增加工作線程,但這時(shí)如果超時(shí)沒有獲取到任務(wù),也就是 timedOuttrue的情況,說明workQueue 已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于 corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。什么時(shí)候會(huì)銷毀?當(dāng)然是 runWorker 方法執(zhí)行完之后,也就是Worker 中的 run方法執(zhí)行完,由 JVM 自動(dòng)回收。
getTask方法返回 null 時(shí),在 runWorker方法中會(huì)跳出while 循環(huán),然后會(huì)執(zhí)行processWorkerExit 方法。

processWorkerExit

runWorkerwhile 循環(huán)執(zhí)行完畢以后,在finally中會(huì)調(diào)用processWorkerExit, 來銷毀工作線程。
到目前為止,我們已經(jīng)從 execute方法中輸入了 worker 線程的創(chuàng)建到執(zhí)行以及最后到銷毀的全部過程。那么我們繼續(xù)回到 execute 方法.我們只分析完addWorker這段邏輯,繼續(xù)來看后面的判斷

execute 后續(xù)邏輯分析

如果核心線程數(shù)已滿,說明這個(gè)時(shí)候不能再創(chuàng)建核心線程了,于是走第二個(gè)判斷,第二個(gè)判斷邏輯比較簡(jiǎn)單,如果線程池處于運(yùn)行狀態(tài)并且任務(wù)隊(duì)列沒有滿,則將任務(wù)添加到隊(duì)列中
第三個(gè)判斷,核心線程數(shù)滿了,隊(duì)列也滿了,那么這個(gè)時(shí)候創(chuàng)建新的線程也就是(非核心線程)如果非核心線程數(shù)也達(dá)到了最大線程數(shù)大小,則直接拒絕任務(wù)

        if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務(wù)隊(duì)列未滿,添加到隊(duì)列中
            int recheck = ctl.get();
//任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
            if (!isRunning(recheck) && remove(command))
                reject(command);//如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
            else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個(gè)線程
                addWorker(null, false);
        } else if (!addWorker(command, false)) //3.核心池已滿,隊(duì)列已滿,試著創(chuàng)建一個(gè)新線程
            reject(command); //如果創(chuàng)建新線程失敗了,說明線程池被關(guān)閉或者線程池完全滿了,拒絕任務(wù)

拒絕策略

1、 AbortPolicy:直接拋出異常,默認(rèn)策略;
2、 CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
3、 DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、 DiscardPolicy:直接丟棄任務(wù);當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù).

線程池的注意事項(xiàng)

分析完線程池以后,我們?cè)賮砹私庖幌戮€程池的注意事項(xiàng)

阿里開發(fā)手冊(cè)不建議使用線程池

線程池的構(gòu)建不允許使用Executors去創(chuàng)建,而是通過 ThreadPoolExecutor 的方式。分析完原理以后,大家自己一定要有一個(gè)答案。我來簡(jiǎn)單分析下,用Executors 使得用戶不需要關(guān)心線程池的參數(shù)配置,意味著大家對(duì)于線程池的運(yùn)行規(guī)則也會(huì)慢慢的忽略。這會(huì)導(dǎo)致一個(gè)問題,比如我們用 newFixdThreadPool 或者singleThreadPool.允許的隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,如果使用不當(dāng)會(huì)導(dǎo)致大量請(qǐng)求堆積到隊(duì)列中導(dǎo)致 OOM 的風(fēng)險(xiǎn),而 newCachedThreadPool,允許創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,也可能會(huì)導(dǎo)致大量線程的創(chuàng)建出現(xiàn) CPU 使用過高或者 OOM 的問題
而如果我們通過 ThreadPoolExecutor 來構(gòu)造線程池的話,我們勢(shì)必要了解線程池構(gòu)造中每個(gè)參數(shù)的具體含義,使得開發(fā)者在配置參數(shù)的時(shí)候能夠更加謹(jǐn)慎。

如何合理配置線程池的大小

如何合理配置線程池大小。線程池大小不是靠猜,也不是說越多越好。
在遇到這類問題時(shí),先冷靜下來分析

  1. 需要分析線程池執(zhí)行的任務(wù)的特性: CPU 密集型還是 IO 密集型
  2. 每個(gè)任務(wù)執(zhí)行的平均時(shí)長(zhǎng)大概是多少,這個(gè)任務(wù)的執(zhí)行時(shí)長(zhǎng)可能還跟任務(wù)處理邏輯是否涉及到網(wǎng)絡(luò)傳輸以及底層系統(tǒng)資源依賴有關(guān)系
    如果是 CPU 密集型, 主要是執(zhí)行計(jì)算任務(wù),響應(yīng)時(shí)間很快, cpu 一直在運(yùn)行,這種任務(wù) cpu的利用率很高,那么線程數(shù)的配置應(yīng)該根據(jù) CPU 核心數(shù)來決定, CPU 核心數(shù)=最大同時(shí)執(zhí)行線程數(shù),加入 CPU 核心數(shù)為 4,那么服務(wù)器最多能同時(shí)執(zhí)行 4 個(gè)線程。過多的線程會(huì)導(dǎo)致上下文切換反而使得效率降低。那線程池的最大線程數(shù)可以配置為 cpu 核心數(shù)+1
    如果是 IO 密集型, 主要是進(jìn)行 IO 操作,執(zhí)行 IO 操作的時(shí)間較長(zhǎng),這是 cpu 出于空閑狀態(tài),導(dǎo)致 cpu 的利用率不高,這種情況下可以增加線程池的大小。這種情況下可以結(jié)合線程的等待時(shí)長(zhǎng)來做判斷,等待時(shí)間越高,那么線程數(shù)也相對(duì)越多。一般可以配置 cpu 核心數(shù)的 2 倍。
    一個(gè)公式:線程池設(shè)定最佳線程數(shù)目 = ((線程池設(shè)定的線程等待時(shí)間+線程 CPU 時(shí)間) / 線程 CPU 時(shí)間 ) * CPU 數(shù)目
    這個(gè)公式的線程 cpu 時(shí)間是預(yù)估的程序單個(gè)線程在 cpu 上運(yùn)行的時(shí)間(通常使用 loadrunner測(cè)試大量運(yùn)行次數(shù)求出平均值)

線程池中的線程初始化

默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。在實(shí) 際中如果需要 線程池創(chuàng)建之 后立即創(chuàng)建線 程,可以通過 以下兩個(gè)方法 辦到:
prestartCoreThread():初始化一個(gè)核心線程;
prestartAllCoreThreads():初始化所有核心線程

ThreadPoolExecutor tpe = (ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();

線程池的關(guān)閉

ThreadPoolExecutor提 供 了 兩 個(gè) 方 法 , 用 于 線 程 池 的 關(guān) 閉 , 分 別 是 shutdown()shutdownNow(),其中:
shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中
的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù).

線程池容量的動(dòng)態(tài)調(diào)整

ThreadPoolExecutor提 供 了 動(dòng) 態(tài) 調(diào) 整 線 程 池 容 量 大 小 的 方 法 :setCorePoolSize()setMaximumPoolSize()
1.setCorePoolSize:設(shè)置核心池大小
2.setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
任務(wù)緩存隊(duì)列及排隊(duì)策略
在前面我們多次提到了任務(wù)緩存隊(duì)列,即 workQueue,它用來存放等待執(zhí)行的任務(wù)。workQueue 的類型為 BlockingQueue,通常可以取下面三種類型:
1.ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建必須指定大小
2.LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE
3.SynchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來執(zhí)行新來的任務(wù)

線程池的監(jiān)控

如果在項(xiàng)目中大規(guī)模的使用了線程池,那么必須要有一套監(jiān)控體系,來指導(dǎo)當(dāng)前線程池的狀態(tài),當(dāng)出現(xiàn)問題的時(shí)候可以快速定位到問題。而線程池提供了相應(yīng)的擴(kuò)展方法,我們通過重寫線程池的beforeExecute、 afterExecute 和 shutdown還有線程的異常處理 等方式就可以實(shí)現(xiàn)對(duì)線程的監(jiān)控,簡(jiǎn)單給大家演示一個(gè)案例

public class ThreadPoolMonitor extends ThreadPoolExecutor {
    // 保存任務(wù)開始執(zhí)行的時(shí)間,當(dāng)任務(wù)結(jié)束時(shí),用任務(wù)結(jié)束時(shí)間減去開始時(shí)間計(jì)算任務(wù)執(zhí)行時(shí)間
    private ConcurrentHashMap<String, Date> startTimes;

    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long
            keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
    }

    @Override
    public void shutdown() {
        System.out.println("已經(jīng)執(zhí)行的任務(wù)數(shù):" + this.getCompletedTaskCount() +
                ", 當(dāng)前活動(dòng)線程數(shù):" + this.getActiveCount() +
                ", 當(dāng)前排隊(duì)線程數(shù):" + this.getQueue().size());
        System.out.println();
        super.shutdown();
    }

    //任務(wù)開始之前記錄任務(wù)開始時(shí)間
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
// 統(tǒng)計(jì)任務(wù)耗時(shí)、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、
// 已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊(duì)列里緩存的任務(wù)數(shù)量、
// 池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時(shí)間、線程池是否關(guān)閉、線程池是否終止
        System.out.print("任務(wù)耗時(shí):" + diff + "\n");
        System.out.print("初始線程數(shù):" + this.getPoolSize() + "\n");
        System.out.print("核心線程數(shù):" + this.getCorePoolSize() + "\n");
        System.out.print("正在執(zhí)行的任務(wù)數(shù)量:" + this.getActiveCount() + "\n");
        System.out.print("已經(jīng)執(zhí)行的任務(wù)數(shù):" + this.getCompletedTaskCount() + "\n ");
        System.out.print("任務(wù)總數(shù):" + this.getTaskCount() + "\n");
        System.out.print("最大允許的線程數(shù):" + this.getMaximumPoolSize() + "\n");
        System.out.print("線程空閑時(shí)間:" + this.getKeepAliveTime(TimeUnit.MILLISECONDS) + "\n ");
        System.out.println();
        super.afterExecute(r, t);
    }

    public static ExecutorService newCachedThreadPool() {
        //1.實(shí)現(xiàn)一個(gè)自己的線程池工廠
        ThreadFactory factory = (Runnable r) -> {
            //創(chuàng)建一個(gè)線程
            Thread t = new Thread(r);
            //給創(chuàng)建的線程設(shè)置UncaughtExceptionHandler對(duì)象 里面實(shí)現(xiàn)異常的默認(rèn)邏輯
            Thread.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
                System.out.println("線程工廠設(shè)置的exceptionHandler" + e.getMessage());
            });
            return t;
        };
        return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
                SynchronousQueue<>(), factory);
    }
}
public class Task implements Runnable {
    private static ExecutorService es = ThreadPoolMonitor.newCachedThreadPool();

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            es.execute(new Task());
        }
        es.shutdown();
    }
}

Callable/Future 使用及原理分析

線程池的執(zhí)行任務(wù)有兩種方法,一種是 submit、一種是 execute;
這兩個(gè)方法是有區(qū)別的,那么基于這個(gè)區(qū)別我們?cè)賮砜纯础?/p>

execute 和 submit 區(qū)別

  1. execute 只可以接收一個(gè) Runnable 的參數(shù)
  2. execute 如果出現(xiàn)異常會(huì)拋出
  3. execute 沒有返回值
  4. submit 可以接收 Runable 和 Callable 這兩種類型的參數(shù),
  5. 對(duì)于 submit 方法,如果傳入一個(gè) Callable,可以得到一個(gè) Future 的返回值
  6. submit 方法調(diào)用不會(huì)拋異常,除非調(diào)用 Future.get,這里,我們重點(diǎn)了解一下 Callable/Future,可能很多人知道它是一個(gè)帶返回值的線程,但是具體的實(shí)現(xiàn)可能不清楚。

Callable/Future 案例演示

Callable/Future 和 Thread 之類的線程構(gòu)建最大的區(qū)別在于,能夠很方便的獲取線程執(zhí)行完以后的結(jié)果。首先來看一個(gè)簡(jiǎn)單的例子

public class CallableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
//Thread.sleep(3000);//阻塞案例演示
        return "hello world";
    }

    public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask futureTask = new FutureTask(callableDemo);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

想一想我們?yōu)槭裁葱枰褂没卣{(diào)呢?那是因?yàn)榻Y(jié)果值是由另一線程計(jì)算的,當(dāng)前線程是不知道結(jié)果值什么時(shí)候計(jì)算完成,所以它傳遞一個(gè)回調(diào)接口給計(jì)算線程,當(dāng)計(jì)算完成時(shí),調(diào)用這個(gè)回調(diào)接口,回傳結(jié)果值。這個(gè)在很多地方有用到,比如 Dubbo 的異步調(diào)用,比如消息中間件的異步通信等等…利用 FutureTask、 Callable、 Thread 對(duì)耗時(shí)任務(wù)(如查詢數(shù)據(jù)庫(kù))做預(yù)處理,在需要計(jì)算結(jié)果之前就啟動(dòng)計(jì)算。
所以我們來看一下 Future/Callable 是如何實(shí)現(xiàn)的.

Callable/Future 原理分析

在剛剛實(shí)現(xiàn)的 demo 中,我們用到了兩個(gè) api,分別是 CallableFutureTask。
Callable 是一個(gè)函數(shù)式接口,里面就只有一個(gè) call 方法。子類可以重寫這個(gè)方法,并且這個(gè)方法會(huì)有一個(gè)返回值

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

FutureTask

FutureTask 的類關(guān)系圖如下,它實(shí)現(xiàn) RunnableFuture 接口,那么這個(gè) RunnableFuture 接口的作用是什么呢。
在講解 FutureTask 之前,先看看 Callable, Future, FutureTask 它們之間的關(guān)系圖,如下:


FutureTask關(guān)系圖.png
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFuture 是一個(gè)接口,它繼承了 RunnableFuture這兩個(gè)接口, Runnable 太熟悉了, 那么 Future是什么呢?
Future表示一個(gè)任務(wù)的生命周期,并提供了相應(yīng)的方法來判斷是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    // 當(dāng)前的 Future 是否被取消,返回 true 表示已取消
    boolean isCancelled();

    // 當(dāng)前 Future 是否已結(jié)束。包括運(yùn)行完成、拋出異常以及取消,都表示當(dāng)前 Future 已結(jié)束
    boolean isDone();

    // 獲取 Future 的結(jié)果值。如果當(dāng)前 Future 還沒有結(jié)束,那么當(dāng)前線程就等待,
// 直到 Future 運(yùn)行結(jié)束,那么會(huì)喚醒等待結(jié)果值的線程的。
    V get() throws InterruptedException, ExecutionException;// 獲取 Future 的結(jié)果值。與 get()相比較多了允許設(shè)置超時(shí)時(shí)間

    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

分析到這里我們其實(shí)有一些初步的頭緒了, FutureTaskRunnableFuture 的結(jié)合,如果我們把 Runnable比作是生產(chǎn)者,Future 比作是消費(fèi)者,那么 FutureTask是被這兩者共享的,生產(chǎn)者運(yùn)行 run方法計(jì)算結(jié)果,消費(fèi)者通過 get方法獲取結(jié)果。
作為生產(chǎn)者消費(fèi)者模式,有一個(gè)很重要的機(jī)制,就是如果生產(chǎn)者數(shù)據(jù)還沒準(zhǔn)備的時(shí)候,消費(fèi)者會(huì)被阻塞。當(dāng)生產(chǎn)者數(shù)據(jù)準(zhǔn)備好了以后會(huì)喚醒消費(fèi)者繼續(xù)執(zhí)行。
這個(gè)有點(diǎn)像我們上次可分析的阻塞隊(duì)列,那么在 FutureTask里面是基于什么方式實(shí)現(xiàn)的呢?

state 的含義

表示 FutureTask 當(dāng)前的狀態(tài),分為七種狀態(tài)

    private static final int NEW = 0; // NEW 新建狀態(tài),表示這個(gè) FutureTask還沒有開始運(yùn)行
    // COMPLETING 完成狀態(tài), 表示 FutureTask 任務(wù)已經(jīng)計(jì)算完畢了
    // 但是還有一些后續(xù)操作,例如喚醒等待線程操作,還沒有完成。
    private static final int COMPLETING = 1;
    // FutureTask 任務(wù)完結(jié),正常完成,沒有發(fā)生異常
    private static final int NORMAL = 2;
    // FutureTask 任務(wù)完結(jié),因?yàn)榘l(fā)生異常。
    private static final int EXCEPTIONAL = 3;
    // FutureTask 任務(wù)完結(jié),因?yàn)槿∠蝿?wù)
    private static final int CANCELLED = 4;
    // FutureTask 任務(wù)完結(jié),也是取消任務(wù),不過發(fā)起了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
    private static final int INTERRUPTING = 5;
    // FutureTask 任務(wù)完結(jié),也是取消任務(wù),已經(jīng)完成了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
    private static final int INTERRUPTED = 6;

run 方法

    public void run() {
        // 如果狀態(tài) state 不是 NEW,或者設(shè)置 runner 值失敗// 表示有別的線程在此之前調(diào)用 run 方法,并成功設(shè)置了 runner 值
        // 保證了只有一個(gè)線程可以運(yùn)行 try 代碼塊中的代碼。
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {/只有 c 不為 null 且狀態(tài) state 為 NEW 的情
                    況
                V result;
                boolean ran;
                try {
                    result = c.call(); //調(diào)用 callable 的 call 方法,并獲得返回結(jié)果
                    ran = true;//運(yùn)行成功
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); //設(shè)置異常結(jié)果,
                }
                if (ran)
                    set(result);//設(shè)置結(jié)果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

其實(shí)run方法作用非常簡(jiǎn)單,就是調(diào)用 callablecall方法返回結(jié)果值result,根據(jù)是否發(fā)生異常,調(diào)用 set(result)setException(ex)方法表示 FutureTask 任務(wù)完結(jié)。
不過因?yàn)?FutureTask任務(wù)都是在多線程環(huán)境中使用,所以要注意并發(fā)沖突問題。注意在 run方法中,我們沒有使用synchronized 代碼塊或者 Lock 來解決并發(fā)問題,而是使用了 CAS 這個(gè)樂觀鎖來實(shí)現(xiàn)并發(fā)安全,保證只有一個(gè)線程能運(yùn)行FutureTask 任務(wù)

get方法

get 方法就是阻塞獲取線程執(zhí)行結(jié)果,這里主要做了兩個(gè)事情

  1. 判斷當(dāng)前的狀態(tài),如果狀態(tài)小于等于 COMPLETING,表示 FutureTask 任務(wù)還沒有完結(jié),所以調(diào)用 awaitDone 方法,讓當(dāng)前線程等待。
  2. report 返回結(jié)果值或者拋出異常
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

awaitDone

如果當(dāng)前的結(jié)果還沒有被執(zhí)行完,把當(dāng)前線程插入到等待隊(duì)列

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() +
                nanos : 0L;
        WaitNode q = null;
        boolean queued = false; // 節(jié)點(diǎn)是否已添加
        for (; ; ) {
            // 如果當(dāng)前線程中斷標(biāo)志位是 true,
            // 那么從列表中移除節(jié)點(diǎn) q,并拋出 InterruptedException 異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            if (s > COMPLETING) { // 當(dāng)狀態(tài)大于 COMPLETING 時(shí),表示 FutureTask 任務(wù)已結(jié)束。
                if (q != null)
                    q.thread = null; // 將節(jié)點(diǎn) q 線程設(shè)置為 null,因?yàn)榫€程沒有阻塞等待
                return s;
            }// 表示還有一些后序操作沒有完成,那么當(dāng)前線程讓出執(zhí)行權(quán)
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
                //表示狀態(tài)是 NEW,那么就需要將當(dāng)前線程阻塞等待。
                // 就是將它插入等待線程鏈表中,
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                // 使用 CAS 函數(shù)將新節(jié)點(diǎn)添加到鏈表中,如果添加失敗,那么 queued 為 false,
                // 下次循環(huán)時(shí),會(huì)繼續(xù)添加,直到成功。
                queued = UNSAFE.compareAndSwapObject(this,
                        waitersOffset,
                        q.next =
                                waiters, q);
            else if (timed) {// timed 為 true 表示需要設(shè)置超時(shí)
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos); // 讓當(dāng)前線程等待 nanos 時(shí)間
            } else
                LockSupport.park(this);
        }
    }

被阻塞的線程,會(huì)等到 run 方法執(zhí)行結(jié)束之后被喚醒

report

report方法就是根據(jù)傳入的狀態(tài)值 s,來決定是拋出異常,還是返回結(jié)果值。 這個(gè)兩種情況都表示 FutureTask完結(jié)了.

    private V report(int s) throws ExecutionException {
        Object x = outcome;//表示 call 的返回值
        if (s == NORMAL) // 表示正常完結(jié)狀態(tài),所以返回結(jié)果值
            return (V) x;
        // 大于或等于 CANCELLED,都表示手動(dòng)取消 FutureTask 任務(wù),
        // 所以拋出 CancellationException 異常
        if (s >= CANCELLED)
            throw new CancellationException();
        // 否則就是運(yùn)行過程中,發(fā)生了異常,這里就拋出這個(gè)異常
        throw new ExecutionException((Throwable) x);
    }

線程池對(duì)于 Future/Callable 的執(zhí)行

我們現(xiàn)在再來看線程池里面的 submit 方法,就會(huì)很清楚了。

    public class CallableDemo implements Callable<String> {
        @Override
        public String call() throws Exception {
            //Thread.sleep(3000);//阻塞案例演示
            return "hello world";
        }

        public static void main(String[] args) throws ExecutionException,
                InterruptedException {
            ExecutorService es = Executors.newFixedThreadPool(1);
            CallableDemo callableDemo = new CallableDemo();
            Future future = es.submit(callableDemo);
            System.out.println(future.get());
        }
    }

AbstractExecutorService.submit

調(diào)用抽象類中的 submit 方法,這里其實(shí)相對(duì)于 execute 方法來說,只多做了一步操作,就是封裝了一個(gè) RunnableFuture.

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

ThreadpoolExecutor.execute

然后調(diào)用 execute 方法,這里面的邏輯前面分析過了,會(huì)通過 worker 線程來調(diào)用過 ftask 的run 方法。而這個(gè) ftask 其實(shí)就是 FutureTask 里面最終實(shí)現(xiàn)的邏輯.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容