線程池

計(jì)算機(jī)線程

線程是調(diào)度CPU的最小單元,也叫輕量級(jí)進(jìn)程LWP(Light Weight Process)

線程模型分類:

  • 用戶級(jí)線程:用戶程序?qū)崿F(xiàn),不依賴操作系統(tǒng)核心,應(yīng)用提供創(chuàng)建、同步、調(diào)度和管理線程的函數(shù)來(lái)控制用戶線程。不需要用戶態(tài)與內(nèi)核態(tài)的切換,速度快。內(nèi)核對(duì)用戶線程無(wú)感知,線程阻塞則進(jìn)程阻塞

  • 內(nèi)核級(jí)線程:系統(tǒng)內(nèi)核管理線程,內(nèi)核保存線程的狀態(tài)和上下文信息,線程阻塞不會(huì)引起進(jìn)程阻塞。在多處理器系統(tǒng)上,多線程在多處理器上并行執(zhí)行。線程的創(chuàng)建、調(diào)度和管理由內(nèi)核完成,效率比用戶線程要慢,比進(jìn)程操作快

后面具體研究下,參考文獻(xiàn)
https://www.cnblogs.com/still-smile/p/11656591.html
https://www.cnblogs.com/still-smile/p/11656533.html

1、線程池的意義

它的創(chuàng)建和銷毀是比較重且消耗資源的操作,而Java線程依賴內(nèi)核線程,創(chuàng)建線程需要進(jìn)行操作系統(tǒng)狀態(tài)切換,為了避免資源過(guò)去消耗需要設(shè)法重用線程執(zhí)行多個(gè)任務(wù)。線程池就是一個(gè)線程緩存,負(fù)責(zé)對(duì)線程進(jìn)行統(tǒng)一分配、調(diào)優(yōu)與監(jiān)控。

線程池的優(yōu)勢(shì)

1、重用存在的線程,減少線程創(chuàng)建,消亡的開(kāi)銷,提高性能
2、提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行
3、提高線程的可管理性,線程是稀缺資源,如果無(wú)限的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)與監(jiān)控

2、幾個(gè)重要接口和類的官方文檔

Executor(接口)

Executor是一個(gè)對(duì)象,可以用來(lái)執(zhí)行提交過(guò)來(lái)的Runnable任務(wù),該接口提供了一種方式,將任務(wù)的提交和每一個(gè)任務(wù)具體的執(zhí)行機(jī)制進(jìn)行解耦了,包括線程使用細(xì)節(jié),調(diào)度等等。通常使用Executor而不是顯式創(chuàng)建線程,例如,比起使用這個(gè)創(chuàng)建線程 new Thread(new(RunnableTask())).start() ,你使用如下方式會(huì)更好

 Executor executor = anExecutor;
 executor.execute(new RunnableTask1());
 executor.execute(new RunnableTask2());
 ... 
ExecutorService(接口)

ExecutorService本質(zhì)上是一個(gè)Executor,提供方法來(lái)管理終端和方法,該方法可以產(chǎn)生一個(gè)Future用來(lái)追蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行情況

一個(gè)ExecutorService可以關(guān)閉,這將導(dǎo)致它拒絕新的任務(wù)。 提供了兩種不同的方法來(lái)關(guān)閉ExecutorService 。 shutdown()方法將允許先前提交的任務(wù)在終止之前執(zhí)行,而shutdownNow()方法可以防止等待任務(wù)啟動(dòng)并嘗試停止當(dāng)前正在執(zhí)行的任務(wù)。 一旦終止,執(zhí)行者沒(méi)有任務(wù)正在執(zhí)行,沒(méi)有任務(wù)正在等待執(zhí)行,并且不能提交新的任務(wù)。 應(yīng)關(guān)閉未使用的ExecutorService以允許資源的回收。

AbstractExecutorService(抽象類)

提供ExecutorService中的接口執(zhí)行方法的默認(rèn)實(shí)現(xiàn),此類使用包中提供的默認(rèn) FutureTask 類實(shí)現(xiàn)了 submit、invokeAny 和 invokeAll 方法

Executors(類)

