關(guān)于線程池需要了解的事情

Java線程池的優(yōu)點(diǎn):

1、降低資源消耗 通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀的消耗。
2、提高響應(yīng)速度 當(dāng)任務(wù)到達(dá)的時(shí)候,不需要等到線程創(chuàng)建就能立即執(zhí)行。
3、提高線程的可管理性 線程時(shí)稀缺資源,使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。

ThreadPoolExecutor 工作原理:

1、如果當(dāng)前運(yùn)行的線程少于 corePoolSize,則直接創(chuàng)建新線程來(lái)執(zhí)行任務(wù)。
2、如果運(yùn)行的線程等于或多于 corePoolSize,則將任務(wù)加入 BlockingQueue
3、如果無(wú)法加入到 BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)。
4、如果新創(chuàng)建的線程將使當(dāng)前運(yùn)行的線程超出 maximumPoolSize,任務(wù)將被拒絕,調(diào)用飽和策略處理(RejectedExceptionHandler.rejectedException()方法),默認(rèn)是AbortPolicy,會(huì)拋出Exception。

當(dāng)前運(yùn)行的線程的意思是:只要不是 TERMINATED 都算活著。

ThreadPoolExecutor 的創(chuàng)建:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize(線程池的基本大小):當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使其它空閑的基本線程能夠執(zhí)行新任務(wù),也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不在創(chuàng)建。
workQueue(任務(wù)隊(duì)列): 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。

  • ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列。FIFO
  • LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列。FIFO

maximumPoolSize(線程池最大數(shù)量):線程池允許的最大線程數(shù)。注意,如果使用了無(wú)界的任務(wù)隊(duì)列,這個(gè)參數(shù)就沒(méi)什么效果。
ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠
RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說(shuō)明線程池處于飽和狀態(tài),必須采用一種策略處理提交的新任務(wù)。默認(rèn)策略是 AbortPolicy,表示無(wú)法處理新任務(wù)時(shí)拋出異常。

  • AbortPolicy:直接拋出異常
  • CallerRunsPolicy:用調(diào)用者所在的線程運(yùn)行任務(wù)。
  • DiscardPolicy:不處理,丟棄掉
  • DiscardOldestPolicy:丟棄隊(duì)列最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。

keepAliveTime(線程活動(dòng)保持時(shí)間):線程池的工作線程空閑后,保持存活的事件。
TimeUnit:保持時(shí)間單位
。
示例代碼演示

先定義一個(gè)任務(wù),供以后使用:

class MyRun(val name: String) : Runnable {
    override fun run() {
        println("$name is running, ${Thread.currentThread().name}")
        TimeUnit.SECONDS.sleep(3)
        println("$name is finished")
    }
}

調(diào)用線程池執(zhí)行任務(wù)一:

val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
TimeUnit.SECONDS.sleep(20)
executor.execute(MyRun("T2"))

結(jié)果:

T1 is running, pool-1-thread-1
T1 is finished
T2 is running, pool-1-thread-2
T2 is finished

分析:當(dāng) T1 執(zhí)行完成之后,此時(shí) pool-1-thread-1的線程所處的狀態(tài) WAITING

"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1c2083000 nid=0x4403 waiting on condition [0x0000700010f37000]
   java.lang.Thread.State: WAITING (parking)

因?yàn)?corePoolSize 是 2,所以面對(duì)新的任務(wù)還是會(huì)用 新的線程 pool-1-thread-2 來(lái)執(zhí)行,程序結(jié)束后線程池中線程狀態(tài):

"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1c2083000 nid=0x4403 waiting on condition [0x0000700010f37000]
   java.lang.Thread.State: WAITING (parking)

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fa1c287e800 nid=0x320b waiting on condition [0x000070001113d000]
   java.lang.Thread.State: WAITING (parking)

調(diào)用線程池執(zhí)行任務(wù)二:

val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
executor.execute(MyRun("T2"))
TimeUnit.SECONDS.sleep(5)
executor.execute(MyRun("T3"))
executor.execute(MyRun("T4"))

執(zhí)行結(jié)果:

T1 is running, pool-1-thread-1
T2 is running, pool-1-thread-2
T2 is finished
T1 is finished
T3 is running, pool-1-thread-2
T4 is running, pool-1-thread-3
T3 is finished
T4 is finished

分析: T1、T2執(zhí)行完成后,線程池中線程有:pool-1-thread-1pool-1-thread-2,此時(shí)當(dāng)T3提交給線程池時(shí),會(huì)把任務(wù)存到 BlockingQueue中等待執(zhí)行,再提交T4,此時(shí)有可能 BlockingQueue 是滿的狀態(tài),所以又因?yàn)?maximumPoolSize 是 10,所以會(huì)新創(chuàng)建線程執(zhí)行任務(wù):pool-1-thread-3。

當(dāng)T4剛剛執(zhí)行完成,我們來(lái)看一下線程池中線程的情況:

"pool-1-thread-3" #13 prio=5 os_prio=31 tid=0x00007f90bb893800 nid=0x4e0b waiting on condition [0x000070000175b000]
   java.lang.Thread.State: TIMED_WAITING (parking)

"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007f90bb00f000 nid=0x3f03 waiting on condition [0x0000700001555000]
   java.lang.Thread.State: TIMED_WAITING (parking)

