Java8線程池——底層為L(zhǎng)inkedBlockingQueue的ThreadPoolExecutor

線程池使用的一個(gè)例子

這里約定,細(xì)節(jié)在代碼中注釋說(shuō)明,代碼外正文的描述文字展示程序的大體流程。

int nThreads = 10;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
Runnable task = new Runnable(){
    public void run(){
        //do something
    }
};
exec.execute(task);

這里的工廠方法newFixedThreadPool()初始化的是ThreadPoolExecutor實(shí)例,在線程池實(shí)現(xiàn)的工廠方法newSingleThreadExecutor()與newFixedThreadPool()一樣構(gòu)建底層為L(zhǎng)inkedBlockingQueue的ThreadPoolExecutor。而newCachedThreadPool()初始化的是底層為SynchronousQueue的ThreadPoolExecutor實(shí)例。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor的execute()

任務(wù)到來(lái)執(zhí)行execute()方法。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 執(zhí)行過(guò)程分為三步:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 1. 如果少于corePoolSize條線程在運(yùn)行,那么試圖啟動(dòng)一條新的線程,并且把當(dāng)前的命令作   
     *  為這條線程將要執(zhí)行的第一個(gè)任務(wù)。addWorker()方法原子性檢查runState和workerCount
     *  的狀態(tài),以防止不應(yīng)該增加線程時(shí)添加了線程。
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 2. 如果一個(gè)任務(wù)成功入隊(duì),我們需要進(jìn)行雙重的校驗(yàn),校驗(yàn)我們是否應(yīng)該增加一個(gè)線程(因
     * 為從上次校驗(yàn)之后存在的線程可能已經(jīng)死掉了)或者檢查線程池是否關(guān)閉了。所以需要再次
     * 檢驗(yàn),如果需要進(jìn)行回滾出隊(duì)?;蛘撸绻麤](méi)有線程,就開(kāi)始一條新的線程。
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     *
     * 3. 如果任務(wù)入隊(duì)不成功,我們?cè)噲D增加新的進(jìn)程。仍舊失敗的話,直接拒絕執(zhí)行任務(wù)。
     */
    int c = ctl.get();
    //①
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //②
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //③
    else if (!addWorker(command, false))
        reject(command);
}

上面的execute()方法的源代碼中注釋的標(biāo)號(hào),對(duì)應(yīng)著上面的英文注釋,中文翻譯在對(duì)應(yīng)英文下面。先大體看流程,execute()方法執(zhí)行新的任務(wù),當(dāng)一個(gè)任務(wù)到來(lái)后,做必要的校驗(yàn),根據(jù)檢驗(yàn)的不同結(jié)果,執(zhí)行不同的邏輯實(shí)現(xiàn)。但是任務(wù)的執(zhí)行都是通過(guò)addWorker()方法完成的,下面流程走的addWorker()方法中。

ThreadPoolExecutor的addWorker()

addWorker()方法代碼片段

private boolean addWorker(Runnable firstTask, boolean core) {
    ....省略一些校驗(yàn)邏輯.....
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//①
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;//②
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//③
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

代碼片段中有三點(diǎn)重要之處,分別標(biāo)注了序號(hào)。

  1. 到來(lái)的任務(wù),被封裝在一個(gè)Worker的對(duì)象中。Worker對(duì)象字段thread賦值給變量t;
  2. 對(duì)線程池狀態(tài)的參數(shù)設(shè)置和進(jìn)一步的最終校驗(yàn)需要加鎖,保證并發(fā)時(shí)的線程安全,和數(shù)據(jù)的一致性。
  3. Thread變量t執(zhí)行start()方法。

梳理上面的代碼片段,到來(lái)的任務(wù)會(huì)被先封裝Worker對(duì)象,然后Worker對(duì)象返回一個(gè)Thread對(duì)象,這個(gè)Thread對(duì)象最終執(zhí)行。下面走到Worker類的源代碼。

ThreadPoolExecutor的Worker類

//Worker的構(gòu)造函數(shù)
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

構(gòu)造Worker對(duì)象時(shí),Runnable任務(wù)作為當(dāng)前線程的第一個(gè)任務(wù)賦值到firstTask,如果確實(shí)是第一個(gè)任務(wù)firstTask不為null,否則為null,然后去隊(duì)列中取已經(jīng)入隊(duì)的任務(wù)。字段thread指向通過(guò)ThreadPoolExecutor中定義的線程工廠,生產(chǎn)的封裝了Worker對(duì)象自己的線程。字段thread指向的對(duì)象執(zhí)行start()方法(上面addWorker()方法代碼中的③)時(shí),底層調(diào)用的就是Worker對(duì)象的run()方法。

Worker類的run()方法

run()方法調(diào)用的是runWorker()方法。

public void run() {
    runWorker(this);
}

Worker類的runWorker()方法

在runWorker()方法最重要的語(yǔ)句就是那一條while語(yǔ)句(下面代碼中標(biāo)識(shí)①的語(yǔ)句)。涉及如下三點(diǎn):

  • 如果任務(wù)task不為null,執(zhí)行當(dāng)前的task,此task是當(dāng)前線程的第一個(gè)任務(wù);
  • 如果task為null,getTask()方法從任務(wù)隊(duì)列中獲取排隊(duì)的任務(wù)進(jìn)行執(zhí)行;
  • while循環(huán)因?yàn)闂l件中g(shù)etTask()方法會(huì)進(jìn)行自旋,所以當(dāng)任務(wù)隊(duì)列中沒(méi)有任務(wù)時(shí),當(dāng)前線程一直處于阻塞等待狀態(tài),利用阻塞隊(duì)列也可以限時(shí)等待,這也是線程池中線程可以復(fù)用的原因。因?yàn)榫€程會(huì)一直等待隊(duì)列中有任務(wù)。當(dāng)然可以通過(guò)ThreadPoolExecutor的keepAliveTime字段對(duì)線程生存時(shí)間進(jìn)行設(shè)定。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {//①
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() &&
                 runStateAtLeast(ctl.get(), STOP))) &&
               !wt.isInterrupted())
               wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
           }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

自此,完成了任務(wù)到來(lái),提交給線程池,線程池創(chuàng)建線程或者復(fù)用等待中的線程執(zhí)行任務(wù)。復(fù)用線程可以減少創(chuàng)建和銷毀線程產(chǎn)生的開(kāi)銷,在高并發(fā)系統(tǒng)中線程池使用可以取得很好的性能效果。更多細(xì)節(jié)可以參考Java線程池ThreadPoolExecutor的實(shí)現(xiàn)和參數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容