教你如何多人反復共用一桶洗澡水:線程池

今天我們聊聊在JAVA世界中如何實現(xiàn)一桶洗澡水可供成百上千的人反復使用的故事

意義:線程池可以反復使用已經(jīng)創(chuàng)建的線程,減小了線程反復創(chuàng)建和銷毀的開銷,避免過多線程占用太多內(nèi)存

構(gòu)造參數(shù)

1、corePoolSize
核心線程數(shù),線程池初始化完成后,不會馬上創(chuàng)建線程,而是等待任務(wù)的到來才會去創(chuàng)建線程,只要核心線程還沒有滿,每次有任務(wù)提交進來即時有其他線程處于空閑狀態(tài),線程池依然會創(chuàng)建新線程直到達到核心線程數(shù)量,之后就算沒有任務(wù),線程池中的線程數(shù)量也會維持核心線程數(shù)
2、maxPooolSize
最大線程數(shù),當任務(wù)存儲隊列的任務(wù)達到了設(shè)置的最大值時,線程池就會去創(chuàng)建新的線程去處理,線程池的數(shù)量最多只能是maxPooolSize的數(shù)量,而在等待了超設(shè)置的存活時間后依舊沒有新的任務(wù)處理,非核心線程就會被銷毀,maxPooolSize可以是非常大的數(shù)值,可以設(shè)置為Integer.MAX_VALUE
3、keepAliveTime
保持存活時間,當非核心線程空閑時間超過了keepAliveTime,就會被銷毀
4、workQueue
任務(wù)存儲隊列,當線程池沒有空閑的核心線程時,將緩存提交的任務(wù)到隊列中排隊等待核心線程的處理,當隊列超過設(shè)置的值時,線程池將會創(chuàng)建新的線程去處理隊列的任務(wù),如果線程池中的線程數(shù)量達了maxPooolSize,隊列也達到了設(shè)置值,將會執(zhí)行拒絕策略
小知識:當隊列滿的時候才會去創(chuàng)建多余corePoolSize的線程去處理任務(wù),如果這是個無限隊列(如linkBlockingQueue),無論隊列中有多少任務(wù),線程池都不會去創(chuàng)建多于corePoolSize的線程去處理任務(wù),所以任務(wù)存儲隊列對線程池的影響是非常大的
5、threadFactor
當線程池需要新的線程時,會使用threadFactor來生成新的線程,線程工廠默認使用的是DefaultThreadFactory,通過線程工廠可以設(shè)置線程的線程組、線程名、優(yōu)先級、是否是守護線程等,一般情況下使用默認的即可
6、Handler
線程池無法接受新提交的任務(wù)時就會使用拒絕策略
7、unit
線程存活時間單位

workQueue線程池的工作隊列

隊列的類型主要有以下幾個:

1、SynchronousQuene

不緩存任務(wù)的阻塞隊列,SynchronousQuene內(nèi)部沒有容量,當任務(wù)被提交進來后就會要求線程馬上處理,不會去緩存新進來的任務(wù),如果沒有足夠的核心線程處理,則會創(chuàng)建新線程去處理,當?shù)竭_maxPooolSize時就執(zhí)行拒絕策略,SynchronousQuene適用于任務(wù)數(shù)不多的場景,如果使用了SynchronousQuene應(yīng)當把maxPooolSize設(shè)置得很大,這樣才會避免隊列執(zhí)行拒絕策略

2、LinkedBlockingQuene

無界阻塞隊列,這個隊列是基于鏈表的隊列,按照FIFO排序,使用這個隊列時由于隊列是無界的,所以無論有多少任務(wù)都不會排滿隊列容量,因此線程池不管核心線程是否空閑,maxPooolSize有多大都不會創(chuàng)建任何非核心線程,此隊列適用于任務(wù)數(shù)量非常大的場景,但是風險就在于如果線程的處理速度跟不上任務(wù)的生產(chǎn)速度,任務(wù)緩存數(shù)量過多造成OOM,所以使用此隊列盡量將核心線程數(shù)設(shè)置的大些

