Android 線(xiàn)程&線(xiàn)程池

一、引言

我們都知道線(xiàn)程和線(xiàn)程池是Android開(kāi)發(fā)中很重要的一個(gè)部分。本文會(huì)從Java線(xiàn)程談起,由淺及深總結(jié)在Android中線(xiàn)程和線(xiàn)程池的使用。
java:
線(xiàn)程相關(guān):Thread、FutureTask;(Runalbe、Callable)
線(xiàn)程池:ThreadPoolExecutor;
以下四個(gè)是在ThreadPoolExecutor基礎(chǔ)上實(shí)現(xiàn)的
FixedThreadPool
CachedThreadPool
ScheduledThreadPool
SingleThreadExecutor
Android:
Android除開(kāi)java基本的線(xiàn)程和線(xiàn)程池,額外提供如下輔助工具。
AsyncTask、HandlerThread、IntentService(他們?nèi)齻€(gè)本質(zhì)是Thread/Handler/ThreadPoolExecutor來(lái)實(shí)現(xiàn))、Handler

1.1、簡(jiǎn)單介紹

Thread是和Runnable結(jié)合實(shí)現(xiàn)線(xiàn)程、Runnable是個(gè)接口,當(dāng)mThread.start()時(shí),執(zhí)行Runable中run()方法所寫(xiě)代碼塊。
FutureTask支持Runable和Callable兩種接口。FutureTask產(chǎn)生是因?yàn)镽unable.run方法只是一個(gè)單獨(dú)的方法。而Callable接口的call方法可以返回結(jié)果和拋出異常。FutureTask等于是Runable的繼承和擴(kuò)展。

ThreadPoolExecutor線(xiàn)程池的基本實(shí)現(xiàn),統(tǒng)一管理多個(gè)線(xiàn)程、線(xiàn)程池中每個(gè)線(xiàn)程執(zhí)行完畢不會(huì)立即銷(xiāo)毀,等待下一個(gè)任務(wù)以達(dá)到不會(huì)頻繁創(chuàng)建銷(xiāo)毀的目的,以避免資源浪費(fèi),GC等
FixedThreadPool只有核心線(xiàn)程,構(gòu)造參數(shù)設(shè)定核心線(xiàn)程數(shù),沒(méi)有超時(shí)機(jī)制且排隊(duì)任務(wù)隊(duì)列無(wú)限制,因?yàn)槿际呛诵木€(xiàn)程,所以響應(yīng)較快,且不用擔(dān)心線(xiàn)程會(huì)被回收。
CachedThreadPool只有非核心線(xiàn)程,數(shù)量無(wú)限,當(dāng)有新任務(wù)來(lái)時(shí),若沒(méi)有空閑線(xiàn)程直接創(chuàng)建新線(xiàn)程執(zhí)行任務(wù)??臻e60s直接銷(xiāo)毀
ScheduledThreadPool含構(gòu)固定數(shù)量核心線(xiàn)程,和無(wú)限量非核心線(xiàn)程。非核心線(xiàn)程執(zhí)行完畢時(shí)立馬回收。
SingleThreadExecutor內(nèi)部只有一個(gè)核心線(xiàn)程,使所有任務(wù)順序執(zhí)行。

Handler其實(shí)就是android的消息機(jī)制,也是一種簡(jiǎn)稱(chēng),它本身不是線(xiàn)程。Handler、MesageQueue、Looper三者結(jié)合來(lái)達(dá)到延時(shí)or指定線(xiàn)程執(zhí)行任務(wù)的目的。
AsyncTask封裝了線(xiàn)程池和Handler,最主要特色是方便我們?cè)谧泳€(xiàn)程中更新UI提供便利,以及在線(xiàn)程執(zhí)行的各個(gè)階段添加自己的處理。
HandlerThread繼承自Thread,封裝了Handler,所以它是Thread和Handler的結(jié)合。這樣就可以很方便的調(diào)用HandlerThread中的Handler在子線(xiàn)程中執(zhí)行任務(wù)。
IntentService繼承自Service,它擁有一個(gè)HandlerThread對(duì)象,所以它是Service、Thread和Handler的結(jié)合。簡(jiǎn)單來(lái)說(shuō)它是一個(gè)擁有HandlerThread特性的Service。

