ThreadPoolExecutor

阿里Java開(kāi)發(fā)手冊(cè)明確java開(kāi)發(fā)使用 ThreadPoolExecutor 的方式創(chuàng)建線程池

    3. 【強(qiáng)制】線程資源必須通過(guò)線程池提供,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
        說(shuō)明:使用線程池的好處是減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷,解決資源不足的問(wèn)題。
        如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者 “過(guò)度切換”的問(wèn)題。
    4. 【強(qiáng)制】線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 的方式,
        這樣的處理方式讓寫(xiě)的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
        說(shuō)明: Executors 返回的線程池對(duì)象的弊端如下:
        1) FixedThreadPool 和 SingleThreadPool :
            允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE ,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM 。
        2) CachedThreadPool 和 ScheduledThreadPool :
            允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM 。

java五種常見(jiàn)線程池

    threadPool = Executors.newCachedThreadPool();//有緩沖的線程池,線程數(shù) JVM 控制
    threadPool = Executors.newFixedThreadPool(3);//固定大小的線程池
    threadPool = Executors.newScheduledThreadPool(2);//定時(shí)任務(wù)線程數(shù)
    threadPool = Executors.newSingleThreadExecutor();//單線程的線程池,只有一個(gè)線程在工作
    threadPool = new ThreadPoolExecutor(...);//默認(rèn)線程池,可控制參數(shù)比較多   

ThreadPoolExecutor創(chuàng)建線程池

//五個(gè)參數(shù)的構(gòu)造函數(shù),少了ThreadFactory 和 拒絕策略,使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
 
//六個(gè)參數(shù)的構(gòu)造函數(shù),少了拒絕策略,即使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)
 
//六個(gè)參數(shù)的構(gòu)造函數(shù),少了threadFactory,即使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
 
//七個(gè)參數(shù)的構(gòu)造函數(shù),全
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

corePoolSize - 線程池大小,即核心線程的最大數(shù)量

默認(rèn)線程池創(chuàng)建之后,線程池中的線程數(shù)為0,當(dāng)任務(wù)過(guò)來(lái)就會(huì)創(chuàng)建一個(gè)核心線程去執(zhí)行,當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有空閑線程,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理。直到核心線程數(shù)達(dá)到corePoolSize 之后,就會(huì)被到達(dá)的任務(wù)放在緩存隊(duì)列中。(注意是到達(dá)的任務(wù))。
核心線程會(huì)一直存活,即使沒(méi)有任務(wù)需要執(zhí)行
如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。

maximumPoolSize - 線程池的最大線程數(shù),即核心線程+非核心線程的最大數(shù)量。
keepAliveTime - 針對(duì)非核心線程,當(dāng)非核心線程的空閑時(shí)間超過(guò)keepAliveTime,則會(huì)被銷毀
unit - keepAliveTime 的時(shí)間單位。
workQueue - 用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列。
threadFactory - 線程工廠。
handler - 拒絕策略。
(本人拙見(jiàn):核心線程:體制員工 ,非核心線程:合同工)


流程


  • 如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(注意這一個(gè)步驟需要獲取全局鎖)。
  • 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
  • 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(注意這一個(gè)步驟需要獲取全局鎖)。
  • 如果創(chuàng)建的新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被執(zhí)行飽和(拒絕)策略。ThreadPoolExecutor 采用上述的設(shè)計(jì)思路,是為執(zhí)行execute()方法時(shí),盡可能避免獲取全局鎖(一個(gè)嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后,幾乎所有的execute()方法調(diào)用都是在執(zhí)行步驟2,而步驟2不需要獲取全局鎖。

workQueue(用于保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列)

  • ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列,按FIFO排序任務(wù)。
new ArrayBlockingQueue<Runnable>(100)
  • LinkedBlockingQueue : 一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQueue。
new LinkedBlockingQueue<>();
  • SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常高于LinkedBlockingQueue。
  • PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。

threadFactory (線程工廠)
自定義ThreadFactory可以實(shí)現(xiàn)自定義線程的名稱、組以及優(yōu)先級(jí)等信息,甚至可以任性的將線程設(shè)置為守護(hù)線程??傊?,自定義ThreadFactory可以更加自由的設(shè)置線程池中所有線程的狀態(tài)。

class MyTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

handler (處理并發(fā)量大的情況下,拒絕策略對(duì)程序健壯性非常有用)
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy ();

  • AbortPolicy (默認(rèn)): 丟棄任務(wù)并拋出RejectedExecutionException異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from " +e.toString());
} 
  • DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  • DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
} 
  • CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

如果你要自定義,添加寫(xiě)日志記錄啥的

public static class MyIgnorePolicy implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            doLog(r, e);
        }
        private void doLog(Runnable r, ThreadPoolExecutor e) {
            // 可做日志記錄等
            System.err.println( r.toString() + " rejected");
//          System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
        }
    }

