Android 多線程探索(三)— 線程池

構(gòu)建服務(wù)器應(yīng)用程序的有效方法 — 線程池

為什么使用線程池?

每次通過(guò) new Thread 創(chuàng)建線程并不是一種好的方式,每次 new Thread 新建和銷毀對(duì)象性能較差,線程缺乏統(tǒng)一管理,可能無(wú)限制新建線程,相互之間競(jìng)爭(zhēng)、占用過(guò)多資源導(dǎo)致死鎖,并且缺乏定時(shí)執(zhí)行、定期執(zhí)行、線程中斷等功能。

Java 提供了 4 種線程池,能夠有效地管理、調(diào)度線程,避免過(guò)多的資源損耗。它的優(yōu)點(diǎn)如下:

1. 重用存在的線程,減少對(duì)象創(chuàng)建、銷毀的開(kāi)銷;

2. 可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率,同時(shí)避免過(guò)多資源競(jìng)爭(zhēng),避免堵塞;

3. 提供定時(shí)執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。

線程池原理簡(jiǎn)單地解釋就是:會(huì)創(chuàng)建多個(gè)線程并且進(jìn)行管理,提交給線程的任務(wù)會(huì)被線程池指派給其中的線程進(jìn)行執(zhí)行,通過(guò)線程池的統(tǒng)一調(diào)度、管理使得多線程的使用更簡(jiǎn)單、高效。

線程池

線程池都實(shí)現(xiàn)了 ExecutorService 接口,該接口定義了線程池需要實(shí)現(xiàn)的接口,如 submit、execute、shutdown 等。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {

    void execute(Runnable command);
}
兩種常用的線程池實(shí)現(xiàn):
  • ThreadPoolExecutor,也就是我們用最多的線程池實(shí)現(xiàn);

  • ScheduledThreadPoolExecutor,則用于周期性地執(zhí)行任務(wù)。

我們一般都不會(huì)通過(guò) new 的形式來(lái)創(chuàng)建線程池,因?yàn)閯?chuàng)建參數(shù)過(guò)程相對(duì)復(fù)雜,所以,JDK 提供了一個(gè) Executors 工廠類來(lái)簡(jiǎn)化這個(gè)過(guò)程。下面分別介紹 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的使用。

1. 啟動(dòng)指定數(shù)量的線程 — ThreadPoolExecutor

ThreadPoolExecutor 是線程池的實(shí)現(xiàn)之一,它的功能是啟動(dòng)指定數(shù)量的線程以及將任務(wù)添加到一個(gè)隊(duì)列中,并且將任務(wù)分發(fā)給空閑的線程。

ExecutorService 的生命周期包括 3 種狀態(tài):運(yùn)行、關(guān)閉、終止。創(chuàng)建后便進(jìn)入運(yùn)行狀態(tài),當(dāng)調(diào)用 shutdown() 方法時(shí)便進(jìn)入關(guān)閉狀態(tài),此時(shí) ExecutorService 不再接受新的任務(wù),但它還在執(zhí)行已經(jīng)提交了的任務(wù)。當(dāng)所有已經(jīng)提交了的任務(wù)執(zhí)行完后,就變成終止?fàn)顟B(tài)。

