[toc]
- Posted by 微博@Yangsc_o
- 原創(chuàng)文章,版權(quán)聲明:自由轉(zhuǎn)載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0
摘要
本文主要回顧java的JDK中的多線程的常見用法和深入理解線程池;
java中的線程
- 創(chuàng)建線程的3種方式
- 通過實(shí)現(xiàn) Runnable 接口來創(chuàng)建線程
- 通過繼承Thread來創(chuàng)建線程
- 通過 Callable 和 Future 創(chuàng)建線程
- 開啟線程的方法
- public void start() 使該線程開始執(zhí)行;Java 虛擬機(jī)調(diào)用該線程的 run 方法。
- public void run() 如果該線程是使用獨(dú)立的 Runnable 運(yùn)行對(duì)象構(gòu)造的,則調(diào)用該 Runnable 對(duì)象的 run 方法;否則,該方法不執(zhí)行任何操作并返回。
- 線程的狀態(tài)
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
java中的線程池
線程池技術(shù)
線程池(Thread Pool)是一種基于池化思想管理線程的工具,經(jīng)常出現(xiàn)在多線程服務(wù)器中,如MySQL等,追根溯源-編程語言&性能優(yōu)化 這篇文檔也指出優(yōu)化性能的一種手段就是池化技術(shù),今天就稍微展開一下,池話技術(shù);
- 池化的表現(xiàn)形式
池化技術(shù)在計(jì)算機(jī)領(lǐng)域中的表現(xiàn)為:統(tǒng)一管理IT資源,包括服務(wù)器、存儲(chǔ)、和網(wǎng)絡(luò)資源等等。通過共享資源,使用戶在低投入中獲益。除去線程池,還有其他比較典型的幾種使用策略包括:
- 內(nèi)存池(Memory Pooling):預(yù)先申請(qǐng)內(nèi)存,提升申請(qǐng)內(nèi)存速度,減少內(nèi)存碎片。
- 連接池(Connection Pooling):預(yù)先申請(qǐng)數(shù)據(jù)庫、redis連接,提升申請(qǐng)連接的速度,降低系統(tǒng)的開銷。
- 實(shí)例池(Object Pooling):循環(huán)使用對(duì)象,減少資源在初始化和釋放時(shí)的昂貴損耗。
- 線程池解決的問題
解決的核心問題就是資源管理問題。在并發(fā)環(huán)境下,系統(tǒng)不能夠確定在任意時(shí)刻中,有多少任務(wù)需要執(zhí)行,有多少資源需要投入。
這種不確定性將帶來以下若干問題
- 頻繁申請(qǐng)/銷毀資源和調(diào)度資源,將帶來額外的消耗,可能會(huì)非常巨大。
- 對(duì)資源無限申請(qǐng)缺少抑制手段,易引發(fā)系統(tǒng)資源耗盡的風(fēng)險(xiǎn)。
- 系統(tǒng)無法合理管理內(nèi)部的資源分布,會(huì)降低系統(tǒng)的穩(wěn)定性。
反之就是解決的問題
- 降低資源消耗:通過池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無需等待線程創(chuàng)建即可立即執(zhí)行。
- 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
線程池的實(shí)現(xiàn)原理
簡述
Java中的線程池核心實(shí)現(xiàn)類是ThreadPoolExecutor,JDK 1.8的源碼重的ThreadPoolExecutor的UML類圖,了解下ThreadPoolExecutor的繼承關(guān)系。
- Executor
void execute(Runnable command);
只有一個(gè)execute方法,只需提供Runnable對(duì)象
- ExecutorService
ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
- AbstractExecutorService
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
- ThreadPoolExecutor
ThreadPoolExecutor繼承了類AbstractExecutorService,execute()、submit()、shutdown()、shutdownNow()
ThreadPoolExecutor是如何運(yùn)行的?
ThreadPoolExecutor是如何運(yùn)行,如何同時(shí)維護(hù)線程和執(zhí)行任務(wù)的呢?
線程池在內(nèi)部實(shí)際上構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復(fù)用線程。線程池的運(yùn)行主要分成兩部分:任務(wù)管理、線程管理。任務(wù)管理部分充當(dāng)生產(chǎn)者的角色,當(dāng)任務(wù)提交后,線程池會(huì)判斷該任務(wù)后續(xù)的流轉(zhuǎn),策略如下:
(1)直接申請(qǐng)線程執(zhí)行該任務(wù);
(2)緩沖到隊(duì)列中等待線程執(zhí)行;
(3)拒絕該任務(wù)。
線程管理部分是消費(fèi)者,它們被統(tǒng)一維護(hù)在線程池內(nèi),根據(jù)任務(wù)請(qǐng)求進(jìn)行線程的分配,當(dāng)線程執(zhí)行完任務(wù)后則會(huì)繼續(xù)獲取新的任務(wù)去執(zhí)行,最終當(dāng)線程獲取不到任務(wù)的時(shí)候,線程就會(huì)被回收。
線程池運(yùn)行的狀態(tài)和線程數(shù)量
- 線程池運(yùn)行的狀態(tài)和線程數(shù)量
狀態(tài)和線程數(shù)量是伴隨著線程池的運(yùn)行,由內(nèi)部來維護(hù)。線程池內(nèi)部使用一個(gè)變量維護(hù)兩個(gè)值:運(yùn)行狀態(tài)(runState)和線程數(shù)量 (workerCount)。在具體實(shí)現(xiàn)中,線程池將運(yùn)行狀態(tài)(runState)、線程數(shù)量 (workerCount)兩個(gè)關(guān)鍵參數(shù)的維護(hù)放在了一起
英文描述:The main pool control state, ctl, is an atomic integer packing two conceptual fields workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc
--- 線程池運(yùn)行的狀態(tài)和數(shù)量---
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
---狀態(tài)值---
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
-
狀態(tài)切換
狀態(tài)轉(zhuǎn)換
任務(wù)執(zhí)行機(jī)制
任務(wù)調(diào)度是線程池的主要入口,當(dāng)用戶提交了一個(gè)任務(wù),接下來這個(gè)任務(wù)將如何執(zhí)行都是由這個(gè)階段決定的。任務(wù)的調(diào)度都是由execute方法完成的,這部分完成的工作是:檢查現(xiàn)在線程池的運(yùn)行狀態(tài)、運(yùn)行線程數(shù)、運(yùn)行策略,決定接下來執(zhí)行的流程,是直接申請(qǐng)線程執(zhí)行,或是緩沖到隊(duì)列中執(zhí)行,亦或是直接拒絕該任務(wù)。其執(zhí)行過程如下:
- 首先檢測線程池運(yùn)行狀態(tài),如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態(tài)下執(zhí)行任務(wù)。
- 如果workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)。
- 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)。
- 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
- JDK源碼如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
隊(duì)列緩存
任務(wù)緩沖模塊是線程池能夠管理任務(wù)的核心部分。通過源碼可以看到緩存使用的隊(duì)列是BlockingQueue;
private final BlockingQueue<Runnable> workQueue;
-
各種隊(duì)列的特點(diǎn)
隊(duì)列的特點(diǎn) -
拒絕策略
拒絕策略
Worker線程管理
Worker線程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
...
...
}
Worker這個(gè)工作線程,實(shí)現(xiàn)了Runnable接口,并持有一個(gè)線程thread,一個(gè)初始化的任務(wù)firstTask。thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,可以用來執(zhí)行任務(wù);firstTask用它來保存?zhèn)魅氲牡谝粋€(gè)任務(wù),這個(gè)任務(wù)可以有也可以為null。如果這個(gè)值是非空的,那么線程就會(huì)在啟動(dòng)初期立即執(zhí)行這個(gè)任務(wù),也就對(duì)應(yīng)核心線程創(chuàng)建時(shí)的情況;如果這個(gè)值是null,那么就需要?jiǎng)?chuàng)建一個(gè)線程去執(zhí)行任務(wù)列表(workQueue)中的任務(wù),也就是非核心線程的創(chuàng)建。
?#### 線程的生命周期
線程池需要管理線程的生命周期,需要在線程長時(shí)間不運(yùn)行的時(shí)候進(jìn)行回收。線程池使用一張Hash表去持有線程的引用,這樣可以通過添加引用、移除引用這樣的操作來控制線程的生命周期。這個(gè)時(shí)候重要的就是如何判斷線程是否在運(yùn)行。
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
Worker是通過繼承AQS,使用AQS來實(shí)現(xiàn)獨(dú)占鎖這個(gè)功能。關(guān)于AQS,改天專門寫一篇博客;
- Worker線程增加
通過閱讀源碼可以看到線程池是通過addWorker添加一個(gè)任務(wù),添加有三種策略。
- addWorker方法有兩個(gè)參數(shù):firstTask、core。
- firstTask參數(shù)用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),該參數(shù)可以為空;
- core參數(shù)為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
...
}
添加的策略
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
添加時(shí)的檢查
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*/
- Worker線程執(zhí)行任務(wù)
在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的執(zhí)行過程如下:
- while循環(huán)不斷地通過getTask()方法獲取任務(wù)。
- getTask()方法從阻塞隊(duì)列中取任務(wù)。
- 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài)。
- 執(zhí)行任務(wù)。
- 如果getTask結(jié)果為null則跳出循環(huán),執(zhí)行processWorkerExit()方法,銷毀線程。
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- Worker線程回收
線程池中線程的銷毀依賴JVM自動(dòng)的回收,線程池做的工作是根據(jù)當(dāng)前線程池的狀態(tài)維護(hù)一定數(shù)量的線程引用,防止這部分線程被JVM回收,當(dāng)線程池決定哪些線程需要回收時(shí),只需要將其引用消除即可。Worker被創(chuàng)建出來后,就會(huì)不斷地進(jìn)行輪詢,然后獲取任務(wù)去執(zhí)行,核心線程可以無限等待獲取任務(wù),非核心線程要限時(shí)獲取任務(wù)。當(dāng)Worker無法獲取到任務(wù),也就是獲取的任務(wù)為空時(shí),循環(huán)會(huì)結(jié)束,Worker會(huì)主動(dòng)消除自身在線程池內(nèi)的引用,從源碼中可以看到,線程回收的工作是在processWorkerExit方法完成的。
final void runWorker(Worker w) {
try {
} finally {
processWorkerExit(w, completedAbruptly);
}
}

建線程池
- 創(chuàng)建線程池的四個(gè)方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- newSingleThreadExecutor
newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
- newFixedThreadPool
創(chuàng)建一個(gè)定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
- newCachedThreadPool
創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,線程的最大數(shù)量是Integer.MAX_VALUE;
- newScheduledThreadPool
創(chuàng)建一個(gè)定長線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。線程的最大數(shù)量是Integer.MAX_VALUE;
- 上面的四個(gè)最后都會(huì)調(diào)用ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- int corePoolSize: 核心線程池大小
- int maximumPoolSize: 最大核心線程池大小
- long keepAliveTime: 超時(shí)了沒有人調(diào)用就會(huì)釋放
- TimeUnit unit: 超時(shí)單位
- BlockingQueue < Runnable > workQueue: 阻塞隊(duì)列
- ThreadFactory threadFactory: 線程工廠
- RejectedExecutionHandler handler: 拒絕策略
就是BlockingQueue的四種拒絕策略
參考
深入理解Java線程池:ThreadPoolExecutor
Java 多線程編程
Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐
從ReentrantLock的實(shí)現(xiàn)看AQS的原理及應(yīng)用