線程池知識(shí)個(gè)人總結(jié)及源碼分析

1 ThreadPoolExecutor

該線程池是比較常用的線程池。參數(shù)如下:

參數(shù)名 解釋
corePoolSize 核心線程池大小
maximumPoolSize 最大線程池大小
keepAliveTime 線程池中超過corePoolSize數(shù)目的空閑進(jìn)程的最大存活時(shí)間;可以allowCoreThreadTimeOut(true)使得核心線程有效時(shí)間
TimeUnit keepAliveTime時(shí)間單位
workQueue 阻塞任務(wù)隊(duì)列
threadFactory 新建線程工廠
RejectedExecutionHandler 當(dāng)提交任務(wù)數(shù)超過maxmumPoolSize+workQueue之和時(shí),任務(wù)會(huì)交給RejectedExecutionHandler來處理
 * <li> If fewer than corePoolSize threads are running, the Executor
 * always prefers adding a new thread
 * rather than queuing.</li>
 *
 * <li> If corePoolSize or more threads are running, the Executor
 * always prefers queuing a request rather than adding a new
 * thread.</li>
 *
 * <li> If a request cannot be queued, a new thread is created unless
 * this would exceed maximumPoolSize, in which case, the task will be
 * rejected.</li>

1.當(dāng)線程池小于corePoolSize時(shí),新提交任務(wù)將創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),即使此時(shí)線程池中存在空閑線程。
2.當(dāng)線程池達(dá)到corePoolSize時(shí),新提交任務(wù)將被放入workQueue中,等待線程池中任務(wù)調(diào)度執(zhí)行
3.當(dāng)workQueue已滿,且maximumPoolSize>corePoolSize時(shí),新提交任務(wù)會(huì)創(chuàng)建新線程執(zhí)行任務(wù)
4.當(dāng)提交任務(wù)數(shù)超過maximumPoolSize+workQueue時(shí),新提交任務(wù)由RejectedExecutionHandler處理
5.當(dāng)線程池中超過corePoolSize線程,空閑時(shí)間達(dá)到keepAliveTime時(shí),關(guān)閉空閑線程
6.當(dāng)設(shè)置allowCoreThreadTimeOut(true)時(shí),線程池中corePoolSize線程空閑時(shí)間達(dá)到keepAliveTime也將關(guān)閉

1.1 newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

創(chuàng)建一個(gè)定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。

創(chuàng)建線程池的時(shí)候默認(rèn)將corePoolSize和maximumPoolSize設(shè)置成相同值,表示不會(huì)創(chuàng)建出更多線程。

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

仔細(xì)看會(huì)發(fā)現(xiàn),在創(chuàng)建線程池的時(shí)候默認(rèn)將corePoolSize和maximumPoolSize設(shè)置成相同值,這是因?yàn)椴捎玫淖枞€程隊(duì)列采用的是LinkedBlockingQueue,該隊(duì)列是一個(gè)無邊界隊(duì)列,所以所有未在線程中運(yùn)行的任務(wù)都可以進(jìn)入到該隊(duì)列。實(shí)際上該線程池的大小為Integer.MAX_VALUE。

1.2 newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。

這里就是同時(shí)將corePoolSize和maximumPoolSize設(shè)置成1。

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

1.3 newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

創(chuàng)建一個(gè)可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

  • 工作線程的創(chuàng)建數(shù)量幾乎沒有限制(其實(shí)也有限制的,數(shù)目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
  • 如果長時(shí)間沒有往線程池中提交任務(wù),即如果工作線程空閑了指定的時(shí)間(默認(rèn)為1分鐘),則該工作線程將自動(dòng)終止。終止后,如果你又提交了新的任務(wù),則線程池重新創(chuàng)建一個(gè)工作線程。
  • 在使用CachedThreadPool時(shí),一定要注意控制任務(wù)的數(shù)量,否則,由于大量線程同時(shí)運(yùn)行,很有會(huì)造成系統(tǒng)癱瘓。
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

這里將maximumPoolSize設(shè)置成了Integer.MAX_VALUE,是由于SynchronousQueue阻塞隊(duì)列的大小是比較小的,如果不將maximumPoolSize設(shè)置成比較大的數(shù)就容易拋出異常。

一般都不建議使用Integer.MAX_VALUE大小的線程池,容易堆積大量的請(qǐng)求和創(chuàng)建大量的線程。

2 ForkJoinPool

ForkJoinPool原理類似分治法的思想,先把大的任務(wù)分成若干個(gè)小任務(wù)并計(jì)算,最后把所有小任務(wù)的計(jì)算結(jié)果合并起來。

2.1 newWorkStealingPool

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

工作竊取線程池,默認(rèn)所有線程都存在一個(gè)自己的任務(wù)隊(duì)列,當(dāng)自己線程所有任務(wù)執(zhí)行完畢時(shí),可以從別的線程的任務(wù)隊(duì)列中獲取到未執(zhí)行的任務(wù)放入本線程執(zhí)行,這就是工作竊取,使用該線程池可以有效提高CPU利用率。
每一個(gè)工作線程簡單的通過以下兩條原則進(jìn)行活動(dòng):

  • 若隊(duì)列非空,則代表自己線程的Task還沒執(zhí)行完畢,取出Task并執(zhí)行。
  • 若隊(duì)列為空,則隨機(jī)選取一個(gè)其他的工作線程的Task并執(zhí)行。
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

3.ScheduledThreadPoolExecutor

該線程池繼承自ThreadPoolExecutor。

3.1 newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

初始化的線程池可以在指定的時(shí)間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實(shí)際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)。

