Java線程池簡介

ThreadPoolExecutor解析

Java里線程池的基本接口是 Executor:

public interface Executor {
        void execute(Runnable command);
}

實(shí)現(xiàn)線程池的類是ThreadPoolExecutor,最主要的構(gòu)造方法如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

參數(shù)解析:

  • corePoolSize 核心線程數(shù)
  • maximumPoolSize 最大線程數(shù)
  • keepAliveTime 線程空閑以后存活時(shí)間。通常在線程數(shù)大于核心線程數(shù)時(shí)才生效,直到存活線程數(shù)等于核心線程數(shù)
  • unit 時(shí)間單位
  • workQueue 存儲任務(wù)的阻塞隊(duì)列
  • threadFactory 用來創(chuàng)建線程的線程工廠
  • handler 拒絕任務(wù)時(shí)的策略,通常有以下幾種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù) 

成員變量:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是原子的整數(shù)值,它用來記錄線程池的狀態(tài) 和 當(dāng)前線程池中線程 數(shù)量,初始值為RUNNING狀態(tài),線程數(shù)為0

private static final int COUNT_BITS = Integer.SIZE - 3;

Integer.SIZE表示int類型數(shù)據(jù)字節(jié)數(shù)(32),COUNT_BITS表示線程數(shù)量占據(jù)的位數(shù)(29)

    //線程池最大容量(線程數(shù)) 00011111 11111111 11111111 11111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//線程池狀態(tài)
    // runState is stored in the high-order bits
    //接受新任務(wù)并且處理阻塞隊(duì)列里的任務(wù)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //拒絕新任務(wù)但是處理阻塞隊(duì)列里的任務(wù)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //拒絕新任務(wù)并且拋棄阻塞隊(duì)列里的任務(wù)同時(shí)會中斷正在處理的任務(wù)
    private static final int STOP       =  1 << COUNT_BITS;
    //所有任務(wù)都執(zhí)行完(包含阻塞隊(duì)列里面任務(wù))當(dāng)前線程池活動線程為0,將要調(diào)用terminated方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    //終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)
    private static final int TERMINATED =  3 << COUNT_BITS;

    private static int ctlOf(int rs, int wc) { return rs | wc; }

可以看到,表示狀態(tài)的數(shù)據(jù)全部左移29位,存儲在int數(shù)值的前三位,然后通過ctlOf方法,將狀態(tài)和線程數(shù)兩個(gè)值位或操作結(jié)合起來,這樣就得到了ctl值,也就是說上面的ctl后29位用來存儲線程數(shù),前3位存儲線程池狀態(tài)。

執(zhí)行過程:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        //獲取當(dāng)前線程池的狀態(tài)
        int c = ctl.get();

        //判斷線程數(shù)是否小于核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            //尚未達(dá)到核心線程數(shù),直接添加線程執(zhí)行任務(wù)
            if (addWorker(command, true))
                return;
            //如果添加線程失敗,重新獲取線程池狀態(tài)
            c = ctl.get();
        }
        //已經(jīng)達(dá)到核心線程數(shù),或者添加線程失敗,繼續(xù)執(zhí)行:
        //判斷線程池是否處于Running狀態(tài),如果是,添加任務(wù)到隊(duì)列中

        if (isRunning(c) && workQueue.offer(command)) {
            //再次檢查線程池狀態(tài)
            int recheck = ctl.get();
            
            if (! isRunning(recheck) && remove(command))
            //如果不是處于Running狀態(tài),就移除任務(wù),移除成功以后執(zhí)行拒絕策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                //如果是處于非Running狀態(tài),但是任務(wù)移除失敗,或者處于Running狀態(tài),但是線程數(shù)為0,新建線程
                addWorker(null, false);
        }
        //不是Running狀態(tài),或者任務(wù)添加失敗了(隊(duì)列已滿),進(jìn)入addWorker,執(zhí)行command
        else if (!addWorker(command, false))
        //添加線程失敗則拒絕任務(wù)
            reject(command);
    }

    private static int workerCountOf(int c)  { return c & CAPACITY; }

addWorker方法接受兩個(gè)參數(shù),command為待執(zhí)行的任務(wù)對象,coretrue表示添加核心線程,false表示不需要使用核心線程。

