Java 線程池的使用 ThreadPoolExecutor

多線程

創(chuàng)建一個多線程,需要重寫 Runnable run() 方法, 然后通過 Thread.start() 進行執(zhí)行,
如果頻繁需要使用到多線程,我們一個一個的創(chuàng)建開銷會很大,而線程池就是解決這個問題的,就像【數(shù)據(jù)庫的連接池】一樣。

Thread thread = new Thread(() -> System.out.println("Thread Hello"));
thread.start();

Java 線程池

線程池 管理線程,包括創(chuàng)建,檢測,替換,關閉等。

java.util.concurrent 在并發(fā)編程中很常用的實用工具類

ThreadPoolExecutor 最基礎的父類,幾種線程池的操作都在該類中

Executor 執(zhí)行已提交的 Runnable 任務的對象。

ExecutorService 接口(實現(xiàn)了Executor) 提供管理終止 Executor 的方法。

Executors 工廠類,提供 Exexutor,ExecutorService,ScheduleExecutorService,ThreadFactory,Callable類的工廠和實用方法。

ThreadFactory 創(chuàng)建 Thread 的工廠類

ExecutorService 四種線程池類型

CachedThreadPool

Executors.newCachedThreadPool() 緩存線程池 (重用已經(jīng)構(gòu)造過的線程池, 適用于 執(zhí)行許多短期異步任務的程序)

ScheduledThreadPool

Executors.newScheduledThreadPool() 定時(調(diào)度)線程池 (可以在 線程執(zhí)行前后 進行時間延遲, scheduleAtFixedRate 方法, scheduleWithFixedDelay 方法)

FixedThreadPool

Executors.newFixedThreadPool() 重用固定數(shù)量的線程池 (創(chuàng)建的時候固定線程池數(shù)量, 線程都處于活動狀態(tài), 會等待線程完成, 線程失敗或終止了, 將會替換一個新的線程)

SingleThreadExecutor

Executors.newSingleThreadExecutor() 單個線程的線程池 (只創(chuàng)建一個單獨的線程, 可適用于處理時間比較長的方法)

Executors Code
import org.apache.commons.lang3.time.DateFormatUtils;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.concurrent.*;

/**
 * ThreadPoolExecutor
 *
 * 四種線程池的特性和使用方式
 *      1. SingleThreadExecutor
 *      2. FixedThreadExecutor
 *      3. CacheThreadExecutor
 *      4. ScheduledThreadExecutor
 *
 * @author zhou
 */
public class TestExecutorService {

    /**
     * 單線程執(zhí)行器
     *
     * 只有一個線程的線程池
     * 執(zhí)行結(jié)果: 需要等待 僅有的線程 執(zhí)行完, 才能執(zhí)行下一次遍歷。
     */
    @Test
    public void testSingleThreadExecutor() throws InterruptedException, ExecutionException {
        // new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println("thread-name: " + Thread.currentThread().getName() + ", time: " + DateFormatUtils.format(new Date(), "HH:mm:ss"));
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(6000);
    }