二、線(xiàn)程Thread

2.1、 線(xiàn)程的6個(gè)狀態(tài)

Thread.java里邊有這個(gè)樣一個(gè)枚舉

public enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;
    }

1、新建狀態(tài)(New) new一個(gè)Thred的時(shí)候
2、可運(yùn)行狀態(tài)(Runnable) 當(dāng)thread調(diào)用start(),線(xiàn)程位于可運(yùn)行池中,等待cpu使用權(quán)。因?yàn)榇藭r(shí)cpu可能被優(yōu)先級(jí)較高的線(xiàn)程占用。
3、阻塞狀態(tài)(BLOCKED) synchronize代碼塊鎖定導(dǎo)致的線(xiàn)程阻塞。暫時(shí)停止運(yùn)行。直到線(xiàn)程再次變成可運(yùn)行狀態(tài),等待cpu使用權(quán)。阻塞分為兩種
<1>、等待阻塞,運(yùn)行的線(xiàn)程synchronize代碼塊中執(zhí)行Objec.waite方法,jvm把線(xiàn)程放入等待池。
<2>、同步阻塞,運(yùn)行的線(xiàn)程synchronize在獲取對(duì)象的同步鎖時(shí),若此同步鎖被別的線(xiàn)程占用,則jvm會(huì)把線(xiàn)程放入等待池
4、等待狀態(tài)(WAITING),三種情況進(jìn)入等待狀態(tài),直至解鎖