execute執(zhí)行流程如下:

  1. 如果commad為空,直接拋出異常

  2. 獲取線程池狀態(tài),如果線程數(shù)小于核心線程數(shù),直接調(diào)用addWorker方法,創(chuàng)建核心線程來執(zhí)行任務(wù)

  3. 核心線程創(chuàng)建成功,則直接返回。如果核心線程創(chuàng)建失敗,說明當(dāng)前線程池核心線程已經(jīng)滿了或者線程池被關(guān)閉了等異常情況,任務(wù)沒有辦法執(zhí)行,那么重新獲取線程池狀態(tài),執(zhí)行下一步。

  1. 再次判斷線程池是否處于Running狀態(tài)。如果是,添加command到任務(wù)隊(duì)列中,執(zhí)行下面的步驟5;如果不是Running狀態(tài)或者添加command失敗,跳到步驟9

  2. 任務(wù)添加成功以后,等待線程來執(zhí)行它,此時(shí)會再次檢查狀態(tài),進(jìn)入下面6,7,8三個(gè)步驟

  3. 如果此時(shí)線程池不是Running狀態(tài)了,把剛剛添加的任務(wù)移除掉,執(zhí)行拒絕策略,execute流程結(jié)束,任務(wù)執(zhí)行失敗。

  4. 如果線程池仍然是Running狀態(tài),或者線程池不是Running狀態(tài),但是移除任務(wù)失?。⊿hutdown時(shí),不接受新任務(wù),但是該任務(wù)還沒有執(zhí)行完,移除不掉),此時(shí)仍然有任務(wù)需要執(zhí)行,但是池內(nèi)的線程數(shù)為0,則調(diào)用addWorker方法添加一個(gè)null對象,創(chuàng)建一個(gè)非核心線程來執(zhí)行任務(wù),execute流程結(jié)束,任務(wù)會被執(zhí)行,但是線程池可能會在任務(wù)執(zhí)行完畢之后結(jié)束。

  5. 如果線程池仍然是Running狀態(tài),而且線程池內(nèi)的線程數(shù)也不是0,那么execute流程到此結(jié)束,任務(wù)添加到workQueue中,線程池狀態(tài)正常。

  6. 線程池處于非Running狀態(tài),或者任務(wù)添加失?。ū热珀?duì)列已滿),調(diào)用addWorker方法,通過非核心線程來處理command

線程數(shù)=worker數(shù),worker執(zhí)行完當(dāng)前任務(wù)以后會從隊(duì)列里取出任務(wù)來執(zhí)行,沒有任務(wù)的時(shí)候只保留核心線程數(shù)的worker運(yùn)行。

addWorker的執(zhí)行步驟:

private boolean addWorker(Runnable firstTask, boolean core) {

    //該循環(huán)用來更新ctl的值(自增),addWorker可能會被多線程同時(shí)調(diào)用,所以更新ctl可能會失敗,如果失敗則重新讀取,繼續(xù)更新ctl值,更新成功則跳出循環(huán),開始創(chuàng)建線程
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

//檢查線程池狀態(tài),分為幾種情況:
//1.當(dāng)前狀態(tài)大于Shutdown,直接返回false,說明線程池已經(jīng)被關(guān)閉了
//2.當(dāng)前狀態(tài)等于Shutdown,說明線程池正在關(guān)閉中,此時(shí)來判斷傳入的commad是否為null,并且! workQueue.isEmpty(),如果三項(xiàng)有一項(xiàng)不滿足,直接返回false。這種情況對應(yīng)的是execute中的addWorker(null, false);這一句指令,線程在關(guān)閉過程中,但是還有任務(wù)在等待執(zhí)行的情況。

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
//到這里說明線程池狀態(tài)更新成功了,開始創(chuàng)建線程執(zhí)行任務(wù)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //創(chuàng)建一個(gè)持有當(dāng)前任務(wù)的worker
            w = new Worker(firstTask);
            //Worker里的線程是通過制定的 ThreadFactory 來創(chuàng)建的
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //此處對workers對象進(jìn)行操作,并發(fā)訪問的時(shí)候必須加鎖,保證不會重復(fù)添加線程
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //線程添加成功,開始執(zhí)行任務(wù)
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
            
        } finally {
            //最終會走到這里,判斷任務(wù)有沒有成功執(zhí)行,未成功的話說明線程沒有成功運(yùn)行起來,回滾ctl的狀態(tài)。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

該方法主要分為兩個(gè)步驟,第一步是對ctl狀態(tài)更新,第二步是狀態(tài)更新完成以后,創(chuàng)建線程并執(zhí)行任務(wù)。

第一步主要為兩個(gè)循環(huán),外層的循環(huán)用來獲取線程池狀態(tài),并判斷是否滿足創(chuàng)建線程的條件,如果滿足,進(jìn)入內(nèi)部的循環(huán),嘗試對ctl進(jìn)行自增操作(線程數(shù)+1)

考慮到并發(fā)問題,自增可能會失敗,失敗以后,再一次自增之前,線程池的狀態(tài)可能發(fā)生變化,所以需要再次獲取線程池狀態(tài),如果沒有變化再次嘗試自增,如果已經(jīng)變了,回到外部的循環(huán),重新判斷是否滿足自增條件。

滿足自增條件有幾下幾種情況:

  1. 線程池狀態(tài) > shutdown: 說明線程池已經(jīng)被關(guān)閉了,直接返回false,不再創(chuàng)建線程

  2. 線程池 = shutdown,此時(shí)再判斷 firstTask == null && !workQueue.isEmpty(),其實(shí)對應(yīng)的是execute方法中的addWorker(null, false);語句,說明線程池正在關(guān)閉,但是還有未執(zhí)行的任務(wù),此時(shí)也需要創(chuàng)建線程。傳入的command不是null,或者任務(wù)隊(duì)列為空,也會直接返回false,不再創(chuàng)建線程。

確定需要創(chuàng)建線程并且自增操作成功,線程成狀態(tài)更新完成以后,開始真正的線程創(chuàng)建和任務(wù)執(zhí)行工作

需要注意的是workers變量,它存儲的對象為Worker,實(shí)際上是對任務(wù)和線程的包裝,workers是一個(gè)HashSet,是線程不安全的,對它的操作需要加鎖。

第二步線程創(chuàng)建和執(zhí)行的步驟如下:

  1. 新建包含當(dāng)前任務(wù)的Worker對象,獲取到該對象包含的線程(在Worker的構(gòu)造函數(shù)中,通過指定的線程工廠創(chuàng)建,創(chuàng)建對象時(shí)已經(jīng)生成了)。

  2. 如果線程t不為空,說明創(chuàng)建線程成功,開始獲取鎖,檢查線程池和線程狀態(tài),更新workers變量最后執(zhí)行t.start開始執(zhí)行任務(wù)

關(guān)鍵的t.start方法是怎樣執(zhí)行任務(wù)的,還要看接下來的Worker對象:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
         /** Thread this worker is running in.  Null if factory fails. */
         final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
    }

