java線程、線程池的使用

一、如何創(chuàng)建線程

1、將類聲明為Thread的子類,重寫Thread類的run方法,使用子類的實例調用start()方法啟動線程。
class MyThread extends Thread {
  public void run(){
      //code
  }
}

MyThread m = new MyThread();
m.start();
2、讓類實現(xiàn)Runnable接口,該類實現(xiàn)run方法,然后在Thread構造方法中傳入Runnable接口的實現(xiàn)類,使用Thread對象調start()方法啟動線程。
class MyThread implements Runnable {
  public void run(){
      //code
  }
}

MyThread m = new MyThread();
//new Thread(m).start();
Thread t = new Thread(m);
t.start();

實現(xiàn)Runnable接口,避免了繼承Thread類的單繼承局限性。

3、匿名內部類的方式
public class Test {
  public static void main(String[] args){
    new Thread(new Runnable(){
      public void run(){
        //code
      }
    }).start();
  }
}
4、讓類實現(xiàn)Callable接口,該類實現(xiàn)call()方法,使用FutureTask類來包裝Callable實現(xiàn)類的對象,然后再Thread構造方法中傳入FutureTask對象,然后再使用Thread對象調start()方法啟動線程。
public class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "data";
    }
}

public class Test{
    public static void main(String[] args){
        Callable<String> myThread = new MyThread();
        FutureTask<String> ft = new FutureTask<>(myThread);
        Thread t = new Thread(ft);
        t.start();
        String s = ft.get();
        System.out.println("callable線程返回值:" + s);
    }
}
打印結果:callable線程返回值:data

二、線程池

jdk中線程池通過線程池工廠類創(chuàng)建,再通過線程去執(zhí)行任務方法。

  • Executors:線程池創(chuàng)建工廠類
  • ExecutorService:線程池類
  • Future:用來記錄線程任務執(zhí)行完畢后產(chǎn)生的結果
1、線程池的使用

使用Executors中的newFixedThreadPool()靜態(tài)方法得到線程池對象,然后調用submit()方法提交線程執(zhí)行任務。

submit(Runable task):下面的例子是提交一個Runnable的任務,返回的Future,該Future的get()方法在成功完成時返回null。

public class MyThread implements Runnable{ 
    //run方法 
}

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(5);
        es.submit(new MyThread());
    }
}

submit(Runable task,T result):下面的例子是提交一個Runnable任務,返回的Future,該Future的get()方法在成功完成時將會返回給定的結果。 可以將傳入的參數(shù)在run里處理并返回結果。

public class Data {
    int num = 1;
    String str = "threadData";
    //省略
}

public class MyThread implements Runnable{ 
    Data data;
    public void run(){
        Data.setStr("data")
    }
}

public class ThreadPoolTest {
    public static void main(String[] args)  throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(5);
        Future<Data> runableResult = es.submit(new MyThread(data), data);
        System.out.println(runableResult.get());
    }
}
打印結果:Data{num=1, str='data'}

submit(Callable<T> task):下面的例子是提交一個Callable的任務,返回一個表示該任務的Future,然后通過Future中的get()方法可以獲取到返回值。

public class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "aaa";
    }
}

public class ThreadPoolTest {
    public static void main(String[] args)  throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(5);
        Future future = es.submit(new MyThread());
        System.out.println(future.get());
    }
}

打印的結果為:aaa
2、java中Executors創(chuàng)建的四種線程池
  • newCachedThreadPool()

創(chuàng)建一個可緩存的線程池。如果有新任務提交時,有空閑線程則直接處理任務,如果沒有就創(chuàng)建新的線程處理任務,線程池不對線程池大小做限制。

newCachedThreadPool(ThreadFactory threadFactory):創(chuàng)建一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前構造的線程可用時將重用它們,并在需要時使用提供的 ThreadFactory創(chuàng)建新線程。

注:也可以使用ThreadPoolExecutor創(chuàng)建個性化設置的線程池。這里講的四種線程池看源碼,里面都是使用ThreadPoolExecutor構建的。

ExecutorService es = Executors.newCachedThreadPool();
  • newFixedThreadPool(int nThreads)

創(chuàng)建一個固定大小的線程池。當線程達到線程池的最大值,再提交任務則進入隊列中,等有線程空閑時,再從隊列中取出任務繼續(xù)執(zhí)行。

newFixedThreadPool(int nThreads,ThreadFactory threadFactory),threadFactory是創(chuàng)建新線程時使用的工廠。

  • newScheduledThreadPool(int corePoolSize)