    /**
     * 可重用固定線程數(shù)的線程池
     *
     * 初始化時創(chuàng)建固定數(shù)量的線程, 之后不再創(chuàng)建, 除非 有線程不可用, 就進行創(chuàng)建并替換。
     * 執(zhí)行結(jié)果: 會重復使用線程池中的線程, 而不會再創(chuàng)建新的線程, 沒有線程可用時, 會一直等待有空閑的線程, 并不會重新創(chuàng)建。
     */
    @Test
    public void testFixedThreadExecutor() throws InterruptedException {
        // new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
        ExecutorService executor = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println("thread-name: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(6000);
    }

    /**
     * 緩存線程的線程池
     *
     * 由于使用的是 SynchronousQueue, 本身只維護一個線程, 插入的時候必須有線程進行移除操作才能進行插入
     * 終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
     * 初始化為 0, 最大為 Integer.MAX_VALUE (2147483647)
     *
     * 執(zhí)行結(jié)果: 沒有線程可用就創(chuàng)建, 并不會進行等待。
     */
    @Test
    public void testCacheThreadExecutorService() throws InterruptedException {
        // new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(6000);
    }

    /**
     * 定時(調(diào)度)線程池
     *
     */
    @Test
    public void testScheduledThreadExecutor() throws InterruptedException, ExecutionException, TimeoutException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        // 下方 四個方法都能獲取 線程 返回的數(shù)據(jù)。

        // 1. schedule() 延遲啟動時間
//        System.out.println("start");
//        for (int i = 0; i < 3; i++) {
//            executor.schedule(() -> System.out.println("運行線程"), 1, TimeUnit.SECONDS);
//        }
//        Thread.sleep(3000);

        // 2. schedule() 延遲啟動時間, 通過 <V> 泛型來指定返回的參數(shù)的類型
//        executor.schedule(() -> {
//            System.out.println("test");
//        }, 1, TimeUnit.SECONDS);
//        Thread.sleep(4000);

        // 3. scheduleAtFixedRate() 初始化延遲和定時調(diào)度延遲
        // command 運行的方法
        // initialDelay 初始化延遲時間
        // period  線程執(zhí)行前延遲時間
        // unit 統(tǒng)一的時間單位
        // 如果第一個線程的執(zhí)行時間超過第二個線程延遲的時間, 則等待第一個線程執(zhí)行完, 再執(zhí)行第二個線程。
//        executor.scheduleAtFixedRate(() -> {
//            System.out.println(System.currentTimeMillis());
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }, 0, 1, TimeUnit.SECONDS);
//
//        Thread.sleep(5000);

        // 4. scheduleWithFixedDelay() 固定延遲調(diào)度
        // command 運行的方法
        // initialDelay 初始化延遲時間
        // delay 線程延期時間
        // unit 統(tǒng)一的時間單位
        // 和上方方法唯一不同的就是 delay 的處理
        // 如果第一個線程的執(zhí)行時間超過第二個線程延遲的時間, 則第二個線程的執(zhí)行時間為 (第一個線程的執(zhí)行時間 + delay)
//        executor.scheduleWithFixedDelay(() -> {
//            System.out.println(System.currentTimeMillis());
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }, 0, 1, TimeUnit.SECONDS);
//        Thread.sleep(5000);

        // 5. 需要獲取線程執(zhí)行完后返回的參數(shù)的對象/方法解析
        ScheduledFuture<String> schedule = executor.schedule(() -> {
            return String.valueOf(System.currentTimeMillis());
        }, 2, TimeUnit.SECONDS);

        // getDelay(計算時間單位) 執(zhí)行到該方法的時候, 執(zhí)行的線程還有多久執(zhí)行完, 返回 0 或者 負數(shù)的時候代表線程已經(jīng)執(zhí)行完成。
        // System.out.println(schedule.getDelay(TimeUnit.SECONDS));

        // get()   獲取線程執(zhí)行完后的返回值
        // System.out.println(schedule.get());

        // get(超時時間, 時間單位)  最長等待多長時間接收到線程執(zhí)行完后的返回值
        // 拋出四個異常
        // 線程取消異常
        // 線程執(zhí)行異常
        // 線程等待時被中斷異常
        // 線程超時異常 (線程還在執(zhí)行中, 我們就去獲取的情況)
        // System.out.println(schedule.get(1, TimeUnit.SECONDS));
        // Thread.sleep(4000);


    }

}

ThreadPoolExecutor

ThreadPoolExecutor 參數(shù)

 1. corePoolSize 線程池中核心線程數(shù)
 2. maximumPoolSize  線程池中最大線程數(shù)
 3. keepAliveTime 空閑時間, 當線程池數(shù)量超過核心線程池時, 多余的空閑線程存活的時間
 4. unit 空閑時間單位
 5. workQueue 等待隊列, 線程池中的線程數(shù)超過核心線程數(shù)時, 任務將放在等待隊列中
 6. threadFactory 線程工廠
 7. handler 拒絕策略, 當線程池和等待隊列都滿了之后, 需要通過該對象的回調(diào)函數(shù)進行回調(diào)處理

workQueue 等待隊列

 1. ArrayBlockingQueue 隊列是有界的, 需要初始化隊列數(shù)量, 基于數(shù)組實現(xiàn)的阻塞隊列
 2. LinkedBlockingQueue 隊列可以有界和無界, 基于鏈表實現(xiàn)的阻塞隊列
 3. SynchronousQueue 不存儲元素的阻塞隊列, 每個插入操作必須等到另一個線程調(diào)用移除操作, 否則插入操作一直處于阻塞狀態(tài)
 4. PriorityBlockingQueue 帶優(yōu)先級的無界阻塞隊列
 5. LinkedBlockingDeque 隊列可有界和無界, 基于雙向鏈表實現(xiàn)的阻塞隊列
 
