java多線程&深入理解線程池

[toc]



摘要

本文主要回顧java的JDK中的多線程的常見用法和深入理解線程池;

java中的線程

  • 創(chuàng)建線程的3種方式
  1. 通過實(shí)現(xiàn) Runnable 接口來創(chuàng)建線程
  2. 通過繼承Thread來創(chuàng)建線程
  3. 通過 Callable 和 Future 創(chuàng)建線程
  • 開啟線程的方法
    • public void start() 使該線程開始執(zhí)行;Java 虛擬機(jī)調(diào)用該線程的 run 方法。
    • public void run() 如果該線程是使用獨(dú)立的 Runnable 運(yùn)行對(duì)象構(gòu)造的,則調(diào)用該 Runnable 對(duì)象的 run 方法;否則,該方法不執(zhí)行任何操作并返回。
  • 線程的狀態(tài)
public enum State {
   NEW,
   RUNNABLE,
   BLOCKED,
   WAITING,
   TIMED_WAITING,
   TERMINATED;
}

java中的線程池

線程池技術(shù)

線程池(Thread Pool)是一種基于池化思想管理線程的工具,經(jīng)常出現(xiàn)在多線程服務(wù)器中,如MySQL等,追根溯源-編程語言&性能優(yōu)化 這篇文檔也指出優(yōu)化性能的一種手段就是池化技術(shù),今天就稍微展開一下,池話技術(shù);

  • 池化的表現(xiàn)形式

池化技術(shù)在計(jì)算機(jī)領(lǐng)域中的表現(xiàn)為:統(tǒng)一管理IT資源,包括服務(wù)器、存儲(chǔ)、和網(wǎng)絡(luò)資源等等。通過共享資源,使用戶在低投入中獲益。除去線程池,還有其他比較典型的幾種使用策略包括:
- 內(nèi)存池(Memory Pooling):預(yù)先申請(qǐng)內(nèi)存,提升申請(qǐng)內(nèi)存速度,減少內(nèi)存碎片。
- 連接池(Connection Pooling):預(yù)先申請(qǐng)數(shù)據(jù)庫、redis連接,提升申請(qǐng)連接的速度,降低系統(tǒng)的開銷。
- 實(shí)例池(Object Pooling):循環(huán)使用對(duì)象,減少資源在初始化和釋放時(shí)的昂貴損耗。

  • 線程池解決的問題

解決的核心問題就是資源管理問題。在并發(fā)環(huán)境下,系統(tǒng)不能夠確定在任意時(shí)刻中,有多少任務(wù)需要執(zhí)行,有多少資源需要投入。


這種不確定性將帶來以下若干問題

  • 頻繁申請(qǐng)/銷毀資源和調(diào)度資源,將帶來額外的消耗,可能會(huì)非常巨大。
  • 對(duì)資源無限申請(qǐng)缺少抑制手段,易引發(fā)系統(tǒng)資源耗盡的風(fēng)險(xiǎn)。
  • 系統(tǒng)無法合理管理內(nèi)部的資源分布,會(huì)降低系統(tǒng)的穩(wěn)定性。

反之就是解決的問題

  • 降低資源消耗:通過池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
  • 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無需等待線程創(chuàng)建即可立即執(zhí)行。
  • 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。

線程池的實(shí)現(xiàn)原理

簡述

Java中的線程池核心實(shí)現(xiàn)類是ThreadPoolExecutor,JDK 1.8的源碼重的ThreadPoolExecutor的UML類圖,了解下ThreadPoolExecutor的繼承關(guān)系。


在這里插入圖片描述
  • Executor

void execute(Runnable command);
只有一個(gè)execute方法,只需提供Runnable對(duì)象

  • ExecutorService

ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  • AbstractExecutorService

抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;

  • ThreadPoolExecutor

ThreadPoolExecutor繼承了類AbstractExecutorService,execute()、submit()、shutdown()、shutdownNow()

ThreadPoolExecutor是如何運(yùn)行的?

ThreadPoolExecutor是如何運(yùn)行,如何同時(shí)維護(hù)線程和執(zhí)行任務(wù)的呢?


運(yùn)行圖