創(chuàng)建一個固定大小的線程池,支持定時及周期性任務執(zhí)行。corePoolSize是池中所保存的線程數(shù),即使線程是空閑的也包括在內。

newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory),threadFactory是創(chuàng)建新線程時使用的工廠。

下面代碼中的schedule(Runnable<V> command,long delay,TimeUnit unit)方法的參數(shù):第一個參數(shù)是要執(zhí)行的任務,第二個是從現(xiàn)在開始延遲執(zhí)行時間,unit是延遲參數(shù)的時間單位。

ThreadPoolRunnable tpr = new ThreadPoolRunnable();
ScheduledExecutorService es = Executors.newScheduledThreadPool(3);
es.schedule(tpr,3,TimeUnit.SECONDS);

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法可以創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期。

  • newSingleThreadExecutor()
    創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序執(zhí)行。這里的單線程執(zhí)行指的是線程池內部,從線程池外看,提交任務到線程池時并沒有阻塞,仍然是異步的。
3、為什么使用ThreadFactory及用法

使用ThreadFactory有幾點好處:

  • 我們可以自己設置一個線程名,而不用使用默認的pool-thread-n這種名字。
  • 可以給線程設置成守護線程。
  • 可以設置優(yōu)先級。
  • 可以處理為捕捉的異常。
    ThreadFactory使用方法:
MyRunnable:
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println("當前線程為:" + Thread.currentThread().getName());
    }
}
MyThreadFactory:
public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("threadFactory線程");
        return thread;
    }
}
public class Test {
    public static void main(String[] args) {
        MyThreadFactory tf = new MyThreadFactory();
        MyRunnable runnable = new MyRunnable();
        ExecutorService es = Executors.newFixedThreadPool(tf);
        es.submit(runnable);
    }
}

三、ThreadPoolExecutor用法

1、Executors創(chuàng)建的四種線程池可能會出現(xiàn)的問題

上面說了Executors創(chuàng)建的幾種線程池,但是不建議使用Executors去創(chuàng)建線程池,這是為什么呢?

我們來看一下Executors創(chuàng)建的線程池源碼:

  • Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

源碼中corePoolSize被設置為0,maximumPoolSize被設置為Integer.MAX_VALUE,keepAliveTime被設置為60秒,因為最大線程池是Integer.MAX_VALUE,可能會因為創(chuàng)建大量的線程導致OOM。

  • Executors.newScheduledThreadPool(n)
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

其問題和newCachedThreadPool一樣,源碼中maximumPoolSize被設置為Integer.MAX_VALUE,所以可能會因為創(chuàng)建大量的線程導致OOM。

  • Executors.newFixedThreadPool(n)
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

源碼中使用的是LinkedBlockingQueue無界隊列作為線程池的工作隊列,因為沒有設置容量,其默認隊列的容量是Integer.MAX_VALUE,所以也可能會堆積大量的請求,從而導致OOM。

  • Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

newFixedThreadPool一樣,因為使用LinkedBlockingQueue也可能堆積大量的請求,從而導致OOM。

注:如果使用Executors的靜態(tài)方法創(chuàng)建ThreadPoolExecutor對象,可以通過使用Semaphore對任務的執(zhí)行進行限流也可以避免出現(xiàn)OOM異常。

2、ThreadPoolExecutor配置:

有下面7個參數(shù):
int corePoolSize:核心線程池大小
int maximumPoolSize:最大線程池大小
long keepAliveTime:非核心線程最大空閑時間
TimeUnit unit:時間單位
BlockingQueue<Runnable> workQueue:線程排隊策略
RejectedExecutionHandler handler:拒絕策略
ThreadFactory threadFactory:線程工廠

  • corePoolSize和maximumPoolSize

ThreadPoolExecutor將根據(jù)corePoolSizemaximumPoolSize設置的邊界自動調整線程池大小。

corePoolSize和maximumPoolSize:
當新任務在方法 execute(Runnable)中提交任務時,如果運行的線程少于corePoolSize時,哪怕線程池中有空閑狀態(tài)的線程,也會創(chuàng)建一個新的線程來處理任務。如果運行的線程多于corePoolSize而少于maximumPoolSize,則僅當隊列滿時才創(chuàng)建新線程。如果設置corePoolSizemaximumPoolSize相同,則創(chuàng)建了固定大小的線程池。如果將 maximumPoolSize設置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數(shù)量的并發(fā)任務。在大多數(shù)情況下,核心和最大池大小僅基于構造來設置,不過也可以使用 setCorePoolSize(int)setMaximumPoolSize(int)進行動態(tài)更改。