4 排隊(duì)策略

4.1 Direct handoffs 直接提交

 * <li> <em> Direct handoffs.</em> A good default choice for a work
 * queue is a {@link SynchronousQueue} that hands off tasks to threads
 * without otherwise holding them. Here, an attempt to queue a task
 * will fail if no threads are immediately available to run it, so a
 * new thread will be constructed. This policy avoids lockups when
 * handling sets of requests that might have internal dependencies.
 * Direct handoffs generally require unbounded maximumPoolSizes to
 * avoid rejection of new submitted tasks. This in turn admits the
 * possibility of unbounded thread growth when commands continue to
 * arrive on average faster than they can be processed.  </li>

在收到提交的任務(wù)時(shí),會(huì)將任務(wù)直接提交給線程,并會(huì)不自己持有,如果沒有線程立刻能立刻處理該任務(wù),將會(huì)讓該任務(wù)直接失敗。

4.2 Unbounded queues 無界隊(duì)列

 * <li><em> Unbounded queues.</em> Using an unbounded queue (for
 * example a {@link LinkedBlockingQueue} without a predefined
 * capacity) will cause new tasks to wait in the queue when all
 * corePoolSize threads are busy. Thus, no more than corePoolSize
 * threads will ever be created. (And the value of the maximumPoolSize
 * therefore doesn't have any effect.)  This may be appropriate when
 * each task is completely independent of others, so tasks cannot
 * affect each others execution; for example, in a web page server.
 * While this style of queuing can be useful in smoothing out
 * transient bursts of requests, it admits the possibility of
 * unbounded work queue growth when commands continue to arrive on
 * average faster than they can be processed.  </li>

使用的是一種無界隊(duì)列,例如LinkedBlockingQueue隊(duì)列。當(dāng)線程池中線程數(shù)目達(dá)到corePoolSize時(shí),所有的任務(wù)都會(huì)放入隊(duì)列中進(jìn)行等待,而不會(huì)創(chuàng)建新線程。所以對(duì)于maximunPoolSize的值并不會(huì)有什么影響。無界隊(duì)列適合那些任務(wù)獨(dú)立的情況,因?yàn)槿蝿?wù)之間不會(huì)相互影響。

