Java線程池實現(xiàn)原理詳解

原理概述

java線程池.png

其實java線程池的實現(xiàn)原理很簡單,說白了就是一個線程集合workerSet和一個阻塞隊列workQueue。當用戶向線程池提交一個任務(也就是線程)時,線程池會先將任務放入workQueue中。workerSet中的線程會不斷的從workQueue中獲取線程然后執(zhí)行。當workQueue中沒有任務的時候,worker就會阻塞,直到隊列中有任務了就取出來繼續(xù)執(zhí)行。

線程池的幾個主要參數(shù)的作用

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize: 規(guī)定線程池有幾個線程(worker)在運行。
  2. maximumPoolSize: 當workQueue滿了,不能添加任務的時候,這個參數(shù)才會生效。規(guī)定線程池最多只能有多少個線程(worker)在執(zhí)行。
  3. keepAliveTime: 超出corePoolSize大小的那些線程的生存時間,這些線程如果長時間沒有執(zhí)行任務并且超過了keepAliveTime設(shè)定的時間,就會消亡。
  4. unit: 生存時間對于的單位
  5. workQueue: 存放任務的隊列
  6. threadFactory: 創(chuàng)建線程的工廠
  7. handler: 當workQueue已經(jīng)滿了,并且線程池線程數(shù)已經(jīng)達到maximumPoolSize,將執(zhí)行拒絕策略。

任務提交后的流程分析

用戶通過submit提交一個任務。線程池會執(zhí)行如下流程:

  1. 判斷當前運行的worker數(shù)量是否超過corePoolSize,如果不超過corePoolSize。就創(chuàng)建一個worker直接執(zhí)行該任務。—— 線程池最開始是沒有worker在運行的
  2. 如果正在運行的worker數(shù)量超過或者等于corePoolSize,那么就將該任務加入到workQueue隊列中去。
  3. 如果workQueue隊列滿了,也就是offer方法返回false的話,就檢查當前運行的worker數(shù)量是否小于maximumPoolSize,如果小于就創(chuàng)建一個worker直接執(zhí)行該任務。
  4. 如果當前運行的worker數(shù)量是否大于等于maximumPoolSize,那么就執(zhí)行RejectedExecutionHandler來拒絕這個任務的提交。

源碼解析

我們先來看一下ThreadPoolExecutor中的幾個關(guān)鍵屬性。

//這個屬性是用來存放 當前運行的worker數(shù)量以及線程池狀態(tài)的
//int是32位的,這里把int的高3位拿來充當線程池狀態(tài)的標志位,后29位拿來充當當前運行worker的數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任務的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set來存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//歷史達到的worker數(shù)最大值
private int largestPoolSize;
//當隊列滿了并且worker的數(shù)量達到maxSize的時候,執(zhí)行具體的拒絕策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存時間
private volatile long keepAliveTime;
//常駐worker的數(shù)量
private volatile int corePoolSize;
//最大worker的數(shù)量,一般當workQueue滿了才會用到這個參數(shù)
private volatile int maximumPoolSize;

1. 提交任務相關(guān)源碼

