Java 線程池實(shí)現(xiàn)原理

概述

現(xiàn)在機(jī)器基本都是多核的,開啟多線程可以有效地增加系統(tǒng)的吞吐量和性能,如下是開啟一個(gè)線程最簡(jiǎn)單的方式

new Thread(new Runnable() {
    @Override
    public void run() {
        // do something
    }
}).start();

這個(gè)線程使用完后,就會(huì)被系統(tǒng)所回收。線程雖是輕量級(jí)的,但其創(chuàng)建、關(guān)閉依然需要花費(fèi)時(shí)間、資源。當(dāng)任務(wù)粒度不大的時(shí)候,大量創(chuàng)建線程會(huì)得不償失。而且當(dāng)線程數(shù)超過核心數(shù),創(chuàng)建后的線程還會(huì)處于等待狀態(tài)。

因此線程的數(shù)量最好是能控制的,且能夠復(fù)用,線程池即能滿足需求。下面看看創(chuàng)建一個(gè)線程池,并提交一個(gè)任務(wù)去執(zhí)行的方式

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
    @Override
    public void run() {
        // do something
    }
});

實(shí)現(xiàn)原理

上面創(chuàng)建了一個(gè)只擁有一個(gè)線程的線程池,并提交一個(gè)任務(wù)去執(zhí)行。線程池的創(chuàng)建可以使用線程池工廠 Executors 里面的 new... 系列方法,含義如下

Executors.newSingleThreadExecutor(); // 創(chuàng)建一個(gè)只有單個(gè)線程的線程池。任務(wù)提交后,若這個(gè)線程空閑,則執(zhí)行,否則加入隊(duì)列,待線程空閑后執(zhí)行
Executors.newFixedThreadPool(numbersOfThread); // 創(chuàng)建一個(gè)有numbersOfThread個(gè)線程的線程池。任務(wù)提交后,若有空閑線程,則執(zhí)行,否則加入隊(duì)列,待有線程空閑后執(zhí)行
Executors.newCachedThreadPool(); // 創(chuàng)建一個(gè)有無限個(gè)線程的線程池。任務(wù)提交后,若有空閑線程,則執(zhí)行,否則創(chuàng)建新的線程去執(zhí)行
Executors.newSingleThreadScheduledExecutor(); // 在newSingleThreadExecutor之上擴(kuò)展了在給定時(shí)間執(zhí)行某任務(wù)的功能
...

查看線程工廠 Executors new... 方法的實(shí)現(xiàn),我們可以看到最終都是調(diào)用了 ThreadPoolExecutor 的構(gòu)造方法

public ThreadPoolExecutor(int corePoolSize, // 核心線程數(shù)量
                          int maximumPoolSize, // 最大線程數(shù)量
                          long keepAliveTime, // 當(dāng)線程數(shù)量超過核心線程數(shù)量時(shí),其余線程?;顣r(shí)間
                          TimeUnit unit, // 時(shí)間單位
                          BlockingQueue<Runnable> workQueue, // 任務(wù)隊(duì)列
                          ThreadFactory threadFactory, // 線程工廠
                          RejectedExecutionHandler handler // 任務(wù)提交失敗時(shí)的執(zhí)行策略
)

這里先說下任務(wù)提交后,線程池的執(zhí)行策略

(1) 當(dāng)線程池的實(shí)際線程數(shù)量小于corePoolSize時(shí),則優(yōu)先創(chuàng)建核心線程;
(2) 若大于等于corePooSize,則將新的任務(wù)加入等待隊(duì)列;
(3) 若加入隊(duì)列失敗,并且線程數(shù)小于maximumPoolSize,則創(chuàng)建新的線程執(zhí)行任務(wù);
(4) 否則執(zhí)行拒絕策略。

下面我們看看具體的源碼

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // workerCountOf:當(dāng)前線程總數(shù)
            if (addWorker(command, true)) // 創(chuàng)建一個(gè)核心線程并執(zhí)行當(dāng)前提交任務(wù)
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 把任務(wù)加到任務(wù)隊(duì)列里面
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 創(chuàng)建一個(gè)非核心線程并執(zhí)行任務(wù)
            reject(command);
    }

接下來我們看看 Worker 工作線程的執(zhí)行過程

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable // 實(shí)現(xiàn)了Runnable接口
    {

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); // 用一開始設(shè)置的線程工廠創(chuàng)建了線程,addWorker之后會(huì)調(diào)用這個(gè)線程的start方法啟動(dòng)線程
        }

        public void run() {
            runWorker(this); // 執(zhí)行工作線程
        }

        final void runWorker(Worker w) {
            try {
                while (task != null || (task = getTask()) != null) { // 如果當(dāng)前任務(wù)未執(zhí)行,則先執(zhí)行當(dāng)前任務(wù);否則去任務(wù)隊(duì)列里面拿任務(wù)執(zhí)行,拿不到任務(wù)時(shí),這個(gè)線程就退出了
                    ...
                    try {
                        beforeExecute(wt, task); //任務(wù)執(zhí)行前的回調(diào)
                        Throwable thrown = null;
                        try {
                            task.run(); // 任務(wù)執(zhí)行
                        } finally {
                            afterExecute(task, thrown); // 任務(wù)執(zhí)行完的回掉
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

接下來看看 getTask() 獲取任務(wù)的過程

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 要等待 or 阻塞

            try {
                Runnable r = timed ?
                    // 從隊(duì)列里面拿任務(wù),如果隊(duì)列為空,最長(zhǎng)等待時(shí)間為 keepAliveTime
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
                    // 從隊(duì)列里面拿任務(wù),如果隊(duì)列為空,則阻塞,直到隊(duì)列有新任務(wù)添加為止
                    workQueue.take(); 
                    // 所以線程池的核心線程為什么不會(huì)銷毀、
                    // 非核心線程為什么能存活 keepAliveTime 時(shí)間,
                    // 超時(shí)后會(huì)被回收,是不是豁然開朗了?
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

可以看到,線程池實(shí)現(xiàn)的相關(guān)特性,主要是通過 getTask() 時(shí)從任務(wù)隊(duì)列里面拿任務(wù)的等待阻塞來實(shí)現(xiàn)的

至于線程工廠,這里不再贅述。

下面最后再看看拒絕策略的種類

(1) AbortPolicy:直接拋異,阻止系統(tǒng)正常工作
(2) CallerRunsPolicy:只要線程池未關(guān)閉,直接在調(diào)用者線程執(zhí)行當(dāng)前任務(wù)
(3) DiscardOledesPolicy:丟棄最老的請(qǐng)求,嘗試重新提交任務(wù)
(4) DiscardPolicy:默默丟棄當(dāng)前提交的任務(wù)
或者可以自己實(shí)現(xiàn)RejectedExecutionHandler接口

至此,線程池的大體實(shí)現(xiàn)基本就清晰了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容