Java再回顧(1)—線程池回顧與思考

概述

本文是針對java線程池的回顧與思考,主要是圍繞java線程池的思路梳理與知識總結。長文預警(寫這篇真的累到爆炸)!??!

概念

什么是線程?

線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務。

相信大家對線程的概念都不會陌生。
實際上,在日常的開發(fā)過程中,也會遇到大量的使用多線程的場景,如異步計算、android新開線程進行耗時操作等。

如何實現(xiàn)線程?

通常,實現(xiàn)線程的主要方式有以下兩種(當然還有實現(xiàn)callable等等...):
1.1繼承Thread類(java.lang.Thread),實現(xiàn)run方法:

public class ThreadDemo1 extends Thread {
    public  void run(){
        //do something
    }
}

1.2實現(xiàn)Runnable接口(java.lang.Runnable),實現(xiàn)run方法:

public class ThreadDemo2 implements Runnable {
    public  void run(){
        //do something
    }
}

1.3兩種啟動方式的對比
-Thread占據(jù)了父類的名額,沒有Runnable方便。因為java是單繼承。
-Thread類本身也是實現(xiàn)了Runnable
如下圖所示:

image.png

-Runnable啟動需要Thread類的支持
如下圖所示:
image.png

-Runnable更容易實現(xiàn)多線程中的資源共享
結論:更建議實現(xiàn)Runnable接口來創(chuàng)建線程

線程的啟動?
通常都是new 、start二連...
最原始的做法是有一個任務就new一個線程..
然而,線程的創(chuàng)建是具有一定開銷的,頻繁地new線程可能會引起一系列的問題:占用過多資源導致死機、線程間沒有互動無法完成協(xié)作、對線程缺乏管理.....
所以,java為我們引入了線程池——ThreadPoolExecutor,來統(tǒng)一的管理線程,方便線程的復用。

正文

ThreadPoolExecuto的體系

可能一百度線程池,會看到什么java四種線程池(FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool),實際它們都是基于ThreadPoolExecutor,只不過參數(shù)不同。
我們先看一下ThreadPoolExecuto的體系:


ThreadPoolExecutor體系.png

先看一下Executor接口,如圖所示:


image.png

它只包含了一個execute方法,實際任務的提交、線程池的關閉等由ExecutorService決定。而繼承了ExecutorService的抽象類AbstractExecutorService則實現(xiàn)了一部分接口....這里就不贅述了,直接進入整體。

ThreadPoolExecuto的構造函數(shù)
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

這好像多少有些嚇人...
但是別怕,咱們慢慢來。
首先,細心的同學可能已經(jīng)發(fā)現(xiàn),前三個構造器實際都是調(diào)用了最后一個構造器,只不過參數(shù)有所差異。
所以,我們先結合系統(tǒng)的javaDoc從共有的參數(shù)來講起:

corePoolSize

     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set

核心線程:在創(chuàng)建完線程池后,核心線程先不創(chuàng)建,接到任務后創(chuàng)建核心線程。核心線程即使空閑也依舊會保留在線程池中,除非設置了allowCoreThreadTimeOut。當allowCoreThreadTimeOut設置之后,那么核心線程超時后就會銷毀。

maximumPoolSize

     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool

線程池最大數(shù)量:這個很好理解,根據(jù)字面意思就能明白。線程池最大數(shù)量=核心線程數(shù)量+非核心線程數(shù)。

keepAliveTime

     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.

非核心線程的超時時長:就是字面意思,如果非核心線程執(zhí)行完任務空閑了,等待任務到來的時長超出這個則會回收

unit

* @param unit the time unit for the {@code keepAliveTime} argument

這里的unit是指keepAliveTime的計量單位,使用TimeUnit。
TimeUnit是一個枚舉類型,包括微毫秒、微秒一直到天。

workQueue

 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.

workQueue:任務阻塞隊列,默認情況下,任務添加進來會先交給核心線程執(zhí)行,如果沒有核心線程空閑,則加入到任務隊列中等待執(zhí)行,任務隊列可以設置一個最大值,當達到最大值后則創(chuàng)建非核心線程執(zhí)行任務。
常見的workQueue有五種:
1.SynchronousQueue:同步阻塞隊列,不存儲元素。一接收到任務,就提交給線程執(zhí)行,如果無空閑線程,則會創(chuàng)建新的線程。SynchronousQueue的插入操作是阻塞的,每個插入操作必須要等到另一個線程調(diào)用移除操作。

image.png

