線程池使用與機(jī)制

之前創(chuàng)建線程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 這四個(gè)方法。
當(dāng)然 Executors 也是用不同的參數(shù)去 new ThreadPoolExecutor 實(shí)現(xiàn)的,本文先分析前四種線程創(chuàng)建方式,后在分析 new ThreadPoolExecutor 創(chuàng)建方式

使用 Executors 創(chuàng)建線程池
1.newFixedThreadPool()
由于使用了LinkedBlockingQueue所以maximumPoolSize沒用,當(dāng)corePoolSize滿了之后就加入到LinkedBlockingQueue隊(duì)列中。
每當(dāng)某個(gè)線程執(zhí)行完成之后就從LinkedBlockingQueue隊(duì)列中取一個(gè)。
所以這個(gè)是創(chuàng)建固定大小的線程池。

源碼分析

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

2.newSingleThreadPool()
創(chuàng)建線程數(shù)為1的線程池,由于使用了LinkedBlockingQueue所以maximumPoolSize 沒用,corePoolSize為1表示線程數(shù)大小為1,滿了就放入隊(duì)列中,執(zhí)行完了就從隊(duì)列取一個(gè)。

源碼分析

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (
                    new ThreadPoolExecutor(
                            1,
                            1,
                            0L,
                            TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>())
            );
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

3.newCachedThreadPool()
創(chuàng)建可緩沖的線程池。沒有大小限制。由于corePoolSize為0所以任務(wù)會(huì)放入SynchronousQueue隊(duì)列中,SynchronousQueue只能存放大小為1,所以會(huì)立刻新起線程,由于maxumumPoolSize為Integer.MAX_VALUE所以可以認(rèn)為大小為2147483647。受內(nèi)存大小限制。

源碼分析

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
            0,
            Integer.MAX_VALUE,
            60L,
            TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

使用 ThreadPoolExecutor 創(chuàng)建線程池
源碼分析 ,ThreadPoolExecutor 的構(gòu)造函數(shù)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

構(gòu)造函數(shù)參數(shù)
1、corePoolSize 核心線程數(shù)大小,當(dāng)線程數(shù) < corePoolSize ,會(huì)創(chuàng)建線程執(zhí)行 runnable

2、maximumPoolSize 最大線程數(shù), 當(dāng)線程數(shù) >= corePoolSize的時(shí)候,會(huì)把 runnable 放入 workQueue中

3、keepAliveTime 保持存活時(shí)間,當(dāng)線程數(shù)大于corePoolSize的空閑線程能保持的最大時(shí)間。

4、unit 時(shí)間單位

5、workQueue 保存任務(wù)的阻塞隊(duì)列

6、threadFactory 創(chuàng)建線程的工廠

7、handler 拒絕策略

任務(wù)執(zhí)行順序
1、當(dāng)線程數(shù)小于 corePoolSize時(shí),創(chuàng)建線程執(zhí)行任務(wù)。

2、當(dāng)線程數(shù)大于等于 corePoolSize并且 workQueue 沒有滿時(shí),放入workQueue中

3、線程數(shù)大于等于 corePoolSize并且當(dāng) workQueue 滿時(shí),新任務(wù)新建線程運(yùn)行,線程總數(shù)要小于 maximumPoolSize

4、當(dāng)線程總數(shù)等于 maximumPoolSize 并且 workQueue 滿了的時(shí)候執(zhí)行 handler 的 rejectedExecution。也就是拒絕策略。

四個(gè)拒絕策略
ThreadPoolExecutor默認(rèn)有四個(gè)拒絕策略:

1、ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException

2、ThreadPoolExecutor.CallerRunsPolicy() 直接調(diào)用run方法并且阻塞執(zhí)行

3、ThreadPoolExecutor.DiscardPolicy() 直接丟棄后來的任務(wù)

4、ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊(duì)列中隊(duì)首的任務(wù)

當(dāng)然可以自己繼承RejectedExecutionHandler來寫拒絕策略.

TestThreadPoolExecutor 示例
TestThreadPoolExecutor.java
package io.ymq.thread.TestThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 描述:
 *
 * @author yanpenglei
 * @create 2017-10-12 15:39
 **/
