來自《Java并發(fā)編程的藝術》
一個簡易的線程池的實現(xiàn)。三個類:
ThreadPool線程池接口
DefaultThreadPool線程池接口實現(xiàn)
Worker工作線程
線程池的工作邏輯:

image.png
全部代碼
package ConcurrencyArt;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
interface ThreadPool<Job extends Runnable> {
void execute(Job job);//執(zhí)行一個Job,這個Job需要實現(xiàn)Runnable
void shutdown();//關閉線程池
void addWorkers(int m);//增加工作者線程
void removeWorkers(int m);//減少工作者線程
int getJobSize();//得到正在等待執(zhí)行的任務數(shù)量
}
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
private static final int MAX_WORKER_NUMBERS = 10;//線程池最大限制數(shù)
private static final int DEFAULT_WORKER_NUMBERS = 5;//線程池默認的數(shù)量
private static final int MIN_WORKER_NUMBERS = 1;//線程池最小的數(shù)量
private final LinkedList<Job> jobs = new LinkedList<>();//工作列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());//工作者列表
private int workerNum = DEFAULT_WORKER_NUMBERS;//工作者線程的數(shù)量
private AtomicLong threadNum = new AtomicLong();//線程編號生成
public DefaultThreadPool() {
initializeWorkers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initializeWorkers(workerNum);
}
//初始化線程工作者
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}
@Override
public void execute(Job job) {
if (job != null) {
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();//在worker的run方法中的while循環(huán)中,調(diào)用了jobs.wait();
}
}
}
@Override
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void addWorkers(int m) {
synchronized (jobs) {
if (m + this.workerNum > MAX_WORKER_NUMBERS) {
m = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(m);
this.workerNum += m;
}
}
@Override
public void removeWorkers(int m) {
synchronized (jobs) {
if (m >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
int count = 0;
while (count < m) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
//工作者,負責消費任務。
class Worker implements Runnable {
private volatile boolean running = true;//在這里通過volatile boolean而不是interrupt方法來安全地中斷線程
@Override
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
while (jobs.isEmpty()) {//當工作隊列為空時,所有的工作者線程均等待在工作隊列上。
try {
jobs.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;//感知到外部對工作者線程的中斷操作,返回。
}
}
job = jobs.removeFirst();
}
if (job != null) {
try {
//job,作為一個Runnable的實現(xiàn)類,在這里并未將其變成線程并start(),
// 而是將其的run方法放在工作者線程中執(zhí)行,這樣真正的多線程是指工作者線程,
//而提交進來的Job只是作為一個應該異步執(zhí)行的任務。
job.run();
} catch (Exception ex) {
//忽略job執(zhí)行中的exception
}
}
}
}
public void shutdown() {
running = false;
}
}
}