@Test
public void testWorkQueue() {
    ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(20);
    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
    SynchronousQueue synchronousQueue = new SynchronousQueue();
    PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
    LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            linkedBlockingDeque);

    executor.execute(() -> {
        System.out.println("test");
    });

}

handler 拒絕策略

 自帶四種拒絕策略, 可自定義拒絕策略
 1. AbortPolicy (默認) 處理程序遭到拒絕將拋出運行時異常 RejectedExecutionException
 2. CallerRunPolicy 由調(diào)用該方法的線程執(zhí)行 (比如: 我主線程調(diào)用的, 直接交給主線程執(zhí)行)
 3. DiscardPolicy 直接將任務刪除, 不做任何處理
 4. DiscardOldestPolicy 將位于工作隊列的頭部任務刪除, 再執(zhí)行當前任務 (再次失敗, 再次刪除和重試)
 5. 自定義 實現(xiàn) RejectedExecutionHandler 重寫 rejectedExecution 方法
 
@Test
public void handler() throws InterruptedException {
    ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
    ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    // 看時間戳的變化 (在兩秒鐘的時間內(nèi), 丟棄了很多線程)
    ThreadPoolExecutor.DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
    ThreadPoolExecutor.DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
    MyRejectedHandler myRejectedHandler = new MyRejectedHandler();

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(8), myRejectedHandler);

    for (int i = 0; i < 6; i++) {
        executor.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ":" + System.currentTimeMillis() + ":" + executor.getQueue().size());
        });
    }
    Thread.sleep(50000);
}

// 自定義拒絕策略
public static class MyRejectedHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("阻塞了");
    }
}

線程執(zhí)行前, 執(zhí)行后, 關閉和終止的方法回調(diào)

回調(diào)方法
 1. beforeExecute 執(zhí)行線程之前
 2. afterExecute  執(zhí)行線程之后
 3. terminated 線程池結(jié)束調(diào)用

 關閉線程池
 1. shutdown() 將線程池狀態(tài)置為 SHUTDOWN, 按過去已提交任務的順序發(fā)起一個有序的關閉。
 2. shutdownNow() 將線程池狀態(tài)置為 SHUTDOWN, 通過 Thread.interrupt() 方法停止所有線程, 并返回從未開始執(zhí)行的任務列表(也就是隊列中的線程),
                  interrupt() 方法不能保證能停止正在運行的任務, 但會盡力嘗試, 所以有些無法響應中斷的線程, 可能永遠無法終止。
 
 判斷線程池是否關閉
 1. isShutdown() 線程已關閉, 返回 true
 2. isTerminating() 當 shutdown 和 shutdownNow 正在終止但未完成中的過程中, 返回 true
 3. isTerminated() 如果關閉后所有任務都已完成,則返回 true。注意,除非首先調(diào)用 shutdown 或 shutdownNow,否則 isTerminated 永不為 true。

 
@Test
public void testAOPExecuteMethod() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 0, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(15)) {
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("執(zhí)行線程之前");
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("執(zhí)行線程之后" + t);
        }

        @Override
        protected void terminated() {
            System.out.println("線程終止");
        }
    };
    executor.execute(() -> {
        System.out.println("執(zhí)行線程");
    });
    executor.shutdown();
    System.out.println(executor.isTerminated());
    System.out.println(executor.isShutdown());
    System.out.println(executor.isTerminating());

    // executor.shutdownNow();

    executor.execute(() -> {
        System.out.println("執(zhí)行線程");
    });
}

線程池統(tǒng)計信息

 1. getActiveCount() 返回處于活動狀態(tài)的線程的大致數(shù)量
 2. getCompletedTaskCount() 返回已完成執(zhí)行的任務的大致總數(shù)。
 3. getCorePoolSize() 返回核心線程數(shù)
 4. getKeepAliveTime(TimeUnit unit) 返回線程保持活動的時間, 這個就是 keepAliveTime
 5. getLargestPoolSize() 返回線程池中同時存在的最大線程數(shù)
 6. getMaximunPoolSize()  返回線程池中最大線程數(shù)
 7. getPoolSize() 返回線程池中的當前線程數(shù)。
 8. getQueue() 返回此執(zhí)行程序使用的任務隊列
 9. getRejectedExecutionHandler() 返回拒絕策略
 10. getTackCount() 返回已計劃執(zhí)行的任務的大致總數(shù)

