多線程與并發(fā)(九):線程池相關(guān)

為什么說頻繁的創(chuàng)建和銷毀線程會浪費大量的系統(tǒng)資源?

線程的創(chuàng)建需要開辟虛擬機棧、本地方法棧、程序計數(shù)器等線程私有的內(nèi)存空間。在線程銷毀時需要回收這些系統(tǒng)資源。頻繁的創(chuàng)建和銷毀線程會浪費大量的系統(tǒng)資源

線程池的作用

  • 利用線程池管理并復(fù)用線程、控制最大并發(fā)數(shù)等
  • 實現(xiàn)任務(wù)線程隊列緩存策略和拒絕機制
  • 實現(xiàn)某些與時間相關(guān)的功能,如定時執(zhí)行、周期執(zhí)行等
  • 隔離線程環(huán)境。通過配置獨立的線程池,將一些服務(wù)隔開,避免個服務(wù)相互影響

合理的使用線程池能夠帶來三個好處

  • 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀的消耗
  • 提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行
  • 提高線程的可管理性。

1. 自定義線程池-阻塞隊列

自定義線程池.jpg
public class ThreadPool {
    public static void main(String[] args) {
        Pool pool = new Pool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
            //拒絕策略
            //queue.put(task); //死等
            //queue.offer(task,500,TimeUnit.MILLISECONDS); //帶超時等待
            //System.out.println("放棄");
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            pool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " execute " + j);

            });
        }
    }
}

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

class Pool {
    private BlockingQueue<Runnable> taskQueue;

    //線程集合
    private HashSet<Worker> workers = new HashSet<>();

    //線程數(shù)
    private int coreSize;

    //獲取任務(wù)的超時時間
    private long timeout;

    private TimeUnit unit;

    private RejectPolicy<Runnable> rejectPolicy;

    public Pool(int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.unit = unit;
        taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    //執(zhí)行的方法
    public void execute(Runnable task) {
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println(Thread.currentThread().getName() + " add workers" + task);
                workers.add(worker);
                worker.start();
            } else {
                //taskQueue.put(task);

                //策略模式
                //1 死等 2. 帶超時等待 3. 放棄任務(wù)執(zhí)行 4. 拋出異常 5. 讓調(diào)用者自己執(zhí)行任務(wù)
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable task) {

            this.task = task;
        }

        @Override
        public void run() {
            //while (task!= null || (task = taskQueue.take()) != null){
            while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 執(zhí)行 " + task );
                    task.run();
                } catch (Exception e) {
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println(Thread.currentThread().getName() + " 移除");
                workers.remove(this);
            }
        }
    }

}

class BlockingQueue<T> {
    Deque<T> queue = new ArrayDeque<>();

    int capacity;

    Lock lock = new ReentrantLock();

    Condition emptyWaitSet = lock.newCondition();
    Condition fullWaitSet = lock.newCondition();

    public BlockingQueue(int queueCapacity) {
        this.capacity = queueCapacity;
    }

    //添加帶超時的等待
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    //返回的是剩余的時間
                    if (nanos <= 0) {
                        System.out.println(Thread.currentThread().getName() + " 沒等到,返回");
                        return null; //沒等到
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T task = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + " 出隊" + task);
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    //取任務(wù)
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 隊列為空,等待");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T task = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + " 出隊 " + task);
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    //添加任務(wù)
    public void put(T task) {

        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 隊列已滿,等待進入隊列");
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.addLast(task);
            System.out.println(Thread.currentThread().getName() + " 入隊 " + task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    //帶超時的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    System.out.println(Thread.currentThread().getName() + " 隊列已滿,等待進入隊列");
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            System.out.println(Thread.currentThread().getName() + " 入隊 " + task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }


    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判斷隊列是否已滿
            if (queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else {
                queue.addLast(task);
                emptyWaitSet.signal();
                System.out.println(Thread.currentThread().getName() + " 入隊 " + task);
            }
        } finally {
            lock.unlock();
        }
    }
}

執(zhí)行結(jié)果:
(放棄任務(wù))


線程池.png

  • 自定義拒絕策略接口
@FunctionalInterface
interface RejectPolicy{
    void reject(BlockingQueue<T> queue,  T task);
}

2. ThreadPoolExecutor

2.1線程池狀態(tài)

ThreadPoolExecutor 使用int 高三位來表示線程池狀態(tài),低29位表示線程池數(shù)量

狀態(tài)名 高3位 接受新任務(wù) 處理阻塞隊列任務(wù) 說明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不會接受新任務(wù),會處理阻塞隊列剩余任務(wù)
STOP 001 N N 會中斷正在執(zhí)行的任務(wù),并拋棄阻塞隊列任務(wù)
TIDYING 010 - - 任務(wù)全執(zhí)行完畢,活動線程為0即將進入終結(jié)
TERMINATED 011 - - 終結(jié)狀態(tài)

這些信息存儲在一個原子變量ctl中,目的是將線程池狀態(tài)與線程個數(shù)合二為一,這樣就可以用一次CAS原子操作進行賦值

//c為舊值,ctlOf返回的結(jié)果為新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
//rs為高三位代表線程池狀態(tài),wc為低29位代表線程個數(shù),ctl是合并它們
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.2 構(gòu)造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize 核心線程數(shù)目
  • maximumPoolSize 最大線程數(shù)目
  • keepAliveTime 生存時間--針對救急線程
  • unit 時間單位--針對救急線程
  • workQueue 阻塞隊列
  • threadFactory 線程工廠--可以為線程創(chuàng)建時起名字
  • handler 拒絕策略

最大線程數(shù)= 核心線程數(shù)+ 救急線程數(shù)