{@link Object#wait() Object.wait} with no timeout</li>
{@link #join() Thread.join} with no timeout</li>
{@link LockSupport#park() LockSupport.park}</li>

5、有具體時(shí)間的等待狀態(tài)(TIMED_WAITING),五種情況會(huì)發(fā)生,直至解鎖

{@link #sleep Thread.sleep}</li>
{@link Object#wait(long) Object.wait} with timeout</li>
{@link #join(long) Thread.join} with timeout</li>
{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
{@link LockSupport#parkUntil LockSupport.parkUntil}</li>

6、結(jié)束狀態(tài)(TERMINATED),線(xiàn)程執(zhí)行完畢

2.2、線(xiàn)程調(diào)度

1、優(yōu)先級(jí),整數(shù)1~10

    public final static int MIN_PRIORITY = 1;
    public final static int NORM_PRIORITY = 5;
    public final static int MAX_PRIORITY = 10;

Thread.java默認(rèn)三個(gè)優(yōu)先級(jí),默認(rèn)Normal,提供setPriority()和getPriority()方法
2、線(xiàn)程睡眠,Thread.sleep()后阻塞,時(shí)間結(jié)束后轉(zhuǎn)為就緒
3、線(xiàn)程等待,Object.wait()導(dǎo)致當(dāng)前線(xiàn)程阻塞,當(dāng)別的線(xiàn)程調(diào)用調(diào)用Object.notify()/Object.notifyAll才喚醒
4、線(xiàn)程讓步,Thread.yield()暫停當(dāng)前線(xiàn)程,把機(jī)會(huì)讓給同等級(jí)or更高等級(jí)的線(xiàn)程。yield并非導(dǎo)致線(xiàn)程等待/阻塞/睡眠,只是轉(zhuǎn)為可運(yùn)行狀態(tài),只是讓出優(yōu)先處理別的。
5、線(xiàn)程加入, Thread.join(),等待線(xiàn)程終止。比如在UI線(xiàn)程添加線(xiàn)程A,A.start()后調(diào)用A.join(),主線(xiàn)程會(huì)等待A執(zhí)行結(jié)束才會(huì)繼續(xù)執(zhí)行A.join()之后的代碼。
6、線(xiàn)程喚醒,Object.notify(),如果多個(gè)線(xiàn)程在同一個(gè)object上等待,調(diào)用nofity()喚醒線(xiàn)程是他們隨機(jī)中的一個(gè)。

2.3、常用方法

Thread.sleep(): 強(qiáng)迫一個(gè)線(xiàn)程睡眠N毫秒。 
Thread.isAlive(): 判斷一個(gè)線(xiàn)程是否存活。 
Thread.join(): 等待線(xiàn)程終止。 
Thread.activeCount(): 程序中活躍的線(xiàn)程數(shù)。 
Thread.enumerate(): 枚舉程序中的線(xiàn)程。 
Thread.currentThread(): 得到當(dāng)前線(xiàn)程。 
Thread.isDaemon(): 一個(gè)線(xiàn)程是否為守護(hù)線(xiàn)程。 
Thread.setDaemon(): 設(shè)置一個(gè)線(xiàn)程為守護(hù)線(xiàn)程。(用戶(hù)線(xiàn)程和守護(hù)線(xiàn)程的區(qū)別在于,是否等待主線(xiàn)程依賴(lài)于主線(xiàn)程結(jié)束而結(jié)束) 
Thread.setName(): 為線(xiàn)程設(shè)置一個(gè)名稱(chēng)。 
Thread.setPriority(): 設(shè)置一個(gè)線(xiàn)程的優(yōu)先級(jí)。
Object.wait(): 強(qiáng)迫一個(gè)線(xiàn)程等待。 和synchronized結(jié)合使用
Object.notify(): 通知一個(gè)線(xiàn)程繼續(xù)運(yùn)行。 和synchronized結(jié)合使用

Thread的具體代碼實(shí)現(xiàn),最后都直接調(diào)用到了native方法,都是JDK平臺(tái)底層的具體實(shí)現(xiàn)了。我們看看Thread.start代碼

public synchronized void start() {
        if (threadStatus != 0 || started)
            throw new IllegalThreadStateException();
        group.add(this);
        started = false;
        try {
            nativeCreate(this, stackSize, daemon);
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {}
        }
    }

private native static void nativeCreate(Thread t, long stackSize, boolean daemon);

其它很多方法就不貼代碼了,都類(lèi)似,最后都調(diào)用到了jvm具體的實(shí)現(xiàn),筆者這里就不深究了。

2.4、例子

來(lái)個(gè)waite notify的demo,經(jīng)典面試題,三個(gè)線(xiàn)程,一個(gè)線(xiàn)程只打印A,一個(gè)線(xiàn)程只打印B,一個(gè)線(xiàn)程只打印C,現(xiàn)在按照順序輸出ABC10次

public class MyThreadPrinter implements Runnable {   
      
    private String name;   
    private Object prev;   
    private Object self;   
  
    private MyThreadPrinter(String name, Object prev, Object self) {   
        this.name = name;   
        this.prev = prev;   
        this.self = self;   
    }   
  
    @Override  
    public void run() {   
        int count = 10;   
        while (count > 0) {   
            synchronized (prev) {   
                synchronized (self) {   
                    System.out.print(name);   
                    count--;  
                    
                    self.notify();   
                }   
                try {   
                    prev.wait();   
                } catch (InterruptedException e) {   
                    e.printStackTrace();   
                }   
            }   
  
        }   
    }   
  
    public static void main(String[] args) throws Exception {   
        Object a = new Object();   
        Object b = new Object();   
        Object c = new Object();   
        MyThreadPrinter pa = new MyThreadPrinter("A", c, a);   
        MyThreadPrinter pb = new MyThreadPrinter("B", a, b);   
        MyThreadPrinter pc = new MyThreadPrinter("C", b, c);   
           
        new Thread(pa).start();
        Thread.sleep(100);  //確保按順序A、B、C執(zhí)行
        new Thread(pb).start();
        Thread.sleep(100);  
        new Thread(pc).start();   
        Thread.sleep(100);  
        }   
}  

簡(jiǎn)單解釋下
1、synchronized是jvm內(nèi)置的鎖機(jī)制。本例是Object對(duì)象鎖。
2、這段代碼理解核心:假定A線(xiàn)程執(zhí)行到prev.waite時(shí),這個(gè)時(shí)候阻塞的是線(xiàn)程A;
3、因?yàn)橐婚_(kāi)始a、b、c都沒(méi)被持有,為了順序執(zhí)行所以需要sleep下,以免造成死鎖,或者錯(cuò)誤循環(huán)。

這里科普下synchronized和volatile這兩個(gè)關(guān)鍵字
synchronized提供了互斥性的語(yǔ)義和可見(jiàn)性,可以通過(guò)使用它來(lái)保證并發(fā)的安全??勺饔迷趯?duì)象,方法和代碼塊上。需要注意的是它的作用域。一類(lèi)是:作用在static的方法或者synchronized(當(dāng)前類(lèi).class)上時(shí),對(duì)所有對(duì)象有效,不管new了多少個(gè)對(duì)象,synchronized包含的內(nèi)容只能被一個(gè)線(xiàn)程持有。其它情況是:只對(duì)當(dāng)前對(duì)象有效。
volatile可以看做是一種synchronized的輕量級(jí)鎖,他能夠保證并發(fā)時(shí),被它修飾的共享變量的可見(jiàn)性。簡(jiǎn)單理解就是無(wú)論何時(shí)或者多少個(gè)線(xiàn)程讀到的變量都是最新值

三、FutureTask

很多文章里邊說(shuō)FutureTask是線(xiàn)程,其實(shí)這個(gè)說(shuō)法是錯(cuò)誤的。線(xiàn)程最后統(tǒng)一的執(zhí)行的是Runable的run方法。FutureTask實(shí)現(xiàn)了Runable的run方法,并讓run執(zhí)行過(guò)程更富有可控制性。這就是FutureTask的作用。它只是基于Runnable上的繼承和擴(kuò)展。

3.1、FutureTask的組成

public class FutureTask<V> implements RunnableFuture<V>{
  public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    ...
}

RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

public interface Runnable {
    public abstract void run();
}

public interface Callable<V> {
    V call() throws Exception;
}

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

1、Runnable和Callable,F(xiàn)utureTask的最大特色就是它同時(shí)實(shí)現(xiàn)這兩個(gè)接口。Runable我們很熟悉就是提供統(tǒng)一的run方法。Callable也很簡(jiǎn)單,提供統(tǒng)一的call方法,方便返回執(zhí)行后的結(jié)果。
2、Future,F(xiàn)utureTask還額外實(shí)現(xiàn)這個(gè)接口。Future提供了取消,判斷是否取消,get結(jié)果等方法。

核心run方法

public void run() {
        ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
      ...
    }

private Object outcome;
protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

1、FutureTask執(zhí)行run方法就是間接調(diào)用構(gòu)造函數(shù)所帶入的Callable參數(shù)的call方法;
2、并把執(zhí)行結(jié)果用outcome保存起來(lái);

3.2、FutureTask的6個(gè)狀態(tài)

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

1、可以看到狀態(tài)用volatile關(guān)鍵字修飾,這避免了多個(gè)線(xiàn)程訪(fǎng)問(wèn)的問(wèn)題;
2、6個(gè)狀態(tài)和單詞意思差不多,這里不詳述;
2、get獲取結(jié)果的時(shí)候,也會(huì)根據(jù)狀態(tài)的不同來(lái)返回,只有normal時(shí)正常返回剛才保存的outcome;

3.3、等待隊(duì)列

3.3.1、隊(duì)列阻塞

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

等待隊(duì)列是FutureTask最后一個(gè)組成部分。當(dāng)我們調(diào)用FutureTask.get的時(shí)候,如果state還未執(zhí)行完畢,會(huì)進(jìn)入一個(gè)等待處理狀態(tài),或者阻塞。直至重新喚醒處理。核心方法:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

狀態(tài)雖多,但處理不復(fù)雜。
首先它是一個(gè)無(wú)限循環(huán)直至處理掉或者線(xiàn)程阻塞
1、創(chuàng)建WaitNode,然后用LockSupport來(lái)把當(dāng)前線(xiàn)程鎖住
2、COMPLETING,線(xiàn)程讓步。>COMPLETING,直接返回狀態(tài)。
3、線(xiàn)程中斷,移除隊(duì)列返回InterruptedException
4、!queued,放入隊(duì)列
5、阻塞 LockSupport.park/ LockSupport.parkNanos

3.3.2、隊(duì)列喚醒

在run執(zhí)行完畢后,set結(jié)果的時(shí)候,會(huì)調(diào)用finishCompletion();方法。就是在這里調(diào)用喚醒的。

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

1、一個(gè)for (;;)喚醒隊(duì)列里所有的線(xiàn)程;
2、完成后調(diào)用可擴(kuò)展方法done;

3.4、例子

FutureTask f = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                for (int i = 10;i>0;i--) {
                    Thread.sleep(1000);
                    Log.d("yink","i = " + i + "  time = " + System.currentTimeMillis());
                }
                return "result";
            }
        });
        new Thread(f).start();
        try {
            Log.d("yink","f.get() time =" + System.currentTimeMillis());
            String result = (String) f.get();
            Log.d("yink","f.get() result = " + result + "  time = " + System.currentTimeMillis());
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

輸出如下,省略幾個(gè)倒計(jì)時(shí)打?。?/p>

D/yink: f.get() time =1486663241
D/yink: i = 10  time = 1486664242
D/yink: i = 9  time = 1486665246
...
D/yink: i = 1  time = 1486673257
D/yink: f.get() result = result  time = 1486673258

1、例子很簡(jiǎn)單延時(shí)輸出打印
2、當(dāng)調(diào)用get時(shí),調(diào)用get的主線(xiàn)程阻塞,直至run運(yùn)行結(jié)束
3、由于代碼很簡(jiǎn)單,主線(xiàn)程沒(méi)有別的操作,所以這里沒(méi)報(bào)錯(cuò)。實(shí)際這樣寫(xiě)很容易ANR。比如點(diǎn)擊5秒無(wú)響應(yīng),廣播超時(shí)等等。故意寫(xiě)這么個(gè)例子就是希望讀者理解這里是哪個(gè)線(xiàn)程阻塞??紤]阻塞會(huì)不會(huì)帶來(lái)別的問(wèn)題。

四、線(xiàn)程池

擼完線(xiàn)程,我們來(lái)擼線(xiàn)程池。線(xiàn)程池的作用很明顯了,當(dāng)我們頻繁的創(chuàng)建銷(xiāo)毀線(xiàn)程,開(kāi)銷(xiāo)是很大的。線(xiàn)程池可以有效的避免重復(fù)創(chuàng)建,合理利用cpu資源。新任務(wù)也能最快響應(yīng)而省去創(chuàng)建線(xiàn)程的時(shí)間。統(tǒng)一管理合理分配資源。下圖是線(xiàn)程池的類(lèi)結(jié)構(gòu),線(xiàn)程池的核心就是ThreadPoolExecutor了。


線(xiàn)程池類(lèi)圖

圖中ScheduledThreadPoolExecutor是在ThreadPoolExecutor基礎(chǔ)上實(shí)現(xiàn)的,我們可以先不看。先理解ThreadPoolExecutor。

4.1、ThreadPoolExecutor

4.1.1、構(gòu)造關(guān)系

public class ThreadPoolExecutor extends AbstractExecutorService { ... }

public abstract class AbstractExecutorService implements ExecutorService { ... }

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

public interface Executor {
    void execute(Runnable command);
}

1、ThreadPoolExecutor集成抽象類(lèi)AbstractExecutorService,抽象類(lèi)實(shí)現(xiàn)ExecutorService接口,ExecutorService繼承自Executor接口;
2、接口Executor定義了最基本的execute執(zhí)行任務(wù)的方法
3、接口ExecutorService額外定義了shutdown等一系列操作任務(wù)的方法
4、抽象類(lèi)AbstractExecutorService提供了newTaskFor、submit、doInvokeAny、invokeAny、cancelAll,實(shí)現(xiàn)了部分邏輯和方法。
5、ThreadPoolExecutor則是線(xiàn)程池的具體實(shí)現(xiàn)。
ThreadPoolExecutor的具體實(shí)現(xiàn)后邊詳細(xì)描述。關(guān)于AbstractExecutorService這里詳解最復(fù)雜的doInvokeAny方法。doInvokeAny為線(xiàn)程池提供了一個(gè)執(zhí)行一個(gè)Callable集合的方法,執(zhí)行集合內(nèi)任務(wù)時(shí),只要有一個(gè)任務(wù)執(zhí)行完畢且有返回結(jié)果。就結(jié)束所有任務(wù)。

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        // 集合大小
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 創(chuàng)建和集合大小一樣的Future(任務(wù)結(jié)果)的集合LIst
        ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
        
        // 它的作用就是額外提供一個(gè)BlockingQueue<Future<V>>隊(duì)列,來(lái)記錄任務(wù)執(zhí)行完畢后的Future
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 迭代器,方便遍歷取出下一個(gè)元素
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 取出一個(gè)任務(wù),并用ExecutorCompletionService調(diào)用submit開(kāi)始執(zhí)行任務(wù)。
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;
            // 無(wú)限循環(huán)
            for (;;) {
                // ExecutorCompletionService取出隊(duì)列第一個(gè)數(shù)據(jù)
                Future<T> f = ecs.poll();
                if (f == null) {
                    // 取出的執(zhí)行結(jié)果為空,且任務(wù)結(jié)合還有任務(wù)就繼續(xù)用sbmit提交執(zhí)行任務(wù)
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                       // 取出的結(jié)果不為空,說(shuō)明有執(zhí)行結(jié)果了。返回取出的結(jié)果。
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            // 最后取消執(zhí)行所有任務(wù)。
            cancelAll(futures);
        }
    }

1、代碼中的ExecutorCompletionService封裝了一下FutureTask任務(wù),提供了一個(gè)保存Future的隊(duì)列。只要執(zhí)行完任務(wù)就把結(jié)果添加到隊(duì)列里。下面是ExecutorCompletionService的部分代碼

public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }
private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

上面doInvokeAny在new ExecutorCompletionService<T>(this);的時(shí)候,帶入線(xiàn)程池本身的Executor。所以doInvokeAny就達(dá)到了為線(xiàn)程池添加這樣一個(gè)方法的目的。
2、關(guān)鍵理解點(diǎn)就是當(dāng)submit提交第一個(gè)任務(wù)后,只有任務(wù)執(zhí)行完畢才有可能返回結(jié)果。
3、假設(shè)任務(wù)執(zhí)行時(shí)間較長(zhǎng),poll方法刪除隊(duì)列第一個(gè)元素。因?yàn)槿蝿?wù)沒(méi)有執(zhí)行完畢,所以隊(duì)列沒(méi)有元素,poll出來(lái)的是null,所以代碼循環(huán)就會(huì)立即提交執(zhí)行下一個(gè)任務(wù)。直至所有的任務(wù)都提交執(zhí)行。
4、當(dāng)有任務(wù)執(zhí)行完畢了,這時(shí)任務(wù)可能有執(zhí)行結(jié)果,也可能沒(méi)有執(zhí)行結(jié)果。沒(méi)有結(jié)果的時(shí)候poll刪除的Future結(jié)果本來(lái)就是null,所以不影響。有結(jié)果的時(shí)候poll出來(lái)的結(jié)果就就return返回。
5、finally最后取消執(zhí)行所有任務(wù)

