深入Java線程池講解

@[toc]

一、什么是線程池


線程池就是創(chuàng)建若干個(gè)可執(zhí)行的線程放入一個(gè)池(容器)中,有任務(wù)需要處理時(shí),會(huì)提交到線程池中的任務(wù)隊(duì)列,處理完之后線程并不會(huì)被銷毀,而是仍然在線程池中等待下一個(gè)任務(wù)。

Java中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開(kāi)發(fā)過(guò)程中,合理地使用線程池能夠帶來(lái)以下下好處

  1. <font color = "orange">降低資源消耗 </font>,通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
  2. <font color = "orange">提高響應(yīng)速度</font> ,當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
  3. <font color = "orange">提高線程的可管理性</font>,線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控

二、Executor 框架


我們知道線程池就是線程的集合,下提供了集中管理、線程重用、降低資源消耗、提高響應(yīng)速度等 從 JDK 1.5之后。為了把工作單元與執(zhí)行機(jī)制分開(kāi),<font color = "orange">Executor</font> 框架誕生了,他是一個(gè)用于統(tǒng)一創(chuàng)建與運(yùn)行的接口。<font color = "orange">Executor</font> 框架實(shí)現(xiàn)的就是線程池的功能。<font color = "orange">Executor</font> 框架不僅包括了線程池的管理,還提供了線程工廠、隊(duì)列以及拒絕策略等,<font color = "orange">Executor</font> 框架讓并發(fā)編程變得更加簡(jiǎn)單。


2.1 Executor 框架組成




1. 任務(wù) (Runnable / Callable)

執(zhí)行任務(wù)需要實(shí)現(xiàn) Runnable 接口或者 Callable 接口

2. 任務(wù)等執(zhí)行 (Executor)

包括任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的 ExecutorService 接口。Executor框架有兩個(gè)關(guān)鍵類實(shí)現(xiàn)了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor


    ExecutorService executorService = Executors.newFixedThreadPool(5);
    

我們實(shí)現(xiàn)線程池通過(guò) Executors 實(shí)現(xiàn) ExecutorService 接口,ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 這兩個(gè)關(guān)鍵類實(shí)現(xiàn)了 ExecutorService 接口

ThreadPoolExecutor 類描述 :


    //AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口
    public class ThreadPoolExecutor extends AbstractExecutorService 

    public abstract class AbstractExecutorService implements ExecutorService 
    

ScheduledThreadPoolExecutor 類描述:


    //繼承ThreadPoolExecutor
    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService
            

3. 異步計(jì)算的結(jié)果 (Future)

包括Future和實(shí)現(xiàn)Future接口的FutureTask類。


2.2 Executor 結(jié)構(gòu)


在這里插入圖片描述
  • <font color = "orange">Executor</font> :是一個(gè)接口,他是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離。
  • <font color = "orange">ThreadPoolExecutor</font> :是線程池的核心實(shí)現(xiàn)類,用來(lái)執(zhí)行被提交的任務(wù)。
  • <font color = "orange">ScheduledThreadPoolExcecutor</font> 是一個(gè)實(shí)現(xiàn)類,可以在給定等延遲后運(yùn)行命令,或者定期執(zhí)行命令,ScheduledThreadPoolExcecutoe比 Timer 更靈活,功能更強(qiáng)大
  • <font color = "orange">Future</font> Future接口和它的實(shí)現(xiàn)FutureTask類,代表異步計(jì)算的結(jié)果。
  • <font color = "orange">ExecutorService</font> :是一個(gè)線程池的實(shí)現(xiàn)

2.2 Executor 使用

  1. 祝線程首先要?jiǎng)?chuàng)建實(shí)現(xiàn) Runnable 或者 Callable 接口的任務(wù)對(duì)象
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("當(dāng)前線程 :" + Thread.currentThread().getName());
            }
        }).start();
    }
  1. 把創(chuàng)建完成實(shí)現(xiàn) Runnable 或者 Callable 接口對(duì)象直接交給 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.execute(Runnable command))</font>或者也可以把 Runnable 對(duì)象或Callable 對(duì)象提交給 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.execute(Runnable command))</font>或 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.submit(Callable <T> task))</font>
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName());
            }
        });
        Future<?> submit = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName());
            }
        });
  
    }
  1. 如果 執(zhí)行 ExecutorService.submit(…),將會(huì)返回一個(gè) Future<?> 對(duì)象
  2. 祝線程可以執(zhí)行 Future.get() 方法來(lái)等待任務(wù)執(zhí)行完成 也可以執(zhí)行 FutureTask.cancel(boolean mayInterruptIfRunning)來(lái)取消此任務(wù)的執(zhí)行。

