使用 Executors 創(chuàng)建線程池
1.newFixedThreadPool()
由于使用了LinkedBlockingQueue所以maximumPoolSize沒(méi)用,當(dāng)corePoolSize滿了之后就加入到LinkedBlockingQueue隊(duì)列中。
每當(dāng)某個(gè)線程執(zhí)行完成之后就從LinkedBlockingQueue隊(duì)列中取一個(gè)。
所以這個(gè)是創(chuàng)建固定大小的線程池。
源碼分析:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.newSingleThreadPool()
創(chuàng)建線程數(shù)為1的線程池,由于使用了LinkedBlockingQueue所以maximumPoolSize 沒(méi)用,corePoolSize為1表示線程數(shù)大小為1,滿了就放入隊(duì)列中,執(zhí)行完了就從隊(duì)列取一個(gè)。
源碼分析
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
3.newCachedThreadPool()
創(chuàng)建可緩沖的線程池。沒(méi)有大小限制。由于corePoolSize為0所以任務(wù)會(huì)放入SynchronousQueue隊(duì)列中,SynchronousQueue只能存放大小為1,所以會(huì)立刻新起線程,由于maxumumPoolSize為Integer.MAX_VALUE所以可以認(rèn)為大小為2147483647。受內(nèi)存大小限制。
源碼分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
使用 ThreadPoolExecutor 創(chuàng)建線程池
源碼分析 ,ThreadPoolExecutor 的構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
構(gòu)造函數(shù)參數(shù)
1、corePoolSize 核心線程數(shù)大小,當(dāng)線程數(shù) < corePoolSize ,會(huì)創(chuàng)建線程執(zhí)行 runnable
2、maximumPoolSize 最大線程數(shù), 當(dāng)線程數(shù) >= corePoolSize的時(shí)候,會(huì)把 runnable 放入 workQueue中
3、keepAliveTime 保持存活時(shí)間,當(dāng)線程數(shù)大于corePoolSize的空閑線程能保持的最大時(shí)間。
4、unit 時(shí)間單位
5、workQueue 保存任務(wù)的阻塞隊(duì)列
6、threadFactory 創(chuàng)建線程的工廠
7、handler 拒絕策略
任務(wù)執(zhí)行順序
1、當(dāng)線程數(shù)小于 corePoolSize時(shí),創(chuàng)建線程執(zhí)行任務(wù)。
2、當(dāng)線程數(shù)大于等于 corePoolSize并且 workQueue 沒(méi)有滿時(shí),放入workQueue中
3、線程數(shù)大于等于 corePoolSize并且當(dāng) workQueue 滿時(shí),新任務(wù)新建線程運(yùn)行,線程總數(shù)要小于 maximumPoolSize
4、當(dāng)線程總數(shù)等于 maximumPoolSize 并且 workQueue 滿了的時(shí)候執(zhí)行 handler 的 rejectedExecution。也就是拒絕策略。
JDK7提供了7個(gè)阻塞隊(duì)列。(也屬于并發(fā)容器)
1、 ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
2、LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
3、PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
4、DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。
5、SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
6、LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
7、LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
什么是阻塞隊(duì)列?
阻塞隊(duì)列是一個(gè)在隊(duì)列基礎(chǔ)上又支持了兩個(gè)附加操作的隊(duì)列。
2個(gè)附加操作:
支持阻塞的插入方法:隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。
支持阻塞的移除方法:隊(duì)列空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/p>
阻塞隊(duì)列的應(yīng)用場(chǎng)景
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里取元素的線程。簡(jiǎn)而言之,阻塞隊(duì)列是生產(chǎn)者用來(lái)存放元素、消費(fèi)者獲取元素的容器。
幾個(gè)方法
在阻塞隊(duì)列不可用的時(shí)候,上述2個(gè)附加操作提供了四種處理方法