4.3 Bounded queues 有界隊(duì)列

 * <li><em>Bounded queues.</em> A bounded queue (for example, an
 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
 * used with finite maximumPoolSizes, but can be more difficult to
 * tune and control.  Queue sizes and maximum pool sizes may be traded
 * off for each other: Using large queues and small pools minimizes
 * CPU usage, OS resources, and context-switching overhead, but can
 * lead to artificially low throughput.  If tasks frequently block (for
 * example if they are I/O bound), a system may be able to schedule
 * time for more threads than you otherwise allow. Use of small queues
 * generally requires larger pool sizes, which keeps CPUs busier but
 * may encounter unacceptable scheduling overhead, which also
 * decreases throughput.  </li>

有界隊(duì)列,例如ArrayBlockingQueue??梢杂脕碓谟邢薜木€程池中防止資源被耗盡,但是難以維護(hù)和控制。

5.保持存活機(jī)制

 * <dt>Keep-alive times</dt>
 *
 * <dd>If the pool currently has more than corePoolSize threads,
 * excess threads will be terminated if they have been idle for more
 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
 * This provides a means of reducing resource consumption when the
 * pool is not being actively used. If the pool becomes more active
 * later, new threads will be constructed. This parameter can also be
 * changed dynamically using method {@link #setKeepAliveTime(long,
 * TimeUnit)}.  Using a value of {@code Long.MAX_VALUE} {@link
 * TimeUnit#NANOSECONDS} effectively disables idle threads from ever
 * terminating prior to shut down. By default, the keep-alive policy
 * applies only when there are more than corePoolSize threads. But
 * method {@link #allowCoreThreadTimeOut(boolean)} can be used to
 * apply this time-out policy to core threads as well, so long as the
 * keepAliveTime value is non-zero. </dd>

如果超過corePoolSize部分的線程如果閑置了超過keepAliveTime的時(shí)間(可以通過getKeepAliveTime()方法來獲取存活時(shí)間),線程將會(huì)被終止。這可以減少線程池使用不活躍時(shí)資源的浪費(fèi)。可以通過setKeepAliveTime()方法來設(shè)置存活時(shí)間??梢酝ㄟ^設(shè)置一個(gè)Long.MAX_VALUE的值來有效的避免空閑線程在關(guān)閉之前終止。allowCoreThreadTimeOut(boolean)可以讓corePoolSize的線程也適用存活機(jī)制。

6.拒絕策略

* <dt>Rejected tasks</dt>
 *
 * <dd>New tasks submitted in method {@link #execute(Runnable)} will be
 * <em>rejected</em> when the Executor has been shut down, and also when
 * the Executor uses finite bounds for both maximum threads and work queue
 * capacity, and is saturated.  In either case, the {@code execute} method
 * invokes the {@link
 * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
 * method of its {@link RejectedExecutionHandler}.  Four predefined handler
 * policies are provided:
 *
 * <ol>
 *
 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
 * handler throws a runtime {@link RejectedExecutionException} upon
 * rejection. </li>
 *
 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
 * that invokes {@code execute} itself runs the task. This provides a
 * simple feedback control mechanism that will slow down the rate that
 * new tasks are submitted. </li>
 *
 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
 * cannot be executed is simply dropped.  </li>
 *
 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
 * executor is not shut down, the task at the head of the work queue
 * is dropped, and then execution is retried (which can fail again,
 * causing this to be repeated.) </li>
 *
 * </ol>
 *
 * It is possible to define and use other kinds of {@link
 * RejectedExecutionHandler} classes. Doing so requires some care
 * especially when policies are designed to work only under particular
 * capacity or queuing policies. </dd>
  • AbortPolicy:默認(rèn)直接拋出RejectedExecutionException異常
  • CallerRunsPolicy:由提交任務(wù)線程負(fù)責(zé)執(zhí)行
  • DiscardPolicy:直接拋棄
  • DiscardOldestPolicy:將消息隊(duì)列中的第一個(gè)任務(wù)替換為當(dāng)前新進(jìn)來的任務(wù)執(zhí)行。

7.線程池如何執(zhí)行任務(wù)

    public void execute(Runnable command) {
        //如果Rnnable對(duì)象為空,直接拋出空指針異常
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
  • 如果當(dāng)前線程數(shù)目少于corePoolSize的大小,會(huì)嘗試開啟一個(gè)新線程給任務(wù)。同時(shí)會(huì)自動(dòng)檢測當(dāng)前線程池狀態(tài)和工作線程數(shù)目(ctl),避免產(chǎn)生不應(yīng)該有的警告。
  • 如果一個(gè)任務(wù)能夠成功入隊(duì),仍然需要進(jìn)行雙重檢查來判斷是否需要開啟一個(gè)線程。因?yàn)榭赡茉谏洗螜z查后有新的線程停止或者在進(jìn)入方法后線程池就停止了。如果線程不再運(yùn)行狀態(tài)就從隊(duì)列中移除并拒絕該任務(wù),否則當(dāng)沒有正在運(yùn)行的線程就創(chuàng)建一個(gè)新線程。
  • 如果任務(wù)進(jìn)入隊(duì)列失敗,嘗試開啟一個(gè)新線程,如果新線程創(chuàng)建失?。赡芫€程池已關(guān)閉或者已達(dá)到最大值),就拒絕該任務(wù)。

8.addWorker()

方法的作用:
檢查一個(gè)新worker能否按照線程池的狀態(tài)以及線程數(shù)目被添加進(jìn)去。如果滿足條件,會(huì)創(chuàng)建一個(gè)新worker,并把第一個(gè)任務(wù)作為自己的第一個(gè)任務(wù)。如果線程池已經(jīng)被關(guān)閉或停止,該方法就會(huì)放回false。同樣,如果創(chuàng)建新線程失敗也會(huì)返回false。如果創(chuàng)建失敗,有可能線程工廠回返回一個(gè)null,有可能跑出一個(gè)OutOfMemoryError的錯(cuò)誤,同時(shí)我們還要進(jìn)行回滾。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

首先就是對(duì)各種狀態(tài)的判斷,然后通過CAS將workerCont的值加1,如果失敗通過自旋的方式去自增。后面,新創(chuàng)建一個(gè)worker,并通過ReetranLock進(jìn)行加鎖。檢查持有鎖后線程池的狀態(tài),如果此時(shí)線程已經(jīng)啟動(dòng)會(huì)拋出IllegalThreadStateException的異常,同時(shí)更新largestPoolSize的值,然后啟動(dòng)線程。如果添加失敗,就執(zhí)行addWorkerFailed()方法。

9.addWorkerFailed

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

先進(jìn)行加鎖,判斷Worker是否存在,如果存在,就從workers中移除它。同時(shí),對(duì)workCount進(jìn)行自減。

9.getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容