如圖所示,前面提到的四大線程池之一的CachedThreadPool,就使用了這個隊列。由于一有新的任務,就要復用線程/新創(chuàng)建線程,所以如果maximumPoolSize設置太小,就會拋出異常。
下面手打一段非常簡單的代碼測試一下:

public class ThreadPoolTest {
    private static ExecutorService pool;

    public static void main(String[] args) throws InterruptedException {
        pool = new ThreadPoolExecutor(0, 2,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTest());
        }
    }
}
     class ThreadTest implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"跑跑跑");
        }
    }

異常拋出:


RejectedExecutionException.png

所以對于設置SynchronousQueue的pool,要注意maximumPoolSize的設置。如同上面的CachedThreadPool設置為maxsize,實際雖然不會拋出策略異常,但是可能會導致oom。

2.LinkedBlockingQueue
這個隊列默認情況下是無界的,也就是說在未設置隊列容量的情況下,隊列容量是最大值,如下圖所示。

image.png

一有任務到來,如果沒有空閑/可創(chuàng)建的核心線程,任務就會被加入到這個隊列之中去, 從而使得maximumPoolSize失去作用。
前面提到的四大線程池之一的FixedThreadPool就采用了這個隊列。


image.png

3.DelayQueue:隊列內(nèi)元素必須實現(xiàn)Delayed接口,這就意味著你傳進去的任務必須先實現(xiàn)Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執(zhí)行任務。

4.PriorityBlockingQueue:具有優(yōu)先級的無界阻塞隊列

5.ArrayBlockingQueue:用數(shù)組實現(xiàn)的有界阻塞隊列。如果任務來臨,核心線程數(shù)滿了,隊列滿了,最大線程數(shù)也滿了,就會出錯或者執(zhí)行飽和策略。

回歸正題,介紹新的構造器參數(shù)。
ThreadFactory:創(chuàng)建線程的工廠,事實上大部分時候都不用管這個參數(shù),這個參數(shù)是用來給線程配置信息的。

RejectedExecutionHandler:飽和(拒絕)策略
該策略是當線程數(shù)已滿且都在工作中、隊列也都滿了的時候,所要采用的應對策略,它是線程池的一種保護機制。
拒絕策略共有下面四種:

  1. AbortPolicy:默認策略,表示無法處理新任務,并拋出RejectedExecutionException 異常 。
    具體實例就如本文開頭測試的RejectedExecutionException圖所示。
  2. CallerRunsPolicy:由調(diào)用者所在的線程進行處理,這種策略會提供簡單的反饋控制機制。
  3. DiscardPolicy:丟棄任務,但不拋出異常。使用這種策略,可能會使得無法發(fā)現(xiàn)系統(tǒng)的異常狀態(tài)。
  4. DiscardOldestPolicy:將隊列頭部的任務拋棄,然后重新提交新任務。
四大線程池

好了,終于進入喜聞樂見的四大線程池部分了。前面已經(jīng)說過了,四大線程池是(FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool)。
Java將創(chuàng)建這四種線程池的方法都放在了Executors這個工廠類之中(當然,能不用Executors是最好哈,阿里巴巴的開發(fā)規(guī)范就有一條明確說明不準使用Executors創(chuàng)建線程池,要使用ThreadPoolExecutor)。
當然了,我們之前也說過,實際上他們都是由ThreadPoolExecutor構造而成的。下面我們看一下Executors相關線程池的創(chuàng)造方法,你就會發(fā)現(xiàn)使用ThreadPoolExecutor會更有助于理解線程池而且很香哦。
下面我們逐一看一下:
1.CachedThreadPool
這是一個緩沖線程池,有任務就讓空閑線程/創(chuàng)建新線程運行,空閑線程的超時時間為60秒。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2.FixedThreadPool:
創(chuàng)建一個固定數(shù)目的可重用的線程池。這個采用了 LinkedBlockingQueue也就是上面我們說到的無界阻塞隊列。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

3.ScheduledThreadPoolExecutor
定時線程池,沒啥好講的,看了前面的內(nèi)容,相信聰明的你一下就明白辣。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

4.SingleThreadExecutor
一個核心線程,先進先出。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

總結

實際上,推薦ThreadPoolExecutor的使用來替代Executors,會更便于理解線程池,也能規(guī)避一些錯誤。要注意的就是線程池最大數(shù)量、飽和策略、阻塞隊列的選擇。

后續(xù)

天,寫一篇關于線程池的文章的想法已經(jīng)有一段時間了,一直懶于提筆,今天終于把債還上了.....


image.png
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容