一個(gè)工廠并且提供了一些輔助方法針對(duì)于Executor中的接口ExecutorService, ScheduledExecutorService, ThreadFactoryCallable在此包中定義的類。該類支持以下幾種方法:

  • 創(chuàng)建并返回一個(gè)ExecutorService設(shè)置的常用的配置設(shè)置的方法。
  • 創(chuàng)建并返回一個(gè)ScheduledExecutorService的方法, 其中設(shè)置了常用的配置設(shè)置。
  • 創(chuàng)建并返回“包裝”ExecutorService的方法,通過(guò)使實(shí)現(xiàn)特定的方法無(wú)法訪問(wèn)來(lái)禁用重新配置。
  • 創(chuàng)建并返回將新創(chuàng)建的線程設(shè)置為已知狀態(tài)的ThreadFactory的方法。
  • 創(chuàng)建并返回一個(gè)方法Callable出的其他閉包形式,這樣他們就可以在需要的執(zhí)行方法使用Callable 。

作用:作為一個(gè)工廠并更加方便地創(chuàng)建出對(duì)應(yīng)的線程池對(duì)象

3、使用工廠模式創(chuàng)建線程池

  • newFixedThreadPool(int n):表示創(chuàng)建一個(gè)固定大小的線程池,corePoolSize = n,maximumPoolSize = n,即線程池當(dāng)中所一直維護(hù)的線程數(shù)量 是 n,且最大的線程數(shù)量是n
  • newSingleThreadExecutor():表示創(chuàng)建一個(gè)固定大小是1的線程池,corePoolSize = 1,maximumPoolSize = 1,即線程池當(dāng)中所一直維護(hù)的線程數(shù)量 是 1,且最大的線程數(shù)量是1(線程池除了線程外,還有其他的記錄功能,應(yīng)該大小為1的線程池也是有好處)
  • newCachedThreadPool():表示創(chuàng)建一個(gè)可緩存線程的線程池,corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即線程池當(dāng)中所一直維護(hù)的線程數(shù)量 是 0,且最大的線程數(shù)量是Integer.MAX_VALUE。當(dāng)線程超過(guò)特定時(shí)間無(wú)使用則自動(dòng)回收,當(dāng)線程不夠時(shí),則再次創(chuàng)建線程

總結(jié):上面3種創(chuàng)建方式其實(shí)本質(zhì)上都是對(duì)ThreadPoolExecutor進(jìn)行傳參從而構(gòu)造線程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

注意:
①:一般中小項(xiàng)目才會(huì)使用工廠模式進(jìn)行創(chuàng)建。而一般大項(xiàng)目會(huì)精心考量每一個(gè)參數(shù)的大小進(jìn)行創(chuàng)建線程池,而不使用工廠模式,比較工長(zhǎng)模式靈活性非常受限
②:線程池里面的線程都是用戶線程,并非守護(hù)線程,因此線程池不用時(shí),需要進(jìn)行關(guān)閉,否則會(huì)浪費(fèi)資源

例子
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class MyTest12 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //ExecutorService executorService = Executors.newSingleThreadExecutor();
        //ExecutorService executorService = Executors.newCachedThreadPool();
        
        IntStream.range(0, 3).forEach(i -> {
            executorService.submit(() -> {
                IntStream.range(0, 5).forEach(j -> {
                    System.out.println(Thread.currentThread().getName());
                });
            });
        });

        executorService.shutdown();
    }
}

使用newFixedThreadPool(3)輸出,線程池中3個(gè)線程都能用上

pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3

使用newSingleThreadExecutor()輸出,線程池中只有一個(gè)線程用上

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

使用newCachedThreadPool()輸出,當(dāng)線程不夠時(shí),會(huì)創(chuàng)建新的線程使用,線程數(shù)范圍在[1, 3],因?yàn)槔又凶疃嘤?個(gè)任務(wù),有可能運(yùn)行得快只用上1個(gè)或者2個(gè)線程

pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

4、線程池構(gòu)造參數(shù)

  • 1、int corePoolSize:線程池當(dāng)中所一直維護(hù)的線程數(shù)量,如果線程池處于任務(wù)空閑期間,那么該線程也并不會(huì)被回收掉
  • 2、int maximumPoolSize:線程池所維護(hù)的線程數(shù)量的最大數(shù)量
  • 3、long keepAliveTime:超過(guò)了corePoolSize的線程在經(jīng)過(guò)keepAliveTime時(shí)間后一直處于空閑狀態(tài),那么超過(guò)的這部分線程將會(huì)回收掉
  • 4、TimeUnit unit:指的是keepAliveTime的時(shí)間單位
  • 5、BlockingQueue<Runnable> workQueue:向線程池所提交的任務(wù)位于的阻塞隊(duì)里,它的實(shí)現(xiàn)有多種方式
  • 6、ThreadFactory threadFactory:線程工廠,用于創(chuàng)建新的線程并被線程池所管理,默認(rèn)線程工廠所創(chuàng)建的線程都是用戶線程且優(yōu)先級(jí)為正常優(yōu)先級(jí)

