Java并發(fā)編程之線程池必用知識點

個人博客地址 http://dandanlove.com/

再使用線程池之前,我們應(yīng)該了解為什么需要使用線程池。進行執(zhí)行任務(wù)(task)的時候我們一般情況是new Thread進行執(zhí)行,如果進行大量的并發(fā)任務(wù)的時候呢?

大多數(shù)并發(fā)應(yīng)用程序是圍繞執(zhí)行任務(wù)(task)進行管理的。所謂的任務(wù)就是抽象、離散的工作單元(unit of work)。

當(dāng)我們進行大量并發(fā)操作的時候,每任務(wù)每線程(thread-per-task)方法存在一些實際的缺陷,尤其是在需要創(chuàng)建大量的線程時會更加突出。

  • 線程生命周期的消耗(創(chuàng)建和銷毀)
  • 資源消耗量(內(nèi)存占用和管理,競爭CPU資源)
  • 穩(wěn)定性(線程數(shù)量超過JVM限制是,可能會出現(xiàn)OutofMemoryError)

在這種情況下為線程管理出現(xiàn)了線程池(ThreadPool),它是管理一個工作者線程的同構(gòu)池(homogeneous pool)。線程池是與工作隊列(work queue)緊密綁定的。

Executor是一個簡單的借口,它是基于生產(chǎn)者-消費者模式,用于異步任務(wù)執(zhí)行,而且支持很多不同類型的任務(wù)執(zhí)行策略。

Executor框架使用的是Runnable作為其任務(wù)的基本表達形式。Runnable只是相當(dāng)有限的抽象,但是Run不能反悔一個值或者拋出受檢查的異常。Callable恰好可以將Runnable做封裝返回其執(zhí)行結(jié)果。

一個Executor執(zhí)行的任務(wù)的生命周期有4個階段:創(chuàng)建、提交、開始和完成。Future可以描述任務(wù)的生命周期,并提供了相關(guān)的方法來獲得、取消以及驗證任務(wù)是否已經(jīng)完成還是被取消。

public interface Executor {
    void execute(Runnable var1);
}
@FunctionalInterface
public interface Runnable {
    void run();
}
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
public interface Future<V> {
    boolean cancel(boolean var1);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

/**
    * Creates a new {@code ThreadPoolExecutor} with the given initial
    * parameters and default rejected execution handler.
    *
    * @param corePoolSize the number of threads to keep in the pool, even
    *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    * @param maximumPoolSize the maximum number of threads to allow in the
    *        pool
    * @param keepAliveTime when the number of threads is greater than
    *        the core, this is the maximum time that excess idle threads
    *        will wait for new tasks before terminating.
    * @param unit the time unit for the {@code keepAliveTime} argument
    * @param workQueue the queue to use for holding tasks before they are
    *        executed.  This queue will hold only the {@code Runnable}
    *        tasks submitted by the {@code execute} method.
    * @param threadFactory the factory to use when the executor
    *        creates a new thread
    * @throws IllegalArgumentException if one of the following holds:<br>
    *         {@code corePoolSize < 0}<br>
    *         {@code keepAliveTime < 0}<br>
    *         {@code maximumPoolSize <= 0}<br>
    *         {@code maximumPoolSize < corePoolSize}
    * @throws NullPointerException if {@code workQueue}
    *         or {@code threadFactory} is null
    */
public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}
  • corePoolSize:線程池的核心線程數(shù),一般情況下不管有沒有任務(wù)都會一直在線程池中一直存活,只有在 ThreadPoolExecutor 中的方法allowCoreThreadTimeOut(boolean value) 設(shè)置為 true 時,閑置的核心線程會存在超時機制,如果在指定時間沒有新任務(wù)來時,核心線程也會被終止,而這個時間間隔由第3個屬性 keepAliveTime 指定。
  • maximumPoolSize:線程池所能容納的最大線程數(shù),當(dāng)活動的線程數(shù)達到這個值后,后續(xù)的新任務(wù)將會被阻塞。
  • keepAliveTime:控制線程閑置時的超時時長,超過則終止該線程。一般情況下用于非核心線程,只有在 ThreadPoolExecutor 中的方法 allowCoreThreadTimeOut(boolean value) 設(shè)置為 true時,也作用于核心線程。
  • unit:用于指定 keepAliveTime 參數(shù)的時間單位,TimeUnit 是個 enum 枚舉類型,常用的有:TimeUnit.HOURS(小時)、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。
  • workQueue:線程池的任務(wù)隊列,通過線程池的 execute(Runnable command) 方法會將任務(wù) Runnable 存儲在隊列中(阻塞隊列)。
    1)ArrayBlockingQueue:
    是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    2)LinkedBlockingQueue:
    一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    3)SynchronousQueue:
    一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。
    4)PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列。
  • threadFactory:線程工廠,它是一個接口,用來為線程池創(chuàng)建新線程的。

newCachedThreadPool

newCachedThreadPool是一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時將重用它們。對于執(zhí)行很多短期異步任務(wù)的程序而言,這些線程池通??商岣叱绦蛐阅堋U{(diào)用 execute() 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構(gòu)造方法創(chuàng)建具有類似屬性但細節(jié)不同(例如超時參數(shù))的線程池。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newSingleThreadExecutor

newSingleThreadExecutor 創(chuàng)建是一個單線程池,也就是該線程池只有一個線程在工作,所有的任務(wù)是串行執(zhí)行的,如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它,此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newFixedThreadPool

newFixedThreadPool 創(chuàng)建固定大小的線程池,每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小,線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

newScheduledThreadPool

newScheduledThreadPool 創(chuàng)建一個大小無限的線程池,此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容