Java線程池原理以及自定義線程池

當(dāng)你需要同時限制應(yīng)用程序中運行的線程數(shù)時,線程池非常有用。 啟動新線程會產(chǎn)生性能開銷,每個線程也會為其堆棧等分配一些內(nèi)存。

可以將任務(wù)傳遞給線程池,而不是為每個任務(wù)啟動并發(fā)執(zhí)行的新線程。 只要線程池有任何空閑線程,任務(wù)就會分配給其中一個線程并執(zhí)行。 在內(nèi)部,任務(wù)被插入到阻塞隊列中,池中的線程從該阻塞隊列中出隊。 當(dāng)新任務(wù)插入隊列時,其中一個空閑線程將成功將其出列并執(zhí)行它。 線程池中的其余空閑線程將被阻塞,等待出列任務(wù)。

從上述所知,一個基本的線程池需要具有

  1. 一個存儲線程的容器(容器可以使用隊列,鏈表等數(shù)據(jù)結(jié)構(gòu)),當(dāng)有任務(wù)時,就從容器中拿出一個線程,來執(zhí)行任務(wù)。
  2. 一個存儲任務(wù)的阻塞隊列。(阻塞隊列可以控制任務(wù)提交的最大數(shù))
  3. 線程池對外暴露一個execute(Runnable task)方法,用以外界向線程池中提交任務(wù)。

自定義阻塞隊列

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue<T> {

    /**
     *     使用鏈表實現(xiàn)一個阻塞隊列(數(shù)據(jù)結(jié)構(gòu)定義數(shù)據(jù)存儲和獲取方式,所以只要滿足這兩點,阻塞隊列可以用鏈表,也可以使用數(shù)組等來實現(xiàn))
     */
    private List<T> queue = new LinkedList();
    /**
     * limit用來限制提交任務(wù)的最大數(shù),默認10
     */
    private int limit = 10;

    public BlockingQueue(int limit) {
        this.limit = limit;
    }

    /**
     *
     * @param item
     *
     *  enqueue是一個同步方法,當(dāng)任務(wù)到達上限,便會調(diào)用wait方法進行阻塞,否則將任務(wù)放入隊列中,并喚醒dequeue()任務(wù)線程
     */
    public synchronized void enqueue(T item){
        while (this.queue.size() == this.limit) {
            this.wait();
        }
        if (this.queue.size() <= limit) {
            this.notifyAll();
        }
        this.queue.add(item);
    }


    /**
     *
     * @return
     *
     *     dequeue也是一個同步方法,當(dāng)隊列中沒有任務(wù)時便會調(diào)用wait方法進入阻塞,當(dāng)任務(wù)到達最大容量是喚醒其他dequeue()線程
     *     ,并出列一個任務(wù)。
     */
    public synchronized T dequeue() {
        while (this.queue.size() == 0) {
            this.wait();
        }
        if (this.queue.size() == this.limit) {
            this.notifyAll();
        }

        return this.queue.remove(0);
    }

 public synchronized int size(){
        return queue.size();
    }
}



新建一個線程池線程類,用來執(zhí)行提交的任務(wù)。結(jié)構(gòu)體中傳入任務(wù)隊列,run()方中發(fā)現(xiàn)taskQueue有任務(wù)時,獲取任務(wù)并執(zhí)行,沒有任務(wù)就阻塞。

public class PoolThread extends Thread {


    private  BlockingQueue taskQueue = null;

    private boolean isStopped = false;