三、ThreadPoolExecutor 解析


3.1 構(gòu)造方法


    public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)
                              int maximumPoolSize, //最大線程數(shù)
                              long keepAliveTime, //線程數(shù)大于核心時(shí),多余線程存活時(shí)間
                              TimeUnit unit, //時(shí)間單位
                              BlockingQueue<Runnable> workQueue, //工作隊(duì)列
                              ThreadFactory threadFactory, //線程工廠
                              RejectedExecutionHandler handler //拒絕策略 ,線程過(guò)多的處理
                              ) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

參數(shù)詳解 :

  • <font color = "orange">corePoolSize </font> :核心池的大小。 當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中
  • <font color = "orange">maximumPoolSize </font> :當(dāng)隊(duì)列中存放的任務(wù)達(dá)到隊(duì)列容量的時(shí)候,當(dāng)前可以同時(shí)運(yùn)行的線程數(shù)量變?yōu)樽畲缶€程數(shù)
  • <font color = "orange">keepAliveTime</font> :當(dāng)線程池中的線程數(shù)量大于 <font color = "orange">corePoolSize </font> 的時(shí)候,如果這時(shí)候沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立刻銷毀,而是等待 時(shí)間超過(guò)來(lái) <font color = "orange">keepAliveTime</font> 才會(huì)被銷毀
  • <font color = "orange">unit</font> :<font color = "orange">keepAliveTime</font> 參數(shù)的時(shí)間單位。
  • <font color = "orange">workQueue </font> : 工作隊(duì)列 , 當(dāng)新任務(wù)來(lái)的時(shí)候會(huì)先判斷當(dāng)前運(yùn)行的線程數(shù)量是否達(dá)到核心心線程數(shù),如果達(dá)到的話,新任務(wù)就會(huì)被存放在隊(duì)列中
  • <font color = "orange">threadFactory </font> : 線程工廠,用來(lái)創(chuàng)建線程
  • <font color = "orange">handler </font> :拒絕策略,提交的任務(wù)過(guò)多而不能及時(shí)處理時(shí),我們可以定制策略來(lái)處理任務(wù)
在這里插入圖片描述

為什么要這么設(shè)計(jì)呢 有了最大線程數(shù),為什么要設(shè)計(jì)核心池大小呢?

  1. 如果當(dāng)前線程池中的線程數(shù) < <font color = "orange">corePoolSize </font> 則每來(lái)一個(gè)任務(wù),就會(huì)超級(jí)愛(ài)你一個(gè)線程去執(zhí)行這個(gè)任務(wù)
  2. 如果當(dāng)前線程池中的線程數(shù) >= <font color = "orange">corePoolSize </font> , 則每來(lái)一個(gè)任務(wù),會(huì)將其添加到工作隊(duì)列中,若添加成功,則等待 核心線程空閑將其取出執(zhí)行,若添加失敗 (一般隊(duì)列已滿)則在總數(shù) 不大于 <font color = "orange">maximumPoolSize </font> 的前提下,創(chuàng)建新的線程
  3. 如果當(dāng)前線程池中的線程數(shù)達(dá)到 <font color = "orange">maximumPoolSize </font> ,則會(huì)才用拒絕策略進(jìn)行處理
  4. 補(bǔ)充 : 如果當(dāng)前線程池的數(shù)量大于 <font color = "orange">corePoolSize </font> 時(shí),如果某個(gè)線程空閑時(shí)間超過(guò)<font color = "orange">keepAliveTime</font> ,線程將被銷毀,直至線程池中的線程數(shù)目不大于 <font color = "orange">corePoolSize </font>


ThreadPoolExecutor 拒絕策略

  • <font color = "orange">ThreadPoolExecutor.AbortPolicy</font> :拋出 RejectedExecutionException來(lái)拒絕新任務(wù)的處理
  • <font color = "orange">ThreadPoolExecutor.CallerRunsPolicy</font> :調(diào)用執(zhí)行自己的線程運(yùn)行任務(wù),也就是直接在調(diào)用 execute 方法的線程運(yùn)行(run)被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。因此這種策略會(huì)降低對(duì)于新任務(wù)的提交速度,影響程序的整體性能。
  • <font color = "orange">ThreadPoolExecutor.DiscardPolicy</font> :不處理新任務(wù),直接丟棄掉。
  • <font color = "orange">ThreadPoolExecutor.DiscardOldestPolicy</font> :此策略將丟棄最早的未處理的任務(wù)請(qǐng)求


3.2 自定義 ThreadPoolExecutor