下面是execute方法的源碼

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //workerCountOf(c)會獲取當前正在運行的worker數(shù)量
        if (workerCountOf(c) < corePoolSize) {
            //如果workerCount小于corePoolSize,就創(chuàng)建一個worker然后直接執(zhí)行該任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //isRunning(c)是判斷線程池是否在運行中,如果線程池被關(guān)閉了就不會再接受任務
        //后面將任務加入到隊列中
        if (isRunning(c) && workQueue.offer(command)) {
            //如果添加到隊列成功了,會再檢查一次線程池的狀態(tài)
            int recheck = ctl.get();
            //如果線程池關(guān)閉了,就將剛才添加的任務從隊列中移除
            if (! isRunning(recheck) && remove(command))
                //執(zhí)行拒絕策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果加入隊列失敗,就嘗試直接創(chuàng)建worker來執(zhí)行任務
        else if (!addWorker(command, false))
            //如果創(chuàng)建worker失敗,就執(zhí)行拒絕策略
            reject(command);
}

添加worker的方法addWorker源碼

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //使用自旋+cas失敗重試來保證線程競爭問題
        for (;;) {
            //先獲取線程池的狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果線程池是關(guān)閉的,或者workQueue隊列非空,就直接返回false,不做任何處理
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //根據(jù)入?yún)ore 來判斷可以創(chuàng)建的worker數(shù)量是否達到上限,如果達到上限了就拒絕創(chuàng)建worker
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //沒有的話就嘗試修改ctl添加workerCount的值。這里用了cas操作,如果失敗了下一個循環(huán)會繼續(xù)重試,直到設(shè)置成功
                if (compareAndIncrementWorkerCount(c))
                    //如果設(shè)置成功了就跳出外層的那個for循環(huán)
                    break retry;
                //重讀一次ctl,判斷如果線程池的狀態(tài)改變了,會再重新循環(huán)一次
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            //創(chuàng)建一個worker,將提交上來的任務直接交給worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //加鎖,防止競爭
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    //還是判斷線程池的狀態(tài)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果worker的線程已經(jīng)啟動了,會拋出異常
                        if (t.isAlive()) 
                              throw new IllegalThreadStateException();
                        //添加新建的worker到線程池中
                        workers.add(w);
                        int s = workers.size();
                        //更新歷史worker數(shù)量的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //設(shè)置新增標志位
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果worker是新增的,就啟動該線程
                if (workerAdded) {
                    t.start();
                     //成功啟動了線程,設(shè)置對應的標志位
                    workerStarted = true;
                }
            }
        } finally {
            //如果啟動失敗了,會觸發(fā)執(zhí)行相應的方法
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

2. Worker的結(jié)構(gòu)

Worker是ThreadPoolExecutor內(nèi)部定義的一個內(nèi)部類。我們先看一下Worker的繼承關(guān)系

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

它實現(xiàn)了Runnable接口,所以可以拿來當線程用。同時它還繼承了AbstractQueuedSynchronizer同步器類,主要用來實現(xiàn)一個不可重入的鎖。

一些屬性還有構(gòu)造方法:

//運行的線程,前面addWorker方法中就是直接通過啟動這個線程來啟動這個worker
final Thread thread;
//當一個worker剛創(chuàng)建的時候,就先嘗試執(zhí)行這個任務
Runnable firstTask;
//記錄完成任務的數(shù)量
volatile long completedTasks;
Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //創(chuàng)建一個Thread,將自己設(shè)置給他,后面這個thread啟動的時候,也就是執(zhí)行worker的run方法
            this.thread = getThreadFactory().newThread(this);
}

worker的run方法

public void run() {
            //這里調(diào)用了ThreadPoolExecutor的runWorker方法
            runWorker(this);
}

ThreadPoolExecutor的runWorker方法

final void runWorker(Worker w) {
        //獲取當前線程
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //執(zhí)行unlock方法,允許其他線程來中斷自己
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果前面的firstTask有值,就直接執(zhí)行這個任務
            //如果沒有具體的任務,就執(zhí)行g(shù)etTask()方法從隊列中獲取任務
            //這里會不斷執(zhí)行循環(huán)體,除非線程中斷或者getTask()返回null才會跳出這個循環(huán)
            while (task != null || (task = getTask()) != null) {
                //執(zhí)行任務前先鎖住,這里主要的作用就是給shutdown方法判斷worker是否在執(zhí)行中的
                //shutdown方法里面會嘗試給這個線程加鎖,如果這個線程在執(zhí)行,就不會中斷它
                w.lock();
               //判斷線程池狀態(tài),如果線程池被強制關(guān)閉了,就馬上退出
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //執(zhí)行任務前調(diào)用。預留的方法,可擴展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //真正的執(zhí)行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       //執(zhí)行任務后調(diào)用。預留的方法,可擴展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //記錄完成的任務數(shù)量
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
}

下面來看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個方法我們可以看出先吃池是怎么讓超過corePoolSize的那部分worker銷毀的。

private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果線程池已經(jīng)關(guān)閉了,就直接返回null,
            //如果這里返回null,調(diào)用的那個worker就會跳出while循環(huán),然后執(zhí)行完銷毀線程
            //SHUTDOWN狀態(tài)表示執(zhí)行了shutdown()方法
            //STOP表示執(zhí)行了shutdownNow()方法
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //獲取當前正在運行中的worker數(shù)量
            int wc = workerCountOf(c);

            // 如果設(shè)置了核心worker也會超時或者當前正在運行的worker數(shù)量超過了corePoolSize,就要根據(jù)時間判斷是否要銷毀線程了
            //其實就是從隊列獲取任務的時候要不要設(shè)置超時間時間,如果超過這個時間隊列還沒有任務進來,就會返回null
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //如果上一次循環(huán)從隊列獲取到的未null,這時候timedOut就會為true了
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //通過cas來設(shè)置WorkerCount,如果多個線程競爭,只有一個可以設(shè)置成功
                //最后如果沒設(shè)置成功,就進入下一次循環(huán),說不定下一次worker的數(shù)量就沒有超過corePoolSize了,也就不用銷毀worker了
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果要設(shè)置超時時間,就設(shè)置一下咯
                //過了這個keepAliveTime時間還沒有任務進隊列就會返回null,那worker就會銷毀
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //如果r為null,就設(shè)置timedOut為true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
}

