【原理】:JAVA線程池源碼分析

總結(jié):線程池ThreadPoolExecutor是通過控制Worker對象的數(shù)量來維護工作的工人集合,并且通過任務隊列workerQueue來存儲提交到線程池的任務。通過配置相關(guān)的容量,以及拒絕策略來更方便使用以及處理容量飽滿的情況。Worker使用了同步器來解決任務執(zhí)行前執(zhí)行時執(zhí)行后的同步問題。
值得注意的是submit()execute()的區(qū)別主要是submit()方法會將任務用FutureTask進行包裝,包裝之后在用execute()執(zhí)行

java線程池內(nèi)容較多,簡單的架構(gòu)如下。

線程池結(jié)構(gòu)

頂層接口定義了execute方法

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService定義了對像城池的一些操作,submit()shutdown()在這里定義了

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2);

    Future<?> submit(Runnable var1);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

除了這兩個接口意外還定義了ScheduledExecutorService接口,并額外定義了定時執(zhí)行的功能

public interface ScheduledExecutorService extends ExecutorService {
    ScheduledFuture<?> schedule(Runnable var1, long var2, TimeUnit var4);

    <V> ScheduledFuture<V> schedule(Callable<V> var1, long var2, TimeUnit var4);

    ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6);

    ScheduledFuture<?> scheduleWithFixedDelay(Runnable var1, long var2, long var4, TimeUnit var6);
}

再往下就是具體的實現(xiàn)了ThreadPoolExecutor作為一個具體的實現(xiàn)。來看他的構(gòu)造方法的參數(shù)。

  1. corePoolSize:核心數(shù)
  2. maximumPoolSize:最大核心數(shù)
  3. keepAliveTime:當線程空閑時間達到keepAliveTime,該線程會退出,直到線程數(shù)量等于corePoolSize。(超出核心數(shù)的線程最大存活時間)
  4. unit:時間單位
  5. workQueue:任務隊列
  6. threadFactory:線程工廠,可以設(shè)置線程的名字
  7. handler:拒絕策略,圖示**Policy的就是handler對應的策略ThreadPoolExecutor內(nèi)部實現(xiàn)了這些策略,可選配。
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • 再來看看線程池的源碼入口execute()或者submit()
  1. 值得注意的是submit()調(diào)用了execute()方法,具有返回值,并且對task進行了包裝。
  2. execute()方法就是線程池執(zhí)行的核心了。結(jié)合配置的線程和核心數(shù),拒絕策略,有三種場景
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }


    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        //獲取線程池狀態(tài),從而判斷場景
        //場景一:worker小于核心線程數(shù)
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        //場景二:線程在運行,且任務隊列大于核心線程數(shù)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        //場景三:非核心工作線程邏輯
        else if (!addWorker(command, false))
            //執(zhí)行拒絕策略
            reject(command);
    }

通過對源碼的分析發(fā)現(xiàn)無論如何workQueueaddWorker(cammand,boolean)是關(guān)鍵`。

  1. workQueue是一個阻塞隊列,用于存放所有被提交到線程池的任務(線程)。
  2. addWorker():線程池里面都是工人的概念,增加一個工人來執(zhí)行線程任務。工人可復用。
    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

addWorker()的工作主要還是初始化工人并且讓工人開始工作,執(zhí)行任務。并且維護一個HashSet來維護工人workers集合

    private final HashSet<Worker> workers = new HashSet<Worker>();


 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //初始化了一個工人,Worker持有一個thread
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        //增加一個工人
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //執(zhí)行任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker工人是ThreadPoolExecutor的一個內(nèi)部類,實現(xiàn)了Runnable說明他本身是一個任務,繼承了AbstractQueuedSynchronizer說明有使用AQS保證一些執(zhí)行動作是同步

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        
        //持有線程,該線程有配置的線程工廠產(chǎn)生
        final Thread thread;
        //持有需要執(zhí)行的任務本身
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable var2) {
            this.setState(-1);
            this.firstTask = var2;
            this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
        }
        //實現(xiàn)了run方法
        public void run() {
            ThreadPoolExecutor.this.runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return this.getState() != 0;
        }

        protected boolean tryAcquire(int var1) {
            if (this.compareAndSetState(0, 1)) {
                this.setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                return false;
            }
        }

        protected boolean tryRelease(int var1) {
            this.setExclusiveOwnerThread((Thread)null);
            this.setState(0);
            return true;
        }

        public void lock() {
            this.acquire(1);
        }

        public boolean tryLock() {
            return this.tryAcquire(1);
        }

        public void unlock() {
            this.release(1);
        }

        public boolean isLocked() {
            return this.isHeldExclusively();
        }

        void interruptIfStarted() {
            Thread var1;
            if (this.getState() >= 0 && (var1 = this.thread) != null && !var1.isInterrupted()) {
                try {
                    var1.interrupt();
                } catch (SecurityException var3) {
                }
            }

        }
    }

通過源碼分析我們知道Worker的run方法調(diào)用了runWorker()方法,其中的同步操作就是task.run(),執(zhí)行具體任務的run方法,而task的來源除了worker持有的firtTask,就是getTask()方法從workQueue中獲取得到了。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //回收工人
            processWorkerExit(w, completedAbruptly);
        }
    }


    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


最后還會通過processWorkerExit然當前工人執(zhí)行回收策略,回收工人。循環(huán)使用了workers

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容