Java的線程既是工作單元,也是執(zhí)行機制。JDK5開始,把工作單元和執(zhí)行機制分離開,工作單元包括Runnable和Callable,而執(zhí)行機制由Executor框架提供。
Executor框架簡介
Executor框架的結(jié)構(gòu)和成員
-
Executor框架的結(jié)構(gòu):主要由下面3部分組成。
- 任務(wù):被執(zhí)行的任務(wù)需要實現(xiàn)Runnable接口或Callable接口。
- 任務(wù)的執(zhí)行:包括任務(wù)執(zhí)行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關(guān)鍵類實現(xiàn)了ExecutorService接口(
ThreadPoolExecutor和ScheduledThreadPoolExecutor)。 - 異步計算的結(jié)果:包括接口Future和實現(xiàn)Future接口的FutureTask類。
-
主要的類和接口
-
Executor是一個接口,是Executor框架的基礎(chǔ),它將任務(wù)的提交和任務(wù)的執(zhí)行分離開來。 -
ThreadPoolExecutor是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務(wù)。 -
ScheduledThreadPoolExecutor是一個實現(xiàn)類,可以在給定的延遲后運行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。 -
Future接口和實現(xiàn)Future接口的FutureTask類,代表異步計算的結(jié)果。 -
Runnable和Callable接口的實現(xiàn)類,具體執(zhí)行任務(wù)的類。
-
-
Executor框架的成員
介紹Executor框架的主要成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。-
ThreadPoolExecutor通常使用工廠類Executors來創(chuàng)建。有3中類型。-
FixedThreadPool:創(chuàng)建使用固定線程數(shù)的。適用于需要限制當(dāng)前線程數(shù)量的應(yīng)用場景。
-
-
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
* `SingleThreadExecutor`:創(chuàng)建單個線程。
public static ExecutorService newSingleThreadExecutor()
* `CachedThreadPool`:大小無界的線程池,適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者是負載較輕的服務(wù)器。
corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,工作隊列為SynchronousQueue。會出現(xiàn)極端情況,即用戶提交任務(wù)速度高于線程處理任務(wù)速度,會不斷的創(chuàng)建新線程,直到耗盡CPU和內(nèi)存。每個線程在結(jié)束任務(wù)60s之后,會被終止。
-
ScheduledThreadPoolExecutor通過工廠類Executor來創(chuàng)建,有2種類型。-
ScheduledThreadPoolExecutor:包含若干個線程的ScheduledThreadPoolExecutor。 -
SingleThreadScheduledExecutor:只包含一個線程的ScheduledThreadPoolExecutor。
-
Future接口
使用submit提交的時候,會返回一個實現(xiàn)Future接口的對象。Runnable接口和Callable接口
都可以被ThreadPoolExecutor執(zhí)行,區(qū)別是Runnable接口不會返回結(jié)果,而Callable接口可以返回結(jié)果。
可以把一個Runnable對象包裝成一個Callable對象。
public static Callable<Object> callable(Runnable task)
//當(dāng)任務(wù)執(zhí)行結(jié)束,F(xiàn)uture.get()得到null
public static <T> Callable<T> callable(Runnable task, T result))
//當(dāng)任務(wù)執(zhí)行結(jié)束,F(xiàn)uture.get()得到result對象。
ScheduledThreadPoolExecutor 詳解
主要用來在給定的延遲之后運行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecutor的功能和Timer類似,但ScheduledThreadPoolExecutor功能更強大靈活,對應(yīng)過個后臺線程,Timer對應(yīng)的是單個后臺線程。
運行機制
- 調(diào)用
ScheduledThreadPoolExecutor的scheduleAtFixedRate()或者scheduleWithFixedDelay()方法是,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現(xiàn)了RunnableScheduledFutur接口的ScheduledFutureTask。DelayQueue是無界的,所以maximunPoolSize無效。DelayQueue只能獲取到時間已經(jīng)到期的元素。 - 線程池中的線程從
DelayQueue種獲取ScheduledFutureTask,然后執(zhí)行任務(wù)。
ScheduleFutureTask主要包含3個成員變量:
- long型成員變量time,表示這個任務(wù)將要執(zhí)行的具體時間。
- long型成員變量
sequenceNumber,表示這個任務(wù)在線程池中的序號。 - long型成員變量
period,表示任務(wù)執(zhí)行的間隔周期。
FutureTask詳解
Future接口和實現(xiàn)Future接口的FutureTask類,代表異步計算的結(jié)果。
FutureTask的使用
可以把FutureTask交給Executor執(zhí)行;也可以通過ExecutorService.submit(...)方法返回一個FutureTask,然后執(zhí)行FutureTask.get()方法或者FutureTask.cancel(...)方法。除此以外,還可以單獨使用FutureTask。
package com.future;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class TutureTaskTest {
private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<Object, Future<String>>();
private String executionTask(final String taskName) throws ExecutionException, InterruptedException{
while(true){
Future<String> future = taskCache.get(taskName);
if(future == null){//沒有該任務(wù)
Callable<String> task = new Callable<String>(){
@Override
public String call() throws Exception {
return taskName;
}
};
//創(chuàng)建任務(wù)
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask);//如果這個key不存在就put返回null,否則不put并返回以前的值。
if(future == null){//put成功返回null
future = futureTask;
futureTask.run();//此次新添加了這個任務(wù),執(zhí)行這個任務(wù)
}
}
try{
return future.get();//等待該線程執(zhí)行完任務(wù)之后返回
}catch(CancellationException e){
taskCache.remove(taskName, future);
}
}
}
public static void main(String[] args) {
}
}