    public PoolThread(BlockingQueue taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run(){
        while(!isStopped() && !Thread.currentThread().isInterrupted()){
            try{
                //從任務(wù)隊列獲取任務(wù)并執(zhí)行
                Runnable runnable = (Runnable) taskQueue.dequeue();
                runnable.run();
            } catch(Exception e){
                isStopped = true;
                break;
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        this.interrupt();
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}

新建線程池類

public interface Service {

    //關(guān)閉線程池
    void shutdown();

    //查看線程池是否已經(jīng)被shutdown
    boolean isShutdown();

  //提交任務(wù)到線程池
    void execute(Runnable runnable);
}
import java.util.ArrayDeque;
import java.util.Queue;

public class ThreadPool  implements Service {

    /**
     * 任務(wù)隊列,用來存儲提交的任務(wù)
     */
    private BlockingQueue<Runnable> taskQueue = null;

    /**
     * 線程池中存儲線程的容器。
     */
    private Queue<PoolThread> threads = new ArrayDeque<PoolThread>();
    
    private boolean isShutdown = false;
    
    public ThreadPool(int initSize, int maxNoOfTasks){
        taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);

        //初始化線程池
        for (int i = 0; i < initSize; i++) {
            threads.add(new PoolThread(taskQueue));
        }
        
        //啟動線程池線程
        threads.forEach(thread -> thread.start());
    }
    
     @Override
    public synchronized void execute(Runnable task)  {
        if (this.isStopped){
            throw new IllegalStateException("ThreadPool is stopped");
        }
        //任務(wù)入列
        taskQueue.enqueue(task);
    }
    
    @Override
    public synchronized void shutdown(){
        this.isShutdown= true;
        threads.forEach(thread -> thread.doStop());
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }
}

至此,一個簡單的線程池便完成。新建一個線程池測試類

import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {

        final ThreadPool threadPool = new ThreadPool(5 , 20);

        //定義20個任務(wù)并且提交到線程池
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() ->{
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + " is running add done");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while (true){
            System.out.println("---------------------------------");
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

打印每次輸出5條記錄,共輸出4次

---------------------------------
---------------------------------
Thread-3 is running add done
Thread-1 is running add done
Thread-0 is running add done
Thread-4 is running add done
Thread-2 is running add done
---------------------------------
---------------------------------
Thread-2 is running add done
Thread-4 is running add done
Thread-0 is running add done
Thread-1 is running add done
Thread-3 is running add done
---------------------------------
---------------------------------
Thread-0 is running add done
Thread-1 is running add done
Thread-3 is running add done
Thread-2 is running add done
Thread-4 is running add done
---------------------------------
---------------------------------
Thread-2 is running add done
Thread-1 is running add done
Thread-3 is running add done
Thread-0 is running add done
Thread-4 is running add done
---------------------------------

當(dāng)執(zhí)行完任務(wù)后,使用visualvm工具或jstack命令獲取線程快照,可以看到有5個線程池中的線程

"Thread-4" #16 prio=5 os_prio=0 tid=0x00000000207b0000 nid=0x2b7c in Object.wait() [0x000000002141e000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-3" #15 prio=5 os_prio=0 tid=0x00000000207ad000 nid=0x56d0 in Object.wait() [0x000000002131f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-2" #14 prio=5 os_prio=0 tid=0x00000000207ab800 nid=0x4cbc in Object.wait() [0x000000002121f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-1" #13 prio=5 os_prio=0 tid=0x00000000207a9800 nid=0x3670 in Object.wait() [0x000000002111f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

   Locked ownable synchronizers:
        - None

"Thread-0" #12 prio=5 os_prio=0 tid=0x00000000207a9000 nid=0x4d84 in Object.wait() [0x000000002101f000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at java.lang.Object.wait(Object.java:502)
        at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
        - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
        at com.customthreadpool.PoolThread.run(PoolThread.java:18)

從線程快照可以看到,線程池的線程名稱使用系統(tǒng)默認名稱,但在實際編碼中通常都會按我們規(guī)范定義系統(tǒng)名稱,所以我們使用工廠模式對線程的創(chuàng)建進行重構(gòu)。

使用工廠模式有一下好處

  1. 對象的創(chuàng)建如果比較復(fù)雜,需要經(jīng)過一系列的初始化。使用工廠模式,可以屏蔽這過程。
  2. 把同一類事物歸于一個框架之下。比如A和B,他們需要自己定義線程池線程創(chuàng)建,但規(guī)定他們都要實現(xiàn)工廠接口,便可以把他們控制在同一框架之下。
  3. 解耦。(只要是不直接創(chuàng)建目標(biāo)對象,基本上都可以叫解耦或者對修改關(guān)閉對擴展開放)

新建線程工廠接口

@FunctionalInterface
public interface ThreadFactory {
    Thread createThread(Runnable runnable);
}

重構(gòu)后的線程池類如下:

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool  implements Service {

    /**
     * 任務(wù)隊列,用來存儲提交的任務(wù)
     */
    private BlockingQueue<Runnable> taskQueue = null;

    /**
     * 線程池中存儲線程的容器。
     */
    private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();


    /**
     * 默認線程工廠
     */
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

    private boolean isShutdown = false;


    public ThreadPool(int initSize, int maxNoOfTasks){
        taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);

        //初始化線程池
        for (int i = 0; i < initSize; i++) {
            newThread();
        }
    }

    private void newThread(){
        PoolThread poolThread = new PoolThread(taskQueue);
        Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
        ThreadTask threadTask = new ThreadTask(thread , poolThread);
        threads.add(threadTask);
        thread.start();
    }
    /**
     * 工廠模式屏蔽對象創(chuàng)建的過程
     */
    private static class DefaultThreadFactory implements ThreadFactory{

        private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);

        private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());

        private static final AtomicInteger COUNTER = new AtomicInteger(0);

        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
        }
    }

    /**
     * ThreadTask 只是PoolThread和Thread的組合,因為后面關(guān)閉線程還需要用到poolThread的doStop方法
     */
    private static class ThreadTask{

        Thread thread;
        PoolThread poolThread;

        public ThreadTask(Thread thread , PoolThread poolThread){
            this.thread = thread;
            this.poolThread = poolThread;
        }
    }

     @Override
    public synchronized void execute(Runnable task) {
        if (this.isShutdown){
            throw new IllegalStateException("ThreadPool is stopped");
        }
        //任務(wù)入列
        taskQueue.enqueue(task);
    }

  @Override
    public synchronized void shutdown(){
        this.isShutdown = true;
        threads.forEach(threadTask -> threadTask.poolThread.doStop());
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }
}

運行測試類,結(jié)果如下圖所示


image.png

dump文件如下所示


image.png

到目前為如果線程任務(wù)隊列到達上限,便會調(diào)用wait方法進行阻塞,我們可以自定義拒接策略,使處理更靈活。

public interface DenyPolicy<T> {