ThreadPoolExecutor 構(gòu)造函數(shù)如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
參數(shù)名 作用
corePoolSize 線程池中所保存的核心線程數(shù)。
maximumPoolSize 線程池允許創(chuàng)建的最大線程數(shù)。
keepAliveTime 當(dāng)前線程池線程總數(shù)大于核心線程數(shù)時(shí),終止多余的空閑線程的時(shí)間。
unit keepAliveTime 參數(shù)的時(shí)間單位,可選值有毫秒、秒、分等。
workQueue 任務(wù)隊(duì)列,如果當(dāng)前線程池達(dá)到核心線程數(shù) corePoolSize,且當(dāng)前所有線程都處于活動(dòng)狀態(tài)時(shí),則將新加入的任務(wù)放到此隊(duì)列中。
threadFactory 線程工廠,讓用戶可以定制線程的創(chuàng)建過(guò)程,通常不需設(shè)置。
Handler 拒絕策略,當(dāng)線程池與 workQueue 隊(duì)列都滿了的情況下,對(duì)新加任務(wù)采取的處理策略。
  • 線程池啟動(dòng)后默認(rèn)是空的,只有任務(wù)來(lái)臨才會(huì)創(chuàng)建線程以處理請(qǐng)求。prestartAllCoreThreads() 方法可以讓線程池啟動(dòng)后立即啟動(dòng)所有核心線程以等待任務(wù)。

  • 任務(wù)數(shù)量小于 corePoolSize,則立即創(chuàng)建新線程來(lái)處理任務(wù);

  • 任務(wù)數(shù)量大于 corePoolSize,但小于 maximumPoolSize,則將任務(wù)放進(jìn) workQueue,當(dāng)阻塞隊(duì)列滿時(shí)才創(chuàng)建新線程;

  • 如果 corePoolSize 與 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。

workQueue 有下列幾個(gè)常用實(shí)現(xiàn):

  1. ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界隊(duì)列,此隊(duì)列按 FIFO 原則對(duì)任務(wù)進(jìn)行排序。如果隊(duì)列滿了還有任務(wù)進(jìn)來(lái),則調(diào)用拒絕策略。

  2. LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的無(wú)界隊(duì)列,此隊(duì)列按 FIFO 原則對(duì)任務(wù)進(jìn)行排序。因?yàn)樗菬o(wú)界的,所以不會(huì)滿,采用此隊(duì)列后線程池將忽略拒絕策略(handler)參數(shù),同時(shí)忽略最大線程數(shù) maximumPoolSize 等參數(shù)。

  3. SynchronousQueue:直接將任務(wù)提交給線程而不是將它加入到隊(duì)列,實(shí)際上此隊(duì)列是空的。如果新任務(wù)來(lái)了線程池沒(méi)有任何可用線程處理的話,則調(diào)用拒絕策略。其實(shí)要是把 maximumPoolSize 設(shè)置成無(wú)界(integer.MAX_VALUE),加上 SynchronousQueue 隊(duì)列,就等同于 Executors.newCachedThreadPool()。

  4. PriorityBlockingQueue:具有優(yōu)先級(jí)的隊(duì)列的有界隊(duì)列,可以自定義優(yōu)先級(jí),默認(rèn)是按自然排序,可能很多場(chǎng)合并不適合。

當(dāng)線程池與 workQueue 隊(duì)列都滿了的情況下,對(duì)新加任務(wù)采取的處理策略有如下四個(gè)默認(rèn)實(shí)現(xiàn):

  1. AbortPolicy:拒絕任務(wù),拋出 RejectedExecutionException 異常。線程池的默認(rèn)策略。

  2. CallerRunsPolicy:用于被拒絕任務(wù)的處理程序,它直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。

  3. DiscardOldestPolicy:如果線程池尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過(guò)程)。這樣的結(jié)果是最后加入的任務(wù)反而有可能被執(zhí)行,先前加入的都被拋棄了。

  4. DiscardPolicy:加不進(jìn)的任務(wù)都被拋棄了,同時(shí)沒(méi)有異常拋出。


1.1 newFixedThreadPool(int size)

對(duì)與 Android 平臺(tái)來(lái)說(shuō),由于資源有限,最常使用的就是通過(guò) Executors.newFixedThreadPool(int size) 函數(shù)來(lái)啟動(dòng)固定數(shù)量的線程池:

public class ExecutorDemo {

    // 任務(wù)數(shù)量
    private static final int MAX = 10;


