Java 線程池的使用

一. 簡介

在實際開發(fā)中,我們有時會需要多線程并發(fā)執(zhí)行一些任務,如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務就結束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。而線程池的作用就是可以對線程進行復用,來提高效率。在 Java 5 之后,并發(fā)編程引入了一堆新的啟動、調度和管理線程的API。Executor 框架便是 Java 5 中引入的,其內部使用了線程池機制,它在 java.util.cocurrent 包下,通過該框架來控制線程的啟動、執(zhí)行和關閉,可以簡化并發(fā)編程的操作。

Executor 框架的類的繼承結構:


image.png

二. ThreadPoolExecutor 介紹

java.uitl.concurrent.ThreadPoolExecutor 類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。這個類有四個構造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

實際上,前三個構造方法最后都是調用的第四個方法,下面說一下第四個構造方法各個參數(shù)的含義:

  • corePoolSize:核心池的大小,這個參數(shù)跟后面講述的線程池的實現(xiàn)原理有非常大的關系。在創(chuàng)建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創(chuàng)建線程去執(zhí)行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創(chuàng)建線程的意思,即在沒有任務到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當有任務來之后,就會創(chuàng)建一個線程去執(zhí)行任務,當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;

  • maximumPoolSize:線程池最大線程數(shù),這個參數(shù)也是一個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個線程;

  • keepAliveTime:表示線程沒有任務執(zhí)行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;

  • unit:參數(shù)keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:

  • workQueue:一個阻塞隊列,用來存儲等待執(zhí)行的任務,這個參數(shù)的選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:

ArrayBlockingQueue;     // 基于數(shù)組的先進先出隊列,此隊列創(chuàng)建時必須指定大??;
LinkedBlockingQueue;    // 基于鏈表的先進先出隊列,如果創(chuàng)建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
SynchronousQueue;       // 這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執(zhí)行新來的任務。

?ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來創(chuàng)建線程;

  • handler:表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy;       // 丟棄任務并拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy;     // 也是丟棄任務,但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy;     // 丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy;        // 由調用線程處理該任務 

在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute() 方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現(xiàn),這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執(zhí)行。

submit() 方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn),在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執(zhí)行的結果,去看submit()方法的實現(xiàn),會發(fā)現(xiàn)它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執(zhí)行結果。

shutdown() 和shutdownNow() 是用來關閉線程池的。

還有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,可以自行查閱API。

三. 線程池的使用

我們在使用線程池的時候,要先獲得一個ThreadPoolExecutor 的對象,我們可以直接通過上面提到的構造方法直接new 一個,不過Java 推薦使用Executors 的工廠方法創(chuàng)建線程池,Executors 提供了以下幾種創(chuàng)建線程池的方法:

newCachedThreadPool()

  • 緩存型池子,先查看池中有沒有以前建立的線程,如果有,就 reuse 如果沒有,就建一個新的線程加入池中

  • 緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務 因此在一些面向連接的 daemon 型 SERVER 中用得不多。但對于生存期短的異步任務,它是 Executor 的首選。

  • 能 reuse 的線程,必須是 timeout IDLE 內的池中線程,缺省 timeout 是 60s,超過這個 IDLE 時長,線程實例將被終止及移出池。

注意:放入 CachedThreadPool 的線程不必擔心其結束,超過 TIMEOUT 不活動,其會自動被終止。

newFixedThreadPool(int)

  • newFixedThreadPool 與 cacheThreadPool 差不多,也是能 reuse 就用,但不能隨時建新的線程。

  • 其獨特之處:任意時間點,最多只能有固定數(shù)目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子。

  • 和 cacheThreadPool 不同,F(xiàn)ixedThreadPool 沒有 IDLE 機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的 TCP 或 UDP IDLE 機制之類的),所以 FixedThreadPool 多數(shù)針對一些很穩(wěn)定很固定的正規(guī)并發(fā)線程,多用于服務器。

  • 從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,只不過參數(shù)不同:

    • fixed 池線程數(shù)固定,并且是0秒IDLE(無IDLE)。

    • cache 池線程數(shù)支持 0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60 秒 IDLE 。