按需構造
默認情況下,即使核心線程最初只是在新任務到達時才創(chuàng)建和啟動的,也可以使用方法 prestartCoreThread()prestartAllCoreThreads()對其進行動態(tài)重寫。如果構造帶有非空隊列的池,則可能希望預先啟動線程。

創(chuàng)建新線程
使用ThreadFactory創(chuàng)建新線程,如果沒有另外說明,則在同一個 ThreadGroup中一律使用 Executors.defaultThreadFactory()創(chuàng)建線程,這些線程具有相同的 NORM_PRIORITY 優(yōu)先級和非守護進程狀態(tài)。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優(yōu)先級、守護進程狀態(tài),等等。如果從 newThread 返回 null 時 ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序將繼續(xù)運行,但不能執(zhí)行任何任務。

  • keepAliveTime線程存活時間

此參數(shù)是在池中當前有多于corePoolSize的線程起作用,這些多出的線程在空閑時間超過keepAliveTime時終止。目的是為了在池處于非活躍狀態(tài)時減少資源消耗的方法。

可以使用setKeepAliveTime(long keepAliveTime,TimeUnit unit)方法動態(tài)更改此參數(shù)。使用Long.MAX_VALUETimeUnit.NANOSECONDS這兩個參數(shù)時可以使空閑線程永遠不會在關閉之前終止。

只要keepAliveTime值非0,allowCoreThreadTimeOut(boolean)方法可以將此超時策略應用于核心線程(corePool)。

  • TimeUnit時間單位

用于設置keepAliveTime設置的超時時間的單位。keepAliveTime:60L,TimeUnit.SECONDS代表:60秒。

注:TimeUnit.SECONDS.sleep(1); //相當于Thread.sleep(1000);
即可以使用TimeUnit里的sleep()方法來代替Thread.sleep()方法

  • BlockingQueue<Runnable>任務排隊策略

所有的BlockingQueue都可用于傳輸和保持提交的任務。此隊列的使用時與池大小進行交互,下面是排隊時機:

  • 如果運行的線程少于corePoolSize,則 Executor始終首選添加新的線程,而不進行排隊。
  • 如果運行的線程等于或多于corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
  • 如果無法將請求加入隊列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

主要有三種排隊策略:

直接提交隊列:
?直接提交隊列的默認選項是SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。

無界隊列:
?使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立于其他任務,即任務執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。

有界隊列:
?當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

  • RejectedExecutionHandler任務拒絕策略

Executo已經(jīng)關閉,或Executor將有限邊界用于最大線程數(shù)和工作對列容量,并且兩者已飽和時,在execute方法中提交新任務將被拒絕。以上兩種情況下,exeute方法都將調用RejectedExecutionHandlerrejectedExecution()方法。線程池提供了下面四種預定的拒絕策略:

中止策略(默認):ThreadPoolExecutor.AbortPolicy()
?線程數(shù)超過maximumPoolSize時,直接拒絕,拋出運行時RejectedExecutionException的異常。

調用者運行策略:ThreadPoolExecutor.CallerRunsPolicy()
?用調用者的線程來調用execute,除非executor被關閉,否則任務不會被丟棄。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。注意:當最大線程池過小時,此策略下,任務會交上層線程(即調用executor的線程)執(zhí)行,導致上層線程既要處理其他任務,又要處理排隊中的大量任務,如果遇到長連接,上層線程將長時間阻塞,出現(xiàn)故障。

拋棄舊任務策略:ThreadPoolExecutor.DiscardOldestPolicy()
?如果執(zhí)行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復此過程)。注意:如果此時阻塞隊列使用PriorityBlockingQueue優(yōu)先級隊列,將會導致優(yōu)先級最高的任務被拋棄,因此不建議將該種策略配合優(yōu)先級隊列使用。

直接丟棄策略:ThreadPoolExecutor.DiscardPolicy()
?線程數(shù)超過maximumPoolSize時,任務被直接被丟棄。和AbortPolicy一樣,但不拋出異常。


寫在最后:

  • 如果文章中有錯誤或是表達不準確的地方,歡迎大家評論中指正,以便我完善。
  • 文章我也會根據(jù)所學到新的知識不斷更新。
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容