前言
面試中經(jīng)常會(huì)有考官問道,讓你自己手寫是實(shí)現(xiàn)一個(gè)線程池。這里我就按照網(wǎng)上的一些參考來進(jìn)行實(shí)現(xiàn)一個(gè)簡單的線程池。主要目的是為了理解和記憶實(shí)現(xiàn)過程中遇到的類 以及實(shí)現(xiàn)過程。
首先我們來看一些具體的成員變量有哪些:

終于知道為什么面試官喜歡這個(gè)了吧,有并發(fā)鎖,有阻塞隊(duì)列,有volatile關(guān)鍵字。很多和多線程并發(fā)的東西都會(huì)在線程池中涉及。
那么面試官就會(huì)圍繞這些知識(shí)點(diǎn)去展開,線程池可能只是開始(套路,全是套路?。?/p>
RUNNING的作用是標(biāo)記線程池的整體狀態(tài)是否在工作
lock是為了在線程池內(nèi)部的一些操作上加上并發(fā)鎖,來保證程序不出錯(cuò)。
workers是一個(gè)工作集,用來存放工人。而且是hashSet類型,這就代表是一個(gè)沒有重復(fù)worker的集合。
queue 阻塞隊(duì)列,用來存放線程池將用執(zhí)行的任務(wù)。使用的是并發(fā)包下的阻塞隊(duì)列,可以保證在任務(wù)的存取上是線程安全的。
threads是一個(gè)簡易的線程工廠,源碼中相對復(fù)雜。用來存放生成的線程
poolsize代表核心線程數(shù) 就是這個(gè)線程池中主要大部分情況下有多少線程
coreSize 代表正在線程池中工作的線程數(shù)
關(guān)于這些Size的變量,源碼中比這個(gè)多。而且不好理解,這里就簡單的這樣認(rèn)為。
shutdown是標(biāo)記線程池停止運(yùn)行的標(biāo)記
下面是整個(gè)線程池的最主要方法 execute,是執(zhí)行任務(wù)的入口

這個(gè)方法我們看,當(dāng)任務(wù)為空,拋出異常。如果當(dāng)前線程池中的空閑線程小于核心線程數(shù)的話就增加線程進(jìn)入addThread方法,否則就會(huì)直接加入阻塞隊(duì)列去等待。
下面是addThread方法

我們看到,這里創(chuàng)建一個(gè)工人進(jìn)行工作,然后把工人加入到工作集中。創(chuàng)建一個(gè)線程去執(zhí)行工人的工作。線程啟動(dòng)。整個(gè)過程在并發(fā)鎖的保護(hù)下進(jìn)行。
下面是shutdown方法

首先把運(yùn)行標(biāo)志記為false,然后把工作集中的工人都停止手中工作 阻塞掉。然后阻塞完成,改變線程池的停止?fàn)顟B(tài)為真。
下面是線程池的內(nèi)部類 worker的簡易實(shí)現(xiàn)

這里我直接把工人獲得的任務(wù)放入阻塞隊(duì)列中,然后每次執(zhí)行都從里面去拿。和源碼的實(shí)現(xiàn)有點(diǎn)不一樣(源碼會(huì)先去執(zhí)行每個(gè)工人自己拿到的任務(wù),之后才去阻塞隊(duì)列中拿?。?/p>
worker的run方法

在線程池正常運(yùn)行的狀態(tài)下,一直獲取阻塞隊(duì)列中的任務(wù)并且執(zhí)行
下面是用于線程池停止時(shí)的方法

當(dāng)線程池停止工作時(shí),就對所有線程發(fā)出阻塞指令不再繼續(xù)工作。
完整代碼:
package com.Thread.ThreadPoolExecutor;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
public class MyThreadPoolExecutor {
private volatile boolean RUNNING = true;// 是否正在運(yùn)行
private final ReentrantLock lock = new ReentrantLock();// 并發(fā)鎖
private final HashSet<Worker> workers = new HashSet<>();// 不重復(fù)的工作集
private static BlockingQueue<Runnable> queue = null;// 任務(wù)阻塞隊(duì)列
private final ArrayList<Thread> threads = new ArrayList<>();// 線程工廠
private volatile int poolsize;// 線程池的核心線程數(shù)
private volatile int coresize;// 當(dāng)前線程池中的線程數(shù)
private volatile boolean shutdown = false;// 是否停止工作
public MyThreadPoolExecutor(int poolsize) {
// TODO Auto-generated constructor stub
this.poolsize = poolsize;
queue = new ArrayBlockingQueue<>(poolsize);
}
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
if (coresize < poolsize) {
addThread(command);
} else {
try {
queue.put(command);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void addThread(Runnable task) {
lock.lock();
try {
coresize++;
Worker worker = new Worker(task);
workers.add(worker);
Thread thread = new Thread(worker);
threads.add(thread);
thread.start();
} finally {
// TODO: handle finally clause
lock.unlock();
}
}
public void shutdown() {
RUNNING = false;
if (!workers.isEmpty()) {
for (Worker worker : workers) {
worker.interruptIfIdle();
}
}
shutdown = true;
Thread.currentThread().interrupt();
}
private final class Worker implements Runnable {
public Worker(Runnable task) {
// TODO Auto-generated constructor stub
queue.offer(task);
}
public Runnable getTask() throws InterruptedException {
return queue.take();
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true && RUNNING) {
if (shutdown) {
Thread.interrupted();
}
Runnable task = null;
try {
task = getTask();
task.run();
} catch (InterruptedException e) {
}
}
}
public void interruptIfIdle() {
for (Thread thread : threads) {
System.out.println(thread.getName() + " interrupt");
thread.interrupt();
}
}
}
public static void main(String[] args) {
}
}
class Main {
public static void main(String[] args) {
MyThreadPoolExecutor executor = new MyThreadPoolExecutor(3);
for (int i = 0; i < 10; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("線程" + Thread.currentThread().getName() + "在工作....");
}
});
}
executor.shutdown();
}
}