android 多線程 — 線程池基礎(chǔ)

線程池這個(gè)大家必會(huì)的把,就算咱是做 android 的,有現(xiàn)成的異步組件但是也保不齊什么時(shí)候得自己寫個(gè)異步處理,再者面試時(shí)你要是不會(huì)線程池,面試官會(huì)把你虐出翔來


看妹子大家就有學(xué)習(xí)動(dòng)力了

java 的4種自帶線程池

線程池這玩必須復(fù)雜,Java 必須提供標(biāo)準(zhǔn) API ,要不指不定能扭曲成啥樣了,這不就有了Executors 這個(gè)類,通過 Executors 提供4種標(biāo)準(zhǔn)線程池:

  • FixedThreadPool - 定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待
  • SingleThreadExecutor - 單線程線程池,只用一個(gè)工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行
  • CachedThreadPool - 可緩存線程池,線程數(shù)量沒有限制,如果線程池?cái)?shù)量超過任務(wù),可靈活回收空閑線程,若無可回收,則新建線程
  • ScheduledThreadPool - 定時(shí)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行
1. FixedThreadPool

它是一種線程數(shù)量固定的線程池,它只有核心線程(不會(huì)被被回收,除非線程關(guān)閉),并且這些核心線程沒有超時(shí)機(jī)制,任務(wù)隊(duì)列也是沒有大小限制(默認(rèn)是 128),采用的是 LinkedBlockingQueue 阻塞隊(duì)列

        val fixedThreadPool = Executors.newFixedThreadPool(3)
        fixedThreadPool.execute(object : Runnable {
            override fun run() {
                Thread.sleep(2000)
                Log.d("AA", "fixedThreadPool...run")
            }
        })
FixedThreadPool
2. SingleThreadExecutor

單線程線程池,關(guān)鍵得看怎么設(shè)計(jì)其中的阻塞隊(duì)列,在 android 中的應(yīng)用范圍日挺廣,android 這種移動(dòng)客戶端是沒有什么并發(fā)要求的,到時(shí)這種按順序執(zhí)行的阻塞任務(wù)到時(shí)挺多,比如大多數(shù) GUI 操作都是單線程的,數(shù)據(jù)庫,日志,文件操作,應(yīng)用批量安裝,應(yīng)用批量刪除等不適合并發(fā)但可能 IO 阻塞性u以及影響 UI 線程響應(yīng)的操作

        val fixedThreadPool = Executors.newSingleThreadExecutor()
        fixedThreadPool.execute(object : Runnable {
            override fun run() {
                Thread.sleep(2000)
                Log.d("AA", "fixedThreadPool...run")
            }
        })
SingleThreadExecutor
3. CachedThreadPool

它是一種線程數(shù)量不定的線程池,它只有非核心線程,并且其最大線程數(shù)為Integer.MAX_VALUE(因?yàn)?Integer.MAX_VALUE 的值很大,所以默認(rèn)為線程數(shù)是為任意大),線程中的空閑線程都有超時(shí)機(jī)制,時(shí)長是60秒,超過60秒閑置線程就會(huì)被回收,采用的是 SynchronousQueue 這個(gè)阻塞隊(duì)列

        val fixedThreadPool = Executors.newCachedThreadPool()
        fixedThreadPool.execute(object : Runnable {
            override fun run() {
                Thread.sleep(2000)
                Log.d("AA", "fixedThreadPool...run")
            }
        })
CachedThreadPool
4. ScheduledThreadPool

它的核心線程數(shù)量是固定的而非核心線程數(shù)是沒有限制的,并且當(dāng)非核心線程限制時(shí)會(huì)被立即回收,這類線程池主要用于執(zhí)行定時(shí)任務(wù)和具有固定周期的重復(fù)任務(wù)。

        val fixedThreadPool = Executors.newScheduledThreadPool(3)
        fixedThreadPool.schedule(object : Runnable {
            override fun run() {
                Thread.sleep(2000)
                Log.d("AA", "fixedThreadPool...run")
            }
        },3,TimeUnit.SECONDS)