    void reject(T runnable, ThreadPool threadPool);

    //該拒接策略會直接將任務(wù)丟棄
    class DiscardDenyPolicy implements DenyPolicy<Runnable>{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            System.out.println(runnable + "do nothing");
        }
    }

    //該拒絕策略會向任務(wù)提交者拋出異常
    class AbortDenyPolicy implements DenyPolicy<Runnable>{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            throw new RunnbaleDenyException("The runnbale " + runnable + " will be abort.");
        }
    }

    //該拒絕策略會使用任務(wù)在提交者所在的線程中執(zhí)行任務(wù)
    class RunnerDenyPolicy implements DenyPolicy<Runnable>{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            if (!threadPool.isShutdown()){
                runnable.run();
            }
        }
    }
}
public class RunnbaleDenyException extends RuntimeException {

    public RunnbaleDenyException(String message) {
        super(message);
    }
}

  • reject 為拒接方法
  • DiscardDenyPolicy 策略會直接丟棄掉Runnable任務(wù)。
  • AbortDenyPolicy 策略會拋出RunnbaleDenyException異常。
  • RunnerDenyPolicy 策略,交給調(diào)用者的線程直接運行runnable,而不會被加入到線程池中。

重構(gòu)阻塞隊列,當(dāng)隊列中的值超出最大容量時使用拒接策略。

