【JDK7源碼閱讀計(jì)劃】線(xiàn)程池

夢(mèng)想在沒(méi)有實(shí)現(xiàn)之前,不必對(duì)他人講。



先從全局看問(wèn)題總是沒(méi)錯(cuò)的,線(xiàn)程池的繼承體系:

Executors 是一個(gè)用來(lái)生產(chǎn)線(xiàn)程池的靜態(tài)工廠,可以通過(guò)該類(lèi)生產(chǎn)ExecutorService、ScheduledExecutorService等對(duì)象。

在 Executors 這個(gè)類(lèi)里面,定義了這么幾種常用的線(xiàn)程池:


 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
 }

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}

 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

這幾種線(xiàn)程池都構(gòu)造了ThreadPoolExecutor類(lèi),只是參數(shù)不同,所以看一下這個(gè)ThreadPoolExecutor類(lèi)。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor參數(shù)描述如下:

  • corePoolSize 線(xiàn)程池核心線(xiàn)程數(shù)。當(dāng)提交一個(gè)任務(wù)時(shí),線(xiàn)程池會(huì)新創(chuàng)建一個(gè)新線(xiàn)程執(zhí)行任務(wù),直到線(xiàn)程數(shù)達(dá)到corePoolSize;之后繼續(xù)提交的任務(wù)會(huì)被保存到阻塞隊(duì)列中。
  • maximumPoolSize 線(xiàn)程池最大線(xiàn)程數(shù)。這個(gè)參數(shù)只有在隊(duì)列有界的情況下才有效。當(dāng)前阻塞隊(duì)列滿(mǎn)了的情況下,繼續(xù)提交任務(wù)時(shí),則會(huì)繼續(xù)創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù),直到線(xiàn)程數(shù)達(dá)到maximumPoolSize。之后再提交任務(wù),會(huì)執(zhí)行拒絕策略。
  • keepAliveTime 空閑隊(duì)列存活時(shí)間。大于corePoolSize的空閑線(xiàn)程在該時(shí)間之后會(huì)被銷(xiāo)毀
  • unit keepAliveTime 的單位
  • workQueue 阻塞隊(duì)列,一般有如下幾種阻塞隊(duì)列
  1. ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列
  2. inkedBlockingQuene:基于隊(duì)列的無(wú)界阻塞隊(duì)列
  3. SynchronousQuene:不實(shí)際存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作,反之亦然。如果使用該隊(duì)列,提交的任務(wù)不會(huì)保存,而總是將新任務(wù)提交給線(xiàn)程執(zhí)行,如果沒(méi)有空閑線(xiàn)程,則嘗試創(chuàng)建新的線(xiàn)程,如果線(xiàn)程已達(dá)最大值,則執(zhí)行拒絕策略。
  4. priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列
  • threadFactory 線(xiàn)程工廠
  • handler 拒絕策略,當(dāng)隊(duì)列已滿(mǎn),且沒(méi)有空閑線(xiàn)程時(shí),會(huì)執(zhí)行一種拒絕策略,JDK一共有四種拒絕策略
  1. AbortPolicy:直接拋出異常
  2. CallerRunsPolicy :在調(diào)用者線(xiàn)程中運(yùn)行任務(wù)
  3. DiscardOldestPolicy: 丟棄最早的一個(gè)請(qǐng)求,再次提交該任務(wù)
  4. DiscardPolicy: 直接丟棄,不做任何處理