"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f90bb892800 nid=0x4103 waiting on condition [0x0000700001452000]
   java.lang.Thread.State: WAITING (parking)

因?yàn)?,此時(shí)線程數(shù)為3,超過(guò)了corePoolSize的個(gè)數(shù),這時(shí)候線程就會(huì)進(jìn)入 TIMED_WAITING狀態(tài),直到線程數(shù)達(dá)到 corePoolSize 的個(gè)數(shù)為止。

我們?cè)俚却欢螘r(shí)間(超過(guò)30秒),這時(shí)候我們來(lái)看一下線程池中線程的情況:

"pool-1-thread-3" #13 prio=5 os_prio=31 tid=0x00007f90bb893800 nid=0x4e0b waiting on condition [0x000070000175b000]
   java.lang.Thread.State: WAITING (parking)

"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f90bb892800 nid=0x4103 waiting on condition [0x0000700001452000]
   java.lang.Thread.State: WAITING (parking)

pool-1-thread-2 經(jīng)過(guò)超時(shí)時(shí)間之后,會(huì)銷毀。 此時(shí)線程池只剩下 pool-1-thread-3pool-1-thread-1 兩條線程,維持核心線程數(shù)。

調(diào)用線程池執(zhí)行任務(wù)三:

fun main() {
    val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
    executor.execute(MyRun("T1"))
    executor.execute(MyRun("T2"))
    TimeUnit.SECONDS.sleep(5)
    executor.execute(MyRun("T3"))
    TimeUnit.SECONDS.sleep(1)    //稍微等待一下
    executor.execute(MyRun("T4"))
}

結(jié)果:

T1 is running, pool-1-thread-1
T2 is running, pool-1-thread-2
T2 is finished
T1 is finished
T3 is running, pool-1-thread-2
T4 is running, pool-1-thread-1
T3 is finished
T4 is finished

分析: 基本和上一個(gè)示例差不多,只是在T3T4提交之間,增加了一個(gè)延遲,這時(shí)候再提交T4的時(shí)候,BlockingQueue 就不為空了,所以T4提交是會(huì)放到 BlockingQueue 中等待執(zhí)行。

合理配置線程池:

CPU密集型任務(wù):應(yīng)盡可能盡可能小的配置線程(否則老是切換線程上下文的話,反而總時(shí)間會(huì)邊長(zhǎng))。例如配置 N(CPU) + 1
IO密集型任務(wù):IO密集型任務(wù)并不是一直再執(zhí)行任務(wù),則應(yīng)配置盡可能多線程(因?yàn)镮O就會(huì)阻塞,線程數(shù)多的話正好可以利用CPU空閑時(shí)間,提高利用率)。例如:2 * N(CPU)。

Executors 中幾種常見(jiàn)的線程池:

ThreadPoolExecutor 是線程池的核心 實(shí)現(xiàn)類,用來(lái)執(zhí)行提交的任務(wù)。
ScheduledThreadPoolExecutor 是一個(gè)線程池實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行任務(wù)?;蛘叨ㄆ趫?zhí)行任務(wù)。

FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool 適用于為了滿足資源管理的需求,而需要限制當(dāng)前線程數(shù)量的應(yīng)用場(chǎng)景,比較適用于負(fù)載較重的服務(wù)器。 任務(wù)隊(duì)列使用的是 無(wú)界的 LinkedBlockingQueue,線程池大小有界。 創(chuàng)建用于容納固定線程數(shù)量的池子,每個(gè)線程存活時(shí)間是無(wú)限的,當(dāng)池子滿了不再添加線程,如果池子中線程都在繁忙,則將新任務(wù)放入阻塞隊(duì)列中。

CachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

是大小無(wú)界限的線程池,適用于執(zhí)行很多短期異步任務(wù)的小程序,或者負(fù)載較輕的服務(wù)器。當(dāng)新任務(wù)到來(lái),則插入到SynchronousQueue中,由于SynchronousQueue 是同步隊(duì)列,因此會(huì)在池中尋找可用線程來(lái)執(zhí)行,若有可用線程則執(zhí)行,若沒(méi)有則創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行該任務(wù)。若空閑線程大于指定超時(shí)時(shí)間,則線程會(huì)被銷毀。 SynchronousQueue是BlockingQueue的一種,SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存儲(chǔ)任何元素。也就是說(shuō)SynchronousQueue的每一次insert操作,必須等待其他線性的remove操作。而每一個(gè)remove操作也必須等待其他線程的insert操作。

也就是說(shuō)SynchronousQueue的每一次insert操作,必須等待其他線性的remove操作。而每一個(gè)remove操作也必須等待其他線程的insert操作。

SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor 適用于需要保證順序地執(zhí)行各個(gè)任務(wù),并且在任意時(shí)候,不會(huì)有多個(gè)線程是活動(dòng)的。創(chuàng)建了只有一個(gè)線程的線程池,是無(wú)限存活的,繁忙是會(huì)有任務(wù)塞進(jìn)任務(wù)隊(duì)列。

ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

適用于需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù)。同時(shí)為了滿足資源管理的需求而需要限制后臺(tái)線程的數(shù)量的應(yīng)用場(chǎng)景。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容