該接口最重要的方法

public interface ThreadFactory {
     Thread newThread(Runnable r);
}

該接口有幾個(gè)實(shí)現(xiàn)類,默認(rèn)的實(shí)現(xiàn)類是DefaultThreadFactory,其中poolNumber要設(shè)置成static,threadNumber要設(shè)置成非static,因?yàn)榫€程池中的每個(gè)線程都是從1開(kāi)始記錄的

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            //當(dāng)該線程是守護(hù)線程時(shí),設(shè)置為用戶線程
            if (t.isDaemon())
                t.setDaemon(false);
           //設(shè)置線程的優(yōu)先級(jí)相同
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
  • 7、RejectedExecutionHandler handler:表示當(dāng)線程池中的線程都在忙于執(zhí)行任務(wù)且阻塞隊(duì)里也已經(jīng)滿了的情況下,新到來(lái)的任務(wù)該如何被對(duì)待和處理。
    它有四種實(shí)現(xiàn)策略
    (1)AbortPolicy:直接拋出一個(gè)運(yùn)行期異常
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    public class RejectedExecutionException extends RuntimeException {}

(2)DiscardPolicy:默默地丟棄掉提交的任務(wù),什么都不做且不拋出異常

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

(3)DiscardOldestPolicy:丟棄掉阻塞隊(duì)列存放時(shí)間最久的任務(wù)(隊(duì)頭元素),并且為當(dāng)前所提交的任務(wù)留出一個(gè)隊(duì)列中的空閑空間,以便將其放進(jìn)隊(duì)列中(當(dāng)彈出隊(duì)頭時(shí),可能又被其他任務(wù)入隊(duì)了,這時(shí)會(huì)繼續(xù)該操作,直到成功為止)

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

(4)CallerRunsPolicy:直接由提交任務(wù)的線程來(lái)運(yùn)行這個(gè)提交的任務(wù)(雖然這么做不會(huì)丟失任務(wù),可是本來(lái)主線程把該任務(wù)給線程池執(zhí)行,自己可以做別的調(diào)度事情,用這個(gè)策略后最后還是自己執(zhí)行,最終還是會(huì)阻塞主線程)

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

在線程池中,最好把偏向鎖標(biāo)記關(guān)閉,使用偏向鎖會(huì)影響性能。(線程池是用來(lái)執(zhí)行大量任務(wù)的集合體,大量的任務(wù)都是由不同的線程執(zhí)行)

5、線程池拒絕策略實(shí)例分析

例子:初始線程池最小線程數(shù)是3,最大線程數(shù)是5,線程等待回收時(shí)間是0,使用有限長(zhǎng)度的阻塞隊(duì)列大小是3。一共有9個(gè)任務(wù),正在執(zhí)行的線程有5個(gè),阻塞3個(gè),表示當(dāng)線程池中的線程都在忙于執(zhí)行任務(wù)且阻塞隊(duì)里也已經(jīng)滿了的情況下,

package com.concurrency2;

import java.util.concurrent.*;
import java.util.stream.IntStream;

public class MyTest12 {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(3, 5, 0,
                TimeUnit.SECONDS, new LinkedBlockingQueue(3), 
                new ThreadPoolExecutor.AbortPolicy());
                                     //DiscardPolicy()

        IntStream.range(0, 9).forEach(i -> {
            executorService.submit(() -> {
                IntStream.range(0, 2).forEach(j -> {
                    System.out.println(Thread.currentThread().getName());
                });
            });
        });

        executorService.shutdown();
    }
}

1、使用AbortPolicy()拒絕策略,根據(jù)該策略,當(dāng)出現(xiàn)新到來(lái)的任務(wù)時(shí),自動(dòng)拋出運(yùn)行時(shí)異常

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@4e50df2e rejected from java.util.concurrent.ThreadPoolExecutor@1d81eb93[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.concurrency2.MyTest12.lambda$main$2(MyTest12.java:12)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
    at com.concurrency2.MyTest12.main(MyTest12.java:11)
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-4
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5
pool-1-thread-3
pool-1-thread-3

2、使用DiscardPolicy()拒絕策略,根據(jù)該策略,當(dāng)出現(xiàn)新到來(lái)的任務(wù)時(shí),直接扔掉,因此下面輸出小于18個(gè)

pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-4
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5

3、使用DiscardOldestPolicy拒絕策略,根據(jù)該策略,當(dāng)出現(xiàn)新到來(lái)的任務(wù)時(shí),poll出阻塞隊(duì)列的隊(duì)頭,將自己加進(jìn)去,因?yàn)橐虼讼旅孑敵鲂∮?8

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
pool-1-thread-3
pool-1-thread-4
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5

