Java 如何開(kāi)發(fā)一個(gè)自定義線程池
每一個(gè)線程的啟動(dòng)和結(jié)束都是比較消耗時(shí)間和占用資源的。
如果在系統(tǒng)中用到了很多的線程,大量的啟動(dòng)和結(jié)束動(dòng)作會(huì)導(dǎo)致系統(tǒng)的性能變卡,響應(yīng)變慢。
為了解決這個(gè)問(wèn)題,引入線程池這種設(shè)計(jì)思想。
線程池的模式很像 生產(chǎn)者消費(fèi)者模式,消費(fèi)的對(duì)象是一個(gè)一個(gè)的能夠運(yùn)行的任務(wù)
步驟 1 : 線程池設(shè)計(jì)思路
線程池的思路和生產(chǎn)者消費(fèi)者模型是很接近的。
- 準(zhǔn)備一個(gè)任務(wù)容器
- 一次性啟動(dòng)10個(gè) 消費(fèi)者線程
- 剛開(kāi)始任務(wù)容器是空的,所以線程都wait在上面。
- 直到一個(gè)外部線程往這個(gè)任務(wù)容器中扔了一個(gè)“任務(wù)”,就會(huì)有一個(gè)消費(fèi)者線程被喚醒notify
- 這個(gè)消費(fèi)者線程取出“任務(wù)”,并且執(zhí)行這個(gè)任務(wù),執(zhí)行完畢后,繼續(xù)等待下一次任務(wù)的到來(lái)。
- 如果短時(shí)間內(nèi),有較多的任務(wù)加入,那么就會(huì)有多個(gè)線程被喚醒,去執(zhí)行這些任務(wù)。
在整個(gè)過(guò)程中,都不需要?jiǎng)?chuàng)建新的線程,而是循環(huán)使用這些已經(jīng)存在的線程
步驟 2 : 開(kāi)發(fā)一個(gè)自定義線程池
這是一個(gè)自定義的線程池,雖然不夠完善和健壯,但是已經(jīng)足以說(shuō)明線程池的工作原理
緩慢的給這個(gè)線程池添加任務(wù),會(huì)看到有多條線程來(lái)執(zhí)行這些任務(wù)。
線程7執(zhí)行完畢任務(wù)后,又回到池子里,下一次任務(wù)來(lái)的時(shí)候,線程7又來(lái)執(zhí)行新的任務(wù)。
package multiplethread;
import java.util.LinkedList;
public class ThreadPool {
// 線程池大小
int threadPoolSize;
// 任務(wù)容器
LinkedList<Runnable> tasks = new LinkedList<Runnable>();
// 試圖消費(fèi)任務(wù)的線程
public ThreadPool() {
threadPoolSize = 10;
// 啟動(dòng)10個(gè)任務(wù)消費(fèi)者線程
synchronized (tasks) {
for (int i = 0; i < threadPoolSize; i++) {
new TaskConsumeThread("任務(wù)消費(fèi)者線程 " + i).start();
}
}
}
public void add(Runnable r) {
synchronized (tasks) {
tasks.add(r);
// 喚醒等待的任務(wù)消費(fèi)者線程
tasks.notifyAll();
}
}
class TaskConsumeThread extends Thread {
public TaskConsumeThread(String name) {
super(name);
}
Runnable task;
public void run() {
System.out.println("啟動(dòng): " + this.getName());
while (true) {
synchronized (tasks) {
while (tasks.isEmpty()) {
try {
tasks.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
task = tasks.removeLast();
// 允許添加任務(wù)的線程可以繼續(xù)添加任務(wù)
tasks.notifyAll();
}
System.out.println(this.getName() + " 獲取到任務(wù),并執(zhí)行");
task.run();
}
}
}
}
.
package multiplethread;
public class TestThread {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool();
for (int i = 0; i < 5; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
//System.out.println("執(zhí)行任務(wù)");
//任務(wù)可能是打印一句話
//可能是訪問(wèn)文件
//可能是做排序
}
};
pool.add(task);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
步驟 3 : 測(cè)試線程池
創(chuàng)造一個(gè)情景,每個(gè)任務(wù)執(zhí)行的時(shí)間都是1秒
剛開(kāi)始是間隔1秒鐘向線程池中添加任務(wù)
然后間隔時(shí)間越來(lái)越短,執(zhí)行任務(wù)的線程還沒(méi)有來(lái)得及結(jié)束,新的任務(wù)又來(lái)了。
就會(huì)觀察到線程池里的其他線程被喚醒來(lái)執(zhí)行這些任務(wù)

package multiplethread;
public class TestThread {
public static void main(String[] args) {
ThreadPool pool= new ThreadPool();
int sleep=1000;
while(true){
pool.add(new Runnable(){
@Override
public void run() {
//System.out.println("執(zhí)行任務(wù)");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
try {
Thread.sleep(sleep);
sleep = sleep>100?sleep-100:sleep;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
步驟 4 : 使用java自帶線程池
java提供自帶的線程池,而不需要自己去開(kāi)發(fā)一個(gè)自定義線程池了。
線程池類ThreadPoolExecutor在包java.util.concurrent下
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
第一個(gè)參數(shù)10 表示這個(gè)線程池初始化了10個(gè)線程在里面工作
第二個(gè)參數(shù)15 表示如果10個(gè)線程不夠用了,就會(huì)自動(dòng)增加到最多15個(gè)線程
第三個(gè)參數(shù)60 結(jié)合第四個(gè)參數(shù)TimeUnit.SECONDS,表示經(jīng)過(guò)60秒,多出來(lái)的線程還沒(méi)有接到活兒,就會(huì)回收,最后保持池子里就10個(gè)
第四個(gè)參數(shù)TimeUnit.SECONDS 如上
第五個(gè)參數(shù) new LinkedBlockingQueue() 用來(lái)放任務(wù)的集合
execute方法用于添加新的任務(wù)
package multiplethread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.execute(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("任務(wù)1");
}
});
}
}
練習(xí): 借助線程池同步查找文件內(nèi)容
在 練習(xí)-同步查找文件內(nèi)容 ,如果文件特別多,就會(huì)創(chuàng)建很多的線程。 改寫(xiě)這個(gè)練習(xí),使用線程池的方式來(lái)完成。
初始化一個(gè)大小是10的線程池
遍歷所有文件,當(dāng)遍歷到文件是.java的時(shí)候,創(chuàng)建一個(gè)查找文件的任務(wù),把這個(gè)任務(wù)扔進(jìn)線程池去執(zhí)行,繼續(xù)遍歷下一個(gè)文件
答案 :

SearchFileTask.java
package multiplethread;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
public class SearchFileTask implements Runnable{
private File file;
private String search;
public SearchFileTask(File file,String search) {
this.file = file;
this.search= search;
}
public void run(){
String fileContent = readFileConent(file);
if(fileContent.contains(search)){
System.out.printf( "線程: %s 找到子目標(biāo)字符串%s,在文件:%s%n",Thread.currentThread().getName(), search,file);
}
}
public String readFileConent(File file){
try (FileReader fr = new FileReader(file)) {
char[] all = new char[(int) file.length()];
fr.read(all);
return new String(all);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
ThreadPool.java
package multiplethread;
import java.util.LinkedList;
public class ThreadPool {
// 線程池大小
int threadPoolSize;
// 任務(wù)容器
LinkedList<Runnable> tasks = new LinkedList<Runnable>();
// 試圖消費(fèi)任務(wù)的線程
public ThreadPool() {
threadPoolSize = 10;
// 啟動(dòng)10個(gè)任務(wù)消費(fèi)者線程
synchronized (tasks) {
for (int i = 0; i < threadPoolSize; i++) {
new TaskConsumeThread("任務(wù)消費(fèi)者線程 " + i).start();
}
}
}
public void add(Runnable r) {
synchronized (tasks) {
tasks.add(r);
// 喚醒等待的任務(wù)消費(fèi)者線程
tasks.notifyAll();
}
}
class TaskConsumeThread extends Thread {
public TaskConsumeThread(String name) {
super(name);
}
Runnable task;
public void run() {
while (true) {
synchronized (tasks) {
while (tasks.isEmpty()) {
try {
tasks.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
task = tasks.removeLast();
// 允許添加任務(wù)的線程可以繼續(xù)添加任務(wù)
tasks.notifyAll();
}
task.run();
}
}
}
}
TestThread.java
package multiplethread;
import java.io.File;
public class TestThread {
static ThreadPool pool= new ThreadPool();
public static void search(File file, String search) {
if (file.isFile()) {
if(file.getName().toLowerCase().endsWith(".java")){
SearchFileTask task = new SearchFileTask(file, search);
pool.add(task);
}
}
if (file.isDirectory()) {
File[] fs = file.listFiles();
for (File f : fs) {
search(f, search);
}
}
}
public static void main(String[] args) {
File folder =new File("e:\\project");
search(folder,"Magic");
}
}