簡易線程池

來自《Java并發(fā)編程的藝術》

一個簡易的線程池的實現(xiàn)。三個類:
ThreadPool線程池接口
DefaultThreadPool線程池接口實現(xiàn)
Worker工作線程

線程池的工作邏輯:

image.png

全部代碼

package ConcurrencyArt;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

interface ThreadPool<Job extends Runnable> {
    void execute(Job job);//執(zhí)行一個Job,這個Job需要實現(xiàn)Runnable

    void shutdown();//關閉線程池

    void addWorkers(int m);//增加工作者線程

    void removeWorkers(int m);//減少工作者線程

    int getJobSize();//得到正在等待執(zhí)行的任務數(shù)量
}

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    private static final int MAX_WORKER_NUMBERS = 10;//線程池最大限制數(shù)
    private static final int DEFAULT_WORKER_NUMBERS = 5;//線程池默認的數(shù)量
    private static final int MIN_WORKER_NUMBERS = 1;//線程池最小的數(shù)量

    private final LinkedList<Job> jobs = new LinkedList<>();//工作列表

    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());//工作者列表

    private int workerNum = DEFAULT_WORKER_NUMBERS;//工作者線程的數(shù)量
    private AtomicLong threadNum = new AtomicLong();//線程編號生成


    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }

    //初始化線程工作者
    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    @Override
    public void execute(Job job) {
        if (job != null) {
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();//在worker的run方法中的while循環(huán)中,調(diào)用了jobs.wait();
            }
        }
    }

    @Override
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int m) {
        synchronized (jobs) {
            if (m + this.workerNum > MAX_WORKER_NUMBERS) {
                m = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(m);
            this.workerNum += m;
        }
    }

    @Override
    public void removeWorkers(int m) {
        synchronized (jobs) {
            if (m >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            int count = 0;
            while (count < m) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }


    //工作者,負責消費任務。
    class Worker implements Runnable {
        private volatile boolean running = true;//在這里通過volatile boolean而不是interrupt方法來安全地中斷線程

        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    while (jobs.isEmpty()) {//當工作隊列為空時,所有的工作者線程均等待在工作隊列上。
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;//感知到外部對工作者線程的中斷操作,返回。
                        }
                    }
                    job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        //job,作為一個Runnable的實現(xiàn)類,在這里并未將其變成線程并start(),
                        // 而是將其的run方法放在工作者線程中執(zhí)行,這樣真正的多線程是指工作者線程,
                        //而提交進來的Job只是作為一個應該異步執(zhí)行的任務。
                        job.run();
                    } catch (Exception ex) {
                        //忽略job執(zhí)行中的exception
                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容