上邊的源碼只列出了幾個(gè)關(guān)鍵部分,可以看到Worker繼承了Runnable,并重寫了run方法,而構(gòu)造函數(shù)中創(chuàng)建線程時(shí)傳入的對象就是Worker本身,所以t.start方法首先執(zhí)行的是Workerrun方法,run方法里只有一句runWorker(this),實(shí)際上最終執(zhí)行任務(wù)是通過runWorker來執(zhí)行的。

下面是runWorker的源碼:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


     private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

簡單來說,runWorker的作用如下:

  1. 如果是新創(chuàng)建的worker,第一次啟動會執(zhí)行當(dāng)前worker內(nèi)的任務(wù),執(zhí)行完之后會依次從workQueue中取出任務(wù)執(zhí)行。

  2. 如果workQueue為空,那么等待keepAliveTime時(shí)間,workQueue仍然為空,結(jié)束循環(huán),線程也就結(jié)束了。

Executors類

Executors是Java線程池的工具類,它內(nèi)部實(shí)現(xiàn)了4中常用的線程池:

newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可緩存線程池:
沒有核心線程,可以創(chuàng)建無上限的普通線程,如果某個(gè)線程超過60s沒有任務(wù),結(jié)束該線程

任務(wù)隊(duì)列為SynchronousQueue,它內(nèi)部不存儲數(shù)據(jù),前一個(gè)數(shù)據(jù)被取走之后,后一個(gè)才能存進(jìn)來。也就是說,execute方法執(zhí)行的時(shí)候,如果command添加到隊(duì)列失敗,說明SynchronousQueue中的任務(wù)沒有被取走,直接新開一個(gè)線程運(yùn)行任務(wù),如果添加成功,任務(wù)會在隊(duì)列中等待空閑進(jìn)程來取走它。

newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定長度線程池:

核心線程和最大線程數(shù)都是指定數(shù)值,不設(shè)置超時(shí)(設(shè)置也沒有意義,核心線程不會退出,除非設(shè)置了allowCoreThreadTimeOut),任務(wù)大于線程數(shù)時(shí)放在LinkedBlockingQueue中排隊(duì)。

newScheduledThreadPool


   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

可指定核心線程數(shù),最大線程數(shù)無上限,有默認(rèn)的超時(shí)時(shí)間,可以執(zhí)行周期性的任務(wù),它執(zhí)行任務(wù)主要調(diào)用的是schedule方法:

  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

創(chuàng)建一個(gè)單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序執(zhí)行.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容