// 2000s 后執(zhí)行 runnable
scheduledThreadPool.schedule(runnable,2000, TimeUnit.MILLISECONDS);
// 延遲 10ms 后,每隔 1000ms 執(zhí)行一次 runnable
scheduledThreadPool.scheduleAtFixedRate(runnable,10,1000,TimeUnit.MILLISECONDS);
ScheduledThreadPool

線程池狀態(tài)

  • RUNNING - 線程池初始化后就是 RUNNING 狀態(tài),此時(shí)線程池中的任務(wù)為0
  • SHUTDOWN - 調(diào)了 shutdown 結(jié)束線程池,線程池就是 SHUTDOWN 狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢
  • STOP - 調(diào)了 shutdownNow 結(jié)束線程池,線程池就是 STOP 狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù)
  • TIDYING - 所有的任務(wù)已終止,ctl 記錄的”任務(wù)數(shù)量”為 0,線程池會(huì)變?yōu)?TIDYING 狀態(tài),接著會(huì)執(zhí)行 terminated 函數(shù)
  • TERMINATED - 看上面自己理解下

關(guān)閉線程池

就是這2個(gè) shutdown、shutdownNow API

  • shutdown - 只標(biāo)記狀態(tài) SHUTDOWN,正在執(zhí)行的任務(wù)會(huì)繼續(xù)執(zhí)行下去,沒有被執(zhí)行的則中斷 回。
  • shutdownNow - 將線程池的狀態(tài)設(shè)置為 STOP,正在執(zhí)行的任務(wù)則被停止,沒被執(zhí)行任務(wù)的則返,注意這樣線程池此時(shí)若是 sheep 的話會(huì)拋異常

線程池配置大小

一般需要根據(jù)任務(wù)的類型來配置線程池大?。?/p>


ThreadPoolExecutor

Android 中的線程池的概念來源于Java 中的 Executor,Executor 是一個(gè)接口,真正的線程池的實(shí)現(xiàn)為ThreadPoolExecutor,ThreadPoolExecutor 提供了一系列參數(shù)來配置線程池,通過不同的參數(shù)可以創(chuàng)建不同的線程池

實(shí)現(xiàn)關(guān)系如下:ThreadPoolExecute -> AbstractExecutorService -> ExecutorService -> Executor

通過 ThreadPoolExecutor 我們可以自己實(shí)現(xiàn)自己定制的線程池,這只是參數(shù)配置的不同,不過我是沒碰到需要作到這一步的,想必也是后臺(tái)開發(fā)才需要的吧

線程池的構(gòu)造方法:

public ThreadPoolExecutor (int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable workQueue>,
                           ThreadFactory threadFactory )
  • corePoolSize - 線程池核心線程數(shù),等于 CPU 核心數(shù) + 1,CPU 核心數(shù)可通過這個(gè) API 來獲取Runtime.getRuntime().availableProcessors(),如果將 ThreadPoolExecutor 的 allowCoreThreadTimeOut 屬性設(shè)置為 true,那么核心線程就會(huì)存在超時(shí)策略,這個(gè)時(shí)間間隔有 keepAliveTime 所決定,當(dāng)?shù)却龝r(shí)間超過 keepAliveTime 所指定的時(shí)長后,核心線程就會(huì)被停止
  • maximumPoolSize - 線程池所能容納的最大線程數(shù),當(dāng)活動(dòng)線程數(shù)達(dá)到這個(gè)數(shù)值后,后續(xù)的新任務(wù)將會(huì)被阻塞
  • keepAliveTime - 非核心線程的超時(shí)時(shí)長,超過這個(gè)時(shí)長,非核心線程就會(huì)被回收,核心線程無超時(shí)機(jī)制,非核心線程在閑置時(shí)的超時(shí)時(shí)間為 1 秒
  • TimeUnit - 超時(shí)時(shí)長單位,TimeUnit 下:MILLISECONDS | SECONDS |MINUTES
  • BlockingQueue<Runnable workQueue> - 線程池中的任務(wù)隊(duì)列,通過線程池的 execute 方法體檢的 Runnable 對象會(huì)存儲(chǔ)在這個(gè)參數(shù)中
  • ThreadFactory - 線程工廠,為線程池提供創(chuàng)建新線程的功能,ThreadFactory 是一個(gè)接口,它只有一個(gè)方法:Thread newThread(Runnable r)
  • RejectedExecutionHandler - 飽和策略,這一塊我目前也不是很清楚,先把資料當(dāng)在這里
    • AbortPolicy - 默認(rèn)飽和策略,直接拋出異常
    • CallerRunsPolicy - 只用調(diào)用者所在線程來運(yùn)行任務(wù)
    • DiscardPolicy - 不處理,丟棄掉,也不會(huì)日志什么的
    • DiscardOldestPolicy - 丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
    • 自定義Police - 需要實(shí)現(xiàn)RejectedExecutionHandler

