Android 多線程之線程池(二)

一 線程池中的一些重要概念

Worker / workers

Worker類(lèi)是ThreadPoolExecutor類(lèi)的一個(gè)內(nèi)部類(lèi),也是線程池管理操作線程的核心所在。每一個(gè)worker都對(duì)應(yīng)著一個(gè)thread,所以在不混淆的情況下,可以把worker理解為工作線程。

ThreadPoolExecutor有一個(gè)名為workers的成員變量,它保存了這個(gè)線程池所有存活的worker對(duì)象。

workQueue

workQueue是線程池內(nèi)部用來(lái)保存待執(zhí)行任務(wù)的隊(duì)列。它是一個(gè)BlockingQueue<Runnable>類(lèi)型的變量,在沒(méi)有任務(wù)的時(shí)候,它的poll()方法會(huì)阻塞。

在一個(gè)允許超時(shí)的worker執(zhí)行完任務(wù)之后,會(huì)調(diào)用workQueue.poll()取出下一個(gè)任務(wù)執(zhí)行。如果沒(méi)有任務(wù),則會(huì)在這里阻塞;當(dāng)阻塞時(shí)間達(dá)到超時(shí)時(shí)間后,這個(gè)工作線程會(huì)退出并銷(xiāo)毀。

一些簡(jiǎn)單的線程池使用方法,可以看看這篇文章Android 多線程之線程池(一)

二 源碼分析

ctl

ThreadPoolExecutor通過(guò)一個(gè)原子整型ctl來(lái)保存線程池的兩個(gè)重要字段,workerCount和runState。workerCount即線程池工作線程的數(shù)量,而runState代表了線程池當(dāng)前的狀態(tài)(如:運(yùn)行中、關(guān)閉、終止)。通過(guò)位運(yùn)算,可以從ctl得到workerCount和runState的值,反之也可以通過(guò)workerCount和runState組合得到ctl。

//ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

execute(runnable)

//ThreadPoolExecutor.java
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

當(dāng)線程池執(zhí)行一個(gè)任務(wù)會(huì)調(diào)這個(gè)方法進(jìn)來(lái),回想下之前講過(guò)線程池的工作流程是怎么樣的Android 多線程之線程池(一),再來(lái)看看源碼會(huì)加深印象。

分析:
1、先判斷任務(wù)是否為空,如果為空直接拋異常。
2、如果當(dāng)前運(yùn)行的線程小于核心線程數(shù),那么就去addWorker(command, true),這個(gè)方式返回一個(gè)Boolen值,意思是創(chuàng)建一個(gè)worker是否成功,第一個(gè)參數(shù)是任務(wù),第二個(gè)參數(shù)是創(chuàng)建的worker的線程是否為核心線程。
3、如果當(dāng)前線程池是運(yùn)作的,并且這個(gè)任務(wù)workQueue.offer(command)添加進(jìn)隊(duì)列,那么繼續(xù)往下走,如果添加進(jìn)隊(duì)列失敗,則去addWorker(command, false)創(chuàng)建一個(gè)普通線程并把任務(wù)對(duì)給他執(zhí)行,如果創(chuàng)建失敗,則拒絕這個(gè)任務(wù)。
4、如果3的條件重新取到ctl,如果這個(gè)線程池不是運(yùn)作的,并且這個(gè)任務(wù)已經(jīng)在任務(wù)隊(duì)列移除了,那么直接拒絕;如果上述條件不成立則判斷workerCountOf(recheck) == 0 線程池沒(méi)有工作線程的時(shí)候,則去創(chuàng)建普通線程的worker。


線程池工作原理.png

addWorker(Runnable firstTask, boolean core)

//ThreadPoolExecutor.java
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

先去判斷線程池的狀態(tài)if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))return false;
直接返回false有以下3種情況:
1、線程池狀態(tài)為STOP、TIDYING、TERMINATED
2、線程池狀態(tài)不是running狀態(tài),并且firstTask不為空
3、線程池狀態(tài)不是running狀態(tài),并且工作隊(duì)列為空
當(dāng)返回false時(shí)則進(jìn)入拒絕策略

之后去判斷

int wc = workerCountOf(c);
if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;

addWorker的core的值,如果是核心線程,當(dāng)前的workerCountOf大于corePoolSize則返回false;如果是非核心線程,當(dāng)前的workerCountOf大于maximumPoolSize則返回false,當(dāng)返回false時(shí)則進(jìn)入拒絕策略

如果上述條件滿(mǎn)足,compareAndIncrementWorkerCount(c)表示workerCountOf+1

接下來(lái)就是去創(chuàng)建一個(gè)worker了,相當(dāng)于創(chuàng)建一個(gè)工作線程,之后把work添加到隊(duì)列里面workers.add(w);如果workers.size() > largestPoolSize 當(dāng)前的工作線程(包含未啟動(dòng))大于最大工作線程,那么最大工作線程設(shè)置為隊(duì)列的工作線程

  if (workerAdded) {
       t.start();
       workerStarted = true;
  }

執(zhí)行work對(duì)象里面的Thread

ThreadPoolExecutor.Worker(Runnable firstTask)、runWorker(Worker)

//ThreadPoolExecutor.java
Worker(Runnable firstTask) {
   setState(-1); // inhibit interrupts until runWorker
   this.firstTask = firstTask;
   this.thread = getThreadFactory().newThread(this);
}

public void run() {
   runWorker(this);
}

剛才有提及到worker可以理解為工作線程,原因是worker.thread是工作線程,而worker.firstTask為該線程處理的任務(wù)

當(dāng)Worker的線程開(kāi)始運(yùn)行之后,會(huì)調(diào)用其run()方法:runWorker(this)

// 省略一大部分
    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } catch (Exception x) {
            } 
        }
    }

    // 省略一大部分
    private Runnable getTask() {
        for (;;) {
            if (/*無(wú)法獲取任務(wù)*/) {
                return null;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
            } catch (InterruptedException retry) {
            }
        }
    }

為了便于觀看,只留了核心的幾行,如果傳入的task不為null,則執(zhí)行相應(yīng)的run方法;當(dāng)傳入的task為null,則會(huì)從getTask()方法中獲取runnable對(duì)象

try {
      Runnable r = timed ?
          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
      if (r != null)
          return r;
    } catch (InterruptedException retry) { }

這里面通過(guò)timed變量判斷是使用poll還是task
poll:如果有值,則立馬返回。否則等待一段時(shí)間,該時(shí)間內(nèi)阻塞,時(shí)間結(jié)束后,隊(duì)列還是沒(méi)有元素則返回null。
task:如果有值,則立馬返回。否則一直阻塞。

什么時(shí)候返回null,不讓worker繼續(xù)存活了呢?
1、線程池被shutdown,并且任務(wù)隊(duì)列空了;
2、線程池超容量;
3、超時(shí);

用一張圖總結(jié)

線程池原理.jpg

End

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容