3、ArrayBlockingQueue

有界阻塞隊列,這個隊列是基于數(shù)組的隊列,按照FIFO排序,這個隊列的容量是可以設(shè)置的,當任務(wù)數(shù)排滿隊列容量時線程池就會創(chuàng)建新的線程去處理任務(wù),當隊列排滿,線程池的線程數(shù)量也到達maxPooolSize后則會執(zhí)行拒絕策略

4、PriorityBlockingQueue

具有優(yōu)先級的無界阻塞隊列,隊列可以按照優(yōu)先級進行內(nèi)部元素排序,隊列的任務(wù)需要實現(xiàn)Comparable 接口,才能通過使用compareTo()方法進行排序。

5、DelayQueue

是一個具有延遲時效的無界隊列,只有在延遲期滿后才能從隊列中取出任務(wù)

拒絕策略

當線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize時,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。這是線程池默認的拒絕策略
ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù),例如如果是在main方法中提交的,就由main方法去執(zhí)行這個任務(wù)

JDK已被設(shè)定好的線程池類型:

在我們平時開發(fā)中我們總喜歡盲目使用類似newFixedThreadPool(),newCachedThreadPool(),無腦使用它們真的好嗎,我分析分析幾種比較常見的線程池類型:
1、newFixedThreadPool()
不多BB直接看源碼,看看這個方法的構(gòu)造方法的

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(
    nThreads//核心線程數(shù),
    nThreads//最大線程數(shù),
    0L//存活時間,
    TimeUnit.MILLISECONDS//存活時間單位,                        
    new LinkedBlockingQueue<Runnable>()//無界隊列
);
    }


  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }


 public ThreadPoolExecutor(int corePoolSize//核心線程數(shù),
                              int maximumPoolSize//最大線程數(shù),
                              long keepAliveTime//存活時間,
                              TimeUnit unit//存活時間單位,
                              BlockingQueue<Runnable> workQueue//隊列) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

我發(fā)現(xiàn)這個newFixedThreadPool()是一個使用的阻塞隊列是無界的,也就是說newFixedThreadPool()可以接受無限多的任務(wù),所以永遠不會執(zhí)行拒絕策略,但是無論接受了多少任務(wù),線程池都不會創(chuàng)建非核心線程,所以這個線程的缺點就是在并發(fā)量高,任務(wù)處理速度慢的情況下可能隊列數(shù)太多造成OOM。核心線程池等于最大線程池,當前的線程數(shù)能夠比較穩(wěn)定保證一個數(shù)。能夠避免頻繁回收線程和創(chuàng)建線程。故適用于處理cpu密集型的任務(wù),確保cpu在長期被工作線程使用的情況下,盡可能少的分配線程,即適用長期的任務(wù)。
2、newCachedThreadPool()
不多BB直接看源碼,依舊是看構(gòu)造函數(shù)

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

我們看到newCachedThreadPool()使用的是SynchronousQueue隊列,這是一個不會緩存任務(wù)的隊列,并且設(shè)置了核心線程數(shù)為0,最大線程數(shù)可以看做是無限大,所以永遠不會執(zhí)行拒絕策略,線程的存活時間是60秒,我們可以知道這個類型的線程池每接受一個線程就會馬上創(chuàng)建新的線程去處理任務(wù),并且會清理掉線程池中空閑超過60秒的線程,這個線程池由于沒有排隊阻塞所以處理任務(wù)的速度會較其他的線程池快,但是在并發(fā)量很大的時候無節(jié)制的創(chuàng)建和銷毀線程會造成系統(tǒng)很大的資源開銷。這個線程池是適用于并發(fā)不固定的短期小任務(wù)
3、newSingleThreadExecutor()
不多BB直接看源碼,依舊是看構(gòu)造函數(shù)

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

我們看到newSingleThreadExecutor使用的是無界隊列,所以永遠不會執(zhí)行拒絕策略,但是核心線程數(shù)和最大線程數(shù)只是1而已,所以newSingleThreadExecutor無論接收多少任務(wù)都會只會用一個線程去處理任務(wù),任務(wù)只會一個一個的被處理,相當于串行處理,適用串行化任務(wù)

4、newScheduledThreadPool()
這個的源碼比較麻煩,我通過手寫一個事例說明:

public class TestTask implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}
  public static void main(String[] args) {
        //核心線程數(shù)設(shè)置為3
        ScheduledExecutorService scp = Executors.newScheduledThreadPool(3);
        //延遲5秒
         scp.schedule(new TestTask(),5, TimeUnit.SECONDS);
        //延遲5秒,每兩秒執(zhí)行一次
        scp.scheduleAtFixedRate(new TestTask(),5,2,TimeUnit.SECONDS);
    }