重構(gòu)后的阻塞隊列

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue<T> {

    /**
     *     使用鏈表實現(xiàn)一個阻塞隊列(數(shù)據(jù)結(jié)構(gòu)定義數(shù)據(jù)存儲和獲取方式,所以只要滿足這兩點,阻塞隊列可以用鏈表,也可以使用數(shù)組等來實現(xiàn))
     */
    private List<T> queue = new LinkedList();
    /**
     * limit用來限制提交任務(wù)的最大數(shù),默認10
     */
    private int limit = 10;


    /**
     * 拒接策略
     */
    private DenyPolicy denyPolicy;

    private ThreadPool threadPool;


    public BlockingQueue(int limit , DenyPolicy denyPolicy , ThreadPool threadPool) {
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
    }

    /**
     *
     * @param item
     *  enqueue是一個同步方法,當(dāng)任務(wù)到達上限,便會調(diào)用wait方法進行阻塞,否則將任務(wù)放入隊列中,并喚醒dequeue()任務(wù)線程
     */
    public synchronized void enqueue(T item) {
        //若果隊列到達最大容量,調(diào)用拒接策略
        if (this.queue.size() >= this.limit) {
            denyPolicy.reject(item , threadPool);
        }
        if (this.queue.size() <= limit) {
            this.notifyAll();
        }
        this.queue.add(item);
    }


    /**
     *
     * @return
     *
     *     dequeue也是一個同步方法,當(dāng)隊列中沒有任務(wù)時便會調(diào)用wait方法進入阻塞,當(dāng)任務(wù)到達最大容量是喚醒其他dequeue()線程
     *     ,并出列一個任務(wù)。
     */
    public synchronized T dequeue(){
        while (this.queue.size() == 0) {
            this.wait();
        }
        if (this.queue.size() == this.limit) {
            this.notifyAll();
        }
        return this.queue.remove(0);
    }

 public synchronized int size(){
        return queue.size();
    }
}

線程池類修改如下兩點ThreadPool.class

...
public class ThreadPool implements Service{
 /**
     * 默認使用丟棄策略
     */
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();

    public ThreadPool(int noOfThreads , int maxNoOfTasks){
        taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);

        //初始化線程池
        for (int i = 0; i < noOfThreads; i++) {
            newThread();
        }
    }
}
...

運行測試類測試類,可以看到當(dāng)任務(wù)到達最大容量時,就會有任務(wù)被拋棄


image.png

目前初始化線程池時,只指定了初始線程數(shù)init,并不能很好的管理線程池線程數(shù)量。繼續(xù)對線程池進行擴展。

  • 新增兩個控制線程池線程數(shù)量的參數(shù)。線程池自動擴充時最大的線程池數(shù)量max,線程池空閑時需要釋放線程但是也要維護一定數(shù)量的活躍線程數(shù)量或者核心數(shù)量core。有了這init , max , core三個參數(shù)就能很好的控制線程池中線程數(shù)量,三者之間的關(guān)系init <= core <= max。
  • 新增參數(shù)Keepedalive時間,該時間主要決定線程各個重要參數(shù)自動維護的時間間隔。