在 《阿里巴巴 JAVA 開(kāi)發(fā)手冊(cè)》明確指出線程資源利用線程池,線程池不允許使用 Executors 去創(chuàng)建

<font color = 'red'>【強(qiáng)制】</font> 線程資源必須通過(guò)線程池提供,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
說(shuō)明:線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時(shí)間以及系統(tǒng)資源的開(kāi)銷,解決資源不足的問(wèn)題。
如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者“過(guò)度切換”的問(wèn)題。

<font color = 'red'>【強(qiáng)制】</font> 線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 的方式,這
樣的處理方式讓寫(xiě)的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。

說(shuō)明:Executors 返回的線程池對(duì)象的弊端如下:

  • =1) FixedThreadPool 和 SingleThreadPool:
    允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM。
  • 2) CachedThreadPool:
    允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM。
public class test {


    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 20;
    private static final Long KEEP_ALIVE_TIME = 1L;


    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.AbortPolicy()
        );
        for (int i = 0; i < 300; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("當(dāng)前線程:"+ Thread.currentThread().getName());
                    System.out.println("當(dāng)前狀態(tài):"+ Thread.currentThread().getState());
                }
            });
        }
        //終止線程池
        threadPoolExecutor.shutdown();
    }



}

在這里插入圖片描述

因?yàn)槲覀兪褂玫? <font color = "orange">ThreadPoolExecutor.AbortPolicy</font> 的拒絕任務(wù),所以被拋出異常

OOM 案例:

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100000);
        System.out.println("開(kāi)始執(zhí)行");
        for (int i = 0; i < 100000000; i++) {
            executorService.execute(() -> {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                System.out.println("等待一小時(shí)開(kāi)始");
                try {
                    TimeUnit.HOURS.sleep(1);
                }catch (Exception e){
                    log.info(payload);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1,TimeUnit.HOURS);
    }

在這里插入圖片描述


3.3 源碼分析


線程池狀態(tài):利用低29位表示線程池中線程數(shù),通過(guò)高3位表示線程池的運(yùn)行狀態(tài):

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • <font color = "orange">RUNNING</font>:運(yùn)行狀態(tài),接受新的任務(wù)并且處理隊(duì)列中的任務(wù)。-
  • <font color = "orange">SHUTDOWN</font>:關(guān)閉狀態(tài)(調(diào)用了 shutdown 方法)。不接受新任務(wù),,但是要處理隊(duì)列
    中的任務(wù)。
  • <font color = "orange">STOP</font>:停止?fàn)顟B(tài)(調(diào)用了 shutdownNow 方法)。不接受新任務(wù),也不處理隊(duì)列中的
    任務(wù),并且要中斷正在處理的任務(wù)。
  • <font color = "orange">TIDYING</font>:所有的任務(wù)都已終止了,workerCount 為 0,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)terminated() 方法進(jìn)入 TERMINATED 狀態(tài)。
  • <font color = "orange">TERMINATED</font>:終止?fàn)顟B(tài),terminated() 方法調(diào)用結(jié)束后的狀態(tài)。
//記錄線程池中線程狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//來(lái)獲取當(dāng)前線程數(shù)量
private static int workerCountOf(int c)  { return c & CAPACITY; }

//工作隊(duì)列
private final BlockingQueue<Runnable> workQueue;