這四個比較典型的線程池是已經(jīng)被JDK設(shè)置好了的,如果沒有學習過線程池基礎(chǔ)知識的程序員可能只會無腦使用它們,但是通過分析它不一定適合我們的業(yè)務(wù),實際的使用還是要看我們的業(yè)務(wù)具體設(shè)置線程池的參數(shù)值,所以學會自定義線程池是很有必要的
自定義線程池:

     ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(3, 
                        5, 
                        30, TimeUnit.SECONDS, 
                        new ArrayBlockingQueue<Runnable>(20));

線程池的基本使用

不多BB直接上代碼

    private static ExecutorService fx = Executors.newFixedThreadPool(3);
    class PooTaskReturn implements Callable<String>{
        @Override
        public String call() throws Exception {
            return "有返回值的線程池方法";
        }
    }
    class PoolTask implements Runnable{
        @Override
        public void run() {
            System.out.println("無返回值的線程池方法");
        }
    }
    @Test
    public  void test(){
        for (int i=0;i<1000;i++) {
            fx.execute(new PoolTask());
            fx.execute(()->{
                System.out.println("使用匿名內(nèi)部類");
            });
        }
        List<Future<String>> fls=new ArrayList<>();
        for (int i=0;i<1000;i++) {
            Future<String> submit = fx.submit(new PooTaskReturn());
            Future<String> lambda=fx.submit(()->{
                return "使用匿名內(nèi)部類";
            });
           fls.add(submit);
           fls.add(lambda);
        }

        fls.forEach(l-> { try {
                System.out.println(l.get());
            }catch (Exception e) {
                System.out.println(e);
            }
                });
        //關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,
        // 然后等待所有的任務(wù),包括隊列中的任務(wù)執(zhí)行完成
        fx.shutdown();
        //關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,
        // 然后把嘗試關(guān)閉正在執(zhí)行的任務(wù)
        //將隊列中的任務(wù)直接丟棄
        fx.shutdownNow();
        //所有的任務(wù)已經(jīng)完成,返回true
        fx.isTerminated();
        //收到關(guān)閉指令后返回true,即執(zhí)行了shutdownNow或者shutdown
        fx.isShutdown();
    }

我看看到幾個關(guān)閉的命令,我們把它列出來:
shutdown():關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,然后等待所有的任務(wù),包括隊列中的任務(wù)執(zhí)行完成
shutdownNow():關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,然后把嘗試關(guān)閉正在執(zhí)行的任務(wù),將隊列中的任務(wù)直接丟棄
isTerminated():所有的任務(wù)已經(jīng)完成,返回true
isShutdown():收到關(guān)閉指令后返回true,即執(zhí)行了shutdownNow或者shutdown

CPU 密集型(I/O bound)

CPU密集型也叫計算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對CPU要好很多,此時,系統(tǒng)運作大部分的狀況是CPU Loading 100%,CPU要讀/寫I/O(硬盤/內(nèi)存),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。