4、使用CallerRunsPolicy拒絕策略,根據(jù)該策略,當(dāng)出現(xiàn)新到來(lái)的任務(wù)時(shí),直接由提交任務(wù)的線程來(lái)運(yùn)行這個(gè)提交的任務(wù),因此線程數(shù)是18,其中有兩個(gè)是由提交任務(wù)的線程執(zhí)行,下面是main線程

main
main
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
pool-1-thread-3
pool-1-thread-4
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5

6、關(guān)于線程池的總體執(zhí)行策略:

image.png

表述1
1、如果線程池中正在執(zhí)行的線程數(shù) < corePoolSize,那么線程池就會(huì)優(yōu)先選擇創(chuàng)建新的線程而非將提交的任務(wù)加到阻塞隊(duì)列中
2、如果線程池中正在執(zhí)行的線程數(shù) >= corePoolSize,那么線程池就會(huì)優(yōu)先進(jìn)行阻塞隊(duì)列排隊(duì)而非創(chuàng)建新的線程
3、如果提交的任務(wù)無(wú)法加入到阻塞隊(duì)列當(dāng)中,那么線程池就會(huì)創(chuàng)建新的線程;如果創(chuàng)建的線程數(shù)超過(guò)了maximumPoolSize,那么拒絕策略就會(huì)起作用

表述2
1、當(dāng)任務(wù)提交之后,線程池首先會(huì)檢查當(dāng)前線程數(shù),如果當(dāng)前的線程數(shù)小于核心線程數(shù)(corePoolSize),比如最開(kāi)始創(chuàng)建的時(shí)候線程數(shù)為 0,則新建線程并執(zhí)行任務(wù)。
2、當(dāng)提交的任務(wù)不斷增加,創(chuàng)建的線程數(shù)等于核心線程數(shù)(corePoolSize),新增的任務(wù)會(huì)被添加到 workQueue 任務(wù)隊(duì)列中,等待核心線程執(zhí)行完當(dāng)前任務(wù)后,重新從 workQueue 中獲取任務(wù)執(zhí)行。
3、假設(shè)任務(wù)非常多,達(dá)到了 workQueue 的最大容量,但是當(dāng)前線程數(shù)小于最大線程數(shù)(maximumPoolSize),線程池會(huì)在核心線程數(shù)(corePoolSize)的基礎(chǔ)上繼續(xù)創(chuàng)建線程來(lái)執(zhí)行任務(wù)。
4、假設(shè)任務(wù)繼續(xù)增加,線程池的線程數(shù)達(dá)到最大線程數(shù)(maximumPoolSize),如果任務(wù)繼續(xù)增加,這個(gè)時(shí)候線程池就會(huì)采用拒絕策略來(lái)拒絕這些任務(wù)。

注意:當(dāng)阻塞隊(duì)列中已經(jīng)沒(méi)有任務(wù)了,同時(shí)該線程池中的線程也執(zhí)行完任務(wù)了,它會(huì)進(jìn)入到相應(yīng)的Condition等待集合中進(jìn)行阻塞,直到等到下一個(gè)任務(wù)提交時(shí)會(huì)直接進(jìn)入到阻塞隊(duì)列,便會(huì)喚醒Condition集合中的某一個(gè)線程。如果是非核心線程,它阻塞了一段時(shí)間后便會(huì)自動(dòng)回收。

7、關(guān)于線程池任務(wù)提交的總結(jié):

1、兩種提交方式:submit與execute
2、submit有三種方式,無(wú)論哪種方式,最終都是將傳遞進(jìn)來(lái)的任務(wù)轉(zhuǎn)換為一個(gè)Callable對(duì)象進(jìn)行處理。submit方法可以取代execute方法,因?yàn)樗瓤梢越邮誄allable任務(wù),也可以接收Runnable任務(wù)
3、當(dāng)Callable對(duì)象構(gòu)造完畢后,最終都會(huì)調(diào)用Executor接口中聲明的execute方法進(jìn)行統(tǒng)一的處理

submit的三種方法

方式一:接收Runnable對(duì)象,會(huì)轉(zhuǎn)變?yōu)镃allable對(duì)象,執(zhí)行完后,返回null

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

