Android線程池ThreadPoolExecutor詳解

???????傳統(tǒng)的多線程是通過(guò)繼承Thread類及實(shí)現(xiàn)Runnable接口來(lái)實(shí)現(xiàn)的,每次創(chuàng)建及銷毀線程都會(huì)消耗資源、響應(yīng)速度慢,且線程缺乏統(tǒng)一管理,容易出現(xiàn)阻塞的情況,針對(duì)以上缺點(diǎn),線程池就出現(xiàn)了。

一.簡(jiǎn)介

a.定義

???????線程池是一個(gè)創(chuàng)建使用線程并能保存使用過(guò)的線程以達(dá)到復(fù)用的對(duì)象,簡(jiǎn)單的說(shuō)就是一塊緩存了一定數(shù)量線程的區(qū)域。

b.作用

???????1.復(fù)用線程:線程執(zhí)行完不會(huì)立刻退出,繼續(xù)執(zhí)行其他線程;
???????2.管理線程:統(tǒng)一分配、管理、控制最大并發(fā)數(shù);

c.優(yōu)點(diǎn)

???????1.降低因頻繁創(chuàng)建&銷毀線程帶來(lái)的性能開(kāi)銷,復(fù)用緩存在線程池中的線程;
???????2.提高線程執(zhí)行效率&響應(yīng)速度,復(fù)用線程:響應(yīng)速度;管理線程:優(yōu)化線程執(zhí)行順序,避免大量線程搶占資源導(dǎo)致阻塞現(xiàn)象;
???????3.提高對(duì)線程的管理度;

二.使用流程

???????線程池的使用也比較簡(jiǎn)單,流程如下:

a.創(chuàng)建線程池,通過(guò)配置線程池的參數(shù),從而實(shí)現(xiàn)自己所需的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, 
                                                               MAXIMUM_POOL_SIZE, 
                                                               KEEP_ALIVE_SECONDS, 
                                                               TimeUnit.SECONDS,
                                                               sPoolWorkQueue, 
                                                               sThreadFactory);
b.向線程池提交任務(wù):execute()
threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                ... 
                //執(zhí)行任務(wù)
            }
        });
c.關(guān)閉線程池shutdown()
threadPoolExecutor.shutdown();

三.工作原理

???????接下來(lái)通過(guò)源碼來(lái)介紹一下ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)及工作原理。

a.參數(shù)介紹
參數(shù) 含義 備注
corePoolSize 核心線程數(shù) 默認(rèn)情況下,核心線程會(huì)一直存活
maximumPoolSize 線程池容納的最大線程數(shù) 當(dāng)活動(dòng)線程數(shù)大于該值時(shí),后續(xù)的新任務(wù)會(huì)阻塞
keepAliveTime 非核心線程閑置超時(shí)時(shí)間 超過(guò)該時(shí)間后,非核心線程會(huì)被回收,當(dāng)設(shè)置allowCoreThreadTimeOut(true),核心線程也會(huì)被回收
unit 時(shí)間單位 常用TimeUnit.SECONDS
workQueue 任務(wù)隊(duì)列 執(zhí)行execute()后,如果核心線程滿了,會(huì)將Runnable加入到該參數(shù)內(nèi)
threadFactory 線程工廠 為線程池創(chuàng)建新線程
b.源碼分析

???????線程池的最終實(shí)現(xiàn)類是ThreadPoolExecutor,通過(guò)實(shí)現(xiàn)可以一步一步的看到,父接口為Executor:

public interface Executor {
    void execute(Runnable command);
}

???????其他的繼承及實(shí)現(xiàn)關(guān)系就不一一列舉了,直接通過(guò)以下圖來(lái)看一下:


image.png
c.ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)

???????從構(gòu)造方法開(kāi)始看:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

???????通過(guò)以上可以看到,在創(chuàng)建ThreadPoolExecutor時(shí),對(duì)傳入的參數(shù)是有要求的:corePoolSize不能小于0;maximumPoolSize需要大于0,且需要大于等于corePoolSize;keepAliveTime大于0;workQueue、threadFactory都不能為null。
???????在創(chuàng)建完后就需要執(zhí)行Runnable了,看以下execute()方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        .....
        int c = ctl.get();
        //--------------分析1-----------------
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //--------------分析2-----------------
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //--------------分析3-----------------
        else if (!addWorker(command, false))
            reject(command);
    }

