一步一步實(shí)現(xiàn)線程池

一、線程池

背景

  • 線程屬于一個(gè)系統(tǒng)資源,在高并發(fā)場景下,每一個(gè)任務(wù)都創(chuàng)建一個(gè)線程的話,對于內(nèi)存的占用是相當(dāng)大的。
  • 線程不一定越多越好,一般情況下CPU核心并不多,如果線程過多,CPU只能讓大部分線程陷入阻塞,引起線程上下文頻繁的切換。上下文切換越頻繁,對CPU性能影響就越大

解決方案:線程池

利用享元模式的思想,充分利用已有線程的資源,來處理不同的任務(wù)。從而減少線程的數(shù)量,避免頻繁的上下文切換

二、自定義線程池實(shí)現(xiàn)

該實(shí)現(xiàn)是跟著黑馬程序員的《全面深入學(xué)習(xí)java并發(fā)編程,java基礎(chǔ)進(jìn)階中級必會(huì)教程》實(shí)現(xiàn)的。自己簡單做了一個(gè)總結(jié),看官們?nèi)绻惺裁唇ㄗh,也可以盡情提出。

自定義線程池
  • 按照上面的圖來實(shí)現(xiàn)自定義線程池
  • 根據(jù)生產(chǎn)者消費(fèi)者模式的思想創(chuàng)建一個(gè)BlockingQueue阻塞隊(duì)列來平衡生產(chǎn)者創(chuàng)建任務(wù)和消費(fèi)者處理任務(wù)速度之間的差異,生產(chǎn)者不斷的創(chuàng)建新的任務(wù),消費(fèi)者(線程池)不斷來消費(fèi)。
  • 如果生產(chǎn)者沒有創(chuàng)建新的任務(wù),那么任務(wù)隊(duì)列就是空的,消費(fèi)者就需要等待生產(chǎn)者創(chuàng)建任務(wù)。
  • 如果消費(fèi)者來不及消費(fèi)任務(wù),那么任務(wù)隊(duì)列可能就滿了,生產(chǎn)者就需要等待任務(wù)的消費(fèi)。

實(shí)現(xiàn)

1. 任務(wù)隊(duì)列BlockingQueue

Tips:

  • 任務(wù)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)使用雙向鏈表ArrayDeque實(shí)現(xiàn),ArrayDequeLinkedList都是雙向鏈表的實(shí)現(xiàn),大多情況下ArrayDeque性能優(yōu)于LinkedList。
  • 為了通用給BlockingQueue加了一個(gè)范型
  • 運(yùn)行過程中可能存在多個(gè)線程來隊(duì)列中消費(fèi)任務(wù)或者生產(chǎn)任務(wù),所以必須用鎖保護(hù)任務(wù)隊(duì)列隊(duì)頭和隊(duì)尾,鎖可以使用較為靈活的ReentrantLock
  • 任務(wù)隊(duì)列是有容量限制的,不可能無限制的向任務(wù)隊(duì)列中推送任務(wù),那么任務(wù)隊(duì)列滿了的時(shí)候,生產(chǎn)者就必須等待。同時(shí)消費(fèi)者把任務(wù)隊(duì)列消費(fèi)完了以后,消費(fèi)者就必須等待生產(chǎn)者推送任務(wù),可以使用ReentrantLock的條件變量。
/**
 * 阻塞隊(duì)列
 * @param <T>
 */
@Slf4j(topic = "c.BlockingQueue") // logback日志
class BlockingQueue<T> {
    // 任務(wù)隊(duì)列(雙向鏈表)
    // ArrayDeque性能優(yōu)于LinkedList
    private Deque<T> queue = new ArrayDeque<>();

    // 隊(duì)列容量
    private int capacity;

    // 鎖
    private ReentrantLock lock = new ReentrantLock();

    // 鎖的條件變量(生產(chǎn)者)
    // 任務(wù)隊(duì)列滿了的時(shí)候 生產(chǎn)者要等待
    private Condition prodWaitSet = lock.newCondition();

