Executor框架學(xué)習(xí)

image.png

引言

在不斷的學(xué)習(xí)中提升自己,收獲成就感。
積累知識(shí)是給未來的自己最好的禮物。

1.Executor簡(jiǎn)介

在Executor接口介紹中定義了它的身份:一個(gè)用于執(zhí)行被提交的任務(wù)的對(duì)象。這個(gè)接口提供了一種降低任務(wù)耦合的技術(shù),包括了任務(wù)怎么運(yùn)行,調(diào)度等。接口內(nèi)部就聲明了一個(gè)方法void execute(Runnable command);該方法只接受實(shí)現(xiàn)了Runnable接口的任務(wù)對(duì)象。
線程創(chuàng)建可以通過Executors類中的靜態(tài)方法創(chuàng)建,只需要在接口中傳入需要配置的參數(shù)即可。Executors它的內(nèi)部提供了一些線程池的方法,算是一個(gè)線程池工廠吧。

2. ThreadPoolExecutor

普通線程池執(zhí)行器,執(zhí)行每個(gè)被提交的任務(wù),一般通過Executors中的工廠方法進(jìn)行配置。它有效解決了兩種問題:(1)提升了線程執(zhí)行大批量異步任務(wù)的表現(xiàn),因?yàn)樗档土司€程管理的耗費(fèi),不需要手動(dòng)去創(chuàng)建線程和管理線程的生命周期。(2)還提供了一種在任務(wù)執(zhí)行期間的資源(包括線程和消耗)跳轉(zhuǎn)和管理手段。它還能提供一些統(tǒng)計(jì)能力,如已經(jīng)完成的線程數(shù)目。

2.1重要的屬性

(1)AtomicInteger
原子性的整型封裝對(duì)象,可以用它安全的獲取到當(dāng)前線程數(shù)。它里面保持的整型由兩部分組成,高三位是運(yùn)行狀態(tài),低29位是線程數(shù)量
(2)CAPACITY
容量總值是1左移29位減1,就是29個(gè)1,約500百萬個(gè)線程數(shù)。
(3)runState
運(yùn)行狀態(tài)有5種,占據(jù)了整型的高三位,RUNNING(-1<<29),SHUTDOWN(0<<29),STOP(1<<29),TIDYING(2<<29),TERMINATED(3<<29)。運(yùn)行狀態(tài)是RUNNING->SHUTDOWN(執(zhí)行方法shutdown()||finalize())-> STOP(執(zhí)行方法shutdownNow())-> TIDYING(池和隊(duì)列都空了)-> TERMINATED(執(zhí)行方法terminated(),鉤子方法完成)。awaitTermination()方法的會(huì)在線程狀態(tài)到達(dá)TERMINATED時(shí)產(chǎn)生返回值。
(3)workerCount
當(dāng)前運(yùn)行的線程計(jì)數(shù)值,通過CAS對(duì)線程統(tǒng)計(jì)值進(jìn)行加減操作
(4)workQueue
存儲(chǔ)待執(zhí)行的任務(wù)

2.2構(gòu)造函數(shù)

public ThreadPoolExecutor(int corePoolSize,     //線程池中線程需要保持的個(gè)數(shù),甚至是當(dāng)他們處于空閑狀態(tài)。

                              intmaximumPoolSize,  //最大允許的線程個(gè)數(shù)

                              longkeepAliveTime,    //當(dāng)線程池中線程數(shù)超過了corePoolSize,閑置線程會(huì)等待結(jié)束的時(shí)間。

                              TimeUnit unit,    // keepAliveTime的時(shí)間單位

                             BlockingQueue workQueue, //用來存放由execute方法提交的任務(wù)

                              ThreadFactorythreadFactory,   //用來創(chuàng)建新的線程

                              RejectedExecutionHandlerhandler)   //當(dāng)隊(duì)列滿的時(shí)候,任務(wù)提交阻塞的拒絕處理器

2.3 任務(wù)執(zhí)行方法

提交一個(gè)實(shí)現(xiàn)了Runnable接口的任務(wù),然后由線程池內(nèi)部去執(zhí)行,至于是新建線程還是選擇哪個(gè)已有的線程去執(zhí)行讓線程池自行決定。