CPU密集的意思是該任務(wù)需要大量的運算,而沒有阻塞,CPU一直全速運行。CPU密集任務(wù)只有在真正的多核CPU上才可能得到加速(通過多線程),而在單核CPU上,無論你開幾個模擬的多線程該任務(wù)都不可能得到加速,因為CPU總的運算能力就那些。

CPU 使用率較高(例如:計算一些復雜的運算,邏輯處理等情況)非常多的情況下,線程數(shù)一般只需要設(shè)置為CPU核心數(shù)的線程個數(shù)就可以了。 這一類型多出現(xiàn)在開發(fā)中的一些業(yè)務(wù)復雜計算和邏輯處理過程中。

I/O 密集型(I/O bound)

IO密集型指的是系統(tǒng)的CPU性能相對硬盤、內(nèi)存要好很多,此時,系統(tǒng)運作,大部分的狀況是CPU在等I/O (硬盤/內(nèi)存) 的讀/寫操作,此時CPU Loading并不高。I/O bound的程序一般在達到性能極限時,CPU占用率仍然較低。這可能是因為任務(wù)本身需要大量I/O操作,而pipeline做得不是很好,沒有充分利用處理器能力。

CPU 使用率較低,程序中會存在大量的 I/O 操作占用時間,導致線程空余時間很多,所以通常就需要開CPU核心數(shù)兩倍的線程。當線程進行 I/O 操作 CPU 空閑時,啟用其他線程繼續(xù)使用 CPU,以提高 CPU 的使用率。例如:數(shù)據(jù)庫交互,文件上傳下載,網(wǎng)絡(luò)傳輸?shù)取?/p>

線程等待時間所占比例越高,需要越多線程,啟用其他線程繼續(xù)使用CPU,以此提高CPU的利用率;線程 CPU 時間所占比例越高,需要越少的線程,這一類型在開發(fā)中主要出現(xiàn)在一些計算業(yè)務(wù)頻繁的邏輯中。

方法一:

由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如CPU核數(shù)*2

方法二:

IO密集型,即該任務(wù)需要大量的IO,即大量的阻塞。在單線程上運IO密集型的任務(wù)會導致浪費大量的CPU運算能力浪費在等待。所以在IO密集型任務(wù)中使用多線程可以大大的加速程序運行,即使在單核CPU上,這種加速主要就是利用了被浪費掉的阻塞時間。
IO密集型時,大部分線程都阻塞,故需要多配置線程數(shù):
參考公式:CPU核數(shù) /(1 - 阻系數(shù))
比如8核CPU:8/(1 - 0.9)=80個線程數(shù)
阻塞系數(shù)在0.8~0.9之間

總結(jié):

1.一個計算為主的程序(CPU密集型程序),多線程跑的時候,可以充分利用起所有的 CPU 核心數(shù),比如說 8 個核心的CPU ,開8 個線程的時候,可以同時跑 8 個線程的運算任務(wù),此時是最大效率。但是如果線程遠遠超出 CPU 核心數(shù)量,反而會使得任務(wù)效率下降,因為頻繁的切換線程也是要消耗時間的。因此對于 CPU 密集型的任務(wù)來說,線程數(shù)等于 CPU 數(shù)是最好的了。

2.如果是一個磁盤或網(wǎng)絡(luò)為主的程序(IO密集型程序),一個線程處在 IO 等待的時候,另一個線程還可以在 CPU 里面跑,有時候 CPU 閑著沒事干,所有的線程都在等著 IO,這時候他們就是同時的了,而單線程的話此時還是在一個一個等待的。我們都知道 IO 的速度比起 CPU 來是很慢的。此時線程數(shù)等于CPU核心數(shù)的兩倍是最佳的。

核心線程數(shù)計算公式

IO密集型:核心線程數(shù) = CPU核數(shù) / (1-阻塞系數(shù))

CPU密集型:核心線程數(shù) = CPU核數(shù) + 1

IO密集型:核心線程數(shù) = CPU核數(shù) * 2

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容