4.1.2、ThreadPoolExecutor構(gòu)造函數(shù)

從構(gòu)造函數(shù)開(kāi)始認(rèn)識(shí)ThreadPoolExecutor,代碼如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1、corePoolSize 核心線(xiàn)程數(shù),核心線(xiàn)程數(shù)還沒(méi)到corePoolSize時(shí),即使有空閑線(xiàn)程,新任務(wù)也會(huì)創(chuàng)建新線(xiàn)程;
2、maximumPoolSize最大線(xiàn)程數(shù);
3、keepAliveTime非核心線(xiàn)程存活時(shí)間,即非核心線(xiàn)程執(zhí)行完畢后不會(huì)立即銷(xiāo)毀,直至?xí)r間到達(dá);
4、TimeUnit枚舉時(shí)間單位;
5、BlockingQueue隊(duì)列;
6、ThreadFactory線(xiàn)程工廠(chǎng),用于線(xiàn)程的創(chuàng)建;
7、RejectedExecutionHandler這個(gè)接口用來(lái)處理這種情況:添加任務(wù)失敗時(shí),就靠handler來(lái)處理。四種處理策略,也就是四種接口實(shí)現(xiàn),
AbortPolicy 默認(rèn)拋出異常
CallerRunsPolicy用調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行任務(wù)
DiscardOldestPolicy丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)
DiscardPolicy直接丟棄任務(wù)