方式二:接收Runnable對(duì)象,會(huì)轉(zhuǎn)變?yōu)镃allable對(duì)象,執(zhí)行完后,返回指定的結(jié)果result

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

方式三:接收Callable對(duì)象,執(zhí)行完后,返回任務(wù)處理后的結(jié)果

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

8、線程池的狀態(tài)

對(duì)于線程池來(lái)說(shuō),存在兩個(gè)狀態(tài)需要維護(hù):

1、線程池本身的狀態(tài):ctl 的高3位來(lái)表示
2、線程池中正在運(yùn)行的線程的數(shù)量:ctl 的其余29位來(lái)表示
注意:其中 ctl 是AtomicInteger類型的

線程池一共存在5種狀態(tài)

1、RUNNING:線程池可以接收新的任務(wù)提交,并且還可以正常處理阻塞隊(duì)列中的任務(wù)
2、SHUTDOWN:不再接受新的任務(wù)提交,不過(guò)線程可以繼續(xù)處理阻塞隊(duì)列中的任務(wù)
3、STOP:不再接收新的任務(wù),同時(shí)還會(huì)丟棄阻塞隊(duì)列中的既有任務(wù);此外,它還會(huì)中斷正在處理中的任務(wù)
4、TIDYING:所有的任務(wù)都執(zhí)行完畢后(同時(shí)也涵蓋了阻塞隊(duì)列中的任務(wù)),當(dāng)前線程池中的活動(dòng)的線程數(shù)量降為0,將會(huì)調(diào)用terminated方法
5、TERMINATED:線程池的終止?fàn)顟B(tài),當(dāng)terminated方法執(zhí)行完畢后,線程池將會(huì)處于該狀態(tài)之下

  • RUNNING -> SHUTDOWN:當(dāng)調(diào)用了線程池中的shutdown方法時(shí),或者當(dāng)finalize方法被隱式調(diào)用后(該方法內(nèi)部會(huì)調(diào)用shutdown方法)
  • RUNNING, SHUTDOWN -> STOP:當(dāng)調(diào)用了線程池中的shutdownNow方法時(shí),丟棄阻塞隊(duì)列中的既有任務(wù) 和 中斷正在處理中的任務(wù),調(diào)用了shutdownNow方法會(huì)返回阻塞隊(duì)列中的任務(wù)的一個(gè)新鏈表
  • SHUTDOWN -> TIDYING:在線程池與阻塞隊(duì)列均變?yōu)榭諘r(shí)
  • STOP -> TIDYING:在線程池變?yōu)榭諘r(shí)
  • TIDYING -> TERMINATED:在terminated方法被執(zhí)行完畢后

9、execute()

execute方法,它會(huì)執(zhí)行給定的任務(wù)在未來(lái)的某個(gè)時(shí)間。這個(gè)任務(wù)可能在新的線程當(dāng)中執(zhí)行,也可能在存在的線程池中執(zhí)行。如果一個(gè)任務(wù)不能提交進(jìn)行執(zhí)行,有兩種情況之一,①線程池已經(jīng)關(guān)閉了 ,②阻塞隊(duì)列容量已經(jīng)滿了,當(dāng)滿足其中一個(gè),當(dāng)前任務(wù)就會(huì)被拒絕策略處理

ThreadPoolExecutor#execute 源碼分析
 public void execute(Runnable command) {
     // 如果傳入的Runnable的空,就拋出異常
     if (command == null)
         throw new NullPointerException();
     int c = ctl.get();
     // 線程池中的線程比核心線程數(shù)少 
     if (workerCountOf(c) < corePoolSize) {
         // 新建一個(gè)核心線程執(zhí)行任務(wù)
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     // 核心線程已滿,但是任務(wù)隊(duì)列未滿,添加到隊(duì)列中
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         // 任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
         if (! isRunning(recheck) && remove(command))
             // 如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
             reject(command);
         else if (workerCountOf(recheck) == 0)
             // 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
             addWorker(null, false);
     }
     // 核心線程池已滿,隊(duì)列已滿,嘗試創(chuàng)建一個(gè)非核心新的線程
     else if (!addWorker(command, false))
         // 如果創(chuàng)建新線程失敗,說(shuō)明線程池關(guān)閉或者線程池滿了,拒絕任務(wù)
         reject(command);
 }

其他代碼比較容易看懂,這里的代碼需要解釋一下

            else if (workerCountOf(recheck) == 0)
                // 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
                addWorker(null, false);