  • 線程池中剛開始沒有線程,當(dāng)一個任務(wù)提交給線程池后,線程池會創(chuàng)建一個新線程來執(zhí)行任務(wù)

  • 當(dāng)線程數(shù)達到corePoolSize 并沒有線程空閑,這時再加入任務(wù),新加的任務(wù)會被加入workQueue隊列排隊,直到有空閑的線程

  • 如果隊列選擇了有界隊列,那么任務(wù)超過了隊列大小時,會創(chuàng)建maximunPoolSize-corePoolSize數(shù)目的線程來救急

  • 如果線程達到了maximunPoolSize 仍然有新任務(wù)這時會執(zhí)行拒絕策略。拒絕策略jdk提供了四種實現(xiàn),其實著名框架也提供了實現(xiàn)

    • AbortPolicy 讓調(diào)用者拋出RejectExecutionException異常,這是默認策略
    • CallerRunsPolicy 讓調(diào)用者運行任務(wù)
    • DiscardPolicy 放棄本次任務(wù)
    • DiscardOldestPolicy 放棄隊列中最早的任務(wù),本任務(wù)取而代之
    • Dubbo 的實現(xiàn),在拋出 RejectExecutionException 異常之前會記錄日志,并dump線程棧信息,方便定位問題
    • Netty 的實現(xiàn),是創(chuàng)建一個新線程來執(zhí)行任務(wù)
    • ActiveMQ的實現(xiàn),帶超時等待(60s)嘗試放入隊列,類似我們之前自定義的拒絕策略
    • PinPoint 的實現(xiàn),它使用了一個拒絕策略鏈,會逐一嘗試策略鏈中的每種拒絕策略
  • 當(dāng)高峰過去后,超過coreSize的救急線程如果一段時間沒有任務(wù)做,需要結(jié)束資源,這個時間由keepAliveTime 和 unit來控制。

根據(jù)這個構(gòu)造方法,JDK Executors類中提供了眾多工廠方法來創(chuàng)建各種用途的線程池

2.3 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

特點:

  • 核心線程數(shù)== 最大線程數(shù)(沒有救急線程被創(chuàng)建),因此無需超時時間
  • 阻塞隊列是無界的,可以放任意數(shù)量的任務(wù)

評價:

  • 適合于任務(wù)量已知,相對耗時的任務(wù)

2.4 newCacheThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

特點:

  • 核心線程數(shù)是0,最大線程數(shù)是Integer.MAX_VALUE,救急線程的空閑生存時間是60s,意味著
    • 全部是救急線程(60s后可以回收)
    • 救急線程可以無限創(chuàng)建
  • 隊列采用了SynchronousQueue實現(xiàn)的特點是,它沒有容量,沒有線程來取是放不進去的(一手交錢一手交貨)

評價:

  • 整個線程池表現(xiàn)為線程數(shù)會根據(jù)任務(wù)量不斷增長,沒有上限,當(dāng)任務(wù)執(zhí)行完畢,空閑1分鐘后釋放線程。
  • 適合任務(wù)數(shù)比較密集,但每個任務(wù)執(zhí)行時間短的情況

2.5 newSingleThreadPool

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

使用場景:
希望多個任務(wù)排隊執(zhí)行。線程數(shù)固定為1,任務(wù)數(shù)多于1時,會放入無界隊列排隊。任務(wù)執(zhí)行完成,這唯一的線程不會被釋放

區(qū)別:

  • 自己創(chuàng)建一個單線程串行執(zhí)行任務(wù),如果任務(wù)執(zhí)行失敗而終止那么沒有任何補救措施,而線程池還會新建一個線程,保證池的正常工作
  • 線程個數(shù)始終為1,不能修改
    • FinalizableDelegateExecutorService應(yīng)用的是裝飾器模式,只對外暴露了ExecutorService接口,因此不能調(diào)用ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始為1,后面還可以修改
    • 對外暴露的是ThreadPoolExecutor對象,可以強轉(zhuǎn)后調(diào)用setCorePoolSize等方法進行修改

2.6 提交任務(wù)

//執(zhí)行任務(wù)
void execute(Runnable command);
//提交任務(wù)task, 用返回值Future 獲得任務(wù)執(zhí)行結(jié)果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任務(wù)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
//提交tasks中所有任務(wù),帶超時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
//提交tasks中所有任務(wù),哪個任務(wù)先成功執(zhí)行完畢,返回此任務(wù)執(zhí)行結(jié)果,其他任務(wù)取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
//提交tasks中所有任務(wù),哪個任務(wù)先成功執(zhí)行完畢,返回此任務(wù)執(zhí)行結(jié)果,其他任務(wù)取消,帶超時時間
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

2.7 關(guān)閉線程池

shutdown

/**
 * 線程池狀態(tài)變?yōu)镾HUTDOWN
 * 不會接收新任務(wù)
 * 但已提交任務(wù)會執(zhí)行完
 * 此方法不會阻塞調(diào)用線程的執(zhí)行
 */
void shutdown();
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改線程池狀態(tài)
        advanceRunState(SHUTDOWN);
        //僅會打斷空閑線程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //嘗試終結(jié)(沒有運行的線程可以立刻終結(jié),如果還有運行的線程也不會等)
    tryTerminate();
}

shutdownNow

/**
 * 線程池狀態(tài)變?yōu)镾TOP
 * 不會接收新任務(wù)
 * 會將隊列中的任務(wù)返回
 * 并用 interrupt 的方式中斷正在執(zhí)行的任務(wù)
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改線程池狀態(tài)
        advanceRunState(STOP);
        //打斷所有線程
        interruptWorkers();
        //獲取隊列中剩余任務(wù)
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

其他方法

//不在RUNNING 狀態(tài)的線程池,此方法返回true
boolean isShutDown();

//線程池狀態(tài)是否是TERMINATED
boolean isTerminated();

//調(diào)用shutdown 后,由于調(diào)用線程并不會等待所有任務(wù)運行結(jié)束,因此如果它想在線程池TERMINATED 后做一些事情,可以用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdown");

        pool.shutdown();

    }
}

執(zhí)行結(jié)果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3


public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdown");

        try {
            pool.awaitTermination(3, TimeUnit.SECONDS); //不用,因為不知道等多久合適
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("other...");

    }
}

執(zhí)行結(jié)果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
other...


public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdownNow");

        List<Runnable> task = pool.shutdownNow();//返回隊列中任務(wù)

    }
}

執(zhí)行結(jié)果:
shutdownNow
running 1
running 2


2.8. 池大小

  • 過小會導(dǎo)致程序不能充分利用系統(tǒng)資源、容易導(dǎo)致饑餓
  • 過大會導(dǎo)致更多的線程上下文切換,占用更多的資源

2.8.1 CPU密集型

通常采用 CPU核數(shù) + 1 能夠?qū)崿F(xiàn)最優(yōu)的CPU利用率,+1 是保證當(dāng)線程由于頁缺失故障(操作系統(tǒng))或其他原因?qū)е聲和r,額外的這個線程就能頂上去,保證CPU時鐘周期不被浪費

2.8.2 I/O密集型

CPU不總是處于繁忙狀態(tài),例如,當(dāng)你執(zhí)行業(yè)務(wù)計算時,這時候會使用CPU資源,但當(dāng)你執(zhí)行I/O操作時,遠程RPC調(diào)用時,包括進行數(shù)據(jù)庫操作時,這時候CPU就閑下來了,你可以利用多線程提高他的利用率

經(jīng)驗公式如下
線程數(shù) = 核數(shù) * CPU利用率 * 總時間(CPU計算時間 + 等待時間)/ CPU計算時間

例如:4核CPU計算時間是50%,其他等待時間是50%,期望CPU被100%利用,套用公式

4*100% *100% / 50% = 8

2.9 任務(wù)調(diào)度線程池

在任務(wù)調(diào)度線程池功能加入之前,可以使用java.util.Timer來實現(xiàn)定時功能,Timer 的優(yōu)點在于簡單易用,但由于所有的任務(wù)都是由同一個線程來調(diào)度,因此所有任務(wù)都是串行執(zhí)行的,同一時間只能有一個任務(wù)在執(zhí)行,前一個任務(wù)的延遲或異常都將會影響到之后的任務(wù)。

public class TimerTest {

    public static void main(String[] args){
        Timer timer = new Timer();

        TimerTask timerTask1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(" task1");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        TimerTask timerTask2 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(" task2");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //使用Timer添加兩個任務(wù),希望他們都在 1s 后執(zhí)行
        //Timer 內(nèi)只有一個線程來順序執(zhí)行隊列中的任務(wù)

        timer.schedule(timerTask1, 1000);
        timer.schedule(timerTask2, 1000);
    }
}

2.9.1 使用任務(wù)調(diào)度線程池改進

public class TimerTest {

    public static void main(String[] args){
        
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        pool.schedule(()->{
            System.out.println(" task1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);

        pool.schedule(()->{
            System.out.println(" task2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);

    }
}

3. Fork/Join

Fork/Join 是JDK 1.7 新加入的線程池實現(xiàn),

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容