3. 添加Callable任務的實現(xiàn)源碼

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
}

要添加一個有返回值的任務的實現(xiàn)也很簡單。其實就是對任務做了一層封裝,將其封裝成Future,然后提交給線程池執(zhí)行,最后返回這個future。
這里的 newTaskFor(task) 方法會將其封裝成一個FutureTask類。
外部的線程拿到這個future,執(zhí)行g(shù)et()方法的時候,如果任務本身沒有執(zhí)行完,執(zhí)行線程就會被阻塞,直到任務執(zhí)行完。
下面是FutureTask的get方法

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //判斷狀態(tài),如果任務還沒執(zhí)行完,就進入休眠,等待喚醒
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //返回值
        return report(s);
}

FutureTask中通過一個state狀態(tài)來判斷任務是否完成。當run方法執(zhí)行完后,會將state狀態(tài)置為完成,同時喚醒所有正在等待的線程。我們可以看一下FutureTask的run方法

public void run() {
        //判斷線程的狀態(tài)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //執(zhí)行call方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //這個方法里面會設(shè)置返回內(nèi)容,并且喚醒所以等待中的線程
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
}

4. shutdown和shutdownNow方法的實現(xiàn)

shutdown方法會將線程池的狀態(tài)設(shè)置為SHUTDOWN,線程池進入這個狀態(tài)后,就拒絕再接受任務,然后會將剩余的任務全部執(zhí)行完

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //檢查是否可以關(guān)閉線程
            checkShutdownAccess();
            //設(shè)置線程池狀態(tài)
            advanceRunState(SHUTDOWN);
            //嘗試中斷worker
            interruptIdleWorkers();
             //預留方法,留給子類實現(xiàn)
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
}

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷所有的worker
            for (Worker w : workers) {
                Thread t = w.thread;
                //先嘗試調(diào)用w.tryLock(),如果獲取到鎖,就說明worker是空閑的,就可以直接中斷它
                //注意的是,worker自己本身實現(xiàn)了AQS同步框架,然后實現(xiàn)的類似鎖的功能
                //它實現(xiàn)的鎖是不可重入的,所以如果worker在執(zhí)行任務的時候,會先進行加鎖,這里tryLock()就會返回false
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
}

shutdownNow做的比較絕,它先將線程池狀態(tài)設(shè)置為STOP,然后拒絕所有提交的任務。最后中斷左右正在運行中的worker,然后清空任務隊列。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //檢測權(quán)限
            advanceRunState(STOP);
            //中斷所有的worker
            interruptWorkers();
            //清空任務隊列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
}

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷所有worker,然后調(diào)用中斷方法
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
}

總結(jié)

java線程池的實現(xiàn)原理還是挺簡單的。但是有一些細節(jié)還是需要去看源碼才能得出答案。本文也沒辦法把所有的源碼都講解一遍,只列了比較重要的一些源碼。有興趣的同學可以自己打開源碼好好看一下,肯定會對實現(xiàn)原理了解的更加深刻。

最后,如果本文有哪里說的不對或者遺漏的地方,也煩請指出,感激不盡。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容