線程池在內(nèi)部實(shí)際上構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復(fù)用線程。線程池的運(yùn)行主要分成兩部分:任務(wù)管理、線程管理。任務(wù)管理部分充當(dāng)生產(chǎn)者的角色,當(dāng)任務(wù)提交后,線程池會(huì)判斷該任務(wù)后續(xù)的流轉(zhuǎn),策略如下:
(1)直接申請(qǐng)線程執(zhí)行該任務(wù);
(2)緩沖到隊(duì)列中等待線程執(zhí)行;
(3)拒絕該任務(wù)。
線程管理部分是消費(fèi)者,它們被統(tǒng)一維護(hù)在線程池內(nèi),根據(jù)任務(wù)請(qǐng)求進(jìn)行線程的分配,當(dāng)線程執(zhí)行完任務(wù)后則會(huì)繼續(xù)獲取新的任務(wù)去執(zhí)行,最終當(dāng)線程獲取不到任務(wù)的時(shí)候,線程就會(huì)被回收。

線程池運(yùn)行的狀態(tài)和線程數(shù)量

  • 線程池運(yùn)行的狀態(tài)和線程數(shù)量

狀態(tài)和線程數(shù)量是伴隨著線程池的運(yùn)行,由內(nèi)部來維護(hù)。線程池內(nèi)部使用一個(gè)變量維護(hù)兩個(gè)值:運(yùn)行狀態(tài)(runState)和線程數(shù)量 (workerCount)。在具體實(shí)現(xiàn)中,線程池將運(yùn)行狀態(tài)(runState)、線程數(shù)量 (workerCount)兩個(gè)關(guān)鍵參數(shù)的維護(hù)放在了一起
英文描述:The main pool control state, ctl, is an atomic integer packing two conceptual fields workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc

--- 線程池運(yùn)行的狀態(tài)和數(shù)量---
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

---狀態(tài)值---
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
狀態(tài)枚舉
  • 狀態(tài)切換


    狀態(tài)轉(zhuǎn)換

任務(wù)執(zhí)行機(jī)制

任務(wù)調(diào)度是線程池的主要入口,當(dāng)用戶提交了一個(gè)任務(wù),接下來這個(gè)任務(wù)將如何執(zhí)行都是由這個(gè)階段決定的。任務(wù)的調(diào)度都是由execute方法完成的,這部分完成的工作是:檢查現(xiàn)在線程池的運(yùn)行狀態(tài)、運(yùn)行線程數(shù)、運(yùn)行策略,決定接下來執(zhí)行的流程,是直接申請(qǐng)線程執(zhí)行,或是緩沖到隊(duì)列中執(zhí)行,亦或是直接拒絕該任務(wù)。其執(zhí)行過程如下:

  • 首先檢測線程池運(yùn)行狀態(tài),如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態(tài)下執(zhí)行任務(wù)。
  • 如果workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)。
  • 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中。
  • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)。
  • 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
  • JDK源碼如下:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

隊(duì)列緩存

任務(wù)緩沖模塊是線程池能夠管理任務(wù)的核心部分。通過源碼可以看到緩存使用的隊(duì)列是BlockingQueue;

private final BlockingQueue<Runnable> workQueue;

  • 各種隊(duì)列的特點(diǎn)


    隊(duì)列的特點(diǎn)
  • 拒絕策略


    拒絕策略

Worker線程管理

Worker線程

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    ...
    ...
}

Worker這個(gè)工作線程,實(shí)現(xiàn)了Runnable接口,并持有一個(gè)線程thread,一個(gè)初始化的任務(wù)firstTask。thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,可以用來執(zhí)行任務(wù);firstTask用它來保存?zhèn)魅氲牡谝粋€(gè)任務(wù),這個(gè)任務(wù)可以有也可以為null。如果這個(gè)值是非空的,那么線程就會(huì)在啟動(dòng)初期立即執(zhí)行這個(gè)任務(wù),也就對(duì)應(yīng)核心線程創(chuàng)建時(shí)的情況;如果這個(gè)值是null,那么就需要?jiǎng)?chuàng)建一個(gè)線程去執(zhí)行任務(wù)列表(workQueue)中的任務(wù),也就是非核心線程的創(chuàng)建。


流轉(zhuǎn)過程

?#### 線程的生命周期
線程池需要管理線程的生命周期,需要在線程長時(shí)間不運(yùn)行的時(shí)候進(jìn)行回收。線程池使用一張Hash表去持有線程的引用,這樣可以通過添加引用、移除引用這樣的操作來控制線程的生命周期。這個(gè)時(shí)候重要的就是如何判斷線程是否在運(yùn)行。