結(jié)合之前的代碼可以看到,當(dāng)corePoolSize 等于maximumPoolSize 時(shí),構(gòu)造的就是newFixedThreadPool,這兩個(gè)都為1 時(shí),構(gòu)造的是newSingleThreadExecutor。newCachedThreadPool線(xiàn)程池在沒(méi)有任務(wù)執(zhí)行時(shí),數(shù)量為0,其數(shù)量會(huì)動(dòng)態(tài)變化,最大值為Integer.MAX_VALUE`

ScheduledThreadPoolExecutor 繼承了ThreadPoolExecutor,構(gòu)造方法:

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

ScheduledThreadPoolExecutor增加了一些定時(shí)任務(wù)的功能,這里使用到了DelayedWorkQueue,這個(gè)隊(duì)列也很有意思,模擬了二叉查找樹(shù)的性質(zhì),用來(lái)存放有序的計(jì)劃任務(wù)。

主要方法如下:

//在指定的時(shí)間后,對(duì)任務(wù)進(jìn)行一次調(diào)度
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//對(duì)任務(wù)進(jìn)行周期性調(diào)度,以開(kāi)始時(shí)間計(jì)算,周期性調(diào)度
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//對(duì)任務(wù)進(jìn)行周期性調(diào)度,以結(jié)束時(shí)間計(jì)算,經(jīng)過(guò)延遲后,才進(jìn)行下一次
 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

那么在線(xiàn)程池中的線(xiàn)程是如何調(diào)度的,線(xiàn)程池的原理是什么呢?

先看一下線(xiàn)程池的狀態(tài)表示:

    //這個(gè)原子類(lèi)非常強(qiáng)大,其中的高3為表示線(xiàn)程池狀態(tài),后29位表示線(xiàn)程數(shù)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;  //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 
    
    //高3位為111,表示線(xiàn)程池能接受新任務(wù),并且可以運(yùn)行隊(duì)列中的任務(wù)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //高3位000,表示線(xiàn)程池不再接受新任務(wù),但可以處理隊(duì)列中的任務(wù)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //高3為001,表示線(xiàn)程池不再接受新任務(wù),不再執(zhí)行隊(duì)列中的任務(wù),而且要中斷正在處理的任務(wù)
    private static final int STOP       =  1 << COUNT_BITS;
    //高3位010,表示線(xiàn)程池位為空,準(zhǔn)備關(guān)閉
    private static final int TIDYING    =  2 << COUNT_BITS;
    //高3位011,表示線(xiàn)程池已關(guān)閉
    private static final int TERMINATED =  3 << COUNT_BITS;

    //獲取高3位
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取低29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
   //將高3位,低29位保存在一個(gè)int里
    private static int ctlOf(int rs, int wc) { return rs | wc; }

接下來(lái)分析線(xiàn)程池的調(diào)度代碼,當(dāng)我們用線(xiàn)程池執(zhí)行一個(gè)任務(wù)的時(shí)候,會(huì)執(zhí)行以下方法。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取ctl值,上面的分析知道,這個(gè)值包含了高3位的線(xiàn)程池狀態(tài)和低29位的線(xiàn)程池?cái)?shù)量
        int c = ctl.get();
        //拿到線(xiàn)程數(shù)量和核心線(xiàn)程數(shù)比較
        if (workerCountOf(c) < corePoolSize) {
           // 如果當(dāng)前線(xiàn)程數(shù)量< 核心線(xiàn)程數(shù),則執(zhí)行addWorker 方法,這個(gè)方法會(huì)新建線(xiàn)程并執(zhí)行任務(wù)
            if (addWorker(command, true))
                return;
            //如果執(zhí)行失敗,再拿一次ctl的值
            c = ctl.get();
        }
       // 當(dāng)線(xiàn)程數(shù)大于核心線(xiàn)程,或上邊任務(wù)添加失敗時(shí)
       // 在線(xiàn)程池可用的時(shí)候,會(huì)將任務(wù)添加到阻塞隊(duì)列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次確認(rèn)線(xiàn)程池狀態(tài),若線(xiàn)程池停止了,將任務(wù)刪除,并執(zhí)行拒絕策略
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果線(xiàn)程數(shù)量為0,則放入一個(gè)空任務(wù)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果隊(duì)列無(wú)法放入,則再新建線(xiàn)程執(zhí)行任務(wù),如果失敗,執(zhí)行 拒接策略
        // 這里就是從core 到 max 的擴(kuò)展 
        else if (!addWorker(command, false))
            reject(command);
    }

下面看一下addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取線(xiàn)程池狀態(tài)
            int rs = runStateOf(c);

            // 如果線(xiàn)程池不在運(yùn)行狀態(tài),則不再處理提交的任務(wù),直接返回 , 但可以繼續(xù)執(zhí)行隊(duì)列中已有的任務(wù)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
           
           //這里的死循環(huán)是為了CAS 線(xiàn)程數(shù)量,直到成功之后跳出外層循環(huán)
            for (;;) {
               // 獲取線(xiàn)程數(shù)
                int wc = workerCountOf(c);
               //判斷線(xiàn)程數(shù)是否已達(dá)最大值,超過(guò)容量直接返回
                if (wc >= CAPACITY ||
                    //判斷是核心線(xiàn)程還是最大線(xiàn)程
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                 //增加線(xiàn)程數(shù),跳出外層循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
               // 檢查線(xiàn)程池狀態(tài),如果與開(kāi)始不同,則從外層循環(huán)重新開(kāi)始
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            // 用傳進(jìn)來(lái)的任務(wù)構(gòu)造一個(gè)worker ,該類(lèi)繼承了AQS,實(shí)現(xiàn)了Runnable   
            w = new Worker(firstTask);
            // 獲取worker中創(chuàng)建的線(xiàn)程
            final Thread t = w.thread;
            if (t != null) {
               //加鎖 ,HashSet線(xiàn)程不安全
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);
                   // 檢測(cè)線(xiàn)程池狀態(tài)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //確認(rèn)創(chuàng)建的線(xiàn)程還沒(méi)開(kāi)始運(yùn)行
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //將線(xiàn)程加入集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //添加成功之后,啟動(dòng)worker線(xiàn)程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回值標(biāo)識(shí)線(xiàn)程是否啟動(dòng)
        return workerStarted;
    }

看一下線(xiàn)程是怎么啟動(dòng)的:

// worker類(lèi)
 Worker(Runnable firstTask) {
            //在運(yùn)行之前不允許中斷
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

線(xiàn)程啟動(dòng)執(zhí)行的是runWorker方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //由于在worker構(gòu)造方法中抑制了中斷,這里解除抑制
        w.unlock(); // allow interrupts
        //默認(rèn)為true,說(shuō)明發(fā)生了異常
        boolean completedAbruptly = true;
        try {
            //先執(zhí)行傳進(jìn)來(lái)的任務(wù),之后從隊(duì)列獲取任務(wù)執(zhí)行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //在任務(wù)執(zhí)行之前,可以做一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                       //任務(wù)的真正的執(zhí)行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任務(wù)執(zhí)行完,可以做些事情,注意:這里可以拿到任務(wù)運(yùn)行時(shí)的異常
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
           // 如果一切正常,置為false , 清理時(shí)會(huì)做判斷
            completedAbruptly = false;
        } finally {
           //清理工作,同時(shí) 任務(wù)如果有異常,會(huì)通過(guò)這個(gè)方法擦屁股
            processWorkerExit(w, completedAbruptly);
        }
    }
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?
            // 兩種情況:
            // 1.RUNING狀態(tài)
            // 2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                // 執(zhí)行到這里說(shuō)明線(xiàn)程已超核心線(xiàn)程數(shù)并且超時(shí),這時(shí)返回null回收線(xiàn)程
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                //如果核心線(xiàn)程允許超時(shí),或者線(xiàn)程數(shù)已達(dá)到核心線(xiàn)程數(shù),則執(zhí)行poll
                //poll方法在規(guī)定時(shí)間內(nèi)沒(méi)返回會(huì)返回null,在下一輪循環(huán)的時(shí)候,會(huì)返回null,線(xiàn)程會(huì)被銷(xiāo)毀
                // 否則,執(zhí)行take方法,該方法會(huì)阻塞直到隊(duì)列中有任務(wù),所以當(dāng)線(xiàn)程數(shù)在核心線(xiàn)程數(shù)以下的線(xiàn)程不會(huì)被銷(xiāo)毀
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

最后看一下runWorker中的清理工作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果非正常結(jié)束,將線(xiàn)程數(shù)減一
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //從線(xiàn)程池中移出異常和超時(shí)的線(xiàn)程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 嘗試關(guān)閉線(xiàn)程池 
        tryTerminate();

        int c = ctl.get();
        //線(xiàn)程池狀態(tài)在RUNNING或SHUTDOWN時(shí)
        if (runStateLessThan(c, STOP)) {
            // 線(xiàn)程正常結(jié)束
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //如果線(xiàn)程為0 但是隊(duì)列中還有任務(wù)要執(zhí)行
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //線(xiàn)程數(shù)量滿(mǎn)足條件,直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //新建空的任務(wù),假如隊(duì)列中有任務(wù)的話(huà),這里保證能執(zhí)行
            //如果線(xiàn)程是因?yàn)楫惓M顺龅模@里進(jìn)行補(bǔ)充
            addWorker(null, false);
        }
    }
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 線(xiàn)程池正在運(yùn)行時(shí)
            // 線(xiàn)程池是SHUTDOWN狀態(tài),但是隊(duì)列還有任務(wù)時(shí)
            // 線(xiàn)程池已經(jīng)準(zhǔn)備停止時(shí) 直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            //下面的代碼說(shuō)明線(xiàn)程池真的需要關(guān)閉了
            //如果線(xiàn)程數(shù)量不為0,說(shuō)明需要將線(xiàn)程中斷,這里只中斷一個(gè)線(xiàn)程就可以(為啥呢?)
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            //執(zhí)行關(guān)閉操作
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 使用 CAS 設(shè)置狀態(tài)位
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

到這里,線(xiàn)程池的基本原理基本能明白一二吧...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容