簡單實(shí)現(xiàn)一個(gè)初級(jí)線程池

前言

面試中經(jīng)常會(huì)有考官問道,讓你自己手寫是實(shí)現(xiàn)一個(gè)線程池。這里我就按照網(wǎng)上的一些參考來進(jìn)行實(shí)現(xiàn)一個(gè)簡單的線程池。主要目的是為了理解和記憶實(shí)現(xiàn)過程中遇到的類 以及實(shí)現(xiàn)過程。

首先我們來看一些具體的成員變量有哪些:

成員變量

終于知道為什么面試官喜歡這個(gè)了吧,有并發(fā)鎖,有阻塞隊(duì)列,有volatile關(guān)鍵字。很多和多線程并發(fā)的東西都會(huì)在線程池中涉及。
那么面試官就會(huì)圍繞這些知識(shí)點(diǎn)去展開,線程池可能只是開始(套路,全是套路?。?/p>

RUNNING的作用是標(biāo)記線程池的整體狀態(tài)是否在工作
lock是為了在線程池內(nèi)部的一些操作上加上并發(fā)鎖,來保證程序不出錯(cuò)。
workers是一個(gè)工作集,用來存放工人。而且是hashSet類型,這就代表是一個(gè)沒有重復(fù)worker的集合。
queue 阻塞隊(duì)列,用來存放線程池將用執(zhí)行的任務(wù)。使用的是并發(fā)包下的阻塞隊(duì)列,可以保證在任務(wù)的存取上是線程安全的。
threads是一個(gè)簡易的線程工廠,源碼中相對復(fù)雜。用來存放生成的線程
poolsize代表核心線程數(shù) 就是這個(gè)線程池中主要大部分情況下有多少線程
coreSize 代表正在線程池中工作的線程數(shù)

關(guān)于這些Size的變量,源碼中比這個(gè)多。而且不好理解,這里就簡單的這樣認(rèn)為。

shutdown是標(biāo)記線程池停止運(yùn)行的標(biāo)記

下面是整個(gè)線程池的最主要方法 execute,是執(zhí)行任務(wù)的入口

execute方法

這個(gè)方法我們看,當(dāng)任務(wù)為空,拋出異常。如果當(dāng)前線程池中的空閑線程小于核心線程數(shù)的話就增加線程進(jìn)入addThread方法,否則就會(huì)直接加入阻塞隊(duì)列去等待。

下面是addThread方法

增加線程

我們看到,這里創(chuàng)建一個(gè)工人進(jìn)行工作,然后把工人加入到工作集中。創(chuàng)建一個(gè)線程去執(zhí)行工人的工作。線程啟動(dòng)。整個(gè)過程在并發(fā)鎖的保護(hù)下進(jìn)行。

下面是shutdown方法


shutdown

首先把運(yùn)行標(biāo)志記為false,然后把工作集中的工人都停止手中工作 阻塞掉。然后阻塞完成,改變線程池的停止?fàn)顟B(tài)為真。

下面是線程池的內(nèi)部類 worker的簡易實(shí)現(xiàn)


worker

這里我直接把工人獲得的任務(wù)放入阻塞隊(duì)列中,然后每次執(zhí)行都從里面去拿。和源碼的實(shí)現(xiàn)有點(diǎn)不一樣(源碼會(huì)先去執(zhí)行每個(gè)工人自己拿到的任務(wù),之后才去阻塞隊(duì)列中拿?。?/p>

worker的run方法

run方法

在線程池正常運(yùn)行的狀態(tài)下,一直獲取阻塞隊(duì)列中的任務(wù)并且執(zhí)行

下面是用于線程池停止時(shí)的方法

interrupt

當(dāng)線程池停止工作時(shí),就對所有線程發(fā)出阻塞指令不再繼續(xù)工作。

完整代碼:

package com.Thread.ThreadPoolExecutor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPoolExecutor {
    private volatile boolean RUNNING = true;// 是否正在運(yùn)行
    private final ReentrantLock lock = new ReentrantLock();// 并發(fā)鎖
    private final HashSet<Worker> workers = new HashSet<>();// 不重復(fù)的工作集
    private static BlockingQueue<Runnable> queue = null;// 任務(wù)阻塞隊(duì)列
    private final ArrayList<Thread> threads = new ArrayList<>();// 線程工廠
    private volatile int poolsize;// 線程池的核心線程數(shù)
    private volatile int coresize;// 當(dāng)前線程池中的線程數(shù)
    private volatile boolean shutdown = false;// 是否停止工作

    public MyThreadPoolExecutor(int poolsize) {
        // TODO Auto-generated constructor stub
        this.poolsize = poolsize;
        queue = new ArrayBlockingQueue<>(poolsize);
    }

    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        if (coresize < poolsize) {
            addThread(command);
        } else {
            try {
                queue.put(command);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    private void addThread(Runnable task) {

        lock.lock();
        try {
            coresize++;
            Worker worker = new Worker(task);
            workers.add(worker);
            Thread thread = new Thread(worker);
            threads.add(thread);
            thread.start();
        } finally {
            // TODO: handle finally clause
            lock.unlock();
        }

    }

    public void shutdown() {
        RUNNING = false;
        if (!workers.isEmpty()) {
            for (Worker worker : workers) {
                worker.interruptIfIdle();
            }
        }
        shutdown = true;
        Thread.currentThread().interrupt();
    }

    private final class Worker implements Runnable {

        public Worker(Runnable task) {
            // TODO Auto-generated constructor stub
            queue.offer(task);
        }

        public Runnable getTask() throws InterruptedException {
            return queue.take();
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            while (true && RUNNING) {
                if (shutdown) {
                    Thread.interrupted();
                }
                Runnable task = null;
                try {
                    task = getTask();
                    task.run();
                } catch (InterruptedException e) {

                }
            }
        }

        public void interruptIfIdle() {
            for (Thread thread : threads) {
                System.out.println(thread.getName() + " interrupt");
                thread.interrupt();
            }
        }

    }

    public static void main(String[] args) {

    }
}

class Main {
    public static void main(String[] args) {
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(3);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    System.out.println("線程" + Thread.currentThread().getName() + "在工作....");
                }
            });
        }

        executor.shutdown();
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容