線程應(yīng)用實(shí)例--簡(jiǎn)單線程池實(shí)現(xiàn)

????????對(duì)于服務(wù)端的程序,經(jīng)常面對(duì)的是客戶端傳入的短?。▓?zhí)行時(shí)間短、工作內(nèi)容較為單一)任務(wù),需要服務(wù)端快速處理并返回結(jié)果。如果服務(wù)端每次接受到一個(gè)任務(wù),創(chuàng)建一個(gè)線程,然后進(jìn)行執(zhí)行,這在原型階段是個(gè)不錯(cuò)的選擇,但是面對(duì)成千上萬(wàn)的任務(wù)遞交進(jìn)服務(wù)器時(shí),如果還是采用一個(gè)任務(wù)一個(gè)線程的方式,那么將會(huì)創(chuàng)建數(shù)以萬(wàn)記的線程,這不是一個(gè)好的選擇。因?yàn)檫@會(huì)使操作系統(tǒng)頻繁的進(jìn)行線程上下文切換,無(wú)故增加系統(tǒng)的負(fù)載,而線程的創(chuàng)建和消亡都是需要耗費(fèi)系統(tǒng)資源的,也無(wú)疑浪費(fèi)了系統(tǒng)資源。

????????線程池技術(shù)能夠很好地解決這個(gè)問(wèn)題,它預(yù)先創(chuàng)建了若干數(shù)量的線程,并且不能由用戶直接對(duì)線程的創(chuàng)建進(jìn)行控制,在這個(gè)前提下重復(fù)使用固定或較為固定數(shù)目的線程來(lái)完成任務(wù)的執(zhí)行。這樣做的好處是,一方面,消除了頻繁創(chuàng)建和消亡線程的系統(tǒng)資源開(kāi)銷,另一方面,面對(duì)過(guò)量任務(wù)的提交能夠平緩的劣化。

一、線程池接口定義

public interface ThreadPool <Job extends Runnable>{

????????//向線程池提交任務(wù)

????????void execute(Job job);

????????//關(guān)閉線程池

????????void shutdown();

????????//增加工作者線程

????????void addWorkers(int num);

????????//減少工作者線程

????????void removeWorker(int num);

????????//獲取等待執(zhí)行的任務(wù)數(shù)

????????int getJobSize();

}

二、線程池實(shí)現(xiàn)

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{

????????//最大工作者線程數(shù)量

????????private static final int MAX_WORKER_NUMBERS = 10;

????????//默認(rèn)工作者線程數(shù)量

????????private static final int DEFAULT_WORKER_NUMBERS = 5;

????????//最小工作者線程數(shù)量

????????private static final int MIN_WORKER_NUMBERS = 1;

????????//工作任務(wù)列表

????????private final LinkedList<Job> jobs = new LinkedList<Job>();

????????//工作者線程列表

????????private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());

????????//工作者線程數(shù)量

????????private int workerNum = DEFAULT_WORKER_NUMBERS;

????????//線程編號(hào)生成

????????private AtomicLong threadNum = new AtomicLong();

????????public DefaultThreadPool() {

? ? ? ? ? ? ? ? //初始化并啟動(dòng)工作者線程

????????????????initializeWokers(DEFAULT_WORKER_NUMBERS);

????????}

????????public DefaultThreadPool(int num) {

????????????????workerNum = num > MAX_WORKER_NUMBERS? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS? ????????????????MIN_WORKER_NUMBERS : num;

????????????????initializeWokers(workerNum);

????????}

? ? ? ? //向線程池提交任務(wù)?

????????public void execute(Job job) {

????????????????if (job != null) {

????????????????????//提交任務(wù)后,向等待的在工作任務(wù)列表的線程發(fā)送喚醒通知?

????????????????????synchronized (jobs) {

????????????????????????????jobs.addLast(job);

????????????????????????????jobs.notify();

????????????????????}

????????????????}

????????}

????????public void shutdown() {

????????????????for (Worker worker : workers) {

????????????????????worker.shutdown();

????????????????}

????????}

????????//增加工作者線程

????????public void addWorkers(int num) {

????????????????synchronized (jobs) {

????????????????????????if (num + this.workerNum > MAX_WORKER_NUMBERS) {

????????????????????????????????num = MAX_WORKER_NUMBERS - this.workerNum;

????????????????????????}

????????????????????????initializeWokers(num);

????????????????????????this.workerNum += num;

????????????????}

????????}

????????//減少工作者線程