BlockingQueue

在線程池中 BlockingQueue 阻塞隊(duì)列這個(gè)點(diǎn)相當(dāng)重要了,甚至可以拿出來單獨(dú)使用,隊(duì)列概念也不難理解,利用數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)數(shù)據(jù),調(diào)節(jié)任務(wù)生產(chǎn)者和任務(wù)消費(fèi)者之間的執(zhí)行


這就是隊(duì)列
隊(duì)列大家也可以為棧,有2種處理任務(wù)的順序:
  • FIFO 先進(jìn)先出 - 先插入的隊(duì)列的元素也最先出隊(duì)列,類似于排隊(duì)的功能。從某種程度上來說這種隊(duì)列也體現(xiàn)了一種公平性
  • LIFO 后進(jìn)先出 - 后插入隊(duì)列的元素最先出隊(duì)列,這種隊(duì)列優(yōu)先處理最近發(fā)生的事件

阻塞隊(duì)列應(yīng)對的是個(gè)經(jīng)典的生產(chǎn)者消費(fèi)者場景,充當(dāng)生產(chǎn)者和消費(fèi)者之間的二道販子(SynchronousQueue 這種隊(duì)列除外),生產(chǎn)者和消費(fèi)者不直接聯(lián)系,生產(chǎn)者把任務(wù)交給阻塞隊(duì)列,阻塞隊(duì)列再去轉(zhuǎn)手給下家消費(fèi)者。一般阻塞隊(duì)列都有大小的,當(dāng)隊(duì)列滿了時(shí),生產(chǎn)者再往隊(duì)列里塞任務(wù),那么就會(huì)阻塞生產(chǎn)者;當(dāng)隊(duì)列為空時(shí),消費(fèi)者沒有任務(wù)就阻塞,核心線程是阻塞,非核心線程會(huì)回收,具體根據(jù)阻塞隊(duì)列來

隊(duì)列滿阻塞添加任務(wù)的生產(chǎn)者
隊(duì)列為空,阻塞取出任務(wù)的消費(fèi)者線程
簡單來看下隊(duì)列的 API
  • 添加任務(wù):
    • offer(anObject) - 添加任務(wù),添加成功返回 true,若是隊(duì)列滿了該方法不阻塞當(dāng)前執(zhí)行方法的線程
    • offer(E o, long timeout, TimeUnit unit) - 指定時(shí)間內(nèi)還不能往隊(duì)列中添加任務(wù),返回 false
    • put(anObject) - 隊(duì)列滿了該方法會(huì)阻塞當(dāng)前執(zhí)行方法的線程,直到隊(duì)列有空間為止
  • 獲取任務(wù):
    • poll(time) - 獲取首位的對象,若不能立即取出,等待指定規(guī)時(shí)間,若還是不行則返回 null
    • poll(long timeout, TimeUnit unit) -
    • take() - 獲取首位的對象,弱此時(shí)隊(duì)列為空,則會(huì)進(jìn)入阻塞,直到由新的任務(wù)添加進(jìn)來
    • drainTo() - 一次性取出所有可用的數(shù)據(jù),可以效率,因?yàn)椴恍枰啻畏峙渔i或釋放鎖
