說說 Java 線程池

一、引言

池的概念大家并不陌生,數(shù)據(jù)庫連接池、線程池等...大體來說,有三個優(yōu)點:

  1. 降低資源消耗。
  2. 提高響應(yīng)速度。
  3. 便于統(tǒng)一管理。

以上是 “池化” 技術(shù)的相同特點,至于他們之間的不同點這里不講,兩者都是為了提高性能和效率,拋開實際做連連看找不同,沒有意義。

同樣,類比于線程池來說:

  • 降低資源消耗:
    重復(fù)利用線程池中已經(jīng)創(chuàng)建的線程,相比之下省去了線程創(chuàng)建和銷毀的性能消耗。
  • 提高響應(yīng)速度:
    當(dāng)有任務(wù)創(chuàng)建時,不必等待線程創(chuàng)建,可以立即執(zhí)行。
  • 便于統(tǒng)一管理:
    使用線程池,可以對線程統(tǒng)一管理,對線程的執(zhí)行狀態(tài)做統(tǒng)一監(jiān)控。

二、線程池的使用

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);
1、關(guān)鍵參數(shù)
  • corePoolSize 核心線程數(shù)
    當(dāng)向線程池中提交一個任務(wù)時,如果線程池中的線程數(shù)量小于核心線程數(shù),即使存在空閑線程,也會新建一個線程來執(zhí)行當(dāng)前任務(wù),直到線程數(shù)量大于或等于核心線程數(shù)。
  • maximunPoolSize 最大線程數(shù)
    當(dāng)任務(wù)隊列滿了,線程池中的線程數(shù)量小于最大線程數(shù)時,創(chuàng)建新線程執(zhí)行任務(wù)。對于無界隊列,忽略該參數(shù)。
  • keepAliveTime 線程存活時間
    大于核心線程數(shù)的那一部分線程的存活時間,如果這部分線程空閑超過這段時間,則進行銷毀。
  • workqueue 任務(wù)隊列
    線程池中的線程數(shù)大于核心線程數(shù)時,將任務(wù)放入此隊列等待執(zhí)行。
  • threadFactory 線程工廠
    用于創(chuàng)建線程,工廠使用 new Threa() 的方式創(chuàng)建線程,并為每個線程做統(tǒng)一規(guī)則的命名:pool-m-thread-n(m為線程池的編號,n為線程池內(nèi)的線程編號)。
  • handler 飽和策略
    當(dāng)線程池和隊列都滿了,則根據(jù)此策略處理任務(wù)。
2、任務(wù)隊列類型
名稱 描述
ArrayBlockingQueue 基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
LinkedBlockingQueue 基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按 FIFO (先進先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。Executors.newFixedThreadPool( ) 使用了這個隊列。
SynchronousQueue 不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQueue,靜態(tài)工廠方法 Executors.newCachedThreadPool( ) 使用了這個隊列。
PriorityBlockingQueue 具有優(yōu)先級的無限阻塞隊列。
3、飽和策略類型
策略名稱 特性
AbortPolicy 默認的飽和策略,直接拋出 RejectedExecutionException 異常
DiscardPolicy 不處理,直接丟棄任務(wù)
CallerRunsPolicy 使用調(diào)用者的線程執(zhí)行任務(wù)
DiscardOldestPolicy 丟棄隊列里最近的一個任務(wù),執(zhí)行當(dāng)前任務(wù)

同時,還可以自行實現(xiàn) RejectedExecutionHandler 接口來自定義飽和策略,比如記錄日志、持久化等等。

  • void execute(Runnable command)
    ? 示例:
    ThreadFactory namedThreadFactory =
    new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    ExecutorService executor =
    new ThreadPoolExecutor(
    10,   
    1000,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(10),
    namedThreadFactory,
    new ThreadPoolExecutor.AbortPolicy());
    executor.execute(
    () -> {
    System.out.println(1111);
    });
    

注意使用 execute 方法提交任務(wù)時,沒有返回值。

  • Future<?> submit(Runnable task)
    ? 示例:

    Future<Integer> future = executor.submit(() -> {
          return 1 + 1;
        });
    Integer result = future.get();
    

    還可以使用 submit 方法提交任務(wù),該方法返回一個 Future 對象,通過 Future#get( ) 方法可以獲得任務(wù)的返回值,該方法會一直阻塞知道任務(wù)執(zhí)行完畢。還可以使用 Future#get(long timeout, TimeUnit unit) 方法,該方法會阻塞一段時間后立即返回,而這時任務(wù)可能沒有執(zhí)行完畢。

5、關(guān)閉線程池

ThreadPoolExecutor 提供了 shutdown( ) 和 shutdownNow( ) 兩個方法關(guān)閉線程池。原理是首先遍歷線程池的工作線程,依次調(diào)用 interrupt( ) 方法中斷線程,這樣看來如果無法響應(yīng)中斷的任務(wù)就不能終止。

兩者區(qū)別是:

  • shutdownNow( ) 首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。
  • shutdown( ) 首先將線程池的狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。

如果調(diào)用了其中一種方法,isShutdown 方法就會返回 true。當(dāng)所有的任務(wù)都已關(guān)閉后, 才表示線程池關(guān)閉成功,這時調(diào)用 isTerminaed 方法會返回 true。實際應(yīng)用中可以根據(jù)任務(wù)是否一定要執(zhí)行完畢的特性,決定使用哪種方法關(guān)閉線程池。

6、合理的配置線程池

通常我們可以根據(jù) CPU 核心數(shù)量來設(shè)計線程池數(shù)量。

可以通過 Runtime.getRuntime().availableProcessors() 方法獲得當(dāng)前設(shè)備的物理核心數(shù)量。值得注意的是,如果應(yīng)用運行在一些 docker 或虛擬機容器上時,該方法取得的是當(dāng)前物理機的 CPU 核心數(shù)。

  • IO 密集型 2nCPU

  • 計算密集型 nCPU+1

    • 其中 n 為 CPU 核心數(shù)量。
    • 為什么加 1:即使當(dāng)計算密集型的線程偶爾由于缺失故障或者其他原因而暫停時,這個額外的線程也能確保 CPU 的時鐘周期不會被浪費。

三、線程池的運行過程

image.png

當(dāng)提交一個新任務(wù)時,線程池的處理步驟:

  1. 判斷當(dāng)前線程池內(nèi)的線程數(shù)量是否小于核心線程數(shù),如果小于則新建線程執(zhí)行任務(wù)。否則,進入下個階段。
  2. 判斷隊列是否已滿,如果沒滿,則將任務(wù)加入等待隊列。否則,進入下個階段。
  3. 在上面基礎(chǔ)上判斷是否大于最大線程數(shù),如果是根據(jù)響應(yīng)的策略處理。否則,新建線程執(zhí)行當(dāng)前任務(wù)。

線程池的源碼比較簡單易懂,感興趣的小伙伴可以自行查看 java.util.concurrent.ThreadPoolExecutor,在線程池中每個任務(wù)都被包裝為一個一個的 Worker ,下面簡單看下 Worker 的 run( ) 方法:

try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }

可以看到不斷的循環(huán)取出 Task 并執(zhí)行,而在任務(wù)的執(zhí)行前后,有 beforeExecute 和 afterExecute 方法,我們可以實現(xiàn)兩個方法實現(xiàn)一些監(jiān)控邏輯。除此之外還可以集合線程池的一些屬性或者重寫 terminated() 方法在線程池關(guān)閉時進行監(jiān)控。

四、常見的幾種線程池實現(xiàn)

Executors 中提供了集中常見的線程池,分別應(yīng)用在不同的場景。

  • FixThreadPool 固定數(shù)量的線程池,適用于對線程管理,高負載的系統(tǒng)
  • SingleThreadPool 只有一個線程的線程池,適用于保證任務(wù)順序執(zhí)行
  • CacheThreadPool 創(chuàng)建一個不限制線程數(shù)量的線程池,適用于執(zhí)行短期異步任務(wù)的小程序,低負載系統(tǒng)
  • ScheduledThreadPool 定時任務(wù)使用的線程池,適用于定時任務(wù)

上面幾種線程池的特性主要依賴于 ThreadPoolExecutor 的幾個參數(shù)來實現(xiàn),不同的核心線程數(shù)量,以及不同類型的阻塞隊列,同時我們還可以自行實現(xiàn)自己的線程池滿足業(yè)務(wù)需求。

值得注意的是,并不推薦使用 Executors 創(chuàng)建線程池,詳見下:

  • Executors.newFixedThreadPool(int nThread)
public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>());
 }

? 繼續(xù)來看 LinkedBlockingQueue :

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
}

可以看到使用 LinkedBlockingQueue 創(chuàng)建的是 Integer.MAX_VALUE 大小的隊列,會堆積大量的請求,從而造成 OOM

  • Executors.newSingleThreadExexutor( )
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

同樣,使用的 LinkedBlockingQueue ,一樣的情況

  • Executors.newCachedThreadPool( )
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

代碼課件線程池使用的最大線程數(shù)是 Integer.MAX_VALUE ,可能會創(chuàng)建大量線程,導(dǎo)致 OOM

  • Executors.newScheduleThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

和上面是一樣的問題,最大線程數(shù)是 Integer.MAX_VALUE

所以原則上來說禁止使用 Executors 創(chuàng)建線程池, 而使用 ThreadPoolExecutor 的構(gòu)造函數(shù)來創(chuàng)建線程池。

五、結(jié)語

線程池在開發(fā)中還是比較常見的,結(jié)合不同的業(yè)務(wù)場景,結(jié)合最佳實踐配置正確的參數(shù),可以幫助我們的應(yīng)用性能得到提升。

歡迎訪問個人博客 獲取更多知識分享。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容