JAVA里的阻塞隊(duì)列
JDK 7 提供了7個(gè)阻塞隊(duì)列,如下
1、ArrayBlockingQueue 數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序,但是默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列,即如果隊(duì)列滿了,那么被阻塞在外面的線程對(duì)隊(duì)列訪問(wèn)的順序是不能保證線程公平(即先阻塞,先插入)的。
2、LinkedBlockingQueue一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
此隊(duì)列按照先出先進(jìn)的原則對(duì)元素進(jìn)行排序
3、PriorityBlockingQueue支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列
4、DelayQueue支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列,即可以指定多久才能從隊(duì)列中獲取當(dāng)前元素
5、SynchronousQueue不存儲(chǔ)元素的阻塞隊(duì)列,每一個(gè)put必須等待一個(gè)take操作,否則不能繼續(xù)添加元素。并且他支持公平訪問(wèn)隊(duì)列。
6、LinkedTransferQueue由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列。相對(duì)于其他阻塞隊(duì)列,多了tryTransfer和transfer方法
transfer方法
如果當(dāng)前有消費(fèi)者正在等待接收元素(take或者待時(shí)間限制的poll方法),transfer可以把生產(chǎn)者傳入的元素立刻傳給消費(fèi)者。如果沒(méi)有消費(fèi)者等待接收元素,則將元素放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。
tryTransfer方法
用來(lái)試探生產(chǎn)者傳入的元素能否直接傳給消費(fèi)者。,如果沒(méi)有消費(fèi)者在等待,則返回false。和上述方法的區(qū)別是該方法無(wú)論消費(fèi)者是否接收,方法立即返回。而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。
7、LinkedBlockingDeque鏈表結(jié)構(gòu)的雙向阻塞隊(duì)列,優(yōu)勢(shì)在于多線程入隊(duì)時(shí),減少一半的競(jìng)爭(zhēng)。
四個(gè)拒絕策略
ThreadPoolExecutor默認(rèn)有四個(gè)拒絕策略:
1、ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException
2、ThreadPoolExecutor.CallerRunsPolicy() 直接調(diào)用run方法并且阻塞執(zhí)行
3、ThreadPoolExecutor.DiscardPolicy() 直接丟棄后來(lái)的任務(wù)
4、ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊(duì)列中隊(duì)首的任務(wù)
當(dāng)然可以自己繼承RejectedExecutionHandler來(lái)寫(xiě)拒絕策略.
TestThreadPoolExecutor 示例
TestThreadPoolExecutor.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:39
**/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
// 構(gòu)造一個(gè)線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
);
for (int i = 1; i <= 10; i++) {
try {
String task = "task=" + i;
System.out.println("創(chuàng)建任務(wù)并提交到線程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//等待所有線程執(zhí)行完畢當(dāng)前任務(wù)。
threadPool.shutdown();
boolean loop = true;
do {
//等待所有線程執(zhí)行完畢當(dāng)前任務(wù)結(jié)束
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
} while (loop);
if (loop != true) {
System.out.println("所有線程執(zhí)行完畢");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("耗時(shí):" + (System.currentTimeMillis() - currentTimeMillis));
}
}
}
ThreadPoolTask.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.io.Serializable;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:40
**/
public class ThreadPoolTask implements Runnable, Serializable {
private Object attachData;
ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}
public void run() {
try {
System.out.println("開(kāi)始執(zhí)行任務(wù):" + attachData + "任務(wù),使用的線程池,線程名稱(chēng):" + Thread.currentThread().getName());
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
attachData = null;
}
}
遇到j(luò)ava.util.concurrent.RejectedExecutionException
第一
你的線程池 ThreadPoolExecutor 顯示的 shutdown() 之后,再向線程池提交任務(wù)的時(shí)候。 如果你配置的拒絕策略是 AbortPolicy 的話,這個(gè)異常就會(huì)拋出來(lái)。
第二
當(dāng)你設(shè)置的任務(wù)緩存隊(duì)列過(guò)小的時(shí)候,或者說(shuō), 你的線程池里面所有的線程都在干活(線程數(shù)== maxPoolSize),并且你的任務(wù)緩存隊(duì)列也已經(jīng)充滿了等待的隊(duì)列, 這個(gè)時(shí)候,你再向它提交任務(wù),則會(huì)拋出這個(gè)異常。
響應(yīng)
可以看到線程 pool-1-thread-1 到5 循環(huán)使用
創(chuàng)建任務(wù)并提交到線程池中:task=1
開(kāi)始執(zhí)行任務(wù):task=1任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-1
創(chuàng)建任務(wù)并提交到線程池中:task=2
開(kāi)始執(zhí)行任務(wù):task=2任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-2
創(chuàng)建任務(wù)并提交到線程池中:task=3
開(kāi)始執(zhí)行任務(wù):task=3任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-3
創(chuàng)建任務(wù)并提交到線程池中:task=4
開(kāi)始執(zhí)行任務(wù):task=4任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-4
創(chuàng)建任務(wù)并提交到線程池中:task=5
開(kāi)始執(zhí)行任務(wù):task=5任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-5
創(chuàng)建任務(wù)并提交到線程池中:task=6
開(kāi)始執(zhí)行任務(wù):task=6任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-1
創(chuàng)建任務(wù)并提交到線程池中:task=7
開(kāi)始執(zhí)行任務(wù):task=7任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-2
創(chuàng)建任務(wù)并提交到線程池中:task=8
開(kāi)始執(zhí)行任務(wù):task=8任務(wù),使用的線程池,線程名稱(chēng):pool-1-thread-3
創(chuàng)建任務(wù)并提交到線程池中:task=9