    public static void fixedThreadPool(int size) throws ExecutionException, InterruptedException {
         // 創(chuàng)建固定數(shù)量的線程池
        ExecutorService executorService = Executors.newFixedThreadPool(size);

        for (int i = 1; i <= MAX; i++) {
            // 提交任務(wù)
            Future<Integer> task = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("執(zhí)行線程:" + Thread.currentThread().getName());
                    return fibc(40);
                }
            });
            System.out.println("第 " + i + " 次計(jì)算,結(jié)果:" + task.get());
        }
    }

    private static int fibc(int n) {
        if (n == 0) {
            return 0;
        }

        if (n == 1) {
            return 1;
        }

        return fibc(n - 1) + fibc(n - 2);
    }
}

newFixedThreadPool(int nThreads) 的實(shí)現(xiàn):

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

在該函數(shù)中,會(huì)調(diào)用 ThreadPoolExecutor 的構(gòu)造函數(shù),設(shè)置它的 corePoolSize 和 maximumPoolSize 值都是 nThreads,并且設(shè)置 keepAliveTime 參數(shù)為 0 毫秒,最后設(shè)置無(wú)界任務(wù)隊(duì)列。這樣該線程池就含有固定個(gè)數(shù)的線程,并且能容納無(wú)限個(gè)任務(wù)。

輸出結(jié)果如下:

執(zhí)行線程:pool-3-thread-1
第 1 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 2 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-3
第 3 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-1
第 4 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 5 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-3
第 6 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-1
第 7 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 8 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-3
第 9 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-1
第 10 次計(jì)算,結(jié)果:102334155

1.2 newCachedThreadPool()

線程越多,并發(fā)量越大,然而占用的內(nèi)存也就越大,指定過(guò)大的線程數(shù)量并不可取。因此,我們可能需要一種場(chǎng)景,如果來(lái)了一個(gè)新的任務(wù),并且沒(méi)有空閑線程可用,此時(shí)必須馬上創(chuàng)建一個(gè)線程來(lái)立即執(zhí)行任務(wù)。這時(shí)就可以通過(guò) Executors 的 newCachedThreadPool 函數(shù)來(lái)實(shí)現(xiàn)。

    // 創(chuàng)建線程池
    public static void cachedThreadPool() throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 1; i <= MAX; i++) {
            // 提交任務(wù)
            Future<Integer> task = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("執(zhí)行線程:" + Thread.currentThread().getName());
                    return fibc(40);
                }
            });
            System.out.println("第 " + i + " 次計(jì)算,結(jié)果:" + task.get());
        }
    }

newCachedThreadPool() 函數(shù)實(shí)現(xiàn):

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到,newCachedThreadPool 函數(shù)不需傳入線程的數(shù)量。它會(huì)調(diào)用 ThreadPoolExecutor 的構(gòu)造函數(shù),設(shè)置它的
maximumPoolSize 值為無(wú)界值(Integer.MAX_VALUE),并且設(shè)置 keepAliveTime 參數(shù)為 60 秒,最后設(shè)置 SynchronousQueue
任務(wù)隊(duì)列。這樣就可以適應(yīng)任意數(shù)量的并發(fā)任務(wù)。線程池為每個(gè)任務(wù)都創(chuàng)建了 1 個(gè)線程,當(dāng)然這是在沒(méi)有線程空閑的情況下才會(huì)創(chuàng)建新的線程。若一個(gè)線程中的任務(wù)已經(jīng)做完了,那么這個(gè)線程可以為未被執(zhí)行的任務(wù)提供執(zhí)行。

輸出結(jié)果如下:

執(zhí)行線程:pool-3-thread-1
第 1 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 2 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 3 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 4 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 5 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 6 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 7 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 8 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 9 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 10 次計(jì)算,結(jié)果:102334155

2. 定時(shí)執(zhí)行一些任務(wù) — ScheduledThreadPoolExecutor

當(dāng)我們需要定時(shí)執(zhí)行一些任務(wù),可以通過(guò) ScheduledThreadPoolExecutor 來(lái)實(shí)現(xiàn)。通過(guò) Executors 的 newScheduledThreadPool 函數(shù)可以很方便地創(chuàng)建定時(shí)執(zhí)行任務(wù)的線程池。

下面是一個(gè)例子:

public class ScheduledThreadPoolDemo {