進(jìn)入這個(gè) else 說(shuō)明前面判斷到線程池狀態(tài)為 Running,那么當(dāng)任務(wù)被添加進(jìn)來(lái)之后就需要防止沒(méi)有可執(zhí)行線程的情況發(fā)生(比如之前的線程被回收了或意外終止了),所以此時(shí)如果檢查當(dāng)前線程數(shù)為 0,也就是 workerCountOf(recheck) == 0,那就執(zhí)行 addWorker() 方法新建一個(gè)非核心線程。

addWorker源碼分析

addWorker 方法的主要作用是在線程池中創(chuàng)建一個(gè)線程并執(zhí)行傳入的任務(wù),如果返回 true 代表添加成功,如果返回 false 代表添加失敗。

第一個(gè)參數(shù)firstTask表示新創(chuàng)建的線程首先執(zhí)行的任務(wù),也可以任務(wù)為null,
第二個(gè)參數(shù)是個(gè)布爾值,如果布爾值傳入 true 代表增加線程時(shí)判斷當(dāng)前線程是否少于 corePoolSize,小于則增加新線程(核心線程),大于等于則不增加;同理,如果傳入 false 代表增加線程時(shí)判斷當(dāng)前線程是否少于 maximumPoolSize,小于則增加新線程(非核心線程),大于等于則不增加,所以這里的布爾值的含義是以核心線程數(shù)為界限還是以最大線程數(shù)為界限進(jìn)行是否新增線程

//線程啟動(dòng)的時(shí)候
private boolean addWorker(Runnable firstTask, boolean core) {
  //代碼 通過(guò)CAS操作對(duì)線程數(shù)量加1,
  w = new Worker(firstTask);
  final Thread t = w.thread;
  ...//代碼
  t.start();
  ...//代碼
}
runWorker源碼分析

線程池中的線程是封裝在Worker類中的,啟動(dòng)線程時(shí)調(diào)用start執(zhí)行run方法,執(zhí)行runWorker,當(dāng)?shù)谝粋€(gè)線程不為空 或者 阻塞隊(duì)列中能夠獲取到線程(getTask()為true)時(shí),線程會(huì)一直執(zhí)行下去,即同一個(gè)線程不斷會(huì)執(zhí)行任務(wù),從而達(dá)到了線程復(fù)用的效果

//1,線程啟動(dòng)的時(shí)候jvm會(huì)執(zhí)行它
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    final Thread thread;
    Runnable firstTask;
    public void run() {
       runWorker(this);
    }
}


//2,繼續(xù)看里面的runWorker(this);很顯然this是內(nèi)部類Work對(duì)象本身
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        ../代碼
        try {
            while (task != null || (task = getTask()) != null) {
            try{
                ../代碼
                task.run();
                }catch(){
                ../代碼
                } finally {
                    ../代碼
                }
            }
            ../代碼
        } finally {
            ../代碼
        }
 }
當(dāng)線程池中的線程已經(jīng)沒(méi)有任務(wù)可以執(zhí)行了會(huì)怎么樣???會(huì)自動(dòng)回收嗎?

答案:當(dāng)阻塞隊(duì)列中已經(jīng)沒(méi)有任務(wù)了,同時(shí)該線程池中的線程也執(zhí)行完任務(wù)了,它會(huì)進(jìn)入到相應(yīng)的Condition等待集合中進(jìn)行阻塞,直到等到下一個(gè)任務(wù)提交時(shí)會(huì)直接進(jìn)入到阻塞隊(duì)列,便會(huì)喚醒Condition集合中的某一個(gè)線程。如果是非核心線程,它阻塞了一段時(shí)間后便會(huì)自動(dòng)回收

(1)線程進(jìn)入Condition等待集合的源碼

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

以阻塞隊(duì)列是ArrayBlockQueue為例

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

分析:其實(shí)在getTask()方法的時(shí)候,如果該線程已經(jīng)空閑了,就會(huì)調(diào)用workQueue.take()方法,會(huì)調(diào)用notEmpty.await()方法從而進(jìn)入阻塞,其中notEmpty是Condition類型
(2)Condition等待集合的線程被喚醒

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

分析:當(dāng)線程數(shù)量達(dá)到一定的核心線程數(shù)時(shí),有任務(wù)提交時(shí)在execute方法中會(huì)調(diào)用workQueue.offer(command)加入任務(wù),同時(shí)如果會(huì)喚醒阻塞中的核心線程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容