newScheduledThreadPool(int)

  • 調度型線程池

  • 這個池子里的線程可以按 schedule 依次 delay 執(zhí)行,或周期執(zhí)行

SingleThreadExecutor()

  • 單例線程,任意時間池中只能有一個線程

  • 用的是和 cache 池和 fixed 池相同的底層池,但線程數(shù)目是 1, 0 秒 IDLE(無 IDLE)

一般來說,CachedTheadPool 在程序執(zhí)行過程中通常會創(chuàng)建與所需數(shù)量相同的線程,然后在它回收舊線程時停止創(chuàng)建新線程,因此它是合理的 Executor 的首選,只有當這種方式會引發(fā)問題時(比如需要大量長時間面向連接的線程時),才需要考慮用 FixedThreadPool。
(該段話摘自《Thinking in Java》第四版)

使用示例:

1)調用execute 方法

execute 方法執(zhí)行一個無返回值的任務,接收一個Runnable 參數(shù)

public class TestCachedThreadPool{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
//      ExecutorService executorService = Executors.newFixedThreadPool(5);  
//      ExecutorService executorService = Executors.newSingleThreadExecutor();  
        for (int i = 0; i < 5; i++){   
            executorService.execute(new TestRunnable());   
            System.out.println("************* a" + i + " *************");   
        }   
        executorService.shutdown();   
    }   
}   

class TestRunnable implements Runnable{   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "線程被調用了。");   
    }   
} 

執(zhí)行結果如下


image.png

2)調用submit 方法

submit 方法有兩種重載,即 submit(Callable<T> callable), submit(Runnable run, T result) ,用于執(zhí)行一個有返回值的任務

public class CallableDemo{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
        List<Future<String>> resultList = new ArrayList<>();   

        //創(chuàng)建10個任務并執(zhí)行   
        for (int i = 0; i < 10; i++){   
            //使用ExecutorService執(zhí)行Callable類型的任務,并將結果保存在future變量中   
            Future<String> future = executorService.submit(new TaskWithResult(i));   
            //將任務執(zhí)行結果存儲到List中   
            resultList.add(future);   
        }   

        //遍歷任務的結果   
        for (Future<String> fs : resultList){   
                        try{   
                    while(!fs.isDone());                            //Future返回如果沒有完成,則一直循環(huán)等待,直到Future返回完成  
                    System.out.println(fs.get());     //打印各個線程(任務)執(zhí)行的結果   
                }catch(InterruptedException e){   
                    e.printStackTrace();   
                }catch(ExecutionException e){   
                    e.printStackTrace();   
                }finally{   
                    //啟動一次順序關閉,執(zhí)行以前提交的任務,但不接受新任務  
                    executorService.shutdown();   
                }   
        }   
    }   
}   

class TaskWithResult implements Callable<String>{   
    private int id;   

    public TaskWithResult(int id){   
        this.id = id;   
    }   

    /**  
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
     * 則該方法自動在一個線程上執(zhí)行 
     */   
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用?。。?   " + Thread.currentThread().getName());   
        //該返回結果將被Future的get方法得到  
        return "call()方法被自動調用,任務返回的結果是:" + id + "    " + Thread.currentThread().getName();   
    }   
}  

執(zhí)行結果如下:


image.png

在本例中使用到了Future 類,關于Future 的使用,將在新的篇章展開。

合理設置線程池大小

如果不使用Executors 提供的工廠方法,而是自己創(chuàng)建線程池,要注意合理的設置線程池的大小

一般需要根據(jù)任務的類型來配置線程池大小:

  • 如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1
  • 如果是IO密集型任務,參考值可以設置為2*NCPU

當然,這只是一個參考值,具體的設置還需要根據(jù)實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統(tǒng)負載、資源利用率來進行適當調整。

四. 線程池的實現(xiàn)原理

可以參考這篇文章 https://blog.csdn.net/u011531613/article/details/61921473

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容