Executor框架

Java的線程既是工作單元,也是執(zhí)行機制。JDK5開始,把工作單元和執(zhí)行機制分離開,工作單元包括RunnableCallable,而執(zhí)行機制由Executor框架提供。

Executor框架簡介

Executor框架的結(jié)構(gòu)和成員

  • Executor框架的結(jié)構(gòu):主要由下面3部分組成。

    • 任務(wù):被執(zhí)行的任務(wù)需要實現(xiàn)Runnable接口或Callable接口。
    • 任務(wù)的執(zhí)行:包括任務(wù)執(zhí)行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關(guān)鍵類實現(xiàn)了ExecutorService接口(ThreadPoolExecutorScheduledThreadPoolExecutor)。
    • 異步計算的結(jié)果:包括接口Future和實現(xiàn)Future接口的FutureTask類。
  • 主要的類和接口

    • Executor是一個接口,是Executor框架的基礎(chǔ),它將任務(wù)的提交和任務(wù)的執(zhí)行分離開來。
    • ThreadPoolExecutor是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務(wù)。
    • ScheduledThreadPoolExecutor是一個實現(xiàn)類,可以在給定的延遲后運行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutorTimer更靈活,功能更強大。
    • Future接口和實現(xiàn)Future接口的FutureTask類,代表異步計算的結(jié)果。
    • RunnableCallable接口的實現(xiàn)類,具體執(zhí)行任務(wù)的類。
  • Executor框架的成員
    介紹Executor框架的主要成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。

    • ThreadPoolExecutor通常使用工廠類Executors來創(chuàng)建。有3中類型。
      • FixedThreadPool:創(chuàng)建使用固定線程數(shù)的。適用于需要限制當(dāng)前線程數(shù)量的應(yīng)用場景。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
* `SingleThreadExecutor`:創(chuàng)建單個線程。
public static ExecutorService newSingleThreadExecutor()
* `CachedThreadPool`:大小無界的線程池,適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者是負載較輕的服務(wù)器。

corePoolSize為0,maximumPoolSizeInteger.MAX_VALUE,工作隊列為SynchronousQueue。會出現(xiàn)極端情況,即用戶提交任務(wù)速度高于線程處理任務(wù)速度,會不斷的創(chuàng)建新線程,直到耗盡CPU和內(nèi)存。每個線程在結(jié)束任務(wù)60s之后,會被終止。

  • ScheduledThreadPoolExecutor通過工廠類Executor來創(chuàng)建,有2種類型。

    • ScheduledThreadPoolExecutor:包含若干個線程的ScheduledThreadPoolExecutor。
    • SingleThreadScheduledExecutor:只包含一個線程的ScheduledThreadPoolExecutor。
  • Future接口
    使用submit提交的時候,會返回一個實現(xiàn)Future接口的對象。

  • Runnable接口和Callable接口
    都可以被ThreadPoolExecutor執(zhí)行,區(qū)別是Runnable接口不會返回結(jié)果,而Callable接口可以返回結(jié)果。
    可以把一個Runnable對象包裝成一個Callable對象。

public static Callable<Object> callable(Runnable task)
//當(dāng)任務(wù)執(zhí)行結(jié)束,F(xiàn)uture.get()得到null
public static <T> Callable<T> callable(Runnable task, T result))
//當(dāng)任務(wù)執(zhí)行結(jié)束,F(xiàn)uture.get()得到result對象。

ScheduledThreadPoolExecutor 詳解

主要用來在給定的延遲之后運行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecutor的功能和Timer類似,但ScheduledThreadPoolExecutor功能更強大靈活,對應(yīng)過個后臺線程,Timer對應(yīng)的是單個后臺線程。

運行機制

  • 調(diào)用ScheduledThreadPoolExecutorscheduleAtFixedRate()或者scheduleWithFixedDelay()方法是,會向ScheduledThreadPoolExecutorDelayQueue添加一個實現(xiàn)了RunnableScheduledFutur接口的ScheduledFutureTaskDelayQueue是無界的,所以maximunPoolSize無效。DelayQueue只能獲取到時間已經(jīng)到期的元素。
  • 線程池中的線程從DelayQueue種獲取ScheduledFutureTask,然后執(zhí)行任務(wù)。

ScheduleFutureTask主要包含3個成員變量:

  • long型成員變量time,表示這個任務(wù)將要執(zhí)行的具體時間。
  • long型成員變量sequenceNumber,表示這個任務(wù)在線程池中的序號。
  • long型成員變量period,表示任務(wù)執(zhí)行的間隔周期。

FutureTask詳解

Future接口和實現(xiàn)Future接口的FutureTask類,代表異步計算的結(jié)果。

FutureTask的使用

可以把FutureTask交給Executor執(zhí)行;也可以通過ExecutorService.submit(...)方法返回一個FutureTask,然后執(zhí)行FutureTask.get()方法或者FutureTask.cancel(...)方法。除此以外,還可以單獨使用FutureTask。

package com.future;

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class TutureTaskTest {
    private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<Object, Future<String>>();
    private String executionTask(final String taskName) throws ExecutionException, InterruptedException{
        while(true){
            Future<String> future = taskCache.get(taskName);
            if(future == null){//沒有該任務(wù)
                Callable<String> task = new Callable<String>(){
                    @Override
                    public String call() throws Exception {
                        return taskName;
                    }
                };
                //創(chuàng)建任務(wù)
                FutureTask<String> futureTask = new FutureTask<String>(task);
                future = taskCache.putIfAbsent(taskName, futureTask);//如果這個key不存在就put返回null,否則不put并返回以前的值。
                if(future == null){//put成功返回null
                    future = futureTask;
                    futureTask.run();//此次新添加了這個任務(wù),執(zhí)行這個任務(wù)
                }
            }
            try{
                return future.get();//等待該線程執(zhí)行完任務(wù)之后返回
            }catch(CancellationException e){
                taskCache.remove(taskName, future);
            }
        }
    }
    public static void main(String[] args) {
        
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容