4.1.3、線(xiàn)程池的狀態(tài)

對(duì)線(xiàn)程池大概認(rèn)識(shí)后,我們來(lái)看它在運(yùn)行時(shí)的幾個(gè)狀態(tài)。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大數(shù)量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  
    // 獲取線(xiàn)程池狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 獲取線(xiàn)程池?cái)?shù)量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 組裝狀態(tài)和數(shù)量,返回ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

1、RUNNING運(yùn)行狀態(tài),創(chuàng)建時(shí)的狀態(tài);
2、SHUTDOWN停工狀態(tài),不接收新任務(wù),已接收的任務(wù)會(huì)繼續(xù)執(zhí)行;
3、STOP停止?fàn)顟B(tài),不接收新任務(wù),接收的和正在執(zhí)行的也會(huì)中斷;
4、TIDYING清空狀態(tài),所有任務(wù)都停止了,工作線(xiàn)程也結(jié)束了;
5、TERMINATED終止?fàn)顟B(tài),線(xiàn)程池已銷(xiāo)毀;
AtomicInteger是個(gè)int型變量,它的高三位用來(lái)表示狀態(tài),剩下的29位用來(lái)表示數(shù)量

4.1.4、提交任務(wù)

提交任務(wù)有兩個(gè)方法,一個(gè)是Executor.execute,一個(gè)是AbstractExecutorService.submit

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
public <T> Future<T> submit(Runnable task, T result){ ... }
public Future<?> submit(Runnable task) { ... }

