談?wù)凾hreadPoolExecutor的實(shí)現(xiàn)

概述

? 線程作為系統(tǒng)稀缺資源,如果在應(yīng)用中進(jìn)行頻繁的創(chuàng)建和銷毀,會(huì)為我們的應(yīng)用帶來災(zāi)難性的體驗(yàn),增大系統(tǒng)負(fù)荷,降低效率。池化技術(shù)為該問題的解決提供了一種有效的思路,通過建立一個(gè)線程池,每次線程的時(shí)候從池中取出一個(gè)空閑的線程,這樣就省去了線程創(chuàng)建和銷毀。java的線程池實(shí)現(xiàn)是在jdk1.5開始引入的,本文將對(duì)其中最常用的ThreadPoolExecutor的實(shí)現(xiàn)進(jìn)行詳細(xì)的介紹,系統(tǒng)可以通過本文了解到如何去實(shí)現(xiàn)一個(gè)線程池,并向Doug Lea大神致敬。

使用

? 我們先看下面的線程池使用的例子,在該例子中我聲明一個(gè)核心線程數(shù)是2,最大線程數(shù)是5,非核心線程線程存活時(shí)間1s,阻塞隊(duì)列大小為1,拒絕策略為AbortPolicy,我們會(huì)輸出程序執(zhí)行過程中的線程池達(dá)到的最大線程數(shù)以及在所有任務(wù)執(zhí)行結(jié)束后線程池中線程的數(shù)量。代碼如下:

/**
 * Created by yuanqiongqiong on 2019/4/10.
 */
public class ThreadPoolExecutorTest {

    private static Logger LOGGER = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);

    //聲明一個(gè)線程池
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue(1),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String []args) {
        for (int i = 0; i< 7;i++) {
            String runnableName = "test" + i;
            PersonRunnable personRunnable = new PersonRunnable(runnableName);
            try {
                threadPoolExecutor.execute(personRunnable);
            } catch (Exception e) {
                LOGGER.error("執(zhí)行{}任務(wù)異常", runnableName, e);
            }
        }
        try {
            Thread.sleep(500);
            LOGGER.info("線程池當(dāng)前線程數(shù)目 = {}", threadPoolExecutor.getPoolSize());
            Thread.sleep(2000);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        LOGGER.info("線程池中達(dá)到的最大線程數(shù)目 = {}", threadPoolExecutor.getLargestPoolSize());
        LOGGER.info("線程池當(dāng)前線程數(shù)目 = {}", threadPoolExecutor.getPoolSize());
        LOGGER.info("線程池已經(jīng)完成的任務(wù)數(shù)量 = {}", threadPoolExecutor.getCompletedTaskCount());
    }
    static class PersonRunnable implements Runnable {
        private String name;

        public PersonRunnable(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            LOGGER.info("我是" + name + "我在線程" + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                LOGGER.error("任務(wù){(diào)}執(zhí)行異常", Thread.currentThread().getName(), e);
            }
        }
    }
}

? 輸出結(jié)果如下:

20:22:26.571 [pool-1-thread-3] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test3我在線程pool-1-thread-3
20:22:26.571 [pool-1-thread-2] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test1我在線程pool-1-thread-2
20:22:26.571 [pool-1-thread-4] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test4我在線程pool-1-thread-4
20:22:26.571 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test5我在線程pool-1-thread-5
20:22:26.571 [pool-1-thread-1] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test0我在線程pool-1-thread-1
20:22:26.576 [main] ERROR com.meituan.campaign.ThreadPoolExecutorTest - 執(zhí)行test6任務(wù)異常
java.util.concurrent.RejectedExecutionException: Task com.meituan.campaign.ThreadPoolExecutorTest$PersonRunnable@46f7f36a rejected from java.util.concurrent.ThreadPoolExecutor@421faab1[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.meituan.campaign.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:30)
20:22:26.681 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test2我在線程pool-1-thread-5
20:22:27.082 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池當(dāng)前線程數(shù)目 = 5
20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池中達(dá)到的最大線程數(shù)目 = 5
20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池當(dāng)前線程數(shù)目 = 2
20:22:29.085 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池已經(jīng)完成的任務(wù)數(shù)量 = 6

? 由于我們代碼設(shè)置了最大線程數(shù)是5個(gè),并且阻塞隊(duì)列大小是1,所以同一時(shí)間最多會(huì)有6個(gè)任務(wù)被執(zhí)行,其中1個(gè)任務(wù)放在阻塞隊(duì)列中。線程池達(dá)到的最大線程數(shù)目是5個(gè),因?yàn)榫€程池設(shè)置了maximumPoolSize=5。非核心線程會(huì)在1s空閑后被回收,因此最終線程池線程數(shù)目還是2個(gè)。

實(shí)現(xiàn)分析

? 拋開ThreadPoolExecutor,我們先想下實(shí)現(xiàn)一個(gè)線程池需要哪些成員變量,個(gè)人感覺以下變量是必不可少的:(1) 一個(gè)存放線程的容器或數(shù)組;(2) 一個(gè)隊(duì)列用來在線程池線程不足是存放排隊(duì)的任務(wù);(3) 一個(gè)狀態(tài)字段表示線程池的狀態(tài),用來表示線程池不同生命周期狀態(tài)。下面,我們看下ThreadPoolExecutor的成員變量:

//表示線程狀態(tài)和線程數(shù),高三位代表線程狀態(tài),低29位代表線程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//值為29
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池最大線程數(shù),大概為5億,可以肯定不會(huì)達(dá)到這么多線程
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//線程池處于運(yùn)行狀態(tài)可以接收新任務(wù)并執(zhí)行任務(wù)隊(duì)列中的任務(wù)
private static final int RUNNING    = -1 << COUNT_BITS;
//該狀態(tài)下線程池不再接收新任務(wù),但是會(huì)把任務(wù)隊(duì)列中的任務(wù)執(zhí)行完成,調(diào)用shutDown()會(huì)進(jìn)入該狀態(tài)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//該狀態(tài)下線程池不接受新任務(wù)并拋棄任務(wù)隊(duì)列中的任務(wù)中斷所有正在執(zhí)行的線程,調(diào)用shutDownNoW()會(huì)進(jìn)入該狀態(tài)
private static final int STOP       =  1 << COUNT_BITS;
//已經(jīng)沒有任務(wù)可以執(zhí)行,會(huì)從SHUTDOWN和STOP狀態(tài)變換為該狀態(tài)
private static final int TIDYING    =  2 << COUNT_BITS;
//在執(zhí)行完terminated()操作后會(huì)進(jìn)入該狀態(tài)
private static final int TERMINATED =  3 << COUNT_BITS;
//任務(wù)阻塞隊(duì)列,存放排隊(duì)任務(wù)
private final BlockingQueue<Runnable> workQueue;
//存放線程的hashset
private final HashSet<Worker> workers = new HashSet<Worker>();
//線程工廠,生成新線程
private volatile ThreadFactory threadFactory;
//拒絕策略
private volatile RejectedExecutionHandler handler;
//線程池核心線程數(shù)
private volatile int corePoolSize;
//線程池最大線程數(shù)
private volatile int maximumPoolSize;

? 上述代碼的注釋給出了線程池各個(gè)狀態(tài)的含義,我們看下各個(gè)狀態(tài)之間的狀態(tài)轉(zhuǎn)換關(guān)系,具體如下:

(1) RUNNING -> SHUTDOWN:調(diào)用了shutdown()函數(shù);

(2) (RUNNING or SHUTDOWN) -> STOP:調(diào)用了shutdownNow();

(3)SHUTDOWN -> TIDYING:當(dāng)線程池線程為空并者任務(wù)隊(duì)列為空;

(4)STOP -> TIDYING:當(dāng)線程池線程為空;

(5)TIDYING -> TERMINATED:當(dāng)調(diào)用了terminated()方法;

? 如上示例,我們把一個(gè)任務(wù)放入線程池的execute()函數(shù)中,線程池會(huì)為我們選擇一個(gè)線程來執(zhí)行我們提交的任務(wù)。在這個(gè)選擇線程的過程中,如果線程池中線程數(shù)量小于corePoolSize,那么將創(chuàng)建新線程執(zhí)行任務(wù);當(dāng)線程池?cái)?shù)量大于等于corePoolSize并且小于maximumPoolSize,線程池會(huì)把任務(wù)放到阻塞隊(duì)列workQueue中直到workQueue滿了去創(chuàng)建新線程;當(dāng)線程池線程數(shù)量等于maximumPoolSize并且workQueue滿時(shí)會(huì)執(zhí)行拒絕策略。下面我們通過execute()函數(shù)的邏輯來理解上述過程:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //如果線程數(shù)小于核心線程數(shù),那么創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)command
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果線程數(shù)大于等于核心線程數(shù),線程數(shù)處于RUNNING狀態(tài)(可以將任務(wù)加入阻塞隊(duì)列)并且加入阻塞隊(duì)列成功(即阻塞隊(duì)列未滿),那么任務(wù)就被加入阻塞隊(duì)列等待空閑線程。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次檢查線程池狀態(tài),如果不是RUNNING狀態(tài),從阻塞隊(duì)列中移除任務(wù),執(zhí)行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //線程處RUNNING狀態(tài)并且線程數(shù)是0,則創(chuàng)建個(gè)空閑新線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果線程數(shù)大于核心線程并且阻塞隊(duì)列已滿,則以maximumPoolSize為線程數(shù)最大值進(jìn)行處理
        else if (!addWorker(command, false))
            //線程池中線程達(dá)到最大線程數(shù)并且阻塞隊(duì)列已經(jīng)滿執(zhí)行拒絕策略
            reject(command);
    }