目前有 7 種 BlockingQueue :
  • ArrayBlockingQueue - 數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列,特點(diǎn)是插入和取出數(shù)據(jù)采用一把鎖,而沒有使用分離鎖設(shè)計(jì),插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對象實(shí)例,在創(chuàng)建可以設(shè)置公平鎖還是非公平鎖,默認(rèn)是非公平鎖
  • LinkedBlockingQueue - 鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,默認(rèn)隊(duì)列無限大小 Integer.MAX_VALUE,需要注意若是插入的速度大于獲取的速度,內(nèi)存有可能消耗殆盡的可能,插入和獲取采用2把鎖,分離鎖的設(shè)計(jì)可以大大加快并發(fā)性能
  • LinkedTransferQueue - 鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列
  • LinkedBlockingDeque - 鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
  • DelayQueue - 使用優(yōu)先級隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列,沒有大小限制,元素只有在指定的延遲時(shí)間后才能夠從隊(duì)列中獲取元素,插入數(shù)據(jù)的操作不會(huì)阻塞,而獲取數(shù)據(jù)的操作會(huì)被阻塞,不過 DelayQueue 的使用場景較少,但都相當(dāng)巧妙,常見的例子比如使用一個(gè) DelayQueue 來管理一個(gè)超時(shí)未響應(yīng)的連接隊(duì)列
  • PriorityBlockingQueue - 支持優(yōu)先級排序的無界阻塞隊(duì)列,不阻塞插入,但會(huì)阻塞取出操作,使用的時(shí)候要特別注意,采用的是公平鎖
  • SynchronousQueue - 不存儲(chǔ)元素的阻塞隊(duì)列,有公平模式非公平模式 2 種模式:
    • 公平模式 - 采用公平鎖,按照 FIFO 順序來阻塞多余的插入和取出
    • 非公平模式 - 采用非公平鎖,按照 LIFO 順序來阻塞多余的插入和取出

ArrayBlockingQueue 和 LinkedBlockingQueue 是兩個(gè)最普通也是最常用的阻塞隊(duì)列,一般情況下,在處理多線程間的生產(chǎn)者消費(fèi)者問題,使用這兩個(gè)類足以


小小的試試隊(duì)列

我們 new 3個(gè)生產(chǎn)者,每個(gè)生產(chǎn)者添加 10 個(gè)任務(wù),new 一個(gè)消費(fèi)者,消費(fèi)者線程 3 秒拿不到任務(wù)就結(jié)速線程,顯示完事了

生產(chǎn)者

class Product(var name: String, var blockingQueue: BlockingDeque<String>) : Runnable {

    var tag: Boolean = true

    /**
     * 每隔 100 毫米添加一個(gè)任務(wù)進(jìn)來
     */
    override fun run() {
        while (tag) {
            for (index in 1..10) {
                blockingQueue.offer("$name 添加的第$index 個(gè)任務(wù)")
                Thread.sleep(100)
            }
            tag = false
        }
    }
}

消費(fèi)者

class Consumer(var name: String, var blockingQueue: BlockingDeque<String>) : Runnable {

    var tag: Boolean = true

    /**
     * 3 秒接受不到任務(wù)就結(jié)速線程
     */
    override fun run() {
        while (tag) {
            val work = blockingQueue.poll(3000, TimeUnit.SECONDS)
            Log.d("AA", "$name 處理任務(wù) - $work")
            Thread.sleep(300)
            if (work == null) {
                tag = false
            }
        }
    }
}

執(zhí)行代碼

        btn_left.setOnClickListener {

            val linkedBlockingDeque = LinkedBlockingDeque<String>()
            val productAA = Product("Product_AA", linkedBlockingDeque)
            val productBB = Product("Product_BB", linkedBlockingDeque)
            val productCC = Product("Product_CC", linkedBlockingDeque)
            val consumerXX = Consumer("Consumer_XX", linkedBlockingDeque)

            val threadPool = Executors.newFixedThreadPool(4)
            threadPool.execute( productAA )
            threadPool.execute( productBB )
            threadPool.execute( productCC )
            threadPool.execute( consumerXX )
        }

參考資料:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容