submit的方法都類(lèi)似、都是封裝成FutureTask以提交給execute方法。所以我們接下來(lái)看execute方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //檢查線(xiàn)程池是否是運(yùn)行狀態(tài),并將任務(wù)添加到等待隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果線(xiàn)程池不是運(yùn)行狀態(tài),則將剛添加的任務(wù)從隊(duì)列移除并執(zhí)行拒絕任務(wù)
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果工作線(xiàn)程為0,先添加一個(gè)worker線(xiàn)程。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 線(xiàn)程池不是運(yùn)行狀態(tài)會(huì)添加失敗,就會(huì)執(zhí)行reject,走拒絕任務(wù)的處理任務(wù)
        else if (!addWorker(command, false))
            reject(command);
    }

添加的邏輯不復(fù)雜,最后邏輯走向兩個(gè)分支,一個(gè)是addWorker添加線(xiàn)程,一個(gè)reject拒絕任務(wù)處理邏輯先看reject,默認(rèn)是AbortPolicy拒絕任務(wù)的策略,這個(gè)策略處理結(jié)果是直接拋出rejectedExecution

final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

4.1.5、內(nèi)部類(lèi)Worker

接著上面addWorker分析之前,我們先認(rèn)識(shí)Worker。它其實(shí)就是線(xiàn)程池管理內(nèi)部任務(wù)的最小單位,線(xiàn)程池就是維護(hù)的一組Worker。上代碼

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

