一、如何創(chuàng)建線程
1、將類聲明為Thread的子類,重寫Thread類的run方法,使用子類的實例調用start()方法啟動線程。
class MyThread extends Thread {
public void run(){
//code
}
}
MyThread m = new MyThread();
m.start();
2、讓類實現(xiàn)Runnable接口,該類實現(xiàn)run方法,然后在Thread構造方法中傳入Runnable接口的實現(xiàn)類,使用Thread對象調start()方法啟動線程。
class MyThread implements Runnable {
public void run(){
//code
}
}
MyThread m = new MyThread();
//new Thread(m).start();
Thread t = new Thread(m);
t.start();
實現(xiàn)Runnable接口,避免了繼承Thread類的單繼承局限性。
3、匿名內部類的方式
public class Test {
public static void main(String[] args){
new Thread(new Runnable(){
public void run(){
//code
}
}).start();
}
}
4、讓類實現(xiàn)Callable接口,該類實現(xiàn)call()方法,使用FutureTask類來包裝Callable實現(xiàn)類的對象,然后再Thread構造方法中傳入FutureTask對象,然后再使用Thread對象調start()方法啟動線程。
public class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
return "data";
}
}
public class Test{
public static void main(String[] args){
Callable<String> myThread = new MyThread();
FutureTask<String> ft = new FutureTask<>(myThread);
Thread t = new Thread(ft);
t.start();
String s = ft.get();
System.out.println("callable線程返回值:" + s);
}
}
打印結果:callable線程返回值:data
二、線程池
jdk中線程池通過線程池工廠類創(chuàng)建,再通過線程去執(zhí)行任務方法。
- Executors:線程池創(chuàng)建工廠類
- ExecutorService:線程池類
- Future:用來記錄線程任務執(zhí)行完畢后產(chǎn)生的結果
1、線程池的使用
使用Executors中的newFixedThreadPool()靜態(tài)方法得到線程池對象,然后調用submit()方法提交線程執(zhí)行任務。
submit(Runable task):下面的例子是提交一個Runnable的任務,返回的Future,該Future的get()方法在成功完成時返回null。
public class MyThread implements Runnable{
//run方法
}
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
es.submit(new MyThread());
}
}
submit(Runable task,T result):下面的例子是提交一個Runnable任務,返回的Future,該Future的get()方法在成功完成時將會返回給定的結果。 可以將傳入的參數(shù)在run里處理并返回結果。
public class Data {
int num = 1;
String str = "threadData";
//省略
}
public class MyThread implements Runnable{
Data data;
public void run(){
Data.setStr("data")
}
}
public class ThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(5);
Future<Data> runableResult = es.submit(new MyThread(data), data);
System.out.println(runableResult.get());
}
}
打印結果:Data{num=1, str='data'}
submit(Callable<T> task):下面的例子是提交一個Callable的任務,返回一個表示該任務的Future,然后通過Future中的get()方法可以獲取到返回值。
public class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
return "aaa";
}
}
public class ThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(5);
Future future = es.submit(new MyThread());
System.out.println(future.get());
}
}
打印的結果為:aaa
2、java中Executors創(chuàng)建的四種線程池
- newCachedThreadPool()
創(chuàng)建一個可緩存的線程池。如果有新任務提交時,有空閑線程則直接處理任務,如果沒有就創(chuàng)建新的線程處理任務,線程池不對線程池大小做限制。
newCachedThreadPool(ThreadFactory threadFactory):創(chuàng)建一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前構造的線程可用時將重用它們,并在需要時使用提供的 ThreadFactory創(chuàng)建新線程。
注:也可以使用ThreadPoolExecutor創(chuàng)建個性化設置的線程池。這里講的四種線程池看源碼,里面都是使用ThreadPoolExecutor構建的。
ExecutorService es = Executors.newCachedThreadPool();
- newFixedThreadPool(int nThreads)
創(chuàng)建一個固定大小的線程池。當線程達到線程池的最大值,再提交任務則進入隊列中,等有線程空閑時,再從隊列中取出任務繼續(xù)執(zhí)行。
newFixedThreadPool(int nThreads,ThreadFactory threadFactory),threadFactory是創(chuàng)建新線程時使用的工廠。
- newScheduledThreadPool(int corePoolSize)
創(chuàng)建一個固定大小的線程池,支持定時及周期性任務執(zhí)行。corePoolSize是池中所保存的線程數(shù),即使線程是空閑的也包括在內。
newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory),threadFactory是創(chuàng)建新線程時使用的工廠。
下面代碼中的schedule(Runnable<V> command,long delay,TimeUnit unit)方法的參數(shù):第一個參數(shù)是要執(zhí)行的任務,第二個是從現(xiàn)在開始延遲執(zhí)行時間,unit是延遲參數(shù)的時間單位。
ThreadPoolRunnable tpr = new ThreadPoolRunnable();
ScheduledExecutorService es = Executors.newScheduledThreadPool(3);
es.schedule(tpr,3,TimeUnit.SECONDS);
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法可以創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期。
-
newSingleThreadExecutor()
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序執(zhí)行。這里的單線程執(zhí)行指的是線程池內部,從線程池外看,提交任務到線程池時并沒有阻塞,仍然是異步的。
3、為什么使用ThreadFactory及用法
使用ThreadFactory有幾點好處:
- 我們可以自己設置一個線程名,而不用使用默認的pool-thread-n這種名字。
- 可以給線程設置成守護線程。
- 可以設置優(yōu)先級。
- 可以處理為捕捉的異常。
ThreadFactory使用方法:
MyRunnable:
public class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("當前線程為:" + Thread.currentThread().getName());
}
}
MyThreadFactory:
public class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("threadFactory線程");
return thread;
}
}
public class Test {
public static void main(String[] args) {
MyThreadFactory tf = new MyThreadFactory();
MyRunnable runnable = new MyRunnable();
ExecutorService es = Executors.newFixedThreadPool(tf);
es.submit(runnable);
}
}
三、ThreadPoolExecutor用法
1、Executors創(chuàng)建的四種線程池可能會出現(xiàn)的問題
上面說了Executors創(chuàng)建的幾種線程池,但是不建議使用Executors去創(chuàng)建線程池,這是為什么呢?
我們來看一下Executors創(chuàng)建的線程池源碼:
- Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
源碼中corePoolSize被設置為0,maximumPoolSize被設置為Integer.MAX_VALUE,keepAliveTime被設置為60秒,因為最大線程池是Integer.MAX_VALUE,可能會因為創(chuàng)建大量的線程導致OOM。
- Executors.newScheduledThreadPool(n)
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
其問題和newCachedThreadPool一樣,源碼中maximumPoolSize被設置為Integer.MAX_VALUE,所以可能會因為創(chuàng)建大量的線程導致OOM。
- Executors.newFixedThreadPool(n)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
源碼中使用的是LinkedBlockingQueue無界隊列作為線程池的工作隊列,因為沒有設置容量,其默認隊列的容量是Integer.MAX_VALUE,所以也可能會堆積大量的請求,從而導致OOM。
- Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
和newFixedThreadPool一樣,因為使用LinkedBlockingQueue也可能堆積大量的請求,從而導致OOM。
注:如果使用Executors的靜態(tài)方法創(chuàng)建ThreadPoolExecutor對象,可以通過使用Semaphore對任務的執(zhí)行進行限流也可以避免出現(xiàn)OOM異常。
2、ThreadPoolExecutor配置:
有下面7個參數(shù):
int corePoolSize:核心線程池大小
int maximumPoolSize:最大線程池大小
long keepAliveTime:非核心線程最大空閑時間
TimeUnit unit:時間單位
BlockingQueue<Runnable> workQueue:線程排隊策略
RejectedExecutionHandler handler:拒絕策略
ThreadFactory threadFactory:線程工廠
-
corePoolSize和maximumPoolSize
ThreadPoolExecutor將根據(jù)corePoolSize和maximumPoolSize設置的邊界自動調整線程池大小。
corePoolSize和maximumPoolSize:
當新任務在方法execute(Runnable)中提交任務時,如果運行的線程少于corePoolSize時,哪怕線程池中有空閑狀態(tài)的線程,也會創(chuàng)建一個新的線程來處理任務。如果運行的線程多于corePoolSize而少于maximumPoolSize,則僅當隊列滿時才創(chuàng)建新線程。如果設置corePoolSize和maximumPoolSize相同,則創(chuàng)建了固定大小的線程池。如果將maximumPoolSize設置為基本的無界值(如Integer.MAX_VALUE),則允許池適應任意數(shù)量的并發(fā)任務。在大多數(shù)情況下,核心和最大池大小僅基于構造來設置,不過也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)進行動態(tài)更改。
按需構造:
默認情況下,即使核心線程最初只是在新任務到達時才創(chuàng)建和啟動的,也可以使用方法prestartCoreThread()或prestartAllCoreThreads()對其進行動態(tài)重寫。如果構造帶有非空隊列的池,則可能希望預先啟動線程。
創(chuàng)建新線程:
使用ThreadFactory創(chuàng)建新線程,如果沒有另外說明,則在同一個ThreadGroup中一律使用Executors.defaultThreadFactory()創(chuàng)建線程,這些線程具有相同的 NORM_PRIORITY 優(yōu)先級和非守護進程狀態(tài)。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優(yōu)先級、守護進程狀態(tài),等等。如果從 newThread 返回 null 時 ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序將繼續(xù)運行,但不能執(zhí)行任何任務。
-
keepAliveTime線程存活時間
此參數(shù)是在池中當前有多于corePoolSize的線程起作用,這些多出的線程在空閑時間超過keepAliveTime時終止。目的是為了在池處于非活躍狀態(tài)時減少資源消耗的方法。
可以使用setKeepAliveTime(long keepAliveTime,TimeUnit unit)方法動態(tài)更改此參數(shù)。使用Long.MAX_VALUE,TimeUnit.NANOSECONDS這兩個參數(shù)時可以使空閑線程永遠不會在關閉之前終止。
只要keepAliveTime值非0,allowCoreThreadTimeOut(boolean)方法可以將此超時策略應用于核心線程(corePool)。
-
TimeUnit時間單位
用于設置keepAliveTime設置的超時時間的單位。keepAliveTime:60L,TimeUnit.SECONDS代表:60秒。
注:TimeUnit.SECONDS.sleep(1); //相當于Thread.sleep(1000);
即可以使用TimeUnit里的sleep()方法來代替Thread.sleep()方法
-
BlockingQueue<Runnable>任務排隊策略
所有的BlockingQueue都可用于傳輸和保持提交的任務。此隊列的使用時與池大小進行交互,下面是排隊時機:
- 如果運行的線程少于
corePoolSize,則Executor始終首選添加新的線程,而不進行排隊。 - 如果運行的線程等于或多于
corePoolSize,則Executor始終首選將請求加入隊列,而不添加新的線程。 - 如果無法將請求加入隊列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出
maximumPoolSize,在這種情況下,任務將被拒絕。
主要有三種排隊策略:
直接提交隊列:
?直接提交隊列的默認選項是SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
無界隊列:
?使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立于其他任務,即任務執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
有界隊列:
?當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
-
RejectedExecutionHandler任務拒絕策略
當Executo已經(jīng)關閉,或Executor將有限邊界用于最大線程數(shù)和工作對列容量,并且兩者已飽和時,在execute方法中提交新任務將被拒絕。以上兩種情況下,exeute方法都將調用RejectedExecutionHandler的rejectedExecution()方法。線程池提供了下面四種預定的拒絕策略:
中止策略(默認):ThreadPoolExecutor.AbortPolicy()
?線程數(shù)超過maximumPoolSize時,直接拒絕,拋出運行時RejectedExecutionException的異常。
調用者運行策略:ThreadPoolExecutor.CallerRunsPolicy()
?用調用者的線程來調用execute,除非executor被關閉,否則任務不會被丟棄。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。注意:當最大線程池過小時,此策略下,任務會交上層線程(即調用executor的線程)執(zhí)行,導致上層線程既要處理其他任務,又要處理排隊中的大量任務,如果遇到長連接,上層線程將長時間阻塞,出現(xiàn)故障。
拋棄舊任務策略:ThreadPoolExecutor.DiscardOldestPolicy()
?如果執(zhí)行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復此過程)。注意:如果此時阻塞隊列使用PriorityBlockingQueue優(yōu)先級隊列,將會導致優(yōu)先級最高的任務被拋棄,因此不建議將該種策略配合優(yōu)先級隊列使用。
直接丟棄策略:ThreadPoolExecutor.DiscardPolicy()
?線程數(shù)超過maximumPoolSize時,任務被直接被丟棄。和AbortPolicy一樣,但不拋出異常。
寫在最后:
- 如果文章中有錯誤或是表達不準確的地方,歡迎大家評論中指正,以便我完善。
- 文章我也會根據(jù)所學到新的知識不斷更新。