public void execute(Runnable command) {
        //如果任務(wù)為null,則拋出異常
        if (command == null)
            throw new NullPointerException();
        // 取的是記錄線程狀態(tài)
        int c = ctl.get();
        //判斷當(dāng)前線程池中之行的任務(wù)數(shù)量是否小于 corePoolSize
        if (workerCountOf(c) < corePoolSize) {
        //小于的話,通過(guò)addWorker(command, true)新建一個(gè)線程,并將任務(wù)(command)添加到該線程中
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果當(dāng)前之行的任務(wù)數(shù)量大于等于 corePoolSize 的時(shí)候就會(huì)走到這里
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次獲取線程池狀態(tài),如果線程池狀態(tài)不是 RUNNING 狀態(tài)就需要從任務(wù)隊(duì)列中移除任務(wù)
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果當(dāng)前線程池為空就新創(chuàng)建一個(gè)線程并執(zhí)行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 通過(guò)addWorker(command, false)新建一個(gè)線程
        // 如果addWorker(command, false)執(zhí)行失敗,則通過(guò)reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容。
        else if (!addWorker(command, false))
            reject(command);
    }
在這里插入圖片描述

addWorker() 方法



// Lock鎖
private final ReentrantLock mainLock = new ReentrantLock();

// 跟蹤線程池的最大大小
private int largestPoolSize;

// 工作線程集合
private final HashSet<Worker> workers = new HashSet<>();

//獲取線程池狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
//判斷線程池的狀態(tài)是否為 Running
private static boolean isRunning(int c) {
      return c < SHUTDOWN;
}

private boolean addWorker(Runnable firstTask, boolean core) {
        // CAS更新線程池?cái)?shù)量
        retry:
        for (;;) {
            //獲取線程池狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 檢查queue是否為空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //獲取線程池中線程的數(shù)量
                int wc = workerCountOf(c);
                // core參數(shù)為true的話表明隊(duì)列也滿了,線程池大小變?yōu)?maximumPoolSize 
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // //原子操作將workcount的數(shù)量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果線程的狀態(tài)改變了就再次執(zhí)行上述操作
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        // 標(biāo)記工作線程是否啟動(dòng)成功
        boolean workerStarted = false;
        // 標(biāo)記工作線程是否創(chuàng)建成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // rs < SHUTDOWN 如果線程池狀態(tài)依然為RUNNING,并且線程的狀態(tài)是存活的話,就會(huì)將工作線程添加到工作線程集合中
                    // 或者 rs == SHUTDOWN 傳入的firstTask == null 添加新的worker
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        // 更新當(dāng)前工作線程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                // 如果成功添加工作線程,則調(diào)用Worker內(nèi)部的線程實(shí)例t的Thread#start()方法啟動(dòng)真實(shí)的線程實(shí)例
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 失敗移除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

runWorker() 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 首先會(huì)通過(guò)run方法執(zhí)行firstTask,執(zhí)行完畢后會(huì)將task置為null
            // 那么task!=null的判斷條件肯定不通過(guò),它就會(huì)嘗試通過(guò)getTask(),從任務(wù)隊(duì)列中獲取任務(wù)。
            while (task != null || (task = getTask()) != null) {
                // 每一次任務(wù)的執(zhí)行都必須獲取鎖來(lái)保證下方臨界區(qū)代碼的線程安全
                w.lock();
                //如果狀態(tài)值大于等于STOP(狀態(tài)值是有序的,即STOP、TIDYING、TERMINATED)且當(dāng)前線程還沒(méi)有被中斷,則主動(dòng)中斷線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //執(zhí)行任務(wù)前處理操作,默認(rèn)是一個(gè)空實(shí)現(xiàn);在子類中可以通過(guò)重寫(xiě)來(lái)改變?nèi)蝿?wù)執(zhí)行前的處理行為
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任務(wù)后處理
                        afterExecute(task, thrown);
                    }
                } finally {
                    //將task 變?yōu)閚ull,已處理完成
                    task = null;
                    // ++操作,已完成任務(wù)數(shù)
                    w.completedTasks++;
                    //解鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 銷毀當(dāng)前的worker對(duì)象,并完成一些諸如完成任務(wù)數(shù)量統(tǒng)計(jì)之類的輔助性工作
            // 在線程池當(dāng)前狀態(tài)小于STOP的情況下會(huì)創(chuàng)建一個(gè)新的worker來(lái)替換被銷毀的worker
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask() 方法

private Runnable getTask() {
        // 通過(guò)timeOut變量表示線程是否空閑時(shí)間超時(shí)了
        boolean timedOut = false; // Did the last poll() time out?

        // 死循環(huán)
        for (;;) {
            // 獲取線程池狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果 線程池狀態(tài)>=STOP 則直接減少一個(gè)worker計(jì)數(shù)并返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //  獲取線程池中的worker計(jì)數(shù)
            int wc = workerCountOf(c);

            // 判斷當(dāng)前線程是否會(huì)被超時(shí)銷毀
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 如果當(dāng)前線程數(shù)大于最大線程數(shù) 或者超時(shí) 或者阻塞隊(duì)列為空 減少worker計(jì)數(shù)并返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //從阻塞隊(duì)列中取出一個(gè)任務(wù)(如果隊(duì)列為空會(huì)進(jìn)入阻塞等待狀態(tài))
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                 // 如果線程不等于null,直接返回
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


四、常用線程池


4.1 FixedThreadPool

  • <font color = "orange">FixedThreadPool</font> : 定長(zhǎng)線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

從上面源代碼可以看出新創(chuàng)建的 <font color = "orange">FixedThreadPool</font> 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 nThreads, 固定長(zhǎng)度

在這里插入圖片描述

說(shuō)明:

  1. 如果當(dāng)前運(yùn)行的線程數(shù)小于 corePoolSize, 如果再來(lái)新任務(wù)的話,就創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)
  2. 當(dāng)前運(yùn)行的線程數(shù)等于 corePoolSize 后, 如果再來(lái)新任務(wù)的話,會(huì)將任務(wù)加入 LinkedBlockingQueue
  3. 線程池中的線程執(zhí)行完 手頭的任務(wù)后,會(huì)在循環(huán)中反復(fù)從 LinkedBlockingQueue 中獲取任務(wù)來(lái)執(zhí)行

<font color ='red'>弊端 :

<font color = "orange">FixedThreadPool</font> 線程池使用的是 <font color = "orange">LinkedBlockingQueue</font> 無(wú)界隊(duì)列 ,(隊(duì)列的容量為 Intger.MAX_VALUE),在線程任務(wù)多的情況下會(huì)導(dǎo)致 OOM


4.2 SingleThreadExecutor


  • <font color = "orange">SingleThreadExecutor</font> : 一個(gè)單線程化的線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
    }

從上面源代碼可以看出新創(chuàng)建的 <font color = "orange">SingleThreadExecutor</font> 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 1, 固定長(zhǎng)度,其他參數(shù)和 <font color = "orange">FixedThreadPool</font> 相同。

在這里插入圖片描述

說(shuō)明:

  1. 如果當(dāng)前運(yùn)行的線程數(shù)少于 corePoolSize,則創(chuàng)建一個(gè)新的線程執(zhí)行任務(wù)
  2. 當(dāng)前線程池中有一個(gè)運(yùn)行的線程后,將任務(wù)加入 LinkedBlockingQueue
  3. 線程執(zhí)行完當(dāng)前的任務(wù)后,會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue 中獲取任務(wù)來(lái)執(zhí)行

<font color ='red'>弊端 :

<font color = "orange">SingleThreadExecutor</font> 線程池使用的是 <font color = "orange">LinkedBlockingQueue</font> 無(wú)界隊(duì)列 ,(隊(duì)列的容量為 Intger.MAX_VALUE),在線程任務(wù)多的情況下會(huì)導(dǎo)致 OOM


4.3 CachedThreadPool

  • <font color = "orange">CachedThreadPool</font> : 緩存線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 的 corePoolSize 被設(shè)置為空(0),maximumPoolSize被設(shè)置為 Integer.MAX.VALUE,即它是無(wú)界的,這也就意味著如果主線程提交任務(wù)的速度高于 maximumPool 中線程處理任務(wù)的速度時(shí),CachedThreadPool 會(huì)不斷創(chuàng)建新的線程。極端情況下,這樣會(huì)導(dǎo)致耗盡 cpu 和內(nèi)存資源。

在這里插入圖片描述

說(shuō)明:

  1. 首先 SynchronousQueue 是一個(gè)生產(chǎn)消費(fèi)模式等阻塞任務(wù)隊(duì)列,只要有任務(wù)就需要有線程執(zhí)行,線程池中等線程可以重復(fù)使用

<font color ='red'>弊端 :

<font color = "orange">CachedThreadPool</font> 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量線程,從而導(dǎo)致 OOM。


4.3 ScheduledThreadPoolExecutor

  • <font color = "orange">ScheduledThreadPoolExecutor</font> : 延遲線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        for (int i = 1; i < 5; i++) {
            scheduledExecutorService.schedule(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
                System.out.println("時(shí)間"+LocalDateTime.now());
            },3,TimeUnit.SECONDS);
        }
        scheduledExecutorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.125
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

<font color = "orange">ScheduledThreadPoolExecutor</font> 主要用來(lái)在給定的延遲后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)

<font color = "orange">ScheduledThreadPoolExecutor</font> 使用的任務(wù)隊(duì)列 DelayQueue 封裝了一個(gè) PriorityQueue,PriorityQueue 會(huì)對(duì)隊(duì)列中的任務(wù)進(jìn)行排序,執(zhí)行所需時(shí)間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time 變量小的先執(zhí)行),如果執(zhí)行所需時(shí)間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。

在這里插入圖片描述

說(shuō)明:

  1. 延遲定時(shí)任務(wù)線程池,有點(diǎn)像我們的定時(shí)任務(wù)。同樣,它也是一個(gè)無(wú)限大小的線程池 ,Integer.MAX_VALUE。它提供的調(diào)用方法比較多,包括:scheduleAtFixedRate、scheduleWithFixedDelay,可以按需選擇延遲執(zhí)行方式。




個(gè)人博客地址:http://blog.yanxiaolong.cn ? |<font color = "orange"> 『縱有疾風(fēng)起,人生不言棄』</font>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容