一、引言
我們都知道線(xiàn)程和線(xiàn)程池是Android開(kāi)發(fā)中很重要的一個(gè)部分。本文會(huì)從Java線(xiàn)程談起,由淺及深總結(jié)在Android中線(xiàn)程和線(xiàn)程池的使用。
java:
線(xiàn)程相關(guān):Thread、FutureTask;(Runalbe、Callable)
線(xiàn)程池:ThreadPoolExecutor;
以下四個(gè)是在ThreadPoolExecutor基礎(chǔ)上實(shí)現(xiàn)的
FixedThreadPool
CachedThreadPool
ScheduledThreadPool
SingleThreadExecutor
Android:
Android除開(kāi)java基本的線(xiàn)程和線(xiàn)程池,額外提供如下輔助工具。
AsyncTask、HandlerThread、IntentService(他們?nèi)齻€(gè)本質(zhì)是Thread/Handler/ThreadPoolExecutor來(lái)實(shí)現(xiàn))、Handler
1.1、簡(jiǎn)單介紹
Thread是和Runnable結(jié)合實(shí)現(xiàn)線(xiàn)程、Runnable是個(gè)接口,當(dāng)mThread.start()時(shí),執(zhí)行Runable中run()方法所寫(xiě)代碼塊。
FutureTask支持Runable和Callable兩種接口。FutureTask產(chǎn)生是因?yàn)镽unable.run方法只是一個(gè)單獨(dú)的方法。而Callable接口的call方法可以返回結(jié)果和拋出異常。FutureTask等于是Runable的繼承和擴(kuò)展。
ThreadPoolExecutor線(xiàn)程池的基本實(shí)現(xiàn),統(tǒng)一管理多個(gè)線(xiàn)程、線(xiàn)程池中每個(gè)線(xiàn)程執(zhí)行完畢不會(huì)立即銷(xiāo)毀,等待下一個(gè)任務(wù)以達(dá)到不會(huì)頻繁創(chuàng)建銷(xiāo)毀的目的,以避免資源浪費(fèi),GC等
FixedThreadPool只有核心線(xiàn)程,構(gòu)造參數(shù)設(shè)定核心線(xiàn)程數(shù),沒(méi)有超時(shí)機(jī)制且排隊(duì)任務(wù)隊(duì)列無(wú)限制,因?yàn)槿际呛诵木€(xiàn)程,所以響應(yīng)較快,且不用擔(dān)心線(xiàn)程會(huì)被回收。
CachedThreadPool只有非核心線(xiàn)程,數(shù)量無(wú)限,當(dāng)有新任務(wù)來(lái)時(shí),若沒(méi)有空閑線(xiàn)程直接創(chuàng)建新線(xiàn)程執(zhí)行任務(wù)??臻e60s直接銷(xiāo)毀
ScheduledThreadPool含構(gòu)固定數(shù)量核心線(xiàn)程,和無(wú)限量非核心線(xiàn)程。非核心線(xiàn)程執(zhí)行完畢時(shí)立馬回收。
SingleThreadExecutor內(nèi)部只有一個(gè)核心線(xiàn)程,使所有任務(wù)順序執(zhí)行。
Handler其實(shí)就是android的消息機(jī)制,也是一種簡(jiǎn)稱(chēng),它本身不是線(xiàn)程。Handler、MesageQueue、Looper三者結(jié)合來(lái)達(dá)到延時(shí)or指定線(xiàn)程執(zhí)行任務(wù)的目的。
AsyncTask封裝了線(xiàn)程池和Handler,最主要特色是方便我們?cè)谧泳€(xiàn)程中更新UI提供便利,以及在線(xiàn)程執(zhí)行的各個(gè)階段添加自己的處理。
HandlerThread繼承自Thread,封裝了Handler,所以它是Thread和Handler的結(jié)合。這樣就可以很方便的調(diào)用HandlerThread中的Handler在子線(xiàn)程中執(zhí)行任務(wù)。
IntentService繼承自Service,它擁有一個(gè)HandlerThread對(duì)象,所以它是Service、Thread和Handler的結(jié)合。簡(jiǎn)單來(lái)說(shuō)它是一個(gè)擁有HandlerThread特性的Service。
二、線(xiàn)程Thread
2.1、 線(xiàn)程的6個(gè)狀態(tài)
Thread.java里邊有這個(gè)樣一個(gè)枚舉
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
1、新建狀態(tài)(New) new一個(gè)Thred的時(shí)候
2、可運(yùn)行狀態(tài)(Runnable) 當(dāng)thread調(diào)用start(),線(xiàn)程位于可運(yùn)行池中,等待cpu使用權(quán)。因?yàn)榇藭r(shí)cpu可能被優(yōu)先級(jí)較高的線(xiàn)程占用。
3、阻塞狀態(tài)(BLOCKED) synchronize代碼塊鎖定導(dǎo)致的線(xiàn)程阻塞。暫時(shí)停止運(yùn)行。直到線(xiàn)程再次變成可運(yùn)行狀態(tài),等待cpu使用權(quán)。阻塞分為兩種
<1>、等待阻塞,運(yùn)行的線(xiàn)程synchronize代碼塊中執(zhí)行Objec.waite方法,jvm把線(xiàn)程放入等待池。
<2>、同步阻塞,運(yùn)行的線(xiàn)程synchronize在獲取對(duì)象的同步鎖時(shí),若此同步鎖被別的線(xiàn)程占用,則jvm會(huì)把線(xiàn)程放入等待池
4、等待狀態(tài)(WAITING),三種情況進(jìn)入等待狀態(tài),直至解鎖
{@link Object#wait() Object.wait} with no timeout</li>
{@link #join() Thread.join} with no timeout</li>
{@link LockSupport#park() LockSupport.park}</li>
5、有具體時(shí)間的等待狀態(tài)(TIMED_WAITING),五種情況會(huì)發(fā)生,直至解鎖
{@link #sleep Thread.sleep}</li>
{@link Object#wait(long) Object.wait} with timeout</li>
{@link #join(long) Thread.join} with timeout</li>
{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
6、結(jié)束狀態(tài)(TERMINATED),線(xiàn)程執(zhí)行完畢
2.2、線(xiàn)程調(diào)度
1、優(yōu)先級(jí),整數(shù)1~10
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
Thread.java默認(rèn)三個(gè)優(yōu)先級(jí),默認(rèn)Normal,提供setPriority()和getPriority()方法
2、線(xiàn)程睡眠,Thread.sleep()后阻塞,時(shí)間結(jié)束后轉(zhuǎn)為就緒
3、線(xiàn)程等待,Object.wait()導(dǎo)致當(dāng)前線(xiàn)程阻塞,當(dāng)別的線(xiàn)程調(diào)用調(diào)用Object.notify()/Object.notifyAll才喚醒
4、線(xiàn)程讓步,Thread.yield()暫停當(dāng)前線(xiàn)程,把機(jī)會(huì)讓給同等級(jí)or更高等級(jí)的線(xiàn)程。yield并非導(dǎo)致線(xiàn)程等待/阻塞/睡眠,只是轉(zhuǎn)為可運(yùn)行狀態(tài),只是讓出優(yōu)先處理別的。
5、線(xiàn)程加入, Thread.join(),等待線(xiàn)程終止。比如在UI線(xiàn)程添加線(xiàn)程A,A.start()后調(diào)用A.join(),主線(xiàn)程會(huì)等待A執(zhí)行結(jié)束才會(huì)繼續(xù)執(zhí)行A.join()之后的代碼。
6、線(xiàn)程喚醒,Object.notify(),如果多個(gè)線(xiàn)程在同一個(gè)object上等待,調(diào)用nofity()喚醒線(xiàn)程是他們隨機(jī)中的一個(gè)。
2.3、常用方法
Thread.sleep(): 強(qiáng)迫一個(gè)線(xiàn)程睡眠N毫秒。
Thread.isAlive(): 判斷一個(gè)線(xiàn)程是否存活。
Thread.join(): 等待線(xiàn)程終止。
Thread.activeCount(): 程序中活躍的線(xiàn)程數(shù)。
Thread.enumerate(): 枚舉程序中的線(xiàn)程。
Thread.currentThread(): 得到當(dāng)前線(xiàn)程。
Thread.isDaemon(): 一個(gè)線(xiàn)程是否為守護(hù)線(xiàn)程。
Thread.setDaemon(): 設(shè)置一個(gè)線(xiàn)程為守護(hù)線(xiàn)程。(用戶(hù)線(xiàn)程和守護(hù)線(xiàn)程的區(qū)別在于,是否等待主線(xiàn)程依賴(lài)于主線(xiàn)程結(jié)束而結(jié)束)
Thread.setName(): 為線(xiàn)程設(shè)置一個(gè)名稱(chēng)。
Thread.setPriority(): 設(shè)置一個(gè)線(xiàn)程的優(yōu)先級(jí)。
Object.wait(): 強(qiáng)迫一個(gè)線(xiàn)程等待。 和synchronized結(jié)合使用
Object.notify(): 通知一個(gè)線(xiàn)程繼續(xù)運(yùn)行。 和synchronized結(jié)合使用
Thread的具體代碼實(shí)現(xiàn),最后都直接調(diào)用到了native方法,都是JDK平臺(tái)底層的具體實(shí)現(xiàn)了。我們看看Thread.start代碼
public synchronized void start() {
if (threadStatus != 0 || started)
throw new IllegalThreadStateException();
group.add(this);
started = false;
try {
nativeCreate(this, stackSize, daemon);
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {}
}
}
private native static void nativeCreate(Thread t, long stackSize, boolean daemon);
其它很多方法就不貼代碼了,都類(lèi)似,最后都調(diào)用到了jvm具體的實(shí)現(xiàn),筆者這里就不深究了。
2.4、例子
來(lái)個(gè)waite notify的demo,經(jīng)典面試題,三個(gè)線(xiàn)程,一個(gè)線(xiàn)程只打印A,一個(gè)線(xiàn)程只打印B,一個(gè)線(xiàn)程只打印C,現(xiàn)在按照順序輸出ABC10次
public class MyThreadPrinter implements Runnable {
private String name;
private Object prev;
private Object self;
private MyThreadPrinter(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
int count = 10;
while (count > 0) {
synchronized (prev) {
synchronized (self) {
System.out.print(name);
count--;
self.notify();
}
try {
prev.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
Object c = new Object();
MyThreadPrinter pa = new MyThreadPrinter("A", c, a);
MyThreadPrinter pb = new MyThreadPrinter("B", a, b);
MyThreadPrinter pc = new MyThreadPrinter("C", b, c);
new Thread(pa).start();
Thread.sleep(100); //確保按順序A、B、C執(zhí)行
new Thread(pb).start();
Thread.sleep(100);
new Thread(pc).start();
Thread.sleep(100);
}
}
簡(jiǎn)單解釋下
1、synchronized是jvm內(nèi)置的鎖機(jī)制。本例是Object對(duì)象鎖。
2、這段代碼理解核心:假定A線(xiàn)程執(zhí)行到prev.waite時(shí),這個(gè)時(shí)候阻塞的是線(xiàn)程A;
3、因?yàn)橐婚_(kāi)始a、b、c都沒(méi)被持有,為了順序執(zhí)行所以需要sleep下,以免造成死鎖,或者錯(cuò)誤循環(huán)。
這里科普下synchronized和volatile這兩個(gè)關(guān)鍵字
synchronized提供了互斥性的語(yǔ)義和可見(jiàn)性,可以通過(guò)使用它來(lái)保證并發(fā)的安全??勺饔迷趯?duì)象,方法和代碼塊上。需要注意的是它的作用域。一類(lèi)是:作用在static的方法或者synchronized(當(dāng)前類(lèi).class)上時(shí),對(duì)所有對(duì)象有效,不管new了多少個(gè)對(duì)象,synchronized包含的內(nèi)容只能被一個(gè)線(xiàn)程持有。其它情況是:只對(duì)當(dāng)前對(duì)象有效。
volatile可以看做是一種synchronized的輕量級(jí)鎖,他能夠保證并發(fā)時(shí),被它修飾的共享變量的可見(jiàn)性。簡(jiǎn)單理解就是無(wú)論何時(shí)或者多少個(gè)線(xiàn)程讀到的變量都是最新值
三、FutureTask
很多文章里邊說(shuō)FutureTask是線(xiàn)程,其實(shí)這個(gè)說(shuō)法是錯(cuò)誤的。線(xiàn)程最后統(tǒng)一的執(zhí)行的是Runable的run方法。FutureTask實(shí)現(xiàn)了Runable的run方法,并讓run執(zhí)行過(guò)程更富有可控制性。這就是FutureTask的作用。它只是基于Runnable上的繼承和擴(kuò)展。
3.1、FutureTask的組成
public class FutureTask<V> implements RunnableFuture<V>{
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
...
}
RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1、Runnable和Callable,F(xiàn)utureTask的最大特色就是它同時(shí)實(shí)現(xiàn)這兩個(gè)接口。Runable我們很熟悉就是提供統(tǒng)一的run方法。Callable也很簡(jiǎn)單,提供統(tǒng)一的call方法,方便返回執(zhí)行后的結(jié)果。
2、Future,F(xiàn)utureTask還額外實(shí)現(xiàn)這個(gè)接口。Future提供了取消,判斷是否取消,get結(jié)果等方法。
核心run方法
public void run() {
...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
...
}
private Object outcome;
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
1、FutureTask執(zhí)行run方法就是間接調(diào)用構(gòu)造函數(shù)所帶入的Callable參數(shù)的call方法;
2、并把執(zhí)行結(jié)果用outcome保存起來(lái);
3.2、FutureTask的6個(gè)狀態(tài)
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
1、可以看到狀態(tài)用volatile關(guān)鍵字修飾,這避免了多個(gè)線(xiàn)程訪(fǎng)問(wèn)的問(wèn)題;
2、6個(gè)狀態(tài)和單詞意思差不多,這里不詳述;
2、get獲取結(jié)果的時(shí)候,也會(huì)根據(jù)狀態(tài)的不同來(lái)返回,只有normal時(shí)正常返回剛才保存的outcome;
3.3、等待隊(duì)列
3.3.1、隊(duì)列阻塞
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
等待隊(duì)列是FutureTask最后一個(gè)組成部分。當(dāng)我們調(diào)用FutureTask.get的時(shí)候,如果state還未執(zhí)行完畢,會(huì)進(jìn)入一個(gè)等待處理狀態(tài),或者阻塞。直至重新喚醒處理。核心方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
狀態(tài)雖多,但處理不復(fù)雜。
首先它是一個(gè)無(wú)限循環(huán)直至處理掉或者線(xiàn)程阻塞
1、創(chuàng)建WaitNode,然后用LockSupport來(lái)把當(dāng)前線(xiàn)程鎖住
2、COMPLETING,線(xiàn)程讓步。>COMPLETING,直接返回狀態(tài)。
3、線(xiàn)程中斷,移除隊(duì)列返回InterruptedException
4、!queued,放入隊(duì)列
5、阻塞 LockSupport.park/ LockSupport.parkNanos
3.3.2、隊(duì)列喚醒
在run執(zhí)行完畢后,set結(jié)果的時(shí)候,會(huì)調(diào)用finishCompletion();方法。就是在這里調(diào)用喚醒的。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
1、一個(gè)for (;;)喚醒隊(duì)列里所有的線(xiàn)程;
2、完成后調(diào)用可擴(kuò)展方法done;
3.4、例子
FutureTask f = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
for (int i = 10;i>0;i--) {
Thread.sleep(1000);
Log.d("yink","i = " + i + " time = " + System.currentTimeMillis());
}
return "result";
}
});
new Thread(f).start();
try {
Log.d("yink","f.get() time =" + System.currentTimeMillis());
String result = (String) f.get();
Log.d("yink","f.get() result = " + result + " time = " + System.currentTimeMillis());
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
輸出如下,省略幾個(gè)倒計(jì)時(shí)打?。?/p>
D/yink: f.get() time =1486663241
D/yink: i = 10 time = 1486664242
D/yink: i = 9 time = 1486665246
...
D/yink: i = 1 time = 1486673257
D/yink: f.get() result = result time = 1486673258
1、例子很簡(jiǎn)單延時(shí)輸出打印
2、當(dāng)調(diào)用get時(shí),調(diào)用get的主線(xiàn)程阻塞,直至run運(yùn)行結(jié)束
3、由于代碼很簡(jiǎn)單,主線(xiàn)程沒(méi)有別的操作,所以這里沒(méi)報(bào)錯(cuò)。實(shí)際這樣寫(xiě)很容易ANR。比如點(diǎn)擊5秒無(wú)響應(yīng),廣播超時(shí)等等。故意寫(xiě)這么個(gè)例子就是希望讀者理解這里是哪個(gè)線(xiàn)程阻塞??紤]阻塞會(huì)不會(huì)帶來(lái)別的問(wèn)題。
四、線(xiàn)程池
擼完線(xiàn)程,我們來(lái)擼線(xiàn)程池。線(xiàn)程池的作用很明顯了,當(dāng)我們頻繁的創(chuàng)建銷(xiāo)毀線(xiàn)程,開(kāi)銷(xiāo)是很大的。線(xiàn)程池可以有效的避免重復(fù)創(chuàng)建,合理利用cpu資源。新任務(wù)也能最快響應(yīng)而省去創(chuàng)建線(xiàn)程的時(shí)間。統(tǒng)一管理合理分配資源。下圖是線(xiàn)程池的類(lèi)結(jié)構(gòu),線(xiàn)程池的核心就是ThreadPoolExecutor了。

圖中ScheduledThreadPoolExecutor是在ThreadPoolExecutor基礎(chǔ)上實(shí)現(xiàn)的,我們可以先不看。先理解ThreadPoolExecutor。
4.1、ThreadPoolExecutor
4.1.1、構(gòu)造關(guān)系
public class ThreadPoolExecutor extends AbstractExecutorService { ... }
public abstract class AbstractExecutorService implements ExecutorService { ... }
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable command);
}
1、ThreadPoolExecutor集成抽象類(lèi)AbstractExecutorService,抽象類(lèi)實(shí)現(xiàn)ExecutorService接口,ExecutorService繼承自Executor接口;
2、接口Executor定義了最基本的execute執(zhí)行任務(wù)的方法
3、接口ExecutorService額外定義了shutdown等一系列操作任務(wù)的方法
4、抽象類(lèi)AbstractExecutorService提供了newTaskFor、submit、doInvokeAny、invokeAny、cancelAll,實(shí)現(xiàn)了部分邏輯和方法。
5、ThreadPoolExecutor則是線(xiàn)程池的具體實(shí)現(xiàn)。
ThreadPoolExecutor的具體實(shí)現(xiàn)后邊詳細(xì)描述。關(guān)于AbstractExecutorService這里詳解最復(fù)雜的doInvokeAny方法。doInvokeAny為線(xiàn)程池提供了一個(gè)執(zhí)行一個(gè)Callable集合的方法,執(zhí)行集合內(nèi)任務(wù)時(shí),只要有一個(gè)任務(wù)執(zhí)行完畢且有返回結(jié)果。就結(jié)束所有任務(wù)。
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// 集合大小
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// 創(chuàng)建和集合大小一樣的Future(任務(wù)結(jié)果)的集合LIst
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
// 它的作用就是額外提供一個(gè)BlockingQueue<Future<V>>隊(duì)列,來(lái)記錄任務(wù)執(zhí)行完畢后的Future
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 迭代器,方便遍歷取出下一個(gè)元素
Iterator<? extends Callable<T>> it = tasks.iterator();
// 取出一個(gè)任務(wù),并用ExecutorCompletionService調(diào)用submit開(kāi)始執(zhí)行任務(wù)。
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
// 無(wú)限循環(huán)
for (;;) {
// ExecutorCompletionService取出隊(duì)列第一個(gè)數(shù)據(jù)
Future<T> f = ecs.poll();
if (f == null) {
// 取出的執(zhí)行結(jié)果為空,且任務(wù)結(jié)合還有任務(wù)就繼續(xù)用sbmit提交執(zhí)行任務(wù)
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
// 取出的結(jié)果不為空,說(shuō)明有執(zhí)行結(jié)果了。返回取出的結(jié)果。
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 最后取消執(zhí)行所有任務(wù)。
cancelAll(futures);
}
}
1、代碼中的ExecutorCompletionService封裝了一下FutureTask任務(wù),提供了一個(gè)保存Future的隊(duì)列。只要執(zhí)行完任務(wù)就把結(jié)果添加到隊(duì)列里。下面是ExecutorCompletionService的部分代碼
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); }
}
上面doInvokeAny在new ExecutorCompletionService<T>(this);的時(shí)候,帶入線(xiàn)程池本身的Executor。所以doInvokeAny就達(dá)到了為線(xiàn)程池添加這樣一個(gè)方法的目的。
2、關(guān)鍵理解點(diǎn)就是當(dāng)submit提交第一個(gè)任務(wù)后,只有任務(wù)執(zhí)行完畢才有可能返回結(jié)果。
3、假設(shè)任務(wù)執(zhí)行時(shí)間較長(zhǎng),poll方法刪除隊(duì)列第一個(gè)元素。因?yàn)槿蝿?wù)沒(méi)有執(zhí)行完畢,所以隊(duì)列沒(méi)有元素,poll出來(lái)的是null,所以代碼循環(huán)就會(huì)立即提交執(zhí)行下一個(gè)任務(wù)。直至所有的任務(wù)都提交執(zhí)行。
4、當(dāng)有任務(wù)執(zhí)行完畢了,這時(shí)任務(wù)可能有執(zhí)行結(jié)果,也可能沒(méi)有執(zhí)行結(jié)果。沒(méi)有結(jié)果的時(shí)候poll刪除的Future結(jié)果本來(lái)就是null,所以不影響。有結(jié)果的時(shí)候poll出來(lái)的結(jié)果就就return返回。
5、finally最后取消執(zhí)行所有任務(wù)
4.1.2、ThreadPoolExecutor構(gòu)造函數(shù)
從構(gòu)造函數(shù)開(kāi)始認(rèn)識(shí)ThreadPoolExecutor,代碼如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1、corePoolSize 核心線(xiàn)程數(shù),核心線(xiàn)程數(shù)還沒(méi)到corePoolSize時(shí),即使有空閑線(xiàn)程,新任務(wù)也會(huì)創(chuàng)建新線(xiàn)程;
2、maximumPoolSize最大線(xiàn)程數(shù);
3、keepAliveTime非核心線(xiàn)程存活時(shí)間,即非核心線(xiàn)程執(zhí)行完畢后不會(huì)立即銷(xiāo)毀,直至?xí)r間到達(dá);
4、TimeUnit枚舉時(shí)間單位;
5、BlockingQueue隊(duì)列;
6、ThreadFactory線(xiàn)程工廠(chǎng),用于線(xiàn)程的創(chuàng)建;
7、RejectedExecutionHandler這個(gè)接口用來(lái)處理這種情況:添加任務(wù)失敗時(shí),就靠handler來(lái)處理。四種處理策略,也就是四種接口實(shí)現(xiàn),
AbortPolicy 默認(rèn)拋出異常
CallerRunsPolicy用調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行任務(wù)
DiscardOldestPolicy丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)
DiscardPolicy直接丟棄任務(wù)
4.1.3、線(xiàn)程池的狀態(tài)
對(duì)線(xiàn)程池大概認(rèn)識(shí)后,我們來(lái)看它在運(yùn)行時(shí)的幾個(gè)狀態(tài)。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大數(shù)量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 獲取線(xiàn)程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取線(xiàn)程池?cái)?shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 組裝狀態(tài)和數(shù)量,返回ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
1、RUNNING運(yùn)行狀態(tài),創(chuàng)建時(shí)的狀態(tài);
2、SHUTDOWN停工狀態(tài),不接收新任務(wù),已接收的任務(wù)會(huì)繼續(xù)執(zhí)行;
3、STOP停止?fàn)顟B(tài),不接收新任務(wù),接收的和正在執(zhí)行的也會(huì)中斷;
4、TIDYING清空狀態(tài),所有任務(wù)都停止了,工作線(xiàn)程也結(jié)束了;
5、TERMINATED終止?fàn)顟B(tài),線(xiàn)程池已銷(xiāo)毀;
AtomicInteger是個(gè)int型變量,它的高三位用來(lái)表示狀態(tài),剩下的29位用來(lái)表示數(shù)量
4.1.4、提交任務(wù)
提交任務(wù)有兩個(gè)方法,一個(gè)是Executor.execute,一個(gè)是AbstractExecutorService.submit
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result){ ... }
public Future<?> submit(Runnable task) { ... }
submit的方法都類(lèi)似、都是封裝成FutureTask以提交給execute方法。所以我們接下來(lái)看execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//檢查線(xiàn)程池是否是運(yùn)行狀態(tài),并將任務(wù)添加到等待隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果線(xiàn)程池不是運(yùn)行狀態(tài),則將剛添加的任務(wù)從隊(duì)列移除并執(zhí)行拒絕任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作線(xiàn)程為0,先添加一個(gè)worker線(xiàn)程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 線(xiàn)程池不是運(yùn)行狀態(tài)會(huì)添加失敗,就會(huì)執(zhí)行reject,走拒絕任務(wù)的處理任務(wù)
else if (!addWorker(command, false))
reject(command);
}
添加的邏輯不復(fù)雜,最后邏輯走向兩個(gè)分支,一個(gè)是addWorker添加線(xiàn)程,一個(gè)reject拒絕任務(wù)處理邏輯先看reject,默認(rèn)是AbortPolicy拒絕任務(wù)的策略,這個(gè)策略處理結(jié)果是直接拋出rejectedExecution
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
4.1.5、內(nèi)部類(lèi)Worker
接著上面addWorker分析之前,我們先認(rèn)識(shí)Worker。它其實(shí)就是線(xiàn)程池管理內(nèi)部任務(wù)的最小單位,線(xiàn)程池就是維護(hù)的一組Worker。上代碼
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
1、AbstractQueuedSynchronizer簡(jiǎn)稱(chēng)AQS是一個(gè)抽象同步框架,可以用來(lái)實(shí)現(xiàn)一個(gè)依賴(lài)狀態(tài)的同步器。可以簡(jiǎn)單理解為控制線(xiàn)程獲取一個(gè)統(tǒng)一volatile類(lèi)型的state變量的Synchronized工具。即同步器
2、構(gòu)造函數(shù)創(chuàng)建一個(gè)Thread,和獲得Runable這兩個(gè)核心組件
3、剩下幾個(gè)常規(guī)鎖操作方法。由AQS來(lái)實(shí)現(xiàn)。
于是簡(jiǎn)單總結(jié)Worker就是一個(gè)同步器+Thread+Runable的結(jié)合
4.1.6、addWorker
接著分析線(xiàn)程池的execute方法中addWorker的代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代碼大概三部分
1、先判斷狀態(tài),如果大于等于SHUTDOWN不執(zhí)行,返回false
2、然后判斷線(xiàn)程數(shù)量是否超過(guò)核心線(xiàn)程數(shù),或者最大數(shù)。沒(méi)有超過(guò)跳出循環(huán)走到下半部分代碼創(chuàng)建新的worker。
3、在mainLock保護(hù)下,創(chuàng)建worker后加入HasSet容器。并啟動(dòng)t.start();
根據(jù)上面的worker類(lèi)的代碼可知,t.start調(diào)用的是Worker自身的run方法。所以實(shí)際調(diào)用到了線(xiàn)程池的runWorker方法
4.1.7、runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
1、task.run();之前會(huì)檢查中斷or停止。在run前后可以添加自定義的處理beforExecute(),afterExcute
2、task = getTask()會(huì)取出BlockingQueue隊(duì)列(workQueue)中的任務(wù)來(lái)執(zhí)行。
4.1.7、ThreadPoolExecutor小結(jié)
這里先對(duì)ThreadPoolExecutor做一個(gè)小小的總結(jié)。整理一下思緒
1、ThreadPoolExecutor主要就是構(gòu)造函數(shù)那幾個(gè)參數(shù)來(lái)組成它的功能。記住worker管理線(xiàn)程來(lái)執(zhí)行workerQueue中的task任務(wù)。然后有核心線(xiàn)程數(shù)、最大線(xiàn)程數(shù)。添加失敗用RejectedExecutionHandler處理失敗邏輯,非核心線(xiàn)程活躍時(shí)間keepAliveTime
2、提交任務(wù)都是最后走到execute來(lái)提交,沒(méi)有達(dá)到核心線(xiàn)程數(shù)量,直接走addWorker來(lái)創(chuàng)建worker來(lái)工作。woker保存在HashSet集合里。添加任務(wù)時(shí),如果添加到workQueue隊(duì)列失敗會(huì)觸發(fā)創(chuàng)建非核心線(xiàn)程。如果線(xiàn)程池是running狀態(tài)但工作線(xiàn)程為0,也會(huì)直接觸發(fā)先創(chuàng)建一個(gè)非核心線(xiàn)程來(lái)執(zhí)行
3、addWorker添加線(xiàn)程,保存在HashSet<Worker> workers里,并立馬執(zhí)行
4、runWorker通過(guò)重復(fù)取出隊(duì)列里的task = getTask(),來(lái)達(dá)到一直執(zhí)行知道執(zhí)行完畢的目的
5、getTask沒(méi)有任務(wù)的時(shí)候會(huì)阻塞并掛起,不會(huì)消耗cpu資源。這樣worker就等于一直在等任務(wù)隊(duì)列workerQueu隊(duì)列有新的任務(wù)進(jìn)來(lái)。進(jìn)來(lái)就執(zhí)行
6、至于FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor只是線(xiàn)程池創(chuàng)建時(shí)指定了不同的參數(shù),通過(guò)java.util.concurrent.Executors的靜態(tài)方法創(chuàng)建,就不詳述了。
7、流程圖里的ScheduledThreadPoolExecutor是JDK1.5開(kāi)始提供的來(lái)支持周期性的任務(wù)調(diào)度。在ThreadPoolExecutor基礎(chǔ)上實(shí)現(xiàn)。多了一個(gè)堆結(jié)構(gòu)隊(duì)列來(lái)管理。有興趣的讀者可以自行分析。
五、寫(xiě)在組后
好了,線(xiàn)程和線(xiàn)程池的知識(shí)點(diǎn)講到這里也差不多了。線(xiàn)程線(xiàn)程池的基本原理我想你也應(yīng)該很清楚了。希望本文能對(duì)你有所幫助。
Read the fucking source code!