ThreadPoolExecutor 源碼分析

前言

本文重點(diǎn)分析了ThreadPoolExecutor兩個(gè)方法execute() 和 submit() 的執(zhí)行原理,并說明Future如何實(shí)現(xiàn)阻塞返回。

繼承關(guān)系圖

關(guān)鍵方法介紹

構(gòu)造方法

    /**
     * @param corePoolSize   核心線程數(shù)
     * @param maximumPoolSize  最大線程數(shù)
     * @param keepAliveTime 臨時(shí)線程保留時(shí)間
     * @param unit  臨時(shí)線程保留時(shí)間單位
     * @param workQueue 阻塞隊(duì)列
     * @param threadFactory  線程工程
     * @param handler  拒絕策略
     */
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

為了方便區(qū)分,本文會(huì)將超過核心線程數(shù)創(chuàng)建的線程叫臨時(shí)線程,本質(zhì)上這兩類線程沒有任何區(qū)別,到期回收哪個(gè)線程完全是跟當(dāng)時(shí)線程池哪個(gè)線程先被空閑有關(guān),跟創(chuàng)建時(shí)間的先后無關(guān)

execute(Runnable command)

默認(rèn)參數(shù)

先介紹主要方法實(shí)現(xiàn)前,先說明一些靜態(tài)變量的含義和值。

ctl 官方給出的注釋是The main pool control state,這個(gè)值包含了兩部分,workerCount和runState。

int COUNT_BITS = Integer.SIZE - 3 = 29; 一共32位,高3位表示線程池的運(yùn)行狀態(tài),低29位表示線程池中的線程數(shù)量。是一種高低位的實(shí)現(xiàn)。

用一個(gè)變量去存儲(chǔ)兩個(gè)值,可避免在做相關(guān)決策時(shí),出現(xiàn)不一致的情況,不必為了維護(hù)兩者的一致,而占用鎖資源。通過閱讀線程池源代碼也可以發(fā)現(xiàn),經(jīng)常出現(xiàn)要同時(shí)判斷線程池運(yùn)行狀態(tài)和線程數(shù)量的情況。

int CAPACITY = (1 << COUNT_BITS) - 1 = 536870912;也就是從的線程容量是536870912個(gè)。

RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 都是用高3位表示不同的含義。低29位都是0

具體值參考下表:

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 RUNNING | 0 = -536870912 , 1110 0000 + 24位0 
    private static final int COUNT_BITS = Integer.SIZE - 3;   //29   高3位表示狀態(tài)  低29表示線程數(shù)量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  //536870912  0001 1111 + 24位1

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;  // -536870912  1110 0000 + 24位0
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 0            0000 0000 + 24位0 
    private static final int STOP       =  1 << COUNT_BITS;  // 536870912   0010 0000 + 24位0
    private static final int TIDYING    =  2 << COUNT_BITS;  // 1073741824  0100 0000 + 24位0
    private static final int TERMINATED =  3 << COUNT_BITS; // 1610612736   0110 0000 + 24位0

    // Packing and unpacking ctl
    // 如果c是默認(rèn)值-536870912, 
    // runStateOf = (-536870912 & ~29) = -536870912, 
    // workerCountOf = (-536870912 & 29) = 0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }  
源碼分析
 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //c = -536870912
        int c = ctl.get();
        // workerCountOf(c) = 0
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

首先,這個(gè)execute有三個(gè)主要的if判斷:

      //判斷當(dāng)前線程池中的線程數(shù)量有沒有到核心線程數(shù),沒有就創(chuàng)建新的worker來處理任務(wù)。
      if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
      //執(zhí)行到此處,說明此時(shí)線程池的線程數(shù)已經(jīng)超過了coolPoolSize。先判斷線程池狀態(tài),且嘗試將任務(wù)添加到阻塞隊(duì)列里。
      if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 最后意味著此時(shí)阻塞隊(duì)列已滿,嘗試創(chuàng)建新的worker來處理,不能創(chuàng)建則執(zhí)行拒絕策略。
        else if (!addWorker(command, false))
            reject(command);
addWorker()