/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker是通過繼承AQS,使用AQS來實(shí)現(xiàn)獨(dú)占鎖這個(gè)功能。關(guān)于AQS,改天專門寫一篇博客;

  • Worker線程增加

通過閱讀源碼可以看到線程池是通過addWorker添加一個(gè)任務(wù),添加有三種策略。

  • addWorker方法有兩個(gè)參數(shù):firstTask、core。
  • firstTask參數(shù)用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),該參數(shù)可以為空;
  • core參數(shù)為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
...
}

添加的策略

/*
 * Proceed in 3 steps:
 *
 * 1. If fewer than corePoolSize threads are running, try to
 * start a new thread with the given command as its first
 * task.  The call to addWorker atomically checks runState and
 * workerCount, and so prevents false alarms that would add
 * threads when it shouldn't, by returning false.
 *
 * 2. If a task can be successfully queued, then we still need
 * to double-check whether we should have added a thread
 * (because existing ones died since last checking) or that
 * the pool shut down since entry into this method. So we
 * recheck state and if necessary roll back the enqueuing if
 * stopped, or start a new thread if there are none.
 *
 * 3. If we cannot queue task, then we try to add a new
 * thread.  If it fails, we know we are shut down or saturated
 * and so reject the task.
 */

添加時(shí)的檢查

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 */
流程
  • Worker線程執(zhí)行任務(wù)
    在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的執(zhí)行過程如下:
  1. while循環(huán)不斷地通過getTask()方法獲取任務(wù)。
  2. getTask()方法從阻塞隊(duì)列中取任務(wù)。
  3. 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài)。
  4. 執(zhí)行任務(wù)。
  5. 如果getTask結(jié)果為null則跳出循環(huán),執(zhí)行processWorkerExit()方法,銷毀線程。
/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                        runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
線程執(zhí)行
  • Worker線程回收
    線程池中線程的銷毀依賴JVM自動(dòng)的回收,線程池做的工作是根據(jù)當(dāng)前線程池的狀態(tài)維護(hù)一定數(shù)量的線程引用,防止這部分線程被JVM回收,當(dāng)線程池決定哪些線程需要回收時(shí),只需要將其引用消除即可。Worker被創(chuàng)建出來后,就會(huì)不斷地進(jìn)行輪詢,然后獲取任務(wù)去執(zhí)行,核心線程可以無限等待獲取任務(wù),非核心線程要限時(shí)獲取任務(wù)。當(dāng)Worker無法獲取到任務(wù),也就是獲取的任務(wù)為空時(shí),循環(huán)會(huì)結(jié)束,Worker會(huì)主動(dòng)消除自身在線程池內(nèi)的引用,從源碼中可以看到,線程回收的工作是在processWorkerExit方法完成的。
final void runWorker(Worker w) {
    try {

    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
流程圖如下

建線程池

  • 創(chuàng)建線程池的四個(gè)方法
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   return new ScheduledThreadPoolExecutor(corePoolSize);
}
  • newSingleThreadExecutor

newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。

  • newFixedThreadPool

創(chuàng)建一個(gè)定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。

  • newCachedThreadPool

創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,線程的最大數(shù)量是Integer.MAX_VALUE;

  • newScheduledThreadPool

創(chuàng)建一個(gè)定長線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。線程的最大數(shù)量是Integer.MAX_VALUE;


  • 上面的四個(gè)最后都會(huì)調(diào)用ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue < Runnable > workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • int corePoolSize: 核心線程池大小
  • int maximumPoolSize: 最大核心線程池大小
  • long keepAliveTime: 超時(shí)了沒有人調(diào)用就會(huì)釋放
  • TimeUnit unit: 超時(shí)單位
  • BlockingQueue < Runnable > workQueue: 阻塞隊(duì)列
  • ThreadFactory threadFactory: 線程工廠
  • RejectedExecutionHandler handler: 拒絕策略

就是BlockingQueue的四種拒絕策略

參考

深入理解Java線程池:ThreadPoolExecutor
Java 多線程編程
Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐
從ReentrantLock的實(shí)現(xiàn)看AQS的原理及應(yīng)用


你的鼓勵(lì)也是我創(chuàng)作的動(dòng)力

打賞地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容