首先何為線程(進程/線程)?
進程操作系統(tǒng)動態(tài)執(zhí)行基本單元,系統(tǒng)為每個進程分配內(nèi)存,包括一般情況下,包括文本區(qū)域(text region/指令)、數(shù)據(jù)區(qū)域(data region/變量)和堆棧(stack region/對象)。
我們的程序雖然可以做到多進程,但是,多進程需要切換上下文,什么是上下文?
當程序執(zhí)行到多進程的指令,那么會把當前的運行環(huán)境-堆棧等,復制一份,到另一塊內(nèi)存區(qū)域,而又因為cpu是輪尋機制,關(guān)于CPU運行原理,百度上有篇文章寫的很好,我這邊引用一下。
頻繁切換上下文(大概一次20微秒),屬于沒必要的消耗。另外就是進程間通信需要通過管道機制,比較復雜。
所以通常需要性能的場景通常合理使用線程來提高效率,線程的定義是,一個線程有且至少有一個線程,線程利用進程的資源,且本身不擁有系統(tǒng)資源,所以對線程的調(diào)度的開銷就會小很多(相對進程)。
因為這篇文章我定義到Java的分類下面,所以還是要通過Java來描述
其實我認為要真的好好深入學習線程進程,cpu調(diào)度這塊,還是要通過C來學
日后有時間,我會用C語言來模擬實現(xiàn)一遍
既然了解了什么是線程,看下Java怎么實現(xiàn)多線程:
Thread,Runnable,Future
至于網(wǎng)上有些說4種的,其實就是用ExecutorService來管理了一下。
那么從頭聊一聊。
Thread 其實是Runnable的實現(xiàn)類,類聲明如下
public class Thread implements Runnable
看下最核心的一個方法
首先判定現(xiàn)成的狀態(tài),0狀態(tài)表示該線程是新創(chuàng)建的,一切不是新建狀態(tài)的線程,都視為非法
第二部添加到線程組,線程組默認初始長度為4,如果滿了就闊為2倍。
之后可以看到,調(diào)用了一個本地方法start0,如果成功,則更改started標簽量
最后一個判定,啟動失敗,從線程組中移除當前線程。
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
Thread中出現(xiàn)的Runnable,作為一個接口,只有一個方法,就是run。
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
之后來看Future,
Future其實提供了和Runnable能力并列的接口,簡單解釋一下為什么這么說,Runnable接口提供了run,也就是可以放在線程中執(zhí)行的能力,Future其實是賦予了線程執(zhí)行后可以返回的能力,run的聲明是void,所以沒有返回值。
兩者結(jié)合,簡單易懂的一個類RunnableFuture的接口就出來了。
那么相當于Thread的實現(xiàn)類,FutureTask就出現(xiàn)了,它就是集大成者。
這么說可能有點跳躍,先看下下面的實現(xiàn),一看,誒,怎么沒有Future?
Future本身是一個接口,跟Runnable是相同的級別,但區(qū)別通俗來講在于他沒有run的能力,這個能力來自于Runnable。
追溯一下FutureTask,發(fā)現(xiàn)它繼承了RunnableFuture,誒,這個單詞起的就有意思了,包含了Runnable,和Future。
點進去看下
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
Future就在這,關(guān)于Future里面有什么,大家可以點進去看看,里面最關(guān)鍵的就是get() throws InterruptedException, ExecutionException;這個方法,就這這個方法,讓我們通過調(diào)用api,拿到線程里面的值。
如果想使用這個東西,開啟線程,這個時候不能用new Thread(future)這種方式了,因為Thread沒有這種能力,只實現(xiàn)了一個Runnable接口,
這個時候,一個新的類出現(xiàn)了,源碼如下
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
看下Callable的聲明,是有返回值的,并且可以拋出異常的。
這個返回值就很關(guān)鍵了,通過這個返回值,你可以把任何你想通過線程拿到的結(jié)果拿回來。
而拿結(jié)果的方法就是FutureTask的get()方法,之前我們看源碼時又看到,這個get方法來自于Future接口的V get()方法
簡單看下如何使用一個有返回值的多線程操作
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> callable = new Task();
FutureTask task = new FutureTask(callable);
Thread oneThread = new Thread(task);
oneThread.start();
System.out.println(">>> 工作結(jié)果 " + task.get().toString());
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(">>> 線程開始工作");
Thread.sleep(1000);
System.out.println(">>> 結(jié)束工作開始返回");
return 10;
}
}
可以看到FutureTask依然調(diào)用的是Thread,走的是本地方法start0。
Runbale就沒什么好說的了,實現(xiàn)一個接口,放到Thread里面去執(zhí)行,基本沒什么東西,能力與Thread差不多,區(qū)別是實現(xiàn)Runnable接口的類必須依托于Thread類才能啟動,
//使用這個構(gòu)造方法
public Thread(Runnable target) {
this(null, target, "Thread-" + nextThreadNum(), 0);
}
然后用Thread的start方法,需要注意的是,千萬不要調(diào)run方法, 要用start。
最后看下ExecutorService這個類,ExecutorService級別很高,他的爸爸直接就是Executor。
他的兒子,是AbstractExecutorService,這里實現(xiàn)了submit,doInvokeAny等方法。
而我們調(diào)用Executors.newFixedThreadPool(poolSize);返回的是ThreadPoolExecutor
注:一般不建議使用Executors.newFixedThreadPool(poolSize);,什么東西全是默認,建議如下方式:
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-%d").build();
ExecutorService service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1024), factory, new ThreadPoolExecutor.AbortPolicy());
具體參數(shù)含義就自行百度吧,很多人講的。
那ThreadPoolExecutor這個類,又是AbstractExecutorService的兒子,所以這個關(guān)系就很明顯了
ThreadPoolExecutor -> AbstractExecutorService -> ExecutorService -> Executor
看到一堆submit方法,然而沒什么用,真正關(guān)鍵的方法,是execute方法,在ThreadPoolExecutor中實現(xiàn)。
這個類有點意思,一上來就給我一個下馬威
private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));
這個ctl到底是什么?
查了很久,在cnblogs里找到一個大神的描述 -- Okevin
這個變量使用來干嘛的呢?它的作用有點類似我們在《7.ReadWriteLock接口及其實現(xiàn)ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態(tài),而AQS則只提供了state一個int型變量,此時將state高16位表示為讀狀態(tài),低16位表示為寫狀態(tài)。這里的clt同樣也是,它表示了兩個概念:
workerCount:當前有效的線程數(shù)
runState:當前線程池的五種狀態(tài),Running、Shutdown、Stop、Tidying、Terminate。
int型變量一共有32位,線程池的五種狀態(tài)runState至少需要3位來表示,故workCount只能有29位,所以代碼中規(guī)定線程池的有效線程數(shù)最多為2^29-1。
看到這先來聊一下線程提交任務的規(guī)則,--《java并發(fā)編程藝術(shù)》
- 首先會判斷核心線程池里是否有線程可執(zhí)行,有空閑線程則創(chuàng)建一個線程來執(zhí)行任務。
- 當核心線程池里已經(jīng)沒有線程可執(zhí)行的時候,此時將任務丟到任務隊列中去。
- 如果任務隊列(有界)也已經(jīng)滿了的話,但運行的線程數(shù)小于最大線程池的數(shù)量的時候,此時將會新建一個線程用于執(zhí)行任務,但如果運行的線程數(shù)已經(jīng)達到最大線程池的數(shù)量的時候,此時將無法創(chuàng)建線程執(zhí)行任務。
所以實際上對于線程池不僅是單純地將任務丟到線程池,線程池中有線程就執(zhí)行任務,沒線程就等待。
最后附上大神對execute的注解
/**
* corePoolSize:核心線程池的線程數(shù)量
*
* maximumPoolSize:最大的線程池線程數(shù)量
*
* keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。
*
* unit:線程活動保持時間的單位。
*
* workQueue:指定任務隊列所使用的阻塞隊列
*/
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//由它可以獲取到當前有效的線程數(shù)和線程池的狀態(tài)
/*1.獲取當前正在運行線程數(shù)是否小于核心線程池,是則新創(chuàng)建一個線程執(zhí)行任務,否則將任務放到任務隊列中*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize){
if (addWorker(command, tre)) //在addWorker中創(chuàng)建工作線程執(zhí)行任務
return ;
c = ctl.get();
}
/*2.當前核心線程池中全部線程都在運行workerCountOf(c) >= corePoolSize,所以此時將線程放到任務隊列中*/
//線程池是否處于運行狀態(tài),且是否任務插入任務隊列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))//線程池是否處于運行狀態(tài),如果不是則使剛剛的任務出隊
reject(command);//拋出RejectedExceptionException異常
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*3.插入隊列不成功,且當前線程數(shù)數(shù)量小于最大線程池數(shù)量,此時則創(chuàng)建新線程執(zhí)行任務,創(chuàng)建失敗拋出異常*/
else if (!addWorker(command, false)){
reject(command); //拋出RejectedExceptionException異常
}
}
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會再次檢查線程池是否處于運行狀態(tài),核心線程池中是否還有空閑線程,都滿足條件過后則會調(diào)用compareAndIncrementWorkerCount先將正在運行的線程數(shù)+1,數(shù)量自增成功則跳出循環(huán),自增失敗則繼續(xù)從頭繼續(xù)循環(huán)*/
...
if (compareAndIncrementWorkerCount(c))
break retry;
...
/*正在運行的線程數(shù)自增成功后則將線程封裝成工作線程Worker*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;//全局鎖
w = new Woker(firstTask);//將線程封裝為Worker工作線程
final Thread t = w.thread;
if (t != null) {
//獲取全局鎖
mainLock.lock();
/*當持有了全局鎖的時候,還需要再次檢查線程池的運行狀態(tài)等*/
try {
int c = clt.get();
int rs = runStateOf(c); //線程池運行狀態(tài)
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){ //線程池處于運行狀態(tài),或者線程池關(guān)閉且任務線程為空
if (t.isAlive()) //線程處于活躍狀態(tài),即線程已經(jīng)開始執(zhí)行或者還未死亡,正確的應線程在這里應該是還未開始執(zhí)行的
throw new IllegalThreadStateException();
//private final HashSet<Worker> wokers = new HashSet<Worker>();
//包含線程池中所有的工作線程,只有在獲取了全局的時候才能訪問它。將新構(gòu)造的工作線程加入到工作線程集合中
workers.add(w);
int s = worker.size(); //工作線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; //新構(gòu)造的工作線程加入成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//在被構(gòu)造為Worker工作線程,且被加入到工作線程集合中后,執(zhí)行線程任務
//注意這里的start實際上執(zhí)行Worker中run方法,所以接下來分析Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) //未能成功創(chuàng)建執(zhí)行工作線程
//在啟動工作線程失敗后,將工作線程從集合中移除
addWorkerFailed(w);
}
return workerStarted;
}
//ThreadPoolExecutor$Worker,它繼承了AQS,同時實現(xiàn)了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
public Worker(Runnable firstTask) {
//設(shè)置AQS的同步狀態(tài)為-1,禁止中斷,直到調(diào)用runWorker
setState(-1);
this.firstTask = firstTask;
//通過線程工廠來創(chuàng)建一個線程,將自身作為Runnable傳遞傳遞
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); //運行工作線程
}
}
Okevin博客