? 看到上面的代碼邏輯,我們會(huì)發(fā)現(xiàn)主要的邏輯還是在addWorker里,這個(gè)函數(shù)主要功能就是為任務(wù)分配線程并執(zhí)行,我們在看這塊邏輯之前需要取看一個(gè)重要的Worker類。該類封裝了線程及任務(wù),可以在內(nèi)部執(zhí)行任務(wù),具體定義如下:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        //具體線程
        final Thread thread;
        //線程要執(zhí)行的任務(wù)
        Runnable firstTask;
        //線程完成的任務(wù)數(shù)
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //調(diào)用線程工廠創(chuàng)建新的線程,threadFactory由我們的線程池構(gòu)造函數(shù)傳入,沒有指定則使用默認(rèn)的,這塊會(huì)創(chuàng)建一個(gè)新的線程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //可以看出Worker實(shí)現(xiàn)了AQS,其本身也是不可重入鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

? 思考為什么Worker要繼承AQS實(shí)現(xiàn)一個(gè)獨(dú)占鎖?這個(gè)問題我們后面分析。

? 了解了worker的構(gòu)成,我們就可以具體看下addWorker函數(shù)的執(zhí)行邏輯了,具體如下:

//core為true,那么創(chuàng)建線程是以corePoolSize作為線程數(shù)最大值,否則以maximumPoolSize作為線程數(shù)最大值
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
          
            // 線程狀態(tài)是非RUNNING狀態(tài)不再進(jìn)行任務(wù)提交處理,其中SHUTDOWN狀態(tài)下已經(jīng)提交進(jìn)行任務(wù)和阻塞隊(duì)列         中的任務(wù)要繼續(xù)處理
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //線程池中線程大于最大線程數(shù)或者大于要求的閾值,返回失敗
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //符合要求后,CAS增大線程數(shù),跳出自旋,走下面的線程創(chuàng)建邏輯
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;//標(biāo)記線程是否啟動(dòng)
        boolean workerAdded = false;//標(biāo)記線程是否添加成功
        Worker w = null;
        try {
            //創(chuàng)建新的線程并封裝為一個(gè)Work對(duì)象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //對(duì)線程池創(chuàng)建的線程狀態(tài)進(jìn)行檢查
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //如果新線程檢查成功,將新線程加入workers中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            //更新全局變量,線程池達(dá)到的最大線程數(shù),該值可以輸出作為線程池參數(shù)設(shè)定的指標(biāo)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //這塊重要了,線程創(chuàng)建成功后,開始執(zhí)行任務(wù)
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

? 我們繼續(xù)跟著上述代碼思路走,看下任務(wù)如何執(zhí)行,t.start()的會(huì)調(diào)用Worker類run()方法,而該方法會(huì)調(diào)用runWorker來從任務(wù)隊(duì)列中獲取任務(wù),執(zhí)行任務(wù),具體看下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果Worker中創(chuàng)建時(shí)存在任務(wù),則執(zhí)行;否則,調(diào)用getTask從阻塞隊(duì)列中獲取任務(wù),當(dāng)阻塞隊(duì)列中沒有任務(wù)并且線程不應(yīng)該被回收時(shí),線程會(huì)一直阻塞等待獲取任務(wù),具體在getTask方法中分析
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 上面的英文注釋很清楚了,這塊為了處理調(diào)用shutdownNow時(shí)需要停止所有的線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //這個(gè)函數(shù)可以自己實(shí)現(xiàn),默認(rèn)為空
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //執(zhí)行具體任務(wù)的業(yè)務(wù)邏輯
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       //這個(gè)函數(shù)可以自己實(shí)現(xiàn),默認(rèn)為空
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //執(zhí)行線程銷毀過程
            processWorkerExit(w, completedAbruptly);
        }
    }

? 在getTask方法中, Worker線程會(huì)一直循環(huán)的從阻塞隊(duì)列中獲取任務(wù),直到遇到以下情況會(huì)返回null,進(jìn)而執(zhí)行上面的線程銷毀過程processWorkerExit:

(1) 線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空;

(2) 線程數(shù)狀態(tài)變大于SHUTDOWN (STOP TIDYING TERMINATED);

(3) 線程池線程數(shù)大于最大線程數(shù)或者線程超時(shí)未獲取任務(wù)的情況下,任務(wù)隊(duì)列為空或者工作線程數(shù)大于1;

這塊邏輯具體代碼如下:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 線程池狀態(tài)為STOP或者(狀態(tài)為SHUTDOWN&&任務(wù)隊(duì)列為空),這個(gè)時(shí)候無需在執(zhí)行任務(wù)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            //設(shè)置了允許核心線程超過keepAliveTime空閑后銷毀線程 或者 線程數(shù)大于核心線程數(shù)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               //從阻塞隊(duì)列中獲取任務(wù),如果進(jìn)行超時(shí)控制,則調(diào)用poll方法,否則調(diào)用take一直阻塞到隊(duì)列中有任務(wù)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

? 以上就是線程池中任務(wù)執(zhí)行的大致過程,接下來我們對(duì)線程池結(jié)束及其中實(shí)現(xiàn)的一些細(xì)節(jié)進(jìn)行分析。

原文

袁瓊瓊的技術(shù)博客,歡迎指針
http://yuanqiongqiong.cn/2019/04/10/%E8%B0%88%E8%B0%88ThreadPoolExecutor%E7%9A%84%E5%AE%9E%E7%8E%B0/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容