    // 鎖的條件變量(消費(fèi)者)
    // 任務(wù)隊(duì)列空時(shí),消費(fèi)者要等待
    private Condition consWaitSet = lock.newCondition();
    
    public BlockingQueue(int capacity) { this.capacity = capacity; }
    
    /**
     * 生產(chǎn)者向任務(wù)隊(duì)列推送任務(wù)
     * 隊(duì)列滿時(shí)等待推送任務(wù)無超時(shí)時(shí)間限制
     * @param t 任務(wù)
     */
     public void put(T t) {
        lock.lock();
        // 加鎖
        try {
            // 判斷隊(duì)列是否滿了
            // log.debug("queue.size(): {},capacity: {}", queue.size(), capacity);
            while (queue.size() == capacity) {
                // 隊(duì)列滿了需要讓線程進(jìn)入等待
                // 任務(wù)數(shù)超過了任務(wù)隊(duì)列容量,生產(chǎn)者生產(chǎn)的任務(wù)就會(huì)一直處于阻塞狀態(tài)
                log.debug("任務(wù)隊(duì)列滿了,生產(chǎn)者還不能向任務(wù)隊(duì)列里推送任務(wù)!");
                prodWaitSet.await();
            }
            // 隊(duì)列沒有滿,推送任務(wù)
            log.debug("任務(wù)隊(duì)列沒有滿,生產(chǎn)者向任務(wù)隊(duì)列推送任務(wù): {}", t);
            queue.addLast(t);
            // 喚醒消費(fèi)者等待中的線程
            consWaitSet.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 消費(fèi)者消費(fèi)任務(wù)
     * 隊(duì)列為空時(shí),無限制等待
     */
    public T take() {
        lock.lock();
        try {
            // 判斷生產(chǎn)者隊(duì)列是否為空
            while (queue.isEmpty()) {
                // 消費(fèi)者線程進(jìn)入等待
                try {
                    consWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 喚醒生產(chǎn)者等待的線程
            prodWaitSet.signalAll();
            // 隊(duì)列不為空,移除隊(duì)列第一個(gè)元素
            return queue.removeFirst();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 獲取任務(wù)隊(duì)列容量
     */
    public int size() {
        lock.lock();
        try {
            return this.capacity;
        } finally {
            lock.unlock();
        }
    }

}

改進(jìn)BlockingQueuetake方法

消費(fèi)者消費(fèi)任務(wù)時(shí),如果任務(wù)隊(duì)列為空,那么take方法將無限制的等待下去,可以設(shè)計(jì)poll方法,該方法設(shè)置超時(shí)時(shí)間,讓消費(fèi)者不會(huì)在任務(wù)隊(duì)列為空的情況下無限等待。

/**
 * 消費(fèi)者消費(fèi)任務(wù)(有超時(shí)時(shí)間的等待)
 *
 * @return
 */
public T poll(long timeout, TimeUnit timeUnit) {
    lock.lock();
    try {
        // 判斷生產(chǎn)者隊(duì)列是否為空
        long nanos = timeUnit.toNanos(timeout); // 將超時(shí)時(shí)間單位轉(zhuǎn)換為納秒
        while (queue.isEmpty()) {
            try {
                // awaitNanos方法在等待過程中被喚醒,返回值是等待的剩余時(shí)間
                if (nanos <= 0) {
                    // 避免虛假喚醒的超時(shí)時(shí)間重置
                    // 如果等待的剩余時(shí)間小于等于0,說明等待超時(shí)了,直接返回null
                    log.debug("消費(fèi)超時(shí)了!");
                    return null;
                }
                // 消費(fèi)者線程進(jìn)入等待
                nanos = consWaitSet.awaitNanos(nanos);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 喚醒生產(chǎn)者等待的線程
        prodWaitSet.signalAll();
        // 隊(duì)列不為空,移除隊(duì)頭元素
        log.debug("消費(fèi)完成!從任務(wù)隊(duì)列中移除該任務(wù)");
        return queue.removeFirst();
    } finally {
        lock.unlock();
    }
}

2. 線程池ThreadPool

tips

  • 線程池集合HashSet范型不使用Thread而是Worker類,方便任務(wù)的處理
  • 線程池處理任務(wù)時(shí),要判斷線程集合的容量有沒有超過核心線程數(shù)
    • 線程集合容量超過核心線程數(shù),需要將任務(wù)放入任務(wù)隊(duì)列中等待消費(fèi)者消費(fèi)
    • 線程集合容量未超過核心線程數(shù),將當(dāng)前任務(wù)交給新建的Worker執(zhí)行即可
/**
 * 線程池
 */
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    /**
     * 任務(wù)隊(duì)列
     */
    private BlockingQueue<Runnable> queue;

    /**
     * 線程集合
     */
    private HashSet<Worker> workers = new HashSet<>();

    /**
     * 核心線程數(shù)
     * 線程池中的線程數(shù)量
     */
    private int coreSize;

    /**
     * 消費(fèi)等待的超時(shí)時(shí)間
     */
    private long timeout;

    /**
     * 超時(shí)時(shí)間單位
     */
    private TimeUnit timeUnit;
    
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.queue = new BlockingQueue<>(queueSize); // 構(gòu)建任務(wù)隊(duì)列
    }
    
    /**
     * 執(zhí)行任務(wù)
     *
     * @param task 任務(wù)
     */
    public void executeTask(Runnable task) {
        if (workers.size() < coreSize) {
            // 核心線程數(shù)未超過線程集合的容量
            // 將當(dāng)前任務(wù)交給Worker執(zhí)行
            Worker worker = new Worker(task);
            // 將worker加入線集合
            log.debug("核心線程數(shù)未滿,剩余:{},新增Worker執(zhí)行任務(wù)", coreSize - workers.size());
            workers.add(worker);
            // 啟動(dòng)線程
            worker.start();
        } else {
            // 核心線程數(shù)超過線程集合的容量,將當(dāng)前任務(wù)加入任務(wù)隊(duì)列等待消費(fèi)
            queue.put(task); 
        }
    }
    
    /**
     * 線程包裝類
     */
    class Worker extends Thread {
        private Runnable task; // 任務(wù)

        public Worker(Runnable task) {
            this.task = task;
        }
        /**
         * 執(zhí)行任務(wù)
         */
        @Override
        public void run() {
            // 執(zhí)行任務(wù)
            // 1)當(dāng)前task不為空,直接執(zhí)行任務(wù)即可 (該情況出現(xiàn)于核心線程數(shù)還沒有超過線程集合的容量,新建的線程的task肯定不為null)
            // 2)當(dāng)前task為空,去任務(wù)隊(duì)列中看有沒有待執(zhí)行的任務(wù)(該情況出現(xiàn)于核心線程數(shù)已超過線程集合的容量)
            while (task != null || (task = queue.take()) != null) {
                try {
                    log.debug("worker執(zhí)行任務(wù)");
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 退出循環(huán)意味著沒有任務(wù)處理,從workers中移除該Worker
            synchronized (workers) {
                log.debug("任務(wù)處理完畢,移除該worker");
                workers.remove(this);
            }
        }
    }
}

測試

任務(wù)數(shù)未超過任務(wù)隊(duì)列的最大容量

@Slf4j(topic = "c.TestThreadPool")
public class TestThreadPool {

    public static void main(String[] args) throws InterruptedException {
        // 測試:
        // 創(chuàng)建一個(gè)線程池
        // 核心線程數(shù) 3, 超時(shí)時(shí)間 5000ms,時(shí)間單位 ms,任務(wù)隊(duì)列容量 10
        ThreadPool threadPool = new ThreadPool(3, 5000, TimeUnit.MILLISECONDS, 10);
        // 任務(wù)數(shù)沒有超過任務(wù)隊(duì)列容量的情況
        log.debug("--------------------任務(wù)數(shù):5---------------------");
        for (int i = 0; i < 5; i++) {
            int tmpI = i;
            threadPool.executeTask(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("-------執(zhí)行任務(wù)了!-----,{}", tmpI);
            });
        }
    }
}

結(jié)果輸出:

12:09:17 [main] c.TestThreadPool - --------------------任務(wù)數(shù):5---------------------
12:09:17 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:3,新增Worker執(zhí)行任務(wù)
12:09:17 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:2,新增Worker執(zhí)行任務(wù)
12:09:17 [Thread-0] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:17 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:1,新增Worker執(zhí)行任務(wù)
12:09:17 [Thread-1] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:17 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@56cbfb61
12:09:17 [Thread-2] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:17 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@1134affc
12:09:18 [Thread-0] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,0
12:09:18 [Thread-0] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:18 [Thread-2] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,2
12:09:18 [Thread-1] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,1
12:09:18 [Thread-2] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:19 [Thread-0] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,3
12:09:19 [Thread-2] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,4

核心線程數(shù) 3,任務(wù)數(shù) 5,前三個(gè)任務(wù)不會(huì)放入任務(wù)隊(duì)列,而是交給線程池里的線程立即執(zhí)行了,剩余的兩個(gè)任務(wù)放入任務(wù)隊(duì)列等待線程池中的線程空閑后再消費(fèi)。

所以可以看見日志:09:18時(shí)執(zhí)行了任務(wù)0,1,2,09:19時(shí)執(zhí)行了任務(wù)3,4

同時(shí)我們可以注意到:Worker調(diào)用的是BlockingQueuetake方法,take方法在任務(wù)隊(duì)列為空時(shí)仍然在無限等待。所以如果想要優(yōu)化,我們可以改成poll方法,即有超時(shí)時(shí)間的等待。
任務(wù)隊(duì)列空閑5s后,退出循環(huán),并移除線程池中的worker

任務(wù)數(shù)超過任務(wù)隊(duì)列的最大容量

@Slf4j(topic = "c.TestThreadPool")
public class TestThreadPool {

    public static void main(String[] args) throws InterruptedException {
        // 測試:
        // 創(chuàng)建一個(gè)線程池
        // 核心線程數(shù) 3, 超時(shí)時(shí)間 5000ms,時(shí)間單位 ms,任務(wù)隊(duì)列容量 5
        ThreadPool threadPool = new ThreadPool(3, 5000, TimeUnit.MILLISECONDS, 5);
        // 任務(wù)數(shù)超過任務(wù)隊(duì)列容量的情況
        // Thread.sleep(1000);
        log.debug("--------------------任務(wù)數(shù):9---------------------");
        for (int i = 0; i < 9; i++) {
            int tmpI = i;
            threadPool.executeTask(() -> {
                try {
                    // 讓任務(wù)執(zhí)行時(shí)間延長
                    Thread.sleep(200000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("-------執(zhí)行任務(wù)了!-----,{}", tmpI);
            });
        }
    }
}

測試中,我們讓任務(wù)執(zhí)行的時(shí)間延長足夠長時(shí)間
結(jié)果:

13:45:33 [main] c.TestThreadPool - --------------------任務(wù)數(shù):9---------------------
13:45:33 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:3,新增Worker執(zhí)行任務(wù)
13:45:33 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:2,新增Worker執(zhí)行任務(wù)
13:45:33 [Thread-0] c.ThreadPool - worker執(zhí)行任務(wù)
13:45:33 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:1,新增Worker執(zhí)行任務(wù)
13:45:33 [Thread-1] c.ThreadPool - worker執(zhí)行任務(wù)
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@56cbfb61
13:45:33 [Thread-2] c.ThreadPool - worker執(zhí)行任務(wù)
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@1134affc
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@d041cf
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@129a8472
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@1b0375b3
13:45:33 [main] c.BlockingQueue - queue.size(): 5,capacity: 5
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列滿了,生產(chǎn)者還不能向任務(wù)隊(duì)列里推送任務(wù)!

此時(shí),由于消費(fèi)者遲遲沒有將任務(wù)執(zhí)行完,所以生產(chǎn)者一直向任務(wù)隊(duì)列中添加任務(wù),導(dǎo)致任務(wù)隊(duì)列被填滿,其余的任務(wù)無法再添加到任務(wù)隊(duì)列中,導(dǎo)致生產(chǎn)者線程也阻塞住了。這樣對生產(chǎn)者線程不夠友好。應(yīng)該給生產(chǎn)者線程提供選擇,讓它選擇是繼續(xù)等待下去呢,還是選擇做其他操作。

優(yōu)化

1. 給生產(chǎn)者添加一個(gè)等待超時(shí)的選擇

類似于poll()

/**
 * 帶超時(shí)時(shí)間的阻塞添加
 *
 * @param t        任務(wù)
 * @param timeout  超時(shí)時(shí)間
 * @param timeUnit 時(shí)間單位
 * @return false 添加超時(shí) true 添加成功
 */
public boolean offer(T t, long timeout, TimeUnit timeUnit) {
    lock.lock();
    // 加鎖
    try {
        // 判斷隊(duì)列是否滿了
        long nanos = timeUnit.toNanos(timeout);
        while (queue.size() == capacity) {
            // 隊(duì)列滿了需要讓線程進(jìn)入等待
            // 任務(wù)數(shù)超過了任務(wù)隊(duì)列容量,生產(chǎn)者生產(chǎn)的任務(wù)就會(huì)一直處于阻塞狀態(tài)
            try {
                log.debug("任務(wù)隊(duì)列滿了,生產(chǎn)者還不能向任務(wù)隊(duì)列里推送任務(wù)!");
                if (nanos <= 0) {
                    // 生產(chǎn)者添加任務(wù)超時(shí)了
                    return false;
                }
                nanos = prodWaitSet.awaitNanos(nanos);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 隊(duì)列沒有滿,推送任務(wù)
        log.debug("任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): {}", t);
        queue.addLast(t);
        // 喚醒消費(fèi)者等待中的線程
        consWaitSet.signalAll();
        return true;
    } finally {
        lock.unlock();
    }
}

2. 生產(chǎn)者還有其他選擇

  1. 死等
  2. 超時(shí)放棄
  3. 直接放棄
  4. 拋出異常
  5. 生產(chǎn)者自行處理
  6. 其他

我們不可能有需求是就往線程池的executeTask()方法中添加邏輯分支,這樣我們的代碼可擴(kuò)展性就非常差。我們可以利用策略模式把選擇的權(quán)利下放給生產(chǎn)者。

實(shí)現(xiàn)

將處理方法抽象為一個(gè)接口的方法
/**
 * 利用策略模式,讓生產(chǎn)者自行決定如果任務(wù)隊(duì)列滿了的時(shí)候 該選擇何種策略來處理該情況
 * 1)死等
 * 2)超時(shí)拒絕
 * 3)直接拒絕
 * 4)拋出異常
 * 5)自行執(zhí)行
 * 6)...其他
 * 方便擴(kuò)展
 */
@FunctionalInterface
interface RejectPolicy<T> {
    /**
     * 拒絕策略
     *
     * @param queue 任務(wù)隊(duì)列
     * @param task  任務(wù)
     */
    void reject(BlockingQueue<T> queue, T task);
}
修改任務(wù)隊(duì)列類
  • 添加tryPut()方法,在任務(wù)隊(duì)列容量等于最大容量, 將執(zhí)行策略下放給調(diào)用方
public void tryPut(RejectPolicy rejectPolicy, T t) {
    lock.lock();
    try {
        if (capacity == queue.size()) {
            // 任務(wù)隊(duì)列容量等于最大容量, 執(zhí)行策略下放給調(diào)用方
            rejectPolicy.reject(this, t);
        } else {
            // 隊(duì)列沒有滿,推送任務(wù)
            log.debug("任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): {}", t);
            queue.addLast(t);
            // 喚醒消費(fèi)者等待中的線程
            consWaitSet.signalAll();
        }
    } finally {
        lock.unlock();
    }
}
修改線程池類
  • 添加拒絕策略的成員變量
private RejectPolicy<Runnable> rejectPolicy;
  • 構(gòu)造方法
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
      this.coreSize = coreSize;
      this.timeout = timeout;
      this.timeUnit = timeUnit;
      this.queue = new BlockingQueue<>(queueSize); // 構(gòu)建任務(wù)隊(duì)列
      this.rejectPolicy = rejectPolicy; // 拒絕策略
  }
  • executeTask()在核心線程數(shù)超過線程集合的容量時(shí),調(diào)用BlockingQueuetryPut()
public void executeTask(Runnable task) {
    if (workers.size() < coreSize) {
        // 核心線程數(shù)未超過線程集合的容量
        // 將當(dāng)前任務(wù)交給Worker執(zhí)行
        Worker worker = new Worker(task);
        // 將worker加入線集合
        log.debug("核心線程數(shù)未滿,剩余:{},新增Worker執(zhí)行任務(wù)", coreSize - workers.size());
        workers.add(worker);
        // 啟動(dòng)線程
        worker.start();
    } else {
        // 核心線程數(shù)超過線程集合的容量
        // 選擇策略模式,讓調(diào)用者自行選擇超時(shí)策略
        queue.tryPut(rejectPolicy, task);
    }
}

再來測一哈

public static void main(String[] args) throws InterruptedException {
    // 測試:
    // 創(chuàng)建一個(gè)線程池
    // 核心線程數(shù) 3, 超時(shí)時(shí)間 5000ms,時(shí)間單位 ms,任務(wù)隊(duì)列容量 5
    ThreadPool threadPool = new ThreadPool(3,
            5000,
            TimeUnit.MILLISECONDS,
            5,
            (queue, task) -> {
                // 1) 死等
                // queue.put(task);
                // 2) 超時(shí)放棄
                // boolean timeoutFlag = queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                // if (!timeoutFlag) {
                //     log.debug("生產(chǎn)者推送任務(wù)超時(shí)!");
                // }
                // 3) 直接放棄
                // log.debug("放棄:{}", task);
                // 4) 拋出異常
                // throw new RuntimeException("任務(wù)推送失敗! 任務(wù): " + task);
            });

    // 任務(wù)數(shù)超過任務(wù)隊(duì)列容量的情況
    log.debug("--------------------任務(wù)數(shù):9---------------------");
    for (int i = 0; i < 10; i++) {
        int tmpI = i;
        threadPool.executeTask(() -> {
            try {
                Thread.sleep(200000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("-------執(zhí)行任務(wù)了!-----,{}", tmpI);
        });
    }
}

死等

測試結(jié)果見上面的日志輸出

超時(shí)放棄

14:17:46 [main] c.TestThreadPool - 生產(chǎn)者推送任務(wù)超時(shí)!

直接放棄

14:19:15 [main] c.TestThreadPool - 放棄:com.example.pool.TestThreadPool$$Lambda$2/1537358694@2f7c7260

拋出異常

Exception in thread "main" java.lang.RuntimeException: 任務(wù)推送失敗! 任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@2f7c7260
    at com.example.pool.TestThreadPool.lambda$main$0(TestThreadPool.java:39)
    at com.example.pool.BlockingQueue.tryPut(TestThreadPool.java:386)
    at com.example.pool.ThreadPool.executeTask(TestThreadPool.java:163)
    at com.example.pool.TestThreadPool.main(TestThreadPool.java:60)

【注意】如果后面還有任務(wù)要推送,也會(huì)推送失敗

三、總結(jié)

  • 利用生產(chǎn)者消費(fèi)者模式平衡了線程池消費(fèi)任務(wù)和任務(wù)生產(chǎn)速度不一致的問題。
  • 利用享元模式充分利用已有線程的資源,來處理不同的任務(wù)。從而減少線程的數(shù)量,避免頻繁的發(fā)生上下文切換
  • 利用策略模式讓生產(chǎn)者自行決定如果任務(wù)隊(duì)列滿了的時(shí)候 該選擇何種策略處理該情況。增加了線程池的可擴(kuò)展性
  • 總而言之,言而總之,收獲頗豐。如果實(shí)現(xiàn)過程中有明顯的硬傷,希望各位看官指出,粉腸感謝!
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容