其他方法

 1. prestartAllCoreThreads() 啟動所有核心線程, 使其處于等待工作的空閑狀態(tài)  return 已啟動的線程數(shù)
 2. prestartCoreThread()     啟動核心線程(一個), 使其處于等待工作的空閑狀態(tài)      return 啟動了線程, 返回 true
 3. purge() 嘗試從工作隊列移除所有已取消的 Future 任務
 4. remove() 從執(zhí)行程序的內(nèi)部隊列中移除此任務 (如果存在), 如果尚未開始, 則其不再運行

運行流程解析

首先需要記住幾個基本參數(shù)和作用, 因為運行的過程都需要通過這些參數(shù)。

  1. coreSize 核心線程數(shù)量
  2. maximumPoolSize 最大線程數(shù)量
  3. workQueue 線程等待隊列
  4. keepAliveTime 線程過期時間
  5. handler 拒絕策略

核心參數(shù)

Worker 譯為: 工人, 維護運行任務的線程中斷控制狀態(tài)。

HashSet<Worker> workers 工人集合, 設置包含池中的所有工作線程。只有在持有主鎖時才能訪問

ReentrantLock mainLock 主鎖, 對工人集合和相關簿記的訪問保持鎖定。

線程狀態(tài)

RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED

execute(Runnable command) 方法解析

方法處理分為三種情況, 也就是針對 command 的處理。

  1. 添加到核心線程中 (當核心線程數(shù)足夠時)
  2. 添加到等待隊列中 (超出核心線程數(shù)時)
  3. 拒絕線程 (等待隊列滿了, 交給拒絕策略進行處理)

addWorker(Runnable firstTask, boolean core) 方法解析

firstTack 需要運行的線程
core 以 corePoolSize 為界限, 小于 corePoolSize = true, 大于 corePoolSize = false

返回值和處理分為以下情況:

  1. 如果池已停止或有資格關閉,則此方法返回false
  2. 如果線程工廠在被請求時沒有創(chuàng)建一個線程,那么它也會返回false
  3. 創(chuàng)建 Worker 存儲 firstTask, 并獲取鎖, 添加創(chuàng)建的 Worker 到 workers 中, 釋放鎖, 如果是添加成功, 則啟動線程。

BlockingQueue<Runnable> workQueue 譯為: 工作隊列
用于保存任務并將任務傳遞給工作線程的隊列。

運行流程

workers 核心線程數(shù)量是 coreSize, 最大的線程數(shù)量是 maximumPoolSize。

添加線程的正常流程:
創(chuàng)建 Worker 存儲線程, 并將 Worker 存儲到 workers 中, 啟動并運行線程。

添加線程的等待流程:
當運行的線程數(shù) 大于 maximumPoolSize 并 小于 workQueue 最大數(shù)量時, 將線程存儲到 workQueue 中, 等待線程池空閑時取出運行。

添加線程的拒絕流程:
當線程池運行的線程數(shù)等于 maximumPoolSize, workQueue 已經(jīng)超過限制的時候, 通過 handler 拒絕策略處理拒絕的線程。

參考博文

Java線程池實現(xiàn)原理及其在美團業(yè)務中的實踐 [好文章]
https://mp.weixin.qq.com/s?__biz=MjM5NjQ5MTI5OA==&mid=2651751537&idx=1&sn=c50a434302cc06797828782970da190e&chksm=bd125d3c8a65d42aaf58999c89b6a4749f092441335f3c96067d2d361b9af69ad4ff1b73504c&scene=0&xtrack=1&key=ae016389609d723a06c894a78ed921fef9476d4e06e1197bfef300aae587bd68cf0051ae83f3188aaca7aa78ea7c4f8521efe58e2b460cf3d745151a7dd649853fd77594f5a72eb20bd11dce97a2b8fb&ascene=1&uin=MTQ3NDMzNDk3NQ%3D%3D&devicetype=Windows+10&version=62080079&lang=zh_CN&exportkey=A0ndiTybr05ZZONXp8FpJ20%3D&pass_ticket=R39BuOO9md3f7V%2Bau%2F1g5tqzZnJX6A%2F%2BwVnp3tnVqZXp1bBf2H2yftgivyEwXllL

http://www.itdecent.cn/p/7ab4ae9443b9

https://www.cnblogs.com/dolphin0520/p/3932921.html

https://blog.csdn.net/qq_22764659/article/details/83620730

https://www.cnblogs.com/CarpenterLee/p/9558026.html

如果存在問題,請在評論告知我,我會及時改進。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容