public class TestThreadPoolExecutor {
    public static void main(String[] args) {

        long currentTimeMillis = System.currentTimeMillis();

        // 構(gòu)造一個(gè)線程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
        );

        for (int i = 1; i <= 10; i++) {
            try {
                String task = "task=" + i;
                System.out.println("創(chuàng)建任務(wù)并提交到線程池中:" + task);
                threadPool.execute(new ThreadPoolTask(task));

                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        try {
            //等待所有線程執(zhí)行完畢當(dāng)前任務(wù)。
            threadPool.shutdown();

            boolean loop = true;
            do {
                //等待所有線程執(zhí)行完畢當(dāng)前任務(wù)結(jié)束
                loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
            } while (loop);

            if (loop != true) {
                System.out.println("所有線程執(zhí)行完畢");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("耗時(shí):" + (System.currentTimeMillis() - currentTimeMillis));
        }


    }
}

ThreadPoolTask.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.io.Serializable;

/**
 * 描述:
 *
 * @author yanpenglei
 * @create 2017-10-12 15:40
 **/
public class ThreadPoolTask implements Runnable, Serializable {

    private Object attachData;

    ThreadPoolTask(Object tasks) {
        this.attachData = tasks;
    }

    public void run() {

        try {

            System.out.println("開始執(zhí)行任務(wù):" + attachData + "任務(wù),使用的線程池,線程名稱:" + Thread.currentThread().getName());

            System.out.println();

        } catch (Exception e) {
            e.printStackTrace();
        }
        attachData = null;
    }

}

遇到j(luò)ava.util.concurrent.RejectedExecutionException

第一

你的線程池 ThreadPoolExecutor 顯示的 shutdown() 之后,再向線程池提交任務(wù)的時(shí)候。 如果你配置的拒絕策略是 AbortPolicy 的話,這個(gè)異常就會(huì)拋出來。

第二

當(dāng)你設(shè)置的任務(wù)緩存隊(duì)列過小的時(shí)候,或者說, 你的線程池里面所有的線程都在干活(線程數(shù)== maxPoolSize),并且你的任務(wù)緩存隊(duì)列也已經(jīng)充滿了等待的隊(duì)列, 這個(gè)時(shí)候,你再向它提交任務(wù),則會(huì)拋出這個(gè)異常。

響應(yīng)

可以看到線程 pool-1-thread-1 到5 循環(huán)使用

創(chuàng)建任務(wù)并提交到線程池中:task=1
開始執(zhí)行任務(wù):task=1任務(wù),使用的線程池,線程名稱:pool-1-thread-1

創(chuàng)建任務(wù)并提交到線程池中:task=2
開始執(zhí)行任務(wù):task=2任務(wù),使用的線程池,線程名稱:pool-1-thread-2

創(chuàng)建任務(wù)并提交到線程池中:task=3
開始執(zhí)行任務(wù):task=3任務(wù),使用的線程池,線程名稱:pool-1-thread-3

創(chuàng)建任務(wù)并提交到線程池中:task=4
開始執(zhí)行任務(wù):task=4任務(wù),使用的線程池,線程名稱:pool-1-thread-4

創(chuàng)建任務(wù)并提交到線程池中:task=5
開始執(zhí)行任務(wù):task=5任務(wù),使用的線程池,線程名稱:pool-1-thread-5

創(chuàng)建任務(wù)并提交到線程池中:task=6
開始執(zhí)行任務(wù):task=6任務(wù),使用的線程池,線程名稱:pool-1-thread-1

創(chuàng)建任務(wù)并提交到線程池中:task=7
開始執(zhí)行任務(wù):task=7任務(wù),使用的線程池,線程名稱:pool-1-thread-2

創(chuàng)建任務(wù)并提交到線程池中:task=8
開始執(zhí)行任務(wù):task=8任務(wù),使用的線程池,線程名稱:pool-1-thread-3

創(chuàng)建任務(wù)并提交到線程池中:task=9
開始執(zhí)行任務(wù):task=9任務(wù),使用的線程池,線程名稱:pool-1-thread-4

創(chuàng)建任務(wù)并提交到線程池中:task=10
開始執(zhí)行任務(wù):task=10任務(wù),使用的線程池,線程名稱:pool-1-thread-5

所有線程執(zhí)行完畢
耗時(shí):1015
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容