1. 四種線程池創(chuàng)建方式,底層都是依賴ThreadPoolExecutor這個方法
ExecutorService executorService=Executors.newFixedThreadPool(10);
ExecutorService executorService1=Executors.newCachedThreadPool();
ExecutorService executorService2=Executors.newScheduledThreadPool(10);
ExecutorService executorService3=Executors.newSingleThreadExecutor();
2. ThreadPoolExecutor的重要參數(shù)
public ThreadPoolExecutor(int corePoolSize,//核心線程數(shù)
int maximumPoolSize,//最大線程數(shù)
long keepAliveTime,//線程執(zhí)行和存活時間
TimeUnit unit,//線程執(zhí)行和存活時間單位
BlockingQueue<Runnable> workQueue,//任務(wù)隊列
RejectedExecutionHandler handler//隊列滿了,執(zhí)行的拒絕策略) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
3. ThreadPoolExecutor的執(zhí)行策略
在使用有界隊列時,若有新的任務(wù)需要執(zhí)行,如果線程池實際線程數(shù)小于corePoolSize,則優(yōu)先創(chuàng)建線程,若大于corePoolSize,則會將任務(wù)加入隊列,若隊列已滿,則在總線程數(shù)不大于maximumPoolSize的前提下,創(chuàng)建新的線程,若線程數(shù)大于maximumPoolSize,則執(zhí)行拒絕策略?;蚱渌远x方式(拒絕方式一般選擇是記入日志,任務(wù)id,關(guān)鍵key,非高峰期重新批處理)。
若使用無界隊列,則不存在任務(wù)進(jìn)入隊列失敗的情況.這種情況下線程數(shù)達(dá)到corePoolSize就不會繼續(xù)創(chuàng)建新線程,后續(xù)任務(wù)一直進(jìn)入堆積,直到系統(tǒng)資源耗盡.
下面是一個demo
Main代碼
public class UseThreadPoolExecutor1 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //coreSize
2, //MaxSize
60, //60
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3) //指定一種隊列 (有界隊列)
//new LinkedBlockingQueue<Runnable>()
, new MyRejected()
//, new DiscardOldestPolicy()
);
MyTask mt1 = new MyTask(1, "任務(wù)1");
MyTask mt2 = new MyTask(2, "任務(wù)2");
MyTask mt3 = new MyTask(3, "任務(wù)3");
MyTask mt4 = new MyTask(4, "任務(wù)4");
MyTask mt5 = new MyTask(5, "任務(wù)5");
MyTask mt6 = new MyTask(6, "任務(wù)6");
pool.execute(mt1);
//pool.execute(mt2);
//pool.execute(mt3);
//pool.execute(mt4);
//pool.execute(mt5);
//pool.execute(mt6);
pool.shutdown();
}
}
Task代碼
public class MyTask implements Runnable {
private int taskId;
private String taskName;
public MyTask(int taskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
try {
System.out.println("run taskId =" + this.taskId+" Thread"+Thread.currentThread().getId());
Thread.sleep(5*1000);
//System.out.println("end taskId =" + this.taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String toString(){
return Integer.toString(this.taskId);
}
}
MyRejected代碼
public class MyRejected implements RejectedExecutionHandler{
public MyRejected(){
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定義處理..");
System.out.println("當(dāng)前被拒絕任務(wù)為:" + r.toString());
}
}
1.只保留1個 pool.execute方法 log信息如下
run taskId =1 Thread11
2.保留2個 pool.execute方法 log信息如下
run taskId =1 Thread11
run taskId =2 Thread11
//只有一個線程操作,中間有間隔時間等待task1完成.
3.保留5個 pool.execute方法 log信息如下
//第一時間1和5同時執(zhí)行
run taskId =1 Thread11
run taskId =5 Thread12
//過一會2,3,4順序執(zhí)行
run taskId =2 Thread11
run taskId =3 Thread12
run taskId =4 Thread11
//執(zhí)行順序解讀
1.task1,2,3,4,5陸續(xù)添加到線程池中之行
2.線程中有一個核心線程,task1立即執(zhí)行
3.task1執(zhí)行過程中,2,3,4被放入隊列中,但是由于隊列容量只有3(ArrayBlockingQueue<Runnable>(3)),task5進(jìn)入不了隊列
4.線程池判斷核心線程數(shù)<最大線程數(shù),又新開了一個線程,立刻執(zhí)行線程5
5.task1,5執(zhí)行完成之后,ThreadPool存在2條線程依次執(zhí)行任務(wù)2,3,4
4.保留6個 pool.execute方法 log信息如下
自定義處理..
run taskId =5 Thread12
當(dāng)前被拒絕任務(wù)為:6
run taskId =1 Thread11
run taskId =2 Thread11
run taskId =3 Thread12
run taskId =4 Thread11
//當(dāng)執(zhí)行線程池和隊列都滿了以后直接執(zhí)行拒絕策略
5.保留6個 pool.execute方法 將構(gòu)造方法中的隊列替換成‘new LinkedBlockingQueue<Runnable>() ’ log信息如下
run taskId =1 Thread11
run taskId =2 Thread11
run taskId =3 Thread11
run taskId =4 Thread11
run taskId =5 Thread11
run taskId =6 Thread11
//當(dāng)隊列換成了無界隊列后,只初始化了一個線程,只有一個線程執(zhí)行任務(wù),maxsize這個參數(shù)失效