重構(gòu)后的線程池類

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool implements Service{

    /**
     * 初始化線程數(shù)量
     */
    private final int initSize;

    /**
     *   線程池最大線程數(shù)量
     */
    private final int maxSzie;

    /**
     *     線程池核心線程數(shù)量
     */
    private final int coreSize;

    /**
     *   當(dāng)前活躍的線程數(shù)量
     */
    private int activeCount;

    private final long keepAliveTime;

    private final TimeUnit timeUnit;

    private  InternalTask internalTask;

    /**
     *     創(chuàng)建線程所需的工廠
     */
    private final ThreadFactory threadFactory;


    /**
     * 任務(wù)隊列,用來存儲提交的任務(wù)
     */
    private BlockingQueue<Runnable> taskQueue = null;

    /**
     * 線程池中存儲線程的容器。
     */
    private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();


    /**
     * 默認線程工廠
     */
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

    private boolean isShutdown = false;

    /**
     * 默認使用丟棄策略
     */
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();


    public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
        this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
    }

    public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
                      , DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){

        this.initSize = initSize;
        this.maxSzie = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.taskQueue = new  BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;

        init();

    }



    private void init(){
        //初始化線程池
        for (int i = 0; i < initSize; i++) {
            newThread();
        }

        //啟動內(nèi)部維護線程
        internalTask =  new InternalTask();
        internalTask.start();
    }

    private void newThread(){
        PoolThread poolThread = new PoolThread(taskQueue);
        Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
        ThreadTask threadTask = new ThreadTask(thread , poolThread);
        activeCount++;
        threads.add(threadTask);
        thread.start();
    }

    private void removeThread(){
        //從線程池中移除某個線程
        ThreadTask threadTask = threads.remove();
        threadTask.poolThread.stop();
        this.activeCount--;
    }
    /**
     * 工廠模式屏蔽對象創(chuàng)建的過程
     */
    private static class DefaultThreadFactory implements ThreadFactory{

        private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);

        private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());

        private static final AtomicInteger COUNTER = new AtomicInteger(0);

        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
        }
    }

    /**
     * ThreadTask 只是PoolThread和Thread的組合,因為后面關(guān)閉線程還需要用到poolThread的doStop方法
     */
    private static class ThreadTask{

        Thread thread;
        PoolThread poolThread;

        public ThreadTask(Thread thread , PoolThread poolThread){
            this.thread = thread;
            this.poolThread = poolThread;
        }
    }


    @Override
    public synchronized void execute(Runnable task)  {
        if (this.isShutdown){
            throw new IllegalStateException("ThreadPool is stopped");
        }
        //任務(wù)入列
        taskQueue.enqueue(task);
    }

    @Override
    public int getInitSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.initSize;
    }

    @Override
    public int getMaxSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.maxSzie;
    }

    @Override
    public int getCoreSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.coreSize;
    }

    @Override
    public int getQueueSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return taskQueue.size();
    }

    @Override
    public int getActiveCount() {
        synchronized (this){
            return this.activeCount;
        }
    }

    @Override
    public synchronized void shutdown(){
        this.isShutdown = true;
        threads.forEach(threadTask -> threadTask.poolThread.doStop());
        internalTask.interrupt();
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }



    class InternalTask extends Thread{
        @Override
        public void run() {
            //run方法繼承自Thread,主要用于維護線程數(shù)量,比如擴容,回收等工作
            while (!isShutdown&&!isInterrupted()){
                try {
                    timeUnit.sleep(keepAliveTime);
                } catch (InterruptedException e) {
                    isShutdown = true;
                    break;
                }
                synchronized (ThreadPool.this){
                    if (isShutdown){
                        break;
                    }
                    //當(dāng)前隊列中任務(wù)尚未處理,并且activeCount< coreSize則繼續(xù)擴容
                    if (taskQueue.size() > 0 && activeCount <coreSize){
                        for (int i = initSize; i < coreSize ; i++){
                            newThread();
                        }
                        //continue的目的在于不想讓線程的擴容直接打到maxsize
                        continue;
                    }

                    //當(dāng)前的隊列中有任務(wù)尚未處理,并且activeCount < maxSize則繼續(xù)擴容
                    if (taskQueue.size() > 0 && activeCount < maxSzie){
                        for (int i = coreSize; i < maxSzie ; i++){
                            newThread();
                        }
                    }

                    //如果任務(wù)隊列中沒有任務(wù),則需要回收,回收至coreSize即可
                    if (taskQueue.size() == 0 && activeCount > coreSize ){
                        for (int i = coreSize ; i < activeCount ; i++){
                            removeThread();
                        }
                    }
                }
            }
        }
    }
}

線程池類中主要新增了如下參數(shù)

  /**
     * 初始化線程數(shù)量
     */
    private final int initSize;

    /**
     *   線程池最大線程數(shù)量
     */
    private final int maxSzie;

    /**
     *     線程池核心線程數(shù)量
     */
    private final int coreSize;

    /**
     *   當(dāng)前活躍的線程數(shù)量
     */
    private int activeCount;

    private final long keepAliveTime;

    private final TimeUnit timeUnit;

    /**
     *     創(chuàng)建線程所需的工廠
     */
    private final ThreadFactory threadFactory;

   private  InternalTask internalTask;

重寫了兩個構(gòu)造函數(shù)

 public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
        this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
    }

    public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
                      , DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){

        this.initSize = initSize;
        this.maxSzie = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.taskQueue = new  BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;

        init();
       
    }

