ThreadPoolExecutor 源碼分析

Integer.SIZE = 32
COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29

為了方便做如下假設(shè)
Integer.SIZE = 10 則
COUNT_BITS = 10 - 3 = 7

CAPACITY = (1 << COUNT_BITS) - 1
CAPACITY: 0001111111

RUNNING: 1111111111 -1<< 7 => 1110000000
SHUTDOWN: 0000000000 0 << 7 => 0000000000
STOP: 0000000001 1 << 7 => 0010000000
TIDYING: 0000000010 2 << 7 => 0100000000
TERMINATED: 0000000011 3 << 7 => 0110000000

runStateOf 線程池的運(yùn)行狀態(tài),獲取高三位
c & ~capacity
1110000000 running
& 1110000000
=
1110000000

workerCountOf 工作線程的容量,獲取后7位
c & CAPACITY
1110000000
& 0001111111
=
0001111111

通過(guò)上面的內(nèi)容可以看到高三位作為狀態(tài)控制位,后面的作為capacity容量大小

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
|: 兩個(gè)位只要有一個(gè)為1,那么結(jié)果就是1,否則就為0

ctl
1110000000
| 0000000000
= 1110000000 保留高3位的線程池的運(yùn)行狀態(tài)

The main pool control state, ctl, is an atomic integer packing two conceptual fields
 workerCount, indicating the effective number of threads
 runState,    indicating whether running, shutting down etc

ctl有兩個(gè)作用, workerCount 有效的線程數(shù), runState 運(yùn)行狀態(tài)running,shutting,STOP,TIDYING,TERMINATED

狀態(tài)解釋說(shuō)明
The runState provides the main lifecycle control, taking on values:

RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP: Don't accept new tasks, don't process queued tasks,
and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero,
the thread transitioning to state TIDYING
will run the terminated() hook method
TERMINATED: terminated() has completed

ThreadPoolExecutor 核心參數(shù)

ThreadFactory線程工廠類,創(chuàng)建工作線程
RejectedExecutionHandler 拒絕策略類, 默認(rèn)為AbortPolicy,其他還有CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy
keepAliveTime 線程的空閑時(shí)間,超過(guò)空閑時(shí)間,線程會(huì)被回收
allowCoreThreadTimeOut 允許核心線程超時(shí),默認(rèn)core線程不會(huì)被回收,當(dāng)為true時(shí),核心線程的Idle時(shí)間超過(guò)keepAliveTime也會(huì)被回收
corePoolSize 核心線程數(shù)
maximumPoolSize 最大線程數(shù)

線程池的工作原理

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize,
 in which case, the task will be rejected.

如果正在運(yùn)行的線程數(shù)少于corePoolSize,Executor添加新線程,而不是排隊(duì)。
如果大于等于corePoolSize大小的線程數(shù)正在運(yùn)行,Executor對(duì)請(qǐng)求進(jìn)行排隊(duì),而不是添加新線程。
如果無(wú)法將請(qǐng)求放入隊(duì)列中,則將創(chuàng)建一個(gè)新線程,除非創(chuàng)建的線程數(shù)超過(guò)了maximumPoolSize大小,在這種情況下,該任務(wù)將被拒絕。

ThreadPoolExecutor.execute(Runnable command)執(zhí)行流程分析

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //如果當(dāng)前worker數(shù)量小于corePoolSize,嘗試直接創(chuàng)建一個(gè)worker,創(chuàng)建成功后直接返回
            if (addWorker(command, true))
                return;
            //如果創(chuàng)建失敗,重新獲取ctl的值
            c = ctl.get();
        }
        //如果當(dāng)前worker大于等于corePoolSize ,ThreadPoolExecutor狀態(tài)是運(yùn)行中,將task添加到queue中
        if (isRunning(c) && workQueue.offer(command)) {
            //重新檢查ctl,
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //當(dāng)線程池不處于運(yùn)行中狀態(tài),而且task被添加到隊(duì)列中了,則使用拒絕策略來(lái)處理添加的task
                reject(command);
            else if (workerCountOf(recheck) == 0)
                 //當(dāng)當(dāng)前的worker數(shù)量為0,創(chuàng)建一個(gè)新的worker,firstTask為null,
                addWorker(null, false);
        }
       //當(dāng)當(dāng)前worker數(shù)量大于等于corePoolSize,并且queue隊(duì)列已經(jīng)滿了,則直接創(chuàng)建新的worker
        else if (!addWorker(command, false))
             //如果線程池不是running,或者當(dāng)前的worker的數(shù)量超過(guò)了maximumPoolSize
            reject(command);
    }

ThreadPoolExecutor.addWorker(Runnable command)執(zhí)行流程分析

//core: 如果為true,以corePoolSize作為邊界,否則以maximumPoolSize作為邊界
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //線程池的狀態(tài)不是running,返回false
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                 //worker count超過(guò)了邊界,返回false
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //創(chuàng)建一個(gè)worker,在worker中會(huì)使用ThreadFactory創(chuàng)建一個(gè)線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                       //將worker添加到workers Set集合中
                        workers.add(w);
                        int s = workers.size();
                        //記錄線程池的最大數(shù)量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //調(diào)用Thread.start方法執(zhí)行執(zhí)行任務(wù)
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker

//Worker繼承AQS,所以Worker是線程安全的,
//Worker實(shí)現(xiàn)Runnable接口,作為參數(shù)傳遞給getThreadFactory().newThread()方法,當(dāng)Thread.start()時(shí),會(huì)執(zhí)行Worker的run方法
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果firstTask不為null,或者從隊(duì)列中取到了task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                   //鉤子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                       //task就是我們調(diào)用execute(Runnable command)提交的任務(wù)
                        task.run();
                    } finally {
                        //鉤子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //處理worker退出邏輯,主要是將worker從workers移除,中斷worker相關(guān)的thread,worker count -1
            processWorkerExit(w, completedAbruptly);
        }
    }

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut為true或者wc大于corePoolSize核心線程數(shù)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果worker count大于maximumPoolSize, 并且worker queue為空, 返回null task,
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果timed為true,poll超時(shí)獲取,如果超時(shí),返回null
               //如果timed為false, (allowCoreThreadTimeOut為false, worker count小于corePoolSize),阻塞獲取
               //從而保證小于corePoolSize的worker不會(huì)清理掉
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

核心方法是: addWorker,runWorker,getTask.

通過(guò)上面的分析可以回答一下幾個(gè)問(wèn)題

  1. 如果worker count大于corePoolSize,則通過(guò)workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)超時(shí)獲取task方法,如果返回的是null,則執(zhí)行processWorkerExit方法做worker的清理工作
    2、如果worker count <= corePoolSize并且allowCoreThreadTimeOut為false,通過(guò)workQueue.take()方法阻塞獲取task,從而不讓processWorkerExit方法執(zhí)行達(dá)到worker的?;畹哪康?/li>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容