取材于網(wǎng)絡,忘記哪些帖子惹,挺多的。。。主要是我自己防止自己忘記記錄的
1.線程
是大家比較熟悉的概念,線程和進程都有五個階段:創(chuàng)建、就緒、運行、阻塞、終止。多線程即一個程序有多個順序流在執(zhí)行。
實現(xiàn)的方法 有三種:Thread、Runnable 和 Callable接口與Future、線程池結(jié)合。
2.首先說java.lang.Thread
Thread類,是很方便的一種,使用起來很快速,如果我們只是想啟一個線程,沒特殊要求,可以直接用Thread。使用的方法也極其簡單,繼承,并填充run方法。等使用的時候直接new出來,然后使用start方法即可,使用十分簡單。
public class MyThread extends Thread {
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("線程 " + name + i);
try {
sleep((int)Math.random()*100);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
test:
new MyThread("A").start();
new MyThread("B").start();
這就行了,程序運行main方法的時候,java會啟動一個線程,主線程隨著main創(chuàng)建。調(diào)用start方法之后,另外兩個線程也啟動了。Start方法調(diào)用之后,并不是立即執(zhí)行了多線程的代碼,而是先把線程轉(zhuǎn)為Runnable狀態(tài),操作系統(tǒng)決定代碼何時運行。多線程的程序?qū)嶋H上的亂序執(zhí)行的,執(zhí)行結(jié)果是隨機的。
3.然后說一下Runnable
是接口,目標類需要實現(xiàn)接口,然后重寫run方法,使用也是很簡單,new Thread()里面參數(shù)填我們自己創(chuàng)建的runnable實現(xiàn)類就可以了。然后還是調(diào)用start方法。
好處也很明顯,畢竟java類單繼承,但是可以實現(xiàn)多個接口。
public class MyRunnable implements Runnable{
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("線程 " + name + i);
try {
sleep((int) Math.random() * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
test:
new Thread(new MyRunnable("A")).start();
new Thread(new MyRunnable("B")).start();
當類實現(xiàn)了Runnable接口之后,此類就具有多線程的特征,接下來做的就是填充run方法??丛创a可知,Thread類實際上也是實現(xiàn)了Runnable的類。啟動的時候,是先通過Thread類構(gòu)造方法,調(diào)用生成的Thread對象的start方法??梢哉f多線程的都是通過Thread的start方法來運行的。Thread的API是多線程的關鍵。
要注意一點!?。【€程池只能放入實現(xiàn)Runnable和Callable類線程,不能直接放入繼承Thread的類。
4.最后就是Callable
一般來說,使用Runnable接口和繼承Thread實現(xiàn)線程是無法給我們返回結(jié)果的,如果需要結(jié)果,那么可以使用Callable,但是Callable只能在ExecutorService的線程池里跑,同時可以通過返回的Future對象查詢執(zhí)行狀態(tài),F(xiàn)uture取得了執(zhí)行異步任務的結(jié)果。
詳細說一下,先看個例子
public class MyCallable implements Callable {
private String name;
public MyCallable(String name) {
this.name = name;
}
@Override
public Object call() throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("線程 " + name + i);
try {
sleep((int) Math.random() * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return name;
}
}
寫倒是同樣很簡單,但是怎么去使用呢?
最好是通過線程池去啟動:
ExecutorService executor = Executors.newCachedThreadPool();
MyCallable c1 = new MyCallable("A");
MyCallable c2 = new MyCallable("B");
MyCallable c3 = new MyCallable("C");
Future<Integer> result1 = executor.submit(c1);
Future<Integer> result2 = executor.submit(c2);
Future<Integer> result3 = executor.submit(c3);
System.out.println("task1運行結(jié)果:"+result1.get());
System.out.println("task2運行結(jié)果:"+result2.get());
System.out.println("task3運行結(jié)果:"+result3.get());
executor.shutdown();
這里用了緩存型池。后續(xù)補充說明。
看完例子我們稍稍微微的看一下源碼。
Callable接口的源碼很簡單,只有一個call方法,返回的一個泛型,這個就是線程返回的結(jié)果。然后使用ExecutorService接口是負責線程池調(diào)度的。
里面比較重要的是:
submit(Callable<T> task)
submit(Callable<T> task,T result)
Submit(Runnable task)
boolean awaitTermination(long timeout, TimeUnit unit)
Vord shutdown()
前三個很明顯,就是幫助我們開啟線程的,類似start
第四個是阻塞,監(jiān)測ExecutorService是否關閉,如果關閉了返回true,否則false。一般和shutdown組合使用。
最后的shutdown顧名思義,平滑的關閉ExecutorService。調(diào)用時,停止接收新的任務并且等待已經(jīng)提交的任務執(zhí)行完。所有任務完成之后,線程池關閉。
再看Future接口:
get() 是等待線程結(jié)果返回,會阻塞
get(long timeout,TimeUnit unit) 設置超時時間
cancel() 取消任務執(zhí)行,任務已經(jīng)完成或者已經(jīng)取消則失敗,參數(shù)是是否應該試圖停止任務的方式去中斷線程。
還有isCancelled和isDone都是判斷狀態(tài)的。
關于Callable只有這么多嗎???當然不是!?。∮袀€很重要的類叫做FutureTask,它是Future的實現(xiàn)類,并且實現(xiàn)了Runnable接口,所以可以通過Executor來執(zhí)行,也可以傳遞給Thread對象執(zhí)行。
如果要在主線程執(zhí)行比較耗時的工作,同時不想阻塞主線程,可以交給Future對象在后臺完成。Executor框架利用FutureTask完成異步任務,一般來說用它進行耗時的計算。主線程可以在完成自己的任務之后,再去其獲取結(jié)果。FutureTask的使用有兩種情況:
- 1.作為new Thread()的參數(shù)。
- 2.ExecutorService.submit() 扔線程池里。
public static void main(String[] args) {
// 1.直接啟動線程
MyCallable task1 = new MyCallable("A");
MyCallable task2 = new MyCallable("B");
FutureTask<Integer> result1 = new FutureTask<Integer>(task1);
FutureTask<Integer> result2 = new FutureTask<Integer>(task2);
Thread thread1 = new Thread(result1);
Thread thread2 = new Thread(result2);
thread1.start();
thread2.start();
// 2.線程池啟動
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result3 = executor.submit(task1);
Future<Integer> result4 = executor.submit(task2);
executor.shutdown();
}
發(fā)現(xiàn)了嗎?其實用了線程池反而更簡單了。。。
但是有個問題,如果我們開了很多很多線程,這些線程的執(zhí)行時間是未知的,但是我們有需要返回結(jié)果,如果我們只是通過Future和FutureTask去取結(jié)果效率就很低,因為我們需要通過循環(huán)不斷遍歷線程池里面的線程,判斷執(zhí)行狀態(tài)并取得結(jié)果。于是針對這種情況,有一個CompletionService。
其原理在于將線程池執(zhí)行結(jié)果放到一個Blockqueueing里面,這里線程執(zhí)行結(jié)果進入Blockqueueing的順序只與線程的執(zhí)行時間有關。
CompletionService是一個接口,里面有五個方法分別是:
//提交線程任務
Future<V> submit(Callable<V> task);
//提交線程任務
Future<V> submit(Runnable task, V result);
//阻塞等待
Future<V> take() throws InterruptedException;
//非阻塞等待
Future<V> poll();
//帶時間的非阻塞等待
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
這個接口有一個實現(xiàn)類ExecutorCompletionService。
實現(xiàn)類的源碼可知,最終線程要提交到Executor里面去運行,所以構(gòu)造函數(shù)中需要Executor參數(shù)。每當線程執(zhí)行完畢之后會往阻塞隊列添加一個Future。
舉例時間:
只用Future:
//不使用 CompletionService 只用 Future
public static void futureTest() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Integer>> result = new ArrayList<>();
// 假設有10個線程
for (int i = 0; i < 10; i++) {
Future<Integer> submit = executor.submit(new MutiThreadFuture(i));
result.add(submit);
}
executor.shutdown();
// 依次等待返回結(jié)果
for (Future<Integer> future : result) {
System.out.println("返回結(jié)果" + future.get());
}
}
使用了CompletionService:
public static void CompleteTest() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newCachedThreadPool();
// 完成服務
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < 10; i++) {
completionService.submit(new MutiThreadFuture<>(i));
}
// 依次等待返回結(jié)果
for (int i = 0; i < 10; i++) {
System.out.println("結(jié)果" + completionService.take().get());
}
}
得到的結(jié)果:
pool-1-thread-3
pool-1-thread-2
pool-1-thread-6
pool-1-thread-4
pool-1-thread-1
返回結(jié)果0
返回結(jié)果1
返回結(jié)果2
返回結(jié)果3
pool-1-thread-5
返回結(jié)果4
返回結(jié)果5
pool-1-thread-7
返回結(jié)果6
pool-1-thread-9
pool-1-thread-8
返回結(jié)果7
返回結(jié)果8
pool-1-thread-10
返回結(jié)果9
------------------------------------------------------分割線-----------------------------------------------
pool-2-thread-2
結(jié)果1
pool-2-thread-6
pool-2-thread-1
結(jié)果5
pool-2-thread-10
結(jié)果0
結(jié)果9
pool-2-thread-3
pool-2-thread-4
結(jié)果2
結(jié)果3
pool-2-thread-7
pool-2-thread-5
結(jié)果4
結(jié)果6
pool-2-thread-8
pool-2-thread-9
結(jié)果7
結(jié)果8
根據(jù)結(jié)果能看得出來,使用了Completion之后的結(jié)果的輸出和線程放入沒啥關系。
5.簡單說一下線程池
剛才在上面一直在用的線程池是這個:newCachedThreadPool
但實際上還有三種線程池可供選擇:
- newFixedThreadPool 固定容量的線程池,線程達到最大值的時候,線程池的規(guī)模不會再變化。
- newCachedThreadPool 緩存的線程池,線程達到最大值之后,將會回收空的線程。當需求增加,線程數(shù)量也會增加,沒有上限。
- newSingleThreadPoolExecutor 單線程的線程池,串行執(zhí)行。
- newScheduledThreadPool 固定容量的線程池,以延遲或者定時的方式去執(zhí)行,一般來說會選擇消息隊列或xxl-job去做定時,而不是這個。
使用線程池有啥好處呢?首先效率高,重用現(xiàn)有的線程,可以在處理多個請求的時候分擔線程產(chǎn)生和銷毀的開銷。請求達到一定數(shù)量的時候,工作線程已經(jīng)存在,響應性大大提高。線程池的容量是可以控制的,盡可能的利用好資源。
使用線程池的方法也很簡單!就像上面寫的,首先寫一個線程類,三種方式任意。
然后建立一個線程池,四個任意,推薦newCachedThreadPool
(也可以通過newFixedThreadPool 設置線程池大小,有效利用資源)
最后調(diào)用線程池操作,executorService.execute(你的線程)
然后就完成了!?。∈遣皇呛蹺Z??!
那么要分析一下:
1.什么情況用CachedThreadPool
它首先會創(chuàng)造足夠多的線程去執(zhí)行任務,隨著程序的執(zhí)行,有點線程執(zhí)行完,可以循環(huán)使用,這時候不必新建線程。客戶端線程和線程池之間會有一個任務隊列,程序要關閉時,需要注意兩件事!1.入隊的任務現(xiàn)在什么情況。2.正在運行這個任務現(xiàn)在怎么樣了。有兩種方式去關閉線程池,這很重要!1.入隊任務全部執(zhí)行完畢(shutdown())。2.舍棄這些任務直接結(jié)束(shutdownNow())。根據(jù)具體情節(jié),程序員自己要做取舍。 注意:主線程的執(zhí)行和線程池里面的線程分開,很有可能主線的線程結(jié)束了,但是線程池還在運行。
2.什么情況用FixedThreadPool
它固定了容量,這個模最大式線程數(shù)目是一定的。當確定任務資源占用的情況之后,想要控制資源利用,那么可以使用這個線程池。線程執(zhí)行完成就從線程池直接移出,不能保證順序性,全看線程之間的競爭。
3.什么情況用newSingleThreadExecutor
它能保證的是線程的執(zhí)行順序,并且能夠保證線程結(jié)束之后,下一條線程再被開啟。
4.什么情況用newScheduledThreadPool
它功能看起來很強大,的確很強大,因為可以設置時間和線程執(zhí)行的先后間隔,但是大多數(shù)情況下,我會選擇直接使用消息隊列或者定時器。
說了四個線程池,他們有個共同的特性,都傳了不同的參數(shù)去調(diào)用了同一個接口,ThreadPoolExecutor。然后這個ThreadPoolExecutor就厲害了。
找了個帶注釋的源碼部分:
//運行狀態(tài)標志位
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
//線程緩沖隊列,當線程池線程運行超過一定線程時并滿足一定的條件,待運行的線程會放入到這個隊列
private final BlockingQueue<Runnable> workQueue;
//重入鎖,更新核心線程池大小、最大線程池大小時要加鎖
private final ReentrantLock mainLock = new ReentrantLock();
//重入鎖狀態(tài)
private final Condition termination = mainLock.newCondition();
//工作都set集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//線程執(zhí)行完成后在線程池中的緩存時間
private volatile long keepAliveTime;
//核心線程池大小
private volatile int corePoolSize;
//最大線程池大小
private volatile int maximumPoolSize;
//當前線程池在運行線程大小
private volatile int poolSize;
//當緩沖隊列也放不下線程時的拒絕策略
private volatile RejectedExecutionHandler handler;
//線程工廠,用來創(chuàng)建線程
private volatile ThreadFactory threadFactory;
//用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)
private int largestPoolSize;
//用來記錄已經(jīng)執(zhí)行完畢的任務個數(shù)
private long completedTaskCount;
................
}
execute(Runnable command)方法很重要
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
注釋寫了很多,一共分三步:
1.如果運行的線程,小于corePoolSize那么嘗試開新線程。addWorker方法的原子性,檢查runState和workerCount,可以通過返回false防止出現(xiàn)錯誤。
2.如果一個任務成功排隊,我們?nèi)匀灰俅螜z查一下是否應該添加一個線程。或者線程池會關閉。我們重新檢查狀態(tài),如果沒有線程就新開一個,如果停止就有必要回滾隊列。
3.如果沒法排隊了,就要添加新線程了,如果失敗了,我們就會發(fā)現(xiàn)隊列飽和或者是線程池被關閉,所以會拒絕任務。