???????在execute()內(nèi)部主要執(zhí)行的邏輯如下:
???????分析點(diǎn)1:如果當(dāng)前線程數(shù)未超過(guò)核心線程數(shù),則將runnable作為參數(shù)執(zhí)行addWorker(),true表示核心線程,false表示非核心線程;
???????分析點(diǎn)2:核心線程滿了,如果線程池處于運(yùn)行狀態(tài)則往workQueue隊(duì)列中添加任務(wù),接下來(lái)判斷是否需要拒絕或者執(zhí)行addWorker();
???????分析點(diǎn)3:以上都不滿足時(shí)[corePoolSize=0且沒(méi)有運(yùn)行的線程,或workQueue已經(jīng)滿了],執(zhí)行addWorker()添加runnable,失敗則執(zhí)行拒絕策略;
???????總結(jié)一下:線程池對(duì)線程創(chuàng)建的管理,流程圖如下:

image.png

???????以上可以看到,核心線程數(shù)量或非核心線程隊(duì)列不滿時(shí),就執(zhí)行addWorker(),否則執(zhí)行reject(),接下來(lái)看一下addWorker()執(zhí)行邏輯:

    private boolean addWorker(Runnable firstTask, boolean core) {
        ......
        //對(duì)添加時(shí)一些狀態(tài)進(jìn)行判斷,提前判斷是否成功
        ......

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //--------分析1----------
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //----分析2-----
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

???????在執(zhí)行addWorker時(shí),主要做了以下兩件事:
???????分析點(diǎn)1:將runnable作為參數(shù)創(chuàng)建Worker對(duì)象w,然后獲取w內(nèi)部的變量thread;
???????分析點(diǎn)2:調(diào)用start()來(lái)啟動(dòng)thread;
???????在addWorker()內(nèi)部會(huì)將runnable作為參數(shù)傳給Worker,然后從Worker內(nèi)部讀取變量thread,看一下Worker類的實(shí)現(xiàn):

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        ......

        final Thread thread;
        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        .......
        .......
    }