線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)

  • shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)


  • 1、RUNNING
    狀態(tài)說(shuō)明:線程池處于RUNNING狀態(tài),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行處理。
    狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說(shuō),線程池一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0。

  • 2、SHUTDOWN
    狀態(tài)說(shuō)明:線程池處于SHUTDOWN狀態(tài),不接收新任務(wù),能夠處理已經(jīng)添加的任務(wù)。
    狀態(tài)切換:調(diào)用shutdown()方法時(shí),線程池由RUNNING -> SHUTDOWN。

  • 3、STOP
    狀態(tài)說(shuō)明:線程池處于STOP狀態(tài),不接收新任務(wù),不處理已提交的任務(wù),并且會(huì)中斷正在處理的任務(wù)。
    狀態(tài)切換:調(diào)用線程池中的shutdownNow()方法時(shí),線程池由(RUNNING or SHUTDOWN) -> STOP。

  • 4、TIDYING
    狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已經(jīng)停止,ctl記錄“任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池處于TIDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù) terminated()。terminated()在ThreadPoolExecutor類中是空, 的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)處理,可以通過(guò)重載 terminated()函數(shù)來(lái)實(shí)現(xiàn)。
    狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行任務(wù)也為空時(shí),就會(huì)由SHUTDOWN -> TIDYING。當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時(shí),就會(huì)由STOP-> TIDYING。

  • 5、TERMINATED
    狀態(tài)說(shuō)明:線程池線程池徹底停止,線程池處于TERMINATED狀態(tài),
    狀態(tài)切換:線程池處于TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后, 就會(huì)由TIDYING->TERMINATED。


submit()和execute()的區(qū)別
任務(wù)分兩類:一類是實(shí)現(xiàn)了Runnable接口的類,一類是實(shí)現(xiàn)了Callable接口的類。兩者都可以被ExecutorService執(zhí)行,它們的區(qū)別是:

  • execute(Runnable x) 沒(méi)有返回值??梢詧?zhí)行任務(wù),但無(wú)法判斷任務(wù)是否成功完成?!獙?shí)現(xiàn)Runnable接口的任務(wù)

  • submit(Runnable x) 返回一個(gè)future??梢杂眠@個(gè)future來(lái)判斷任務(wù)是否成功完成?!獙?shí)現(xiàn)Callable接口和Runnable接口的任務(wù),如果為Runnable,則get得到的為null:

  • 使用submit方法還有一個(gè)特點(diǎn)就是,他的異??梢栽谥骶€程中catch到。而使用execute方法執(zhí)行任務(wù)是捕捉不到異常的。

future.get()方法是會(huì)有阻塞性,在調(diào)用submit提交任務(wù)之后,future.get()一直等到任務(wù)執(zhí)行完畢,拿到了返回的返回值,主線程才會(huì)繼續(xù)運(yùn)行。


設(shè)置合理的線程池大小
任務(wù)一般可分為:CPU密集型、IO密集型、混合型,對(duì)于不同類型的任務(wù)需要分配不同大小的線程池。

  • CPU密集型任務(wù)
    盡量使用較小的線程池,一般為CPU核心數(shù)+1。
    因?yàn)镃PU密集型任務(wù)使得CPU使用率很高,若開(kāi)過(guò)多的線程數(shù),只能增加上下文切換的次數(shù),因此會(huì)帶來(lái)額外的開(kāi)銷。
    IO密集型任務(wù)
    可以使用稍大的線程池,一般為2*CPU核心數(shù)。
  • IO密集型任務(wù)CPU使用率并不高,因此可以讓CPU在等待IO的時(shí)候去處理別的任務(wù),充分利用CPU時(shí)間。
  • 混合型任務(wù)
    可以將任務(wù)分成IO密集型和CPU密集型任務(wù),然后分別用不同的線程池去處理。
    只要分完之后兩個(gè)任務(wù)的執(zhí)行時(shí)間相差不大,那么就會(huì)比串行執(zhí)行來(lái)的高效。
    因?yàn)槿绻麆澐种髢蓚€(gè)任務(wù)執(zhí)行時(shí)間相差甚遠(yuǎn),那么先執(zhí)行完的任務(wù)就要等后執(zhí)行完的任務(wù),最終的時(shí)間仍然取決于后執(zhí)行完的任務(wù),而且還要加上任務(wù)拆分與合并的開(kāi)銷,得不償失。


public class ThreadTest {
    //自定義ThreadFactory 
    private static class NameTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }
    //自定義拒絕策略
    private static class MyIgnorePolicy implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            doLog(r, e);
        }
        private void doLog(Runnable r, ThreadPoolExecutor e) {
            // 日志記錄
            System.err.println( r.toString() + " rejected");
        }
    }

    private final static ThreadPoolExecutor executor = new ThreadPoolExecutor
            (10, 30, 2000, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(100),new NameTreadFactory(),new MyIgnorePolicy());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //execute
        executor.execute(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("execute,無(wú)返回值");
        });
        //submit
        Future<String> future = executor.submit(()->{
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "submit,有返回值Future<String>";
        });
        System.out.println(future.get());
        System.out.println("結(jié)束");
    }
}

打印結(jié)果

my-thread-1 has been created
my-thread-2 has been created
execute,無(wú)返回值
submit,有返回值Future<String>
結(jié)束
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容