????????public void removeWorker(int num) {

????????????????synchronized (jobs) {

????????????????????????if (num >= this.workerNum) {

????????????????????????????????throw new IllegalArgumentException("beyond workNum");

????????????????????????}

????????????????????????int count = 0;

????????????????????????while (count < num) {

????????????????????????????????Worker worker = workers.get(count);

????????????????????????????????if (workers.remove(worker)) {

????????????????????????????????????????worker.shutdown();

????????????????????????????????????????count++;

????????????????????????????????}

????????????????????????}

????????????????????????this.workerNum -= count;

????????????????}

????????}

????????//獲取等待執(zhí)行的任務(wù)數(shù)

????????public int getJobSize() {

????????????????????return jobs.size();

????????}

????????//初始化工作者線程列表

????????private void initializeWokers(int num) {

????????????????for (int i = 0; i < num; i++) {

????????????????????Worker worker = new Worker();

????????????????????workers.add(worker);

????????????????????Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.

????????????????????incrementAndGet());

????????????????????thread.start();

????????????}

????????}

????????//工作者線程定義

????????class Worker implements Runnable {

? ? ? ? ? ? ? ?//是否工作

????????????????private volatile boolean running = true;

????????????????public void run() {

????????????????????????while (running) {

????????????????????????????Job job = null;

????????????????????????????synchronized (jobs) {

????????????????????????????????????//如果任務(wù)列表中沒(méi)有任務(wù)則等待

????????????????????????????????????while (jobs.isEmpty()) {

????????????????????????????????????????????try {

????????????????????????????????????????????????????jobs.wait();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?????????} catch (InterruptedException ex) {

????????????????????????????????????????????????????Thread.currentThread().interrupt();

????????????????????????????????????????????????????return;

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}

????????????????????????????????????}

????????????????????????????????????//獲取任務(wù)執(zhí)行

????????????????????????????????????job = jobs.removeFirst();

????????????????????????????????????if(job != null) {

????????????????????????????????????????try {

????????????????????????????????????????????job.run();

????????????????????????????????????????} catch (Exception ex) {

????????????????????????????????????????}

????????????????????????????????????}

????????????????????????}

????????????????}

????????????}

????????????public void shutdown() {

????????????????????running = false;

????????????????}

????????}

}

? ? ? ? 當(dāng)客戶端調(diào)用execute(Job)方法時(shí),會(huì)不斷地向任務(wù)列表jobs中添加Job,而每個(gè)工作者線程當(dāng)客戶端調(diào)用execute(Job)方法時(shí),會(huì)不斷地向任務(wù)列表jobs中添加Job,而每個(gè)工作者線程會(huì)不斷地從jobs上取出一個(gè)job進(jìn)行執(zhí)行,當(dāng)jobs為空時(shí),工作者線程進(jìn)入等到狀態(tài)。

? ? ? ? 添加一個(gè)job后,對(duì)工作隊(duì)列jobs調(diào)用了notify()方法,而不是notifyAll()方法,因?yàn)槟軌虼_定有工作者線程被喚醒,這時(shí)使用notify()方法將會(huì)比notifyAll()方法獲得更小的開(kāi)銷(避免將等待隊(duì)列中的線程全部移到阻塞隊(duì)列中)。

? ? ? ? 可以看到線程池的本質(zhì)是使用了一個(gè)線程安全的工作隊(duì)列連接工作者線程和客戶端線程,客戶端線程將任務(wù)放入工作隊(duì)列后邊返回,而工作者線程則不斷地從工作任務(wù)隊(duì)列取出工作并執(zhí)行。當(dāng)工作隊(duì)列為空時(shí),所有的工作者線程均等待在工作隊(duì)列上,當(dāng)有客戶端提交了一個(gè)任務(wù)之后會(huì)通知任意一個(gè)工作者線程,隨著大量任務(wù)的提交,更多的工作者線程會(huì)被喚醒。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • ~家像個(gè)垃圾堆那樣~~ ~~有點(diǎn)原則,有點(diǎn)規(guī)律,有點(diǎn)思想好不好~~ ~~搞得家像垃圾堆那樣~~ ~不要太隨意,點(diǎn)點(diǎn)...
    爛筆頭韋閱讀 134評(píng)論 0 0
  • 不要輕易去依賴一個(gè)人,它會(huì)成為你的習(xí)慣,當(dāng)分別來(lái)臨,你失去的不是某個(gè)人,而是你精神的支柱。無(wú)論何時(shí)何地,都要學(xué)會(huì)獨(dú)...
    鹿哈鹿哈閱讀 584評(píng)論 0 0
  • 參考http://stackoverflow.com/questions/29505622/spannablest...
    小面包屑閱讀 1,950評(píng)論 0 2
  • 水逆啊諸事不順。又是丟東西又是生病的。。
    晨曦iuiu閱讀 199評(píng)論 0 0

友情鏈接更多精彩內(nèi)容