1、AbstractQueuedSynchronizer簡(jiǎn)稱(chēng)AQS是一個(gè)抽象同步框架,可以用來(lái)實(shí)現(xiàn)一個(gè)依賴(lài)狀態(tài)的同步器。可以簡(jiǎn)單理解為控制線(xiàn)程獲取一個(gè)統(tǒng)一volatile類(lèi)型的state變量的Synchronized工具。即同步器
2、構(gòu)造函數(shù)創(chuàng)建一個(gè)Thread,和獲得Runable這兩個(gè)核心組件
3、剩下幾個(gè)常規(guī)鎖操作方法。由AQS來(lái)實(shí)現(xiàn)。
于是簡(jiǎn)單總結(jié)Worker就是一個(gè)同步器+Thread+Runable的結(jié)合

4.1.6、addWorker

接著分析線(xiàn)程池的execute方法中addWorker的代碼

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代碼大概三部分
1、先判斷狀態(tài),如果大于等于SHUTDOWN不執(zhí)行,返回false
2、然后判斷線(xiàn)程數(shù)量是否超過(guò)核心線(xiàn)程數(shù),或者最大數(shù)。沒(méi)有超過(guò)跳出循環(huán)走到下半部分代碼創(chuàng)建新的worker。
3、在mainLock保護(hù)下,創(chuàng)建worker后加入HasSet容器。并啟動(dòng)t.start();
根據(jù)上面的worker類(lèi)的代碼可知,t.start調(diào)用的是Worker自身的run方法。所以實(shí)際調(diào)用到了線(xiàn)程池的runWorker方法

