
引言
在不斷的學(xué)習(xí)中提升自己,收獲成就感。
積累知識(shí)是給未來的自己最好的禮物。
1.Executor簡(jiǎn)介
在Executor接口介紹中定義了它的身份:一個(gè)用于執(zhí)行被提交的任務(wù)的對(duì)象。這個(gè)接口提供了一種降低任務(wù)耦合的技術(shù),包括了任務(wù)怎么運(yùn)行,調(diào)度等。接口內(nèi)部就聲明了一個(gè)方法void execute(Runnable command);該方法只接受實(shí)現(xiàn)了Runnable接口的任務(wù)對(duì)象。
線程創(chuàng)建可以通過Executors類中的靜態(tài)方法創(chuàng)建,只需要在接口中傳入需要配置的參數(shù)即可。Executors它的內(nèi)部提供了一些線程池的方法,算是一個(gè)線程池工廠吧。
2. ThreadPoolExecutor
普通線程池執(zhí)行器,執(zhí)行每個(gè)被提交的任務(wù),一般通過Executors中的工廠方法進(jìn)行配置。它有效解決了兩種問題:(1)提升了線程執(zhí)行大批量異步任務(wù)的表現(xiàn),因?yàn)樗档土司€程管理的耗費(fèi),不需要手動(dòng)去創(chuàng)建線程和管理線程的生命周期。(2)還提供了一種在任務(wù)執(zhí)行期間的資源(包括線程和消耗)跳轉(zhuǎn)和管理手段。它還能提供一些統(tǒng)計(jì)能力,如已經(jīng)完成的線程數(shù)目。
2.1重要的屬性
(1)AtomicInteger
原子性的整型封裝對(duì)象,可以用它安全的獲取到當(dāng)前線程數(shù)。它里面保持的整型由兩部分組成,高三位是運(yùn)行狀態(tài),低29位是線程數(shù)量
(2)CAPACITY
容量總值是1左移29位減1,就是29個(gè)1,約500百萬個(gè)線程數(shù)。
(3)runState
運(yùn)行狀態(tài)有5種,占據(jù)了整型的高三位,RUNNING(-1<<29),SHUTDOWN(0<<29),STOP(1<<29),TIDYING(2<<29),TERMINATED(3<<29)。運(yùn)行狀態(tài)是RUNNING->SHUTDOWN(執(zhí)行方法shutdown()||finalize())-> STOP(執(zhí)行方法shutdownNow())-> TIDYING(池和隊(duì)列都空了)-> TERMINATED(執(zhí)行方法terminated(),鉤子方法完成)。awaitTermination()方法的會(huì)在線程狀態(tài)到達(dá)TERMINATED時(shí)產(chǎn)生返回值。
(3)workerCount
當(dāng)前運(yùn)行的線程計(jì)數(shù)值,通過CAS對(duì)線程統(tǒng)計(jì)值進(jìn)行加減操作
(4)workQueue
存儲(chǔ)待執(zhí)行的任務(wù)
2.2構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize, //線程池中線程需要保持的個(gè)數(shù),甚至是當(dāng)他們處于空閑狀態(tài)。
intmaximumPoolSize, //最大允許的線程個(gè)數(shù)
longkeepAliveTime, //當(dāng)線程池中線程數(shù)超過了corePoolSize,閑置線程會(huì)等待結(jié)束的時(shí)間。
TimeUnit unit, // keepAliveTime的時(shí)間單位
BlockingQueue workQueue, //用來存放由execute方法提交的任務(wù)
ThreadFactorythreadFactory, //用來創(chuàng)建新的線程
RejectedExecutionHandlerhandler) //當(dāng)隊(duì)列滿的時(shí)候,任務(wù)提交阻塞的拒絕處理器
2.3 任務(wù)執(zhí)行方法
提交一個(gè)實(shí)現(xiàn)了Runnable接口的任務(wù),然后由線程池內(nèi)部去執(zhí)行,至于是新建線程還是選擇哪個(gè)已有的線程去執(zhí)行讓線程池自行決定。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//當(dāng)運(yùn)行的線程數(shù)量少于corePoolSize,嘗試新建一個(gè)線程(worker)去執(zhí)行任務(wù)。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//如果添加線程失敗,重新獲取運(yùn)行狀態(tài)和線程計(jì)數(shù)值
c = ctl.get();
}
//如果線程池的狀態(tài)是isRunning,且阻塞隊(duì)列中還能添加
if (isRunning(c) && workQueue.offer(command)) {
//再次獲取計(jì)數(shù)和運(yùn)行狀態(tài)
int recheck = ctl.get();
//如果狀態(tài)不是isRunning,且能夠從阻塞隊(duì)列中移除
if (! isRunning(recheck) && remove(command))
//那么拒絕這個(gè)新任務(wù)
reject(command);
//如果沒有任務(wù)在運(yùn)行,就新增加一個(gè)worker,這個(gè)worker中不能再放入task了,因?yàn)樯厦嬉呀?jīng)放入到queue中了。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果增加worker失敗,那么拒絕新任務(wù)。
else if (!addWorker(command, false))
reject(command);
}
2.4addWorker
增加worker,傳入是否在core中增加的布爾值和任務(wù)。
private boolean addWorker(RunnablefirstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//這個(gè)邏輯寫的也太混亂了,理解出來后就是:rs>SHUTDOWN,
//或rs>=SHUTDOWN且firsttask不是空,或rs>=SHUTDOWN且workQueue是空的。
//因?yàn)镽UNNING是一個(gè)負(fù)數(shù),所以運(yùn)行狀態(tài)的值是小于SHUTDOWN狀態(tài)的。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程個(gè)數(shù)統(tǒng)計(jì)值
int wc = workerCountOf(c);
//當(dāng)core是true時(shí)(即想在添加常駐線程),線程計(jì)數(shù)不能超過corePoolSize,否則不能超過maximumPoolSize。
if (wc >= CAPACITY ||
wc >= (core ?corePoolSize : maximumPoolSize))
return false;
//線程統(tǒng)計(jì)值+1,意味著worker還未新增就先增加一個(gè)計(jì)數(shù)值。
if(compareAndIncrementWorkerCount(c))
break retry;
//如果統(tǒng)計(jì)值增加失敗,重新獲取原子integer。
c = ctl.get(); // Re-read ctl
//運(yùn)行狀態(tài)改變了的話,回到最初的循環(huán)點(diǎn)。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due toworkerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//獲取線程池的主鎖,給它鎖上,添加線程是一個(gè)原子操作,只能被當(dāng)前線程操作。
final ReentrantLock mainLock =this.mainLock;
mainLock.lock();
try {
// Recheck while holdinglock.
// Back out onThreadFactory failure or if
// shut down before lockacquired.
int rs =runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN&& firstTask == null)) {
//如果線程還沒使用就處于運(yùn)行狀態(tài),直接報(bào)錯(cuò),不知道什么情況下會(huì)這樣
if (t.isAlive()) //precheck that t is startable
throw newIllegalThreadStateException();
//如果前面都正常,就在工作者隊(duì)列中增加一個(gè)新的工作者。
workers.add(w);
int s = workers.size();
//當(dāng)前池中的線程最大個(gè)數(shù)
if (s >largestPoolSize)
largestPoolSize =s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果worker起不來就移除這個(gè)worker,并減去計(jì)數(shù)值。
if (! workerStarted)
addWorkerFailed(w);
}
//就是說明了增加成功標(biāo)志是,成功啟動(dòng)。
return workerStarted;
}
3. ForkJoinPool
它是將任務(wù)拆分成更小的任務(wù),然后通過并行運(yùn)算多個(gè)小任務(wù)再合并小任務(wù)的結(jié)果的形式加速總體任務(wù)的計(jì)算效率。從這里可以發(fā)現(xiàn)兩個(gè)特征:小任務(wù)的結(jié)果,拆分合并,說明了兩個(gè)問題,它執(zhí)行的任務(wù)是可以拆分的,是有結(jié)果的,而不是void。感覺這個(gè)東西和現(xiàn)在的大數(shù)據(jù)基本思想是一致的:分布式任務(wù)執(zhí)行。
3.1重要的屬性
(1)WorkQueue工作者隊(duì)列,內(nèi)部用數(shù)組存放各個(gè)待執(zhí)行的Task。
(2)ForkJoinWorkerThread工作線程,繼承自Thread,因此它才是真正用來執(zhí)行任務(wù)的最小單位。
(3)ForkJoinTask工作任務(wù),繼承自Future,F(xiàn)uture是有返回值的異步實(shí)現(xiàn)方式,跟Runnable不同。它是用來定義實(shí)際任務(wù)的。
3.2構(gòu)造函數(shù)
public ForkJoinPool(int parallelism, //并行數(shù),默認(rèn)為可以獲取到的處理器個(gè)數(shù)
ForkJoinWorkerThreadFactory factory, //用于創(chuàng)建新線程的工廠
UncaughtExceptionHandler handler, //處理任務(wù)執(zhí)行過程中不可回收的異常
boolean asyncMode) { //true對(duì)應(yīng)的FIFO模式,false對(duì)應(yīng)的是LIFO模式
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" +nextPoolId() + "-worker-");
checkPermission();
}
3.3 任務(wù)執(zhí)行方法
外部通過調(diào)用execute方法傳入Task執(zhí)行,實(shí)際上內(nèi)部都是使用externalPush去執(zhí)行了Task中需要執(zhí)行的任務(wù)。
public voidexecute(Runnable task) { //傳入的是Runnable的實(shí)現(xiàn)類
if (task == null)
throw new NullPointerException();
ForkJoinTask job;
if (task instanceofForkJoinTask) // avoid re-wrap
job = (ForkJoinTask) task;
else
job = newForkJoinTask.RunnableExecuteAction(task); //在這里需要進(jìn)行包裝一下,包裝成ForkJoinTask類型。
externalPush(job);
}
final void externalPush(ForkJoinTasktask) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null&& (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) !=null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0,1)) {
ForkJoinTask[] a; int am,n, s;
if ((a = q.array) != null&&
(am = a.length - 1) > (n = (s = q.top)- q.base)) {
int j = ((am & s) <
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s +1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1,0);
}
externalSubmit(task);
}
3.4 舉例
public class Test {
public static void main(String[] args) throws InterruptedException,ExecutionException {
int[] arr = new int[100];
Random random = new Random();
int total = 0;
for (int i = 0, len = arr.length; i < len; i++) {
int temp = random.nextInt(20);
total += (arr[i] = temp);
}
System.out.println("初始化數(shù)組總和:" + total);
SumTask task = new SumTask(arr, 0, arr.length);
//設(shè)置并行數(shù)目
ExecutorService pool = Executors.newWorkStealingPool(2);
//提交分解的SumTask 任務(wù),以及result類型
Future future = pool.submit(task);
System.out.println(future.get());
pool.shutdown();
}
static class SumTask implements Callable {
private static final int THRESHOLD = 20;
private int array[];
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
public Integer call() throws Exception {
int sum = 0;
if (end - start < THRESHOLD) {
for (int i = start; i < end;i++) {
sum = sum + array[i];
}
return sum;
} else { //當(dāng)end -start > threshold , 將大任務(wù)拆分成小任務(wù)
int middle = (start + end) / 2;
SumTask left = newSumTask(array, start, middle);
SumTask right = newSumTask(array, middle, end);
int x = left.call();
int y = right.call();
return x + y;
}
}
}
}
4. ScheduledThreadPoolExecutor
在ThreadPoolExecutor前面加上了Scheduled的修飾(實(shí)際上它就是繼承了ThreadPoolExecutor),從字面上理解,它是ThreadPoolExecutor升級(jí)版,能按照規(guī)劃的方式執(zhí)行給定的任務(wù)。如可以給任務(wù)設(shè)置延時(shí),設(shè)置運(yùn)行周期,時(shí)間間隔等。
4.1構(gòu)造函數(shù)
//使用父類的構(gòu)造函數(shù),傳入的queue為DelayedWorkQueue,時(shí)間單位納秒,池的最大值整型最大值。
public ScheduledThreadPoolExecutor(intcorePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(),threadFactory, handler);
}
4.2 ScheduledFutureTask
它是ScheduledThreadPoolExecutor中的私有內(nèi)部類,使用period來記錄多久后重復(fù)執(zhí)行;用time來記錄他執(zhí)行前需要等待的時(shí)間,也就是延時(shí)時(shí)間。
//內(nèi)部的run方法
public void run() {
boolean periodic =isPeriodic(); //如果傳入的周期時(shí)間是0,就返回false
if(!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run(); //如果不是周期性的,調(diào)用父類的run方法,直接執(zhí)行任務(wù)
else if(ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); //如果是周期型的,設(shè)置下次運(yùn)行的時(shí)間
reExecutePeriodic(outerTask); //放入隊(duì)列中,等時(shí)間到了就執(zhí)行。
}
}
4.3 任務(wù)執(zhí)行方法
execute(submit)方法在這里都是調(diào)用的schedule方法,然后ScheduledThreadPoolExecutor又新增了兩個(gè)重要的方法:scheduleAtFixedRate,scheduleWithFixedDelay,這個(gè)兩個(gè)方法支持了線程執(zhí)行的周期和延時(shí)控制,方法內(nèi)對(duì)command進(jìn)行了包裝,然后有調(diào)用了delayedExecute方法
//傳入包裝好的Task
private voiddelayedExecute(RunnableScheduledFuture task) {
if (isShutdown())
reject(task); //如果線程池的狀態(tài)是已經(jīng)shutdown的,那么拒絕該任務(wù)
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart(); //調(diào)用addwork方法,增加worker,與ThreadPoolExecutor相同。
}
}
原創(chuàng)文章轉(zhuǎn)載請(qǐng)標(biāo)明出處
更多文章請(qǐng)查看
http://www.canfeng.xyz