Java線程池的使用

image.png

使用線程池的原因

  1. 無線創(chuàng)建線程的不足

    在生產(chǎn)環(huán)境中,為每一個任務(wù)都分配一個線程這種方法存在一些缺陷:

    • 線程生命周期的開銷:線程的創(chuàng)建與銷毀都會消耗大量資源,頻繁創(chuàng)建與銷毀線程會帶來很大的資源開銷

    • 資源消耗:活躍的線程會消耗系統(tǒng)資源。如果可運行的線程數(shù)量大于可用的處理器數(shù)量,閑置的線程會占用許多內(nèi)存,并且頻繁的線程上下文切換也會帶來很大的性能開銷

    • 穩(wěn)定性:操作系統(tǒng)在可創(chuàng)建的線程數(shù)量上有一個限制。在高負載情況下,應(yīng)用程序很有可能突破這個限制,資源耗盡后很可能拋出OutOfMemoryError異常

  2. 提高響應(yīng)速度

    任務(wù)到達時,不再需要創(chuàng)建線程就可以立即執(zhí)行

  3. 線程池提供了管理線程的功能

    比如,可以統(tǒng)計任務(wù)的完成情況,統(tǒng)計活躍線程與閑置線程的數(shù)量等

使用場景

  • 不適用場合
  1. 依賴性任務(wù)

    在線程池中,如果任務(wù)依賴于其他任務(wù),那么可能產(chǎn)生死鎖。比如,在單線程的Executor中,如果一個任務(wù)將另一個任務(wù)提交到同一個Executor,并且等待這個被提交任務(wù)的結(jié)果,那么通常會引發(fā)死鎖

  2. 使用ThreadLocal的任務(wù)

    ThreadLocal可以存儲線程級變量,將變量封閉到特定的線程當中。然而使用線程池時,這些線程都會被自由的重用,在線程池的線程中不應(yīng)該使用ThreadLocal在任務(wù)之間傳遞值。

    當線程本地值的生命周期受限于任務(wù)的生命周期時,可以在線程池的線程中使用ThreadLocal,任務(wù)結(jié)束后調(diào)用ThreadLocal.remove方法將已存儲的值清除。

  3. 使用線程封閉機制的任務(wù)

    在單線程應(yīng)用程序中,不用考慮對象的并發(fā)安全問題,他們都被很好的封閉在單個線程當中。如果將單線程的環(huán)境換成線程池,那么這些對象有可能造成并發(fā)安全問題,失去線程安全性

  4. 不同類型或運行時長差異較大的任務(wù)

    不同類型任務(wù)之間很可能存在依賴,并且他們執(zhí)行的時長也不相同,在線程池中運行時很有可能造成擁塞,甚至死鎖

  • 適用場合

    當任務(wù)是同類型且相互獨立時,線程池的性能可以達到最佳

    網(wǎng)頁服務(wù)器、文件服務(wù)器、郵件服務(wù)器,他們的請求往往是同類型且相互獨立的

架構(gòu)

線程池異常處理方案這篇博客中已經(jīng)提到了線程池的架構(gòu),如圖:

image.png

Executor:異步任務(wù)執(zhí)行框架的基礎(chǔ)

public interface Executor {
    void execute(Runnable command);
}

通過使用Executor,將請求處理任務(wù)的提交與任務(wù)的實際執(zhí)行解耦,只需要采用另一種不同的Executor實現(xiàn),就可以改變服務(wù)器的行為。比如:

// 為每個任務(wù)分配一個線程
public class ThreadPerTaskExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}
// 以同步的方式執(zhí)行每個任務(wù)
public class WithinThreadExecutor implements Executor{
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService:ExecutorService擴展了Executor接口,添加了一些用于管理生命周期和任務(wù)提交的方法

public interface ExecutorService extends Executor {
    // 生命周期管理
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 任務(wù)提交
    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService的生命周期有3中狀態(tài):運行、關(guān)閉、終止。ExecutorService在初始創(chuàng)建時處于運行狀態(tài)。shutdown方法執(zhí)行平緩的關(guān)閉過程:不再接受新任務(wù),同時等待已提交的任務(wù)執(zhí)行完成,包括在任務(wù)隊列中尚未開始的任務(wù)。shutdownNow方法將嘗試取消所有運行中的任務(wù),并不再啟動隊列中尚未執(zhí)行的任務(wù)。

所有任務(wù)完成后,ExecutorService將轉(zhuǎn)入終止狀態(tài)??梢哉{(diào)用awaitTermination來等待ExecutorService到達終止狀態(tài),或者通過輪詢isTerminated來判斷ExecutorService是否終止。

AbstractExecutorService ThreadPoolExecutor ScheduledThreadPoolExecutor: 線程池的實現(xiàn)

ThreadPoolExecutor擴展了ExecutorService接口,是線程池的具體實現(xiàn)。ScheduledThreadPoolExecutor支持定時以及周期性任務(wù)的執(zhí)行。

ThreadPoolExecutor支持兩種方式的任務(wù)提交:exec.execute(Runnable r)以及exec.submit(Runnable r)。關(guān)于任務(wù)的這兩種提交方式在線程池異常處理方案已經(jīng)提到過了,不再贅述。

定制線程池

先來了解一下線程池的創(chuàng)建:

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

以上是ThreadPoolExecutor的構(gòu)造函數(shù),看一下每個參數(shù)的含義:

  1. corePoolSize

corePoolSize(線程池的基本大?。寒斕峤灰粋€任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads方法,線程池會提前創(chuàng)建并啟動所有基本線程。

  1. runnableTaskQueue(任務(wù)隊列):用于保存等待執(zhí)行的任務(wù)的阻塞隊列。 可以選擇以下幾個阻塞隊列。

    • ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。

    • LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。

    • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。

    • PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列。

  2. maximumPoolSize(線程池最大大?。壕€程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是如果使用了無界的任務(wù)隊列這個參數(shù)就沒什么效果。

  3. ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字。

  4. RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。下面會有介紹幾種飽和策略。

  5. keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務(wù)很多,并且每個任務(wù)執(zhí)行的時間比較短,可以調(diào)大這個時間,提高線程的利用率。

  6. TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

image.png

設(shè)置線程池的大小

線程池過大,會導致大量的線程在很少的cpu和內(nèi)存資源上發(fā)生競爭,頻繁的線程上下文切換也會帶來額外的性能開銷。線程池過小,導致許多空閑的處理器無法執(zhí)行工作,降低吞吐率。

  1. cpu密集型

對于計算密集型的任務(wù),當系統(tǒng)擁有n個處理器時,將線程池大小設(shè)置為n+1通常可以實現(xiàn)最優(yōu)利用率。

  1. io密集型

對于包含io操作或其他阻塞操作的任務(wù),由于線程不會一直執(zhí)行,因此線程池的規(guī)模應(yīng)該更大。有這么一個簡單的公式:

N[threads] = N[cpu] * U[cpu] * (1 + W/C)

其中,N[threads]是線程池的大小,U[cpu]是cpu的利用率,W/C是任務(wù)等待時間與任務(wù)執(zhí)行時間的比值。

可以通過一些監(jiān)控工具獲得cpu利用率等,Runtime.getRuntime().availableProcessors()返回cpu的數(shù)目

  1. 資源依賴

如果任務(wù)還依賴一些其他的有限資源,比如數(shù)據(jù)庫連接,文件句柄等,那么這些資源也會影響線程池的大?。河嬎忝總€任務(wù)對該資源的需求量,用該資源的可用總量除以每個任務(wù)的需求量,所得的結(jié)果就是線程池大小的上限。

Executors

Executors提供了許多靜態(tài)工廠方法來創(chuàng)建一個線程池:

newCachedThreadPool
創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool
創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。

具體情況可以結(jié)合Executors源碼和ThreadPoolExecutor的構(gòu)造函數(shù)查看。我們也可以模仿Executors的這幾個工廠方法來定制自己的線程池執(zhí)行策略。

擴展ThreadPoolExecutor

  1. 線程池異常處理方案這篇總結(jié)曾經(jīng)提到重寫ThreadPoolExecutor的afterExecute方法來處理未檢測異常,這就是擴展ThreadPoolExecutor的一個例子。除此之外,還可以在這些方法中添加日志、計時、監(jiān)視等功能。

線程池完成關(guān)閉操作后會調(diào)用方法terminated。terminated可以用來釋放Executor在其生命周期中分配的各種資源,以及執(zhí)行發(fā)送通知、記錄日志等操作。

下面編寫一個利用beforeExecute、afterExecute和terminated添加日志記錄和統(tǒng)計信息收集的擴展ThreadPoolExecutor。

public class TimingThreadPool extends ThreadPoolExecutor{
    // 使用ThreadLocal存儲任務(wù)起始時間,在beforeExecute設(shè)置起始時間,在afterExecute中可以看到這個值
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private final Logger logger = Logger.getLogger(TimingThreadPool.class.getName());
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();
    
    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        logger.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            logger.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }
    
    @Override
    protected void terminated() {
        try {
            logger.fine(String.format("Terminated: avg time=%dns", totalTime.get()/numTasks.get()));
        } finally {
            super.terminated();
        }
    }

}
  1. 擴展ThreadPoolExecutor的newTaskFor方法可以修改通過submit方法返回的默認Future實現(xiàn)FutureTask為自己的實現(xiàn)。在我們自己實現(xiàn)Future的類中可以針對任務(wù)做一些操作,比如定制任務(wù)的取消行為:
public class CacellingExecutor extends ThreadPoolExecutor {

    public CacellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    
    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask) {
            return ((CancellableTask<T>)callable).newTask();
        }
        return super.newTaskFor(callable);
    }

}
interface CancellableTask<T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}

abstract class SocketUsingTask<T> implements CancellableTask<T> {
    private Socket socket;
    public SocketUsingTask(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void cancel() {在并發(fā)應(yīng)用程序中,線程池是很重要的一塊。讀完《java并發(fā)編程實戰(zhàn)》以及研究了一遍jdk源代碼之后,總結(jié)一下線程池方面的知識~
        try {
            this.socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this){
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

異常處理

異常處理這部分,在前面的博客中已經(jīng)總結(jié)過了:線程池異常處理方案

飽和策略

當線程池達到飽和以后(maximumPoolSzie),飽和策略開始發(fā)揮作用。ThreadPoolExecutor的飽和策略可以通過setRejectedExecutionHandler來修改。當某個任務(wù)被提交到一個已經(jīng)關(guān)閉的Executor時,也會用到飽和策略。jdk提供了幾種不同的RejectedExecutionHandler實現(xiàn):

  1. AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

AbortPolicy是默認的飽和策略,該飽和策略將拋出未檢查的RejectedExecutionException。調(diào)用者可以處理這個異常。

  1. CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

CallerRunsPolicy將任務(wù)回退到調(diào)用者,他不會在線程池的某個線程中提交任務(wù),而是在調(diào)用execute的線程中運行,從而降低新任務(wù)的流量。

  1. DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

DiscardPolicy會悄悄拋棄任務(wù),什么也不做。

  1. DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardOldestPolicy會拋棄下一個將被執(zhí)行的任務(wù),然后重新嘗試提交任務(wù)。

其他

  1. CompletionService

如果向Executor提交了一組計算任務(wù),并希望在計算完成后獲取結(jié)果,那么可以保留與每個任務(wù)關(guān)聯(lián)的Future,然后輪詢這些future的get方法,判斷任務(wù)是否完成。這種方法雖然可行,但是有些繁瑣。

CompletionService將Executor和BlockingQueue的功能融合在一起,可以將任務(wù)提交給他執(zhí)行,然后使用類似于隊列的take或poll方法獲取已完成結(jié)果。

ExecutorCompletionService 實現(xiàn)了CompletionService,他的實現(xiàn)很簡單,在構(gòu)造函數(shù)中創(chuàng)建一個BlockingQueue來保存計算完成的結(jié)果。當提交某個任務(wù)時,該任務(wù)首先包裝成為一個QueueingFuture,這是FutureTask的一個子類,他改寫了done方法,將結(jié)果放入BlockingQueue中。ExecutorCompletionService的take和poll方法委托給了BlockingQueue。

  1. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor以延遲或定時的方式執(zhí)行任務(wù),類似于Timer。由于Timer的一些缺陷,可以使用ScheduledThreadPoolExecutor來代替Timer。

Timer在執(zhí)行所有的定時任務(wù)時只會創(chuàng)建一個線程,如果某個任務(wù)執(zhí)行時間過長,就會破壞其他TimerTask的定時準確性。TimerTask拋出異常后,Timer線程也不會捕獲這個異常,從而終止定時線程。尚未執(zhí)行的TimerTask不會再執(zhí)行,新的任務(wù)也不會被調(diào)度。

參考

java并發(fā)編程實戰(zhàn)

聊聊并發(fā)(三)——JAVA線程池的分析和使用

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容