新增一個線程類,用于維護內(nèi)部狀態(tài)

 class InternalTask extends Thread{
        @Override
        public void run() {
            //run方法繼承自Thread,主要用于維護線程數(shù)量,比如擴容,回收等工作
            while (!isShutdown&&!isInterrupted()){
                try {
                    timeUnit.sleep(keepAliveTime);
                } catch (InterruptedException e) {
                    isShutdown = true;
                    break;
                }
                synchronized (ThreadPool.this){
                    if (isShutdown){
                        break;
                    }
                    //當(dāng)前隊列中任務(wù)尚未處理,并且activeCount< coreSize則繼續(xù)擴容
                    if (taskQueue.size() > 0 && activeCount <coreSize){
                        for (int i = initSize; i < coreSize ; i++){
                            newThread();
                        }
                        //continue的目的在于不想讓線程的擴容直接打到maxsize
                        continue;
                    }

                    //當(dāng)前的隊列中有任務(wù)尚未處理,并且activeCount < maxSize則繼續(xù)擴容
                    if (taskQueue.size() > 0 && activeCount < maxSzie){
                        for (int i = coreSize; i < maxSzie ; i++){
                            newThread();
                        }
                    }

                    //如果任務(wù)隊列中沒有任務(wù),則需要回收,回收至coreSize即可
                    if (taskQueue.size() == 0 && activeCount > coreSize ){
                        for (int i = coreSize ; i < activeCount ; i++){
                            removeThread();
                        }
                    }
                }
            }
        }
    }

以及一系列輔助方法

public interface Service {
.....
  //獲取線程池的初始化大小
    int getInitSize();

    //獲取線程池最大的線程數(shù)
    int getMaxSize();

    //獲取線程池核心線程梳理
    int getCoreSize();

    //獲取線程池中活躍線程的數(shù)量大小
    int getQueueSize();

    //獲取線程池中用于緩存任務(wù)隊列的大小
    int getActiveCount();
.....
}
@Override
    public int getInitSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.initSize;
    }

    @Override
    public int getMaxSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.maxSzie;
    }

    @Override
    public int getCoreSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return this.coreSize;
    }

    @Override
    public int getQueueSize() {
        if (isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return taskQueue.size();
    }

    @Override
    public int getActiveCount() {
        synchronized (this){
            return this.activeCount;
        }
    }

執(zhí)行測試類

import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {

        final ThreadPool threadPool = new ThreadPool(2 , 6 , 4 , 1000);

        //定義20個任務(wù)并且提交到線程池
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() ->{
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + " is running add done");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while (true){
            System.out.println("getActiveCount: " + threadPool.getActiveCount());
            System.out.println("getQueueSize: " + threadPool.getQueueSize());
            System.out.println("getCoreSize: " + threadPool.getCoreSize());
            System.out.println("getMaxSize: "+ threadPool.getMaxSize());
            System.out.println("======================================");
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

會有如下輸出,activeCount數(shù)量會增長到與maxSize一直,最后會保持與coreSize相等

getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--4 is running add done
thread-pool--3 is running add done
thread-pool--5 is running add done
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
thread-pool--4 is running add done
thread-pool--5 is running add done
thread-pool-0 is running add done
thread-pool--1 is running add done
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================

到這里,一個功能比較完善的線程池就已經(jīng)完成了
代碼地址: github

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線程池中有一定數(shù)量的工作線程,工作線程會循環(huán)從任務(wù)隊列中獲取任務(wù),并執(zhí)行這個任務(wù)。那么怎么去停止這些工作線程呢?這...
    wo883721閱讀 1,749評論 0 14
  • 第一部分 來看一下線程池的框架圖,如下: 1、Executor任務(wù)提交接口與Executors工具類 Execut...
    壓抑的內(nèi)心閱讀 4,394評論 1 24
  • 【JAVA 線程】 線程 進程:是一個正在執(zhí)行中的程序。每一個進程執(zhí)行都有一個執(zhí)行順序。該順序是一個執(zhí)行路徑,或者...
    Rtia閱讀 2,893評論 2 20
  • 感恩路上 文/熙云閣 朵朵云兒漫步在蔚藍的天空 感謝風(fēng)兒讓她們相聚 羊群悠閑地唱著小曲 感恩大地帶給它...
    熙云閣閱讀 223評論 0 0
  • 站內(nèi)優(yōu)化思路(未來幾天會不定時的分享一下我的站內(nèi)優(yōu)化詳細筆記,今天整理了一下優(yōu)化思路,覺得有用的朋友可以關(guān)注收藏一...
    簡小貓閱讀 507評論 0 4

友情鏈接更多精彩內(nèi)容