很長(zhǎng)的一個(gè)方法,注釋就不貼了,兩個(gè)參數(shù)分別是當(dāng)前要執(zhí)行的任務(wù)和core(表示要?jiǎng)?chuàng)建的是核心線程還是臨時(shí)線程)。
這里的worker是真正負(fù)責(zé)處理任務(wù)的對(duì)象,worker內(nèi)部封裝了所屬線程和待執(zhí)行的任務(wù).

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // ...
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
       
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

接下來主要看addWorker方法的實(shí)現(xiàn)。

    private boolean addWorker(Runnable firstTask, boolean core) {
      //第一部分
      retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //第二部分
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

第一部分:
rs表示是線程池的狀態(tài),先校驗(yàn)線程池狀態(tài)和隊(duì)列數(shù)量。前文已經(jīng)提過,RUNNING的值是負(fù)數(shù),SHOTDOWN是0,其他值都是正數(shù)。

之后是for循環(huán),判斷容量和是否超過了預(yù)設(shè)的線程數(shù)量。
如果成功增加了workerCount的值就跳出循環(huán),開始執(zhí)行任務(wù)。
如果失敗,說明有并發(fā)情況,就重新獲取ctl,判斷rs狀態(tài)是否變了,從而決定是重新執(zhí)行一遍大或小循環(huán)。

for循環(huán)結(jié)束后,說明當(dāng)前可以增加worker對(duì)象。此時(shí)就真正創(chuàng)建對(duì)象開始執(zhí)行任務(wù)。

第二部分:
在創(chuàng)建worker對(duì)象時(shí),構(gòu)造方法中也創(chuàng)建了一個(gè)Thread。并通過lock來保證原子性,校驗(yàn)狀態(tài)之后將worker對(duì)象add到HashSet中。
private final HashSet<Worker> workers = new HashSet<Worker>();

添加后,釋放鎖并start線程。

如果在addWorker過程中失敗,且第一階段順利完成,就從hashSet中移除,并減少workerCount。

/**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

如果添加任務(wù)順利,則在t.start();執(zhí)行完成后,主要任務(wù)就完成了并返回true。此時(shí)線程會(huì)執(zhí)行worker對(duì)象內(nèi)的run方法。

worker內(nèi) run()
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

runWorker真正執(zhí)行,這個(gè)this只得是worker對(duì)象,task和線程都已經(jīng)封裝到worker內(nèi)了。


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果線程池已經(jīng)是STOP或TIDYING或TERMINATED,需要將線程也主動(dòng)中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這里說明一些最核心的邏輯
執(zhí)行過程:

  1. 在while中判斷當(dāng)前的task和隊(duì)列中的task,如果當(dāng)前task != null,說明是線程是伴隨著任務(wù)一起創(chuàng)建的,直接調(diào)用task.run來執(zhí)行。
  2. 第一圈執(zhí)行完成后,task=null,第二次執(zhí)行while時(shí),需要從getTask中取task來執(zhí)行。
  3. 當(dāng)getTask() 返回null時(shí),while結(jié)束,設(shè)置completedAbruptly = false;表明任務(wù)時(shí)正常結(jié)束。最后調(diào)用processWorkerExit來退出線程。

這里提供了兩個(gè)方法:beforeExecute 和 afterExecute,task.run()的切面,我們可以定義worker的子類,來實(shí)現(xiàn)擴(kuò)展,比如加入一些監(jiān)控等。

getTask() 返回null就代表著線程可以正常結(jié)束,那么什么情況下會(huì)返回null?

getTask()
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask() 的主要任務(wù)是從阻塞隊(duì)列中獲取task。通過判斷當(dāng)前的wc 是否超過了核心線程數(shù),來決定poll還是take來取任務(wù)。
如果超過了,說明此時(shí)已經(jīng)創(chuàng)建過了臨時(shí)線程,臨時(shí)線程的有效期就是等待從隊(duì)列返回的時(shí)間,超過這個(gè)時(shí)間沒有取到,則設(shè)置timeOut表示已經(jīng)超時(shí),在下一次for循環(huán)的if判斷中,返回null,讓這個(gè)臨時(shí)線程自動(dòng)結(jié)束。
如果沒超過,說明此時(shí)還處在核心線程的階段,可以take長(zhǎng)期等待。

至此,run方法的執(zhí)行過程就此完成。

任務(wù)是如何添加到隊(duì)列中的,還得回到execute方法。

      if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

如果已經(jīng)達(dá)到核心線程數(shù),就不能在繼續(xù)addWorker,而是要offer到workQueue中,并再次檢查線程池狀態(tài)。

如果offer失敗,說明阻塞隊(duì)列已滿,此時(shí)需要繼續(xù)創(chuàng)建新的worker來完成任務(wù)。

        else if (!addWorker(command, false))
            reject(command);

這里的false代表 創(chuàng)建時(shí)和最大線程數(shù)進(jìn)行比較,如果超過了最大線程數(shù),則調(diào)用reject來執(zhí)行拒絕策略。

reject()
/**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

4種默認(rèn)的拒絕策略

AbortPolicy : 直接拋出異常(默認(rèn)策略)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
DiscardPolicy : 什么也不處理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
DiscardOldestPolicy : 把當(dāng)前最早在隊(duì)列的任務(wù)丟棄,并將再次執(zhí)行此任務(wù)(可能會(huì)直接執(zhí)行,也可能被加到隊(duì)列中)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
CallerRunsPolicy : 由當(dāng)前線程來直接執(zhí)行run,不再交給線程池。
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

submit 源碼分析

submit()

    Future<?> future = Executors.newCachedThreadPool().submit(new Thread());
    ...
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

submit方法可用于帶返回值的任務(wù)執(zhí)行??梢苑祷谾uture來獲取線程的執(zhí)行結(jié)果,具體的實(shí)現(xiàn)定義在AbstractExecutorService中。

首先創(chuàng)建了一個(gè)FutureTask對(duì)象,傳入了要執(zhí)行的任務(wù)。把封裝后的FutureTask交給execute來執(zhí)行。

  /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *
     * @param runnable 要執(zhí)行的任務(wù)
     * @param 返回的默認(rèn)值
     * @param <T> the type of the given value
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

FutureTask

繼承關(guān)系圖和構(gòu)造方法
    /**  
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    // FutureTask 可能的狀態(tài)列表
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 執(zhí)行的任務(wù) */
    private Callable<V> callable;
    /** get() 的返回值,即最終的執(zhí)行結(jié)果 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    // 單項(xiàng)列表的node
    private volatile WaitNode waiters;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask 既然將任務(wù)封裝到了callable屬性中,且它自身還是一個(gè)Runnable,那么真正執(zhí)行一定在run方法中。而get() 是一個(gè)阻塞方法,當(dāng)執(zhí)行完成后,可以獲取返回值,否則就等待。

那重點(diǎn)看下run() 和 get() 的實(shí)現(xiàn)。

get()
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

 private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

state 有多種狀態(tài),用來標(biāo)記當(dāng)前任務(wù)的執(zhí)行情況,如果已經(jīng)是完成狀態(tài),通過report方法直接返回outcome即可。
如果還未到達(dá)完成態(tài),就說明當(dāng)前任務(wù)還在執(zhí)行,此時(shí)需要await等待,也就是awaitDone。

awaitDone()
/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

awaitDone的兩個(gè)參數(shù)分別用于表示是否有等待時(shí)間,以及等待時(shí)間的納秒數(shù)。
如果有等待時(shí)間,deadline就是截止時(shí)間。
下面則是主要邏輯:
一般來說,這里的for循環(huán)會(huì)執(zhí)行3圈,(不考慮已經(jīng)執(zhí)行完成和中斷的情況)。

  1. 第一圈:因?yàn)閃aitNode q 最初被賦值為null,在run執(zhí)行完之前,state是NEW,所以for循環(huán)會(huì)執(zhí)行q=null的邏輯,先創(chuàng)建一個(gè)WaitNode對(duì)象。
  2. 第二圈:因?yàn)閝此時(shí)有值,但queued是false,此時(shí)for循環(huán)執(zhí)行! queued的邏輯,如果設(shè)置成功,則queued = true。
  3. 第三圈:LockSupport.park(this); (如果有deadline,就判斷是否超時(shí)了)此時(shí)線程進(jìn)入阻塞狀態(tài)等待喚醒。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
    //背景:
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> k = FutureTask.class;
    waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

重點(diǎn)就是這一句。
這一句是做了兩個(gè)事情:

  1. 構(gòu)建waiters的Node單向鏈表
  2. 如果添加隊(duì)列成功就返回true。
這里為什么要構(gòu)建單向鏈表?

一般來說,一個(gè)task通過一個(gè)get()方法等待獲取就OK了,是一個(gè)單任務(wù)。但如果,同一個(gè)FutureTask的get() 方法被多個(gè)線程調(diào)用時(shí),多個(gè)線程(可能)會(huì)同時(shí)處于阻塞狀態(tài),這時(shí)就需要一個(gè)存儲(chǔ)介質(zhì)來存儲(chǔ)這些等待線程,這里是通過單鏈表來實(shí)現(xiàn)。
構(gòu)建單向鏈表的過程如下:

  1. 第一次調(diào)用get():
    當(dāng)前waiters = null;q.next = waiters(null); waiters = q; 即waiters的頭節(jié)點(diǎn)是q,q.next是null。
  2. 第二次調(diào)用get(); 如果當(dāng)前的任務(wù)命名為p;
    當(dāng)前waiters = q; p.next = waiter(q); waiters = p; 即構(gòu)建了一個(gè) p -> q的鏈表結(jié)構(gòu),waiters是頭節(jié)點(diǎn)p。
  3. 第三次調(diào)用get(); 如果當(dāng)前的任務(wù)命名為r;
    最后的效果是 r -> p -> q; 可以看出來是頭插法。
run()
 public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
}

run() 比較簡(jiǎn)單,如果當(dāng)前FutureTask是NEW的狀態(tài),就調(diào)用callable.call(),將執(zhí)行完成的result通過set方法設(shè)置到outcome中。
且無論成功失敗,都將runner線程置為null,并判斷執(zhí)行過程中是否被其他線程中斷,如果因?yàn)橹袛喽。瑒t此線程一直交出時(shí)間片,直到狀態(tài)從INTERRUPTING變成INTERRUPTED。

如果成功執(zhí)行且沒有被中斷過,則通過set方法進(jìn)行返回值的設(shè)置。

set()
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

先判斷此時(shí)狀態(tài)是NEW,則改成COMPLETING,設(shè)置outcome后,狀態(tài)改成NORMAL(完成態(tài)),調(diào)用finishCompletion來喚醒等待中的線程。

finishCompletion()
/**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        // done是一個(gè)空方法,給子類重寫用。
        done();

        callable = null;        // to reduce footprint
    }

這個(gè)方法比較簡(jiǎn)單,可以看到就是在遍歷waiters單鏈表,依次喚醒內(nèi)部的阻塞線程。(阻塞的發(fā)起點(diǎn)是get方法)。

總結(jié)

execute()

實(shí)現(xiàn)思想:
  1. task因?yàn)榻挥删€程池來執(zhí)行,線程池的線程直接調(diào)用task中的run,而不是執(zhí)行task.start()。
  2. 如果當(dāng)前線程池中的線程數(shù) < corePoolSize ,就創(chuàng)建新的線程添加到線程池中(HashSet存儲(chǔ))。
  3. 如果當(dāng)前的線程數(shù) > corePoolSize 就先存放到阻塞隊(duì)列里
  4. 如果阻塞隊(duì)列已滿,且 < maximumPoolSize,就創(chuàng)建新的線程添加到線程池中(HashSet存儲(chǔ)),當(dāng)keepAliveTime的時(shí)間沒有處理任務(wù),則銷毀(也就是讓run方法結(jié)束)。
  5. 如果已經(jīng)超過maximumPoolSize,則根據(jù)拒絕策略執(zhí)行。
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

submit()

實(shí)現(xiàn)思想:

任務(wù)執(zhí)行的思想還是execute,阻塞等待返回值的思想是通過Future完成。實(shí)現(xiàn)類是FutureTask。

  1. get()返回值時(shí)如果還未完成,將當(dāng)前線程封裝成WaiterNode,進(jìn)行LockSupport.park,并將所有park的線程按照頭插法構(gòu)建一個(gè)單向鏈表。
  2. run() 執(zhí)行完成后,將內(nèi)部的outcome屬性設(shè)置成當(dāng)前FutureTask的返回值,并unpark單鏈表中的所有阻塞線程,這些線程的get()會(huì)直接返回outcome的值。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容