????????對(duì)于服務(wù)端的程序,經(jīng)常面對(duì)的是客戶端傳入的短?。▓?zhí)行時(shí)間短、工作內(nèi)容較為單一)任務(wù),需要服務(wù)端快速處理并返回結(jié)果。如果服務(wù)端每次接受到一個(gè)任務(wù),創(chuàng)建一個(gè)線程,然后進(jìn)行執(zhí)行,這在原型階段是個(gè)不錯(cuò)的選擇,但是面對(duì)成千上萬(wàn)的任務(wù)遞交進(jìn)服務(wù)器時(shí),如果還是采用一個(gè)任務(wù)一個(gè)線程的方式,那么將會(huì)創(chuàng)建數(shù)以萬(wàn)記的線程,這不是一個(gè)好的選擇。因?yàn)檫@會(huì)使操作系統(tǒng)頻繁的進(jìn)行線程上下文切換,無(wú)故增加系統(tǒng)的負(fù)載,而線程的創(chuàng)建和消亡都是需要耗費(fèi)系統(tǒng)資源的,也無(wú)疑浪費(fèi)了系統(tǒng)資源。
????????線程池技術(shù)能夠很好地解決這個(gè)問(wèn)題,它預(yù)先創(chuàng)建了若干數(shù)量的線程,并且不能由用戶直接對(duì)線程的創(chuàng)建進(jìn)行控制,在這個(gè)前提下重復(fù)使用固定或較為固定數(shù)目的線程來(lái)完成任務(wù)的執(zhí)行。這樣做的好處是,一方面,消除了頻繁創(chuàng)建和消亡線程的系統(tǒng)資源開(kāi)銷,另一方面,面對(duì)過(guò)量任務(wù)的提交能夠平緩的劣化。
一、線程池接口定義
public interface ThreadPool <Job extends Runnable>{
????????//向線程池提交任務(wù)
????????void execute(Job job);
????????//關(guān)閉線程池
????????void shutdown();
????????//增加工作者線程
????????void addWorkers(int num);
????????//減少工作者線程
????????void removeWorker(int num);
????????//獲取等待執(zhí)行的任務(wù)數(shù)
????????int getJobSize();
}
二、線程池實(shí)現(xiàn)
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{
????????//最大工作者線程數(shù)量
????????private static final int MAX_WORKER_NUMBERS = 10;
????????//默認(rèn)工作者線程數(shù)量
????????private static final int DEFAULT_WORKER_NUMBERS = 5;
????????//最小工作者線程數(shù)量
????????private static final int MIN_WORKER_NUMBERS = 1;
????????//工作任務(wù)列表
????????private final LinkedList<Job> jobs = new LinkedList<Job>();
????????//工作者線程列表
????????private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
????????//工作者線程數(shù)量
????????private int workerNum = DEFAULT_WORKER_NUMBERS;
????????//線程編號(hào)生成
????????private AtomicLong threadNum = new AtomicLong();
????????public DefaultThreadPool() {
? ? ? ? ? ? ? ? //初始化并啟動(dòng)工作者線程
????????????????initializeWokers(DEFAULT_WORKER_NUMBERS);
????????}
????????public DefaultThreadPool(int num) {
????????????????workerNum = num > MAX_WORKER_NUMBERS? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS? ????????????????MIN_WORKER_NUMBERS : num;
????????????????initializeWokers(workerNum);
????????}
? ? ? ? //向線程池提交任務(wù)?
????????public void execute(Job job) {
????????????????if (job != null) {
????????????????????//提交任務(wù)后,向等待的在工作任務(wù)列表的線程發(fā)送喚醒通知?
????????????????????synchronized (jobs) {
????????????????????????????jobs.addLast(job);
????????????????????????????jobs.notify();
????????????????????}
????????????????}
????????}
????????public void shutdown() {
????????????????for (Worker worker : workers) {
????????????????????worker.shutdown();
????????????????}
????????}
????????//增加工作者線程
????????public void addWorkers(int num) {
????????????????synchronized (jobs) {
????????????????????????if (num + this.workerNum > MAX_WORKER_NUMBERS) {
????????????????????????????????num = MAX_WORKER_NUMBERS - this.workerNum;
????????????????????????}
????????????????????????initializeWokers(num);
????????????????????????this.workerNum += num;
????????????????}
????????}
????????//減少工作者線程
????????public void removeWorker(int num) {
????????????????synchronized (jobs) {
????????????????????????if (num >= this.workerNum) {
????????????????????????????????throw new IllegalArgumentException("beyond workNum");
????????????????????????}
????????????????????????int count = 0;
????????????????????????while (count < num) {
????????????????????????????????Worker worker = workers.get(count);
????????????????????????????????if (workers.remove(worker)) {
????????????????????????????????????????worker.shutdown();
????????????????????????????????????????count++;
????????????????????????????????}
????????????????????????}
????????????????????????this.workerNum -= count;
????????????????}
????????}
????????//獲取等待執(zhí)行的任務(wù)數(shù)
????????public int getJobSize() {
????????????????????return jobs.size();
????????}
????????//初始化工作者線程列表
????????private void initializeWokers(int num) {
????????????????for (int i = 0; i < num; i++) {
????????????????????Worker worker = new Worker();
????????????????????workers.add(worker);
????????????????????Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
????????????????????incrementAndGet());
????????????????????thread.start();
????????????}
????????}
????????//工作者線程定義
????????class Worker implements Runnable {
? ? ? ? ? ? ? ?//是否工作
????????????????private volatile boolean running = true;
????????????????public void run() {
????????????????????????while (running) {
????????????????????????????Job job = null;
????????????????????????????synchronized (jobs) {
????????????????????????????????????//如果任務(wù)列表中沒(méi)有任務(wù)則等待
????????????????????????????????????while (jobs.isEmpty()) {
????????????????????????????????????????????try {
????????????????????????????????????????????????????jobs.wait();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?????????} catch (InterruptedException ex) {
????????????????????????????????????????????????????Thread.currentThread().interrupt();
????????????????????????????????????????????????????return;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
????????????????????????????????????}
????????????????????????????????????//獲取任務(wù)執(zhí)行
????????????????????????????????????job = jobs.removeFirst();
????????????????????????????????????if(job != null) {
????????????????????????????????????????try {
????????????????????????????????????????????job.run();
????????????????????????????????????????} catch (Exception ex) {
????????????????????????????????????????}
????????????????????????????????????}
????????????????????????}
????????????????}
????????????}
????????????public void shutdown() {
????????????????????running = false;
????????????????}
????????}
}
? ? ? ? 當(dāng)客戶端調(diào)用execute(Job)方法時(shí),會(huì)不斷地向任務(wù)列表jobs中添加Job,而每個(gè)工作者線程當(dāng)客戶端調(diào)用execute(Job)方法時(shí),會(huì)不斷地向任務(wù)列表jobs中添加Job,而每個(gè)工作者線程會(huì)不斷地從jobs上取出一個(gè)job進(jìn)行執(zhí)行,當(dāng)jobs為空時(shí),工作者線程進(jìn)入等到狀態(tài)。
? ? ? ? 添加一個(gè)job后,對(duì)工作隊(duì)列jobs調(diào)用了notify()方法,而不是notifyAll()方法,因?yàn)槟軌虼_定有工作者線程被喚醒,這時(shí)使用notify()方法將會(huì)比notifyAll()方法獲得更小的開(kāi)銷(避免將等待隊(duì)列中的線程全部移到阻塞隊(duì)列中)。
? ? ? ? 可以看到線程池的本質(zhì)是使用了一個(gè)線程安全的工作隊(duì)列連接工作者線程和客戶端線程,客戶端線程將任務(wù)放入工作隊(duì)列后邊返回,而工作者線程則不斷地從工作任務(wù)隊(duì)列取出工作并執(zhí)行。當(dāng)工作隊(duì)列為空時(shí),所有的工作者線程均等待在工作隊(duì)列上,當(dāng)有客戶端提交了一個(gè)任務(wù)之后會(huì)通知任意一個(gè)工作者線程,隨著大量任務(wù)的提交,更多的工作者線程會(huì)被喚醒。