???????Worker實(shí)現(xiàn)了Runnable接口,在Worker內(nèi)部,進(jìn)行了賦值及創(chuàng)建操作,先將execute()時(shí)傳入的runnable賦值給內(nèi)部變量firstTask,然后通過(guò)ThreadFactory.newThread(this)創(chuàng)建Thread,上面講到在addWorker內(nèi)部執(zhí)行t.start()后,會(huì)執(zhí)行到Worker內(nèi)部的run()方法,接著會(huì)執(zhí)行runWorker(this),一起看一下:

    final void runWorker(Worker w) {
         //-------------分析1--------------
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //-------------分析2-------------------------
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //----分析3-----
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

???????前面可以看到,runWorker是執(zhí)行在子線程內(nèi)部,主要執(zhí)行了三件事:
???????分析1:獲取當(dāng)前線程,當(dāng)執(zhí)行shutdown()時(shí)需要將線程interrupt(),接下來(lái)從Worker內(nèi)部取到firstTask,即execute傳入的runnable,接下來(lái)會(huì)執(zhí)行;
???????分析2:while循環(huán),task不空直接執(zhí)行;否則執(zhí)行g(shù)etTask()去獲取,不為空直接執(zhí)行;
???????分析3:對(duì)有效的task執(zhí)行run(),由于是在子線程中執(zhí)行,因此直接run()即可,不需要start();
???????前面看到,在while內(nèi)部有執(zhí)行g(shù)etTask(),一起看一下:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //-------------------------分析1---------------------------
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //-------------------------分析2---------------------------
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

???????getTask()是從workQueue內(nèi)部獲取接下來(lái)需要執(zhí)行的runnable,內(nèi)部主要做了兩件事:
???????分析1:先獲取到當(dāng)前正在執(zhí)行工作的線程數(shù)量wc,通過(guò)判斷allowCoreThreadTimeOut[在創(chuàng)建ThreadPoolExecutor時(shí)可以進(jìn)行設(shè)置]及wc > corePoolSize來(lái)確定timed值;
???????分析2:通過(guò)timed值來(lái)決定執(zhí)行poll()或者take(),如果WorkQueue中有未執(zhí)行的線程時(shí),兩者作用是相同的,立刻返回線程;如果WorkQueue中沒(méi)有線程時(shí),poll()有超時(shí)返回,take()會(huì)一直阻塞;如果allowCoreThreadTimeOut為true,則核心線程在超時(shí)時(shí)間沒(méi)有使用的話,是需要退出的;wc > corePoolSize時(shí),非核心線程在超時(shí)時(shí)間沒(méi)有使用的話,是需要退出的;
???????allowCoreThreadTimeOut是可以通過(guò)以下方式進(jìn)行設(shè)置的:

threadPoolExecutor.allowCoreThreadTimeOut(true);
public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

???????如果沒(méi)有進(jìn)行設(shè)置,那么corePoolSize數(shù)量的核心線程會(huì)一直存在。
???????總結(jié)一下:ThreadPoolExecutor內(nèi)部的核心線程如何確保一直存在,不退出?
???????上面分析已經(jīng)回答了這個(gè)問(wèn)題,每個(gè)線程在執(zhí)行時(shí)會(huì)執(zhí)行runWorker(),而在runWorker()內(nèi)部有while()循環(huán)會(huì)判斷getTask(),在getTask()內(nèi)部會(huì)對(duì)當(dāng)前執(zhí)行的線程數(shù)量及allowCoreThreadTimeOut進(jìn)行實(shí)時(shí)判斷,如果工作數(shù)量大于corePoolSize且workQueue中沒(méi)有未執(zhí)行的線程時(shí),會(huì)執(zhí)行poll()超時(shí)退出;如果工作數(shù)量不大于corePoolSize且workQueue中沒(méi)有未執(zhí)行的線程時(shí),會(huì)執(zhí)行take()進(jìn)行阻塞,確保有corePoolSize數(shù)量的線程阻塞在runWorker()內(nèi)部的while()循環(huán)不退出。
???????如果需要關(guān)閉線程池,需要如何操作呢,看一下shutdown()方法:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

???????以上可以看到,關(guān)閉線程池的原理:a. 遍歷線程池中的所有工作線程;b. 逐個(gè)調(diào)用線程的interrupt()中斷線程(注:無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止)
???????也可調(diào)用shutdownNow()來(lái)關(guān)閉線程池,二者區(qū)別:
???????shutdown():設(shè)置線程池的狀態(tài)為SHUTDOWN,然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程;
???????shutdownNow():設(shè)置線程池的狀態(tài)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表;
???????使用建議:一般調(diào)用shutdown()關(guān)閉線程池;若任務(wù)不一定要執(zhí)行完,則調(diào)用shutdownNow();
???????總結(jié)一下:ThreadPoolExecutor在執(zhí)行execute()及shutdown()時(shí)的調(diào)用關(guān)系,流程圖如下:

image.png

???????1.客戶端在創(chuàng)建完線程池后,調(diào)用execute()來(lái)執(zhí)行一個(gè)runnable任務(wù);
???????2.在execute()內(nèi)部會(huì)執(zhí)行addWorker()來(lái)創(chuàng)建一個(gè)Worker對(duì)象,然后調(diào)用線程的start()方法,即執(zhí)行Worker內(nèi)部的run()方法;
???????3.Worker內(nèi)部的run()方法中會(huì)調(diào)用到runWorker()方法;
???????4.runWorker()方法內(nèi)部會(huì)在while()循環(huán)內(nèi)執(zhí)行g(shù)etTask()來(lái)不斷的從workQueue中獲取未執(zhí)行的runnable然后執(zhí)行;
???????5.getTask()內(nèi)部會(huì)實(shí)時(shí)判斷當(dāng)前正在執(zhí)行的Worker數(shù)量與corePoolSize進(jìn)行比較,如果數(shù)量不大于corePoolSize且workQueue為空,會(huì)執(zhí)行task()進(jìn)行阻塞,確保corePoolSize的線程不退出,即核心線程不退出;
???????6.執(zhí)行shutdown()來(lái)中斷那些沒(méi)有在執(zhí)行的線程;

四.線程池類型

???????線程池可以通過(guò)Executors來(lái)進(jìn)行不同類型的創(chuàng)建,具體分為四種不同的類型,如下:

a.newCachedThreadPool

???????可緩存線程池:不固定線程數(shù)量,且支持最大為Integer.MAX_VALUE的線程數(shù)量:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

???????1、線程數(shù)無(wú)限制
???????2、有空閑線程則復(fù)用空閑線程,若無(wú)空閑線程則新建線程
???????3、一定程度上減少頻繁創(chuàng)建/銷毀線程,減少系統(tǒng)開(kāi)銷

b.newFixedThreadPool

???????固定線程數(shù)量的線程池:定長(zhǎng)線程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

???????1、可控制線程最大并發(fā)數(shù)(同時(shí)執(zhí)行的線程數(shù))
???????2、超出的線程會(huì)在隊(duì)列中等待。

c.newSingleThreadExecutor

???????單線程化的線程池:可以理解為線程數(shù)量為1的FixedThreadPool

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

???????1、有且僅有一個(gè)工作線程執(zhí)行任務(wù)
???????2、所有任務(wù)按照指定順序執(zhí)行,即遵循隊(duì)列的入隊(duì)出隊(duì)規(guī)則

d.newScheduledThreadPool

???????定時(shí)以指定周期循環(huán)執(zhí)行任務(wù)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

五.隊(duì)列類型

???????一般來(lái)說(shuō),等待隊(duì)列 BlockingQueue 有: ArrayBlockingQueue 、 LinkedBlockingQueue 與 SynchronousQueue 。
???????假設(shè)向線程池提交任務(wù)時(shí),核心線程都被占用的情況下:
???????ArrayBlockingQueue :基于數(shù)組的阻塞隊(duì)列,初始化需要指定固定大小。
???????當(dāng)使用此隊(duì)列時(shí),向線程池提交任務(wù),會(huì)首先加入到等待隊(duì)列中,當(dāng)?shù)却?duì)列滿了之后,再次提交任務(wù),嘗試加入隊(duì)列就會(huì)失敗,這時(shí)就會(huì)檢查如果當(dāng)前線程池中的線程數(shù)未達(dá)到最大線程,則會(huì)新建線程執(zhí)行新提交的任務(wù)。所以最終可能出現(xiàn)后提交的任務(wù)先執(zhí)行,而先提交的任務(wù)一直在等待。
???????LinkedBlockingQueue:基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,初始化可以指定大小,也可以不指定。
???????當(dāng)指定大小后,行為就和 ArrayBlockingQueue一致。而如果未指定大小,則會(huì)使用默認(rèn)的 Integer.MAX_VALUE 作為隊(duì)列大小。這時(shí)候就會(huì)出現(xiàn)線程池的最大線程數(shù)參數(shù)無(wú)用,因?yàn)闊o(wú)論如何,向線程池提交任務(wù)加入等待隊(duì)列都會(huì)成功。最終意味著所有任務(wù)都是在核心線程執(zhí)行。如果核心線程一直被占,那就一直等待。
???????SynchronousQueue :無(wú)容量的隊(duì)列。
???????使用此隊(duì)列意味著希望獲得最大并發(fā)量。因?yàn)闊o(wú)論如何,向線程池提交任務(wù),往隊(duì)列提交任務(wù)都會(huì)失敗。而失敗后如果沒(méi)有空閑的非核心線程,就會(huì)檢查如果當(dāng)前線程池中的線程數(shù)未達(dá)到最大線程,則會(huì)新建線程執(zhí)行新提交的任務(wù)。完全沒(méi)有任何等待,唯一制約它的就是最大線程數(shù)的個(gè)數(shù)。因此一般配合Integer.MAX_VALUE就實(shí)現(xiàn)了真正的無(wú)等待。
???????但是需要注意的是,進(jìn)程的內(nèi)存是存在限制的,而每一個(gè)線程都需要分配一定的內(nèi)存。所以線程并不能無(wú)限個(gè)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容