4.1.7、runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

1、task.run();之前會(huì)檢查中斷or停止。在run前后可以添加自定義的處理beforExecute(),afterExcute
2、task = getTask()會(huì)取出BlockingQueue隊(duì)列(workQueue)中的任務(wù)來(lái)執(zhí)行。

4.1.7、ThreadPoolExecutor小結(jié)

這里先對(duì)ThreadPoolExecutor做一個(gè)小小的總結(jié)。整理一下思緒
1、ThreadPoolExecutor主要就是構(gòu)造函數(shù)那幾個(gè)參數(shù)來(lái)組成它的功能。記住worker管理線(xiàn)程來(lái)執(zhí)行workerQueue中的task任務(wù)。然后有核心線(xiàn)程數(shù)、最大線(xiàn)程數(shù)。添加失敗用RejectedExecutionHandler處理失敗邏輯,非核心線(xiàn)程活躍時(shí)間keepAliveTime
2、提交任務(wù)都是最后走到execute來(lái)提交,沒(méi)有達(dá)到核心線(xiàn)程數(shù)量,直接走addWorker來(lái)創(chuàng)建worker來(lái)工作。woker保存在HashSet集合里。添加任務(wù)時(shí),如果添加到workQueue隊(duì)列失敗會(huì)觸發(fā)創(chuàng)建非核心線(xiàn)程。如果線(xiàn)程池是running狀態(tài)但工作線(xiàn)程為0,也會(huì)直接觸發(fā)先創(chuàng)建一個(gè)非核心線(xiàn)程來(lái)執(zhí)行
3、addWorker添加線(xiàn)程,保存在HashSet<Worker> workers里,并立馬執(zhí)行
4、runWorker通過(guò)重復(fù)取出隊(duì)列里的task = getTask(),來(lái)達(dá)到一直執(zhí)行知道執(zhí)行完畢的目的
5、getTask沒(méi)有任務(wù)的時(shí)候會(huì)阻塞并掛起,不會(huì)消耗cpu資源。這樣worker就等于一直在等任務(wù)隊(duì)列workerQueu隊(duì)列有新的任務(wù)進(jìn)來(lái)。進(jìn)來(lái)就執(zhí)行
6、至于FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor只是線(xiàn)程池創(chuàng)建時(shí)指定了不同的參數(shù),通過(guò)java.util.concurrent.Executors的靜態(tài)方法創(chuàng)建,就不詳述了。
7、流程圖里的ScheduledThreadPoolExecutor是JDK1.5開(kāi)始提供的來(lái)支持周期性的任務(wù)調(diào)度。在ThreadPoolExecutor基礎(chǔ)上實(shí)現(xiàn)。多了一個(gè)堆結(jié)構(gòu)隊(duì)列來(lái)管理。有興趣的讀者可以自行分析。

五、寫(xiě)在組后

好了,線(xiàn)程和線(xiàn)程池的知識(shí)點(diǎn)講到這里也差不多了。線(xiàn)程線(xiàn)程池的基本原理我想你也應(yīng)該很清楚了。希望本文能對(duì)你有所幫助。

Read the fucking source code!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容