    public static void scheduledThreadPool() {
        
        // 創(chuàng)建定時(shí)執(zhí)行的線程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        
        // 參數(shù) 2 為第一次執(zhí)行任務(wù)延遲的時(shí)間,
        // 意思就是第一次調(diào)度開(kāi)始時(shí)間點(diǎn)=當(dāng)前時(shí)間 + initialDelay 
        // 參數(shù) 3 為執(zhí)行周期
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread: " + Thread.currentThread().getName() + ",定時(shí)計(jì)算1:");
                System.out.println("結(jié)果:" + fibc(30));
            }
        }, 1, 2, TimeUnit.SECONDS);

        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread: " + Thread.currentThread().getName() + ",定時(shí)計(jì)算2:");
                System.out.println("結(jié)果:" + fibc(40));
            }
        }, 1, 2, TimeUnit.SECONDS);
    }


    private static int fibc(int n) {
        if (n == 0) {
            return 0;
        }

        if (n == 1) {
            return 1;
        }

        return fibc(n - 1) + fibc(n - 2);
    }
}

public class Main {

    public static void main(String[] args) {
        // write your code here
        ScheduledThreadPoolDemo.scheduledThreadPool();

}

該線程池有 3 個(gè)線程,我們指定了 2 個(gè)定時(shí)任務(wù),因此,該線程池有兩個(gè)線程來(lái)定時(shí)完成任務(wù)。

scheduleAtFixedRate() 函數(shù)的實(shí)現(xiàn):

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

scheduleAtFixedRate() 函數(shù)就是設(shè)置定時(shí)任務(wù)的方法,參數(shù) 1 是要執(zhí)行的任務(wù),參數(shù) 2 是第一次運(yùn)行任務(wù)時(shí)延遲時(shí)間(第一次調(diào)度開(kāi)始時(shí)間點(diǎn) = 當(dāng)前時(shí)間 + initialDelay ),參數(shù) 3 是定時(shí)任務(wù)的周期(兩次任務(wù)調(diào)度的間隔時(shí)間),參數(shù) 4 是時(shí)間單元,這里設(shè)置為秒。

部分輸出結(jié)果:

Thread: pool-2-thread-1,定時(shí)計(jì)算1:
Thread: pool-2-thread-2,定時(shí)計(jì)算2:
結(jié)果:832040
結(jié)果:102334155

Thread: pool-2-thread-1,定時(shí)計(jì)算1:
Thread: pool-2-thread-3,定時(shí)計(jì)算2:
結(jié)果:832040
結(jié)果:102334155
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 【JAVA 線程】 線程 進(jìn)程:是一個(gè)正在執(zhí)行中的程序。每一個(gè)進(jìn)程執(zhí)行都有一個(gè)執(zhí)行順序。該順序是一個(gè)執(zhí)行路徑,或者...
    Rtia閱讀 2,893評(píng)論 2 20
  • 前段時(shí)間遇到這樣一個(gè)問(wèn)題,有人問(wèn)微信朋友圈的上傳圖片的功能怎么做才能讓用戶的等待時(shí)間較短,比如說(shuō)一下上傳9張圖片,...
    加油碼農(nóng)閱讀 1,285評(píng)論 0 2
  • 原文鏈接:http://blog.csdn.net/u010687392/article/details/4985...
    xpengb閱讀 1,464評(píng)論 0 1
  • 片段選自《如何說(shuō)孩子才會(huì)聽(tīng),怎么聽(tīng)孩子才肯說(shuō)》作者是美國(guó)的阿黛爾.法伯,伊萊恩.瑪茲麗施。 這本書(shū)和作者的《如何說(shuō)...
    九斤yesheng閱讀 328評(píng)論 0 1
  • 2016.9.26 很久都不去故宮了,突然想寫(xiě)故宮,因?yàn)榭吹搅艘粡垐D片,出門旅游,一個(gè)人的所言所形代表的是本國(guó)的形...
    云小霓CC閱讀 172評(píng)論 0 0

友情鏈接更多精彩內(nèi)容