public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       int c = ctl.get();
//當(dāng)運(yùn)行的線程數(shù)量少于corePoolSize,嘗試新建一個(gè)線程(worker)去執(zhí)行任務(wù)。
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
                return;
    //如果添加線程失敗,重新獲取運(yùn)行狀態(tài)和線程計(jì)數(shù)值
          c = ctl.get();
       }
     //如果線程池的狀態(tài)是isRunning,且阻塞隊(duì)列中還能添加
       if (isRunning(c) && workQueue.offer(command)) {
    //再次獲取計(jì)數(shù)和運(yùn)行狀態(tài)
           int recheck = ctl.get();
   //如果狀態(tài)不是isRunning,且能夠從阻塞隊(duì)列中移除
           if (! isRunning(recheck) && remove(command))
   //那么拒絕這個(gè)新任務(wù)
                reject(command);
   //如果沒有任務(wù)在運(yùn)行,就新增加一個(gè)worker,這個(gè)worker中不能再放入task了,因?yàn)樯厦嬉呀?jīng)放入到queue中了。
           else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
       }
   //如果增加worker失敗,那么拒絕新任務(wù)。
       else if (!addWorker(command, false))
           reject(command);
}

2.4addWorker

增加worker,傳入是否在core中增加的布爾值和任務(wù)。

private boolean addWorker(RunnablefirstTask, boolean core) {
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);
           // Check if queue empty only if necessary.
     //這個(gè)邏輯寫的也太混亂了,理解出來后就是:rs>SHUTDOWN,
     //或rs>=SHUTDOWN且firsttask不是空,或rs>=SHUTDOWN且workQueue是空的。
     //因?yàn)镽UNNING是一個(gè)負(fù)數(shù),所以運(yùn)行狀態(tài)的值是小于SHUTDOWN狀態(tài)的。
           if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
           for (;;) {
               //獲取線程個(gè)數(shù)統(tǒng)計(jì)值
                int wc = workerCountOf(c);
               //當(dāng)core是true時(shí)(即想在添加常駐線程),線程計(jì)數(shù)不能超過corePoolSize,否則不能超過maximumPoolSize。
                if (wc >= CAPACITY ||
                    wc >= (core ?corePoolSize : maximumPoolSize))
                    return false;
                //線程統(tǒng)計(jì)值+1,意味著worker還未新增就先增加一個(gè)計(jì)數(shù)值。
                if(compareAndIncrementWorkerCount(c))
                    break retry;
                //如果統(tǒng)計(jì)值增加失敗,重新獲取原子integer。
                c = ctl.get();  // Re-read ctl
                //運(yùn)行狀態(tài)改變了的話,回到最初的循環(huán)點(diǎn)。
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due toworkerCount change; retry inner loop
           }
       }
       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           w = new Worker(firstTask);
           final Thread t = w.thread;
           if (t != null) {
            //獲取線程池的主鎖,給它鎖上,添加線程是一個(gè)原子操作,只能被當(dāng)前線程操作。
                final ReentrantLock mainLock =this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holdinglock.
                    // Back out onThreadFactory failure or if
                    // shut down before lockacquired.
                    int rs =runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN&& firstTask == null)) {
              //如果線程還沒使用就處于運(yùn)行狀態(tài),直接報(bào)錯(cuò),不知道什么情況下會(huì)這樣
                        if (t.isAlive()) //precheck that t is startable
                            throw newIllegalThreadStateException();
                  //如果前面都正常,就在工作者隊(duì)列中增加一個(gè)新的工作者。
                        workers.add(w);
                        int s = workers.size();
                   //當(dāng)前池中的線程最大個(gè)數(shù)
                        if (s >largestPoolSize)
                            largestPoolSize =s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
           }
       } finally {
           //如果worker起不來就移除這個(gè)worker,并減去計(jì)數(shù)值。
           if (! workerStarted)
                addWorkerFailed(w);
       }
       //就是說明了增加成功標(biāo)志是,成功啟動(dòng)。
       return workerStarted;
}

3. ForkJoinPool

它是將任務(wù)拆分成更小的任務(wù),然后通過并行運(yùn)算多個(gè)小任務(wù)再合并小任務(wù)的結(jié)果的形式加速總體任務(wù)的計(jì)算效率。從這里可以發(fā)現(xiàn)兩個(gè)特征:小任務(wù)的結(jié)果,拆分合并,說明了兩個(gè)問題,它執(zhí)行的任務(wù)是可以拆分的,是有結(jié)果的,而不是void。感覺這個(gè)東西和現(xiàn)在的大數(shù)據(jù)基本思想是一致的:分布式任務(wù)執(zhí)行。

3.1重要的屬性

(1)WorkQueue工作者隊(duì)列,內(nèi)部用數(shù)組存放各個(gè)待執(zhí)行的Task。
(2)ForkJoinWorkerThread工作線程,繼承自Thread,因此它才是真正用來執(zhí)行任務(wù)的最小單位。
(3)ForkJoinTask工作任務(wù),繼承自Future,F(xiàn)uture是有返回值的異步實(shí)現(xiàn)方式,跟Runnable不同。它是用來定義實(shí)際任務(wù)的。

3.2構(gòu)造函數(shù)

      public ForkJoinPool(int parallelism, //并行數(shù),默認(rèn)為可以獲取到的處理器個(gè)數(shù)
                       ForkJoinWorkerThreadFactory factory, //用于創(chuàng)建新線程的工廠
                       UncaughtExceptionHandler handler,   //處理任務(wù)執(zhí)行過程中不可回收的異常
                        boolean asyncMode) {   //true對(duì)應(yīng)的FIFO模式,false對(duì)應(yīng)的是LIFO模式
       this(checkParallelism(parallelism),
            checkFactory(factory),
            handler,
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
            "ForkJoinPool-" +nextPoolId() + "-worker-");
       checkPermission();
    }

3.3 任務(wù)執(zhí)行方法

外部通過調(diào)用execute方法傳入Task執(zhí)行,實(shí)際上內(nèi)部都是使用externalPush去執(zhí)行了Task中需要執(zhí)行的任務(wù)。

public voidexecute(Runnable task) {  //傳入的是Runnable的實(shí)現(xiàn)類
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask job;
        if (task instanceofForkJoinTask) // avoid re-wrap
            job = (ForkJoinTask) task;
        else
            job = newForkJoinTask.RunnableExecuteAction(task);   //在這里需要進(jìn)行包裝一下,包裝成ForkJoinTask類型。
        externalPush(job);
    }

  final void externalPush(ForkJoinTasktask) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null&& (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) !=null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0,1)) {
            ForkJoinTask[] a; int am,n, s;
            if ((a = q.array) != null&&
                (am = a.length - 1) > (n = (s = q.top)- q.base)) {
                int j = ((am & s) <
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s +1);
                U.putIntVolatile(q, QLOCK, 0);
               if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1,0);
        }
        externalSubmit(task);
    }

3.4 舉例

public class Test {

   public static void main(String[] args) throws InterruptedException,ExecutionException {
       int[] arr = new int[100];
       Random random = new Random();
       int total = 0;
       for (int i = 0, len = arr.length; i < len; i++) {
           int temp = random.nextInt(20);
           total += (arr[i] = temp);
       }
       System.out.println("初始化數(shù)組總和:" + total);
       SumTask task = new SumTask(arr, 0, arr.length);
       //設(shè)置并行數(shù)目
       ExecutorService pool = Executors.newWorkStealingPool(2);
       //提交分解的SumTask 任務(wù),以及result類型
       Future future = pool.submit(task);
       System.out.println(future.get());
        pool.shutdown();
    }

   static class SumTask implements Callable {
       private static final int THRESHOLD = 20;
       private int array[];
       private int start;
       private int end;
       public SumTask(int[] array, int start, int end) {
           this.array = array;
           this.start = start;
           this.end = end;
       }

       @Override
       public Integer call() throws Exception {
           int sum = 0;
           if (end - start < THRESHOLD) {
                for (int i = start; i < end;i++) {
                    sum = sum + array[i];
                }
                return sum;
           } else { //當(dāng)end -start > threshold , 將大任務(wù)拆分成小任務(wù)
                int middle = (start + end) / 2;
                SumTask left = newSumTask(array, start, middle);
                SumTask right = newSumTask(array, middle, end);
                int x = left.call();
                int y = right.call();
                return x + y;
           }
       }
    }
}

4. ScheduledThreadPoolExecutor

在ThreadPoolExecutor前面加上了Scheduled的修飾(實(shí)際上它就是繼承了ThreadPoolExecutor),從字面上理解,它是ThreadPoolExecutor升級(jí)版,能按照規(guī)劃的方式執(zhí)行給定的任務(wù)。如可以給任務(wù)設(shè)置延時(shí),設(shè)置運(yùn)行周期,時(shí)間間隔等。

4.1構(gòu)造函數(shù)

    //使用父類的構(gòu)造函數(shù),傳入的queue為DelayedWorkQueue,時(shí)間單位納秒,池的最大值整型最大值。
        public ScheduledThreadPoolExecutor(intcorePoolSize,
                                      ThreadFactory threadFactory,
                                      RejectedExecutionHandler handler) {
       super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(),threadFactory, handler);
}

4.2 ScheduledFutureTask

它是ScheduledThreadPoolExecutor中的私有內(nèi)部類,使用period來記錄多久后重復(fù)執(zhí)行;用time來記錄他執(zhí)行前需要等待的時(shí)間,也就是延時(shí)時(shí)間。

//內(nèi)部的run方法
  public void run() {
            boolean periodic =isPeriodic();   //如果傳入的周期時(shí)間是0,就返回false
            if(!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
               ScheduledFutureTask.super.run();  //如果不是周期性的,調(diào)用父類的run方法,直接執(zhí)行任務(wù)
            else if(ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();  //如果是周期型的,設(shè)置下次運(yùn)行的時(shí)間
               reExecutePeriodic(outerTask);  //放入隊(duì)列中,等時(shí)間到了就執(zhí)行。
            }
        }

4.3 任務(wù)執(zhí)行方法

execute(submit)方法在這里都是調(diào)用的schedule方法,然后ScheduledThreadPoolExecutor又新增了兩個(gè)重要的方法:scheduleAtFixedRate,scheduleWithFixedDelay,這個(gè)兩個(gè)方法支持了線程執(zhí)行的周期和延時(shí)控制,方法內(nèi)對(duì)command進(jìn)行了包裝,然后有調(diào)用了delayedExecute方法

//傳入包裝好的Task
    private voiddelayedExecute(RunnableScheduledFuture task) {
        if (isShutdown())
            reject(task);   //如果線程池的狀態(tài)是已經(jīng)shutdown的,那么拒絕該任務(wù)
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
               !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);       
            else
                ensurePrestart();    //調(diào)用addwork方法,增加worker,與ThreadPoolExecutor相同。
        }
    }

原創(chuàng)文章轉(zhuǎn)載請(qǐng)標(biāo)明出處
更多文章請(qǐng)查看
http://www.canfeng.xyz

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.簡(jiǎn)介 如果并發(fā)的線程數(shù)量很多,并且每個(gè)線程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會(huì)大大降低系統(tǒng)...
    htkeepmoving閱讀 606評(píng)論 0 2
  • 前段時(shí)間遇到這樣一個(gè)問題,有人問微信朋友圈的上傳圖片的功能怎么做才能讓用戶的等待時(shí)間較短,比如說一下上傳9張圖片,...
    加油碼農(nóng)閱讀 1,288評(píng)論 0 2
  • Executor框架的兩級(jí)調(diào)度模型 在HotSpot VM的線程模型中,Java線程(java.lang.Thre...
    先生zeng閱讀 572評(píng)論 0 10
  • 一.線程安全性 線程安全是建立在對(duì)于對(duì)象狀態(tài)訪問操作進(jìn)行管理,特別是對(duì)共享的與可變的狀態(tài)的訪問 解釋下上面的話: ...
    黃大大吃不胖閱讀 972評(píng)論 0 3
  • 1990年一個(gè)多雨的夏天,楓子呱呱墜地,至此的很多年,楓子的媽媽說每逢她的生日都是雨水或陰天。不過,這不影響她陽(yáng)光...
    小暮閱讀 618評(píng)論 0 1

友情鏈接更多精彩內(nèi)容