Java基礎(chǔ)-線程池

Android知識總結(jié)

一、線程池

Java中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過程中,合理地使用線程池能夠帶來3個好處。
1)、使用線程池有哪些優(yōu)點

  • 1、 降低資源消耗
      通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。因為CPU創(chuàng)建和銷毀經(jīng)上下文切換,會消耗資源。
  • 2、 提高響應(yīng)速度,減少CPU的調(diào)度
      當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
      假設(shè)一個服務(wù)器完成一項任務(wù)所需時間為:T1 創(chuàng)建線程時間,T2 在線程中執(zhí)行任務(wù)的時間,T3 銷毀線程時間。如果:T1 + T3 遠大于 T2,則可以采用線程池,以提高服務(wù)器性能。線程池技術(shù)正是關(guān)注如何縮短或調(diào)整T1,T3時間的技術(shù),從而提高服務(wù)器程序性能的。
      它把T1,T3分別安排在服務(wù)器程序的啟動和結(jié)束的時間段或者一些空閑的時間段,這樣在服務(wù)器程序處理客戶請求時,不會有T1,T3的開銷了。
  • 3、提高線程的可管理性,來維護我們的線程
      線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

2)、實現(xiàn)線程池

  BlockingQueue blockingQueue = new ArrayBlockingQueue(4);
  private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
 //corePoolSize :線程池中核心線程數(shù)的最大值, 核心線程處于運行態(tài),避免線程重建提高效率
 //maximumPoolSize :線程池中能擁有最多線程數(shù)
 //workQueue:用于緩存任務(wù)的阻塞隊列
 //keepAliveTime :表示空閑線程的存活時間,超過這個時間就會消毀
 //TimeUnit unit :表示keepAliveTime的單位。
 //RejectedExecutionHandler:拒絕策略,當(dāng)超過最大線程數(shù)時,用來拒絕新的任務(wù)
  ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
//JDK 提供的獲取CPU可用的核心數(shù)
                Runtime.getRuntime().availableProcessors(), 
//因為操作系統(tǒng)引入了虛擬內(nèi)存技術(shù),可能出現(xiàn)也缺失,再頁缺失時為了把cpu填滿所以加1
                Runtime.getRuntime().availableProcessors() + 1,
                60,
                TimeUnit.SECONDS,
                blockingQueue,
                defaultHandler);
   poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3 * 1000);
                    System.out.println("-------------helloworld_001---------------" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

3)、線程池的工作機制

  • 1)如果當(dāng)前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
  • 2)如果運行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
  • 3)如果無法將任務(wù)加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)。
  • 4)如果創(chuàng)建新線程將使當(dāng)前運行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。

4)、線程池執(zhí)行示意圖

線程池執(zhí)行圖

5)、線程池中各個參數(shù)

  • corePoolSize
      線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;
      如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行;
      如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
  • maximumPoolSize
      線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize。
  • keepAliveTime
      線程空閑時的存活時間,即當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時才有用。
  • TimeUnit
    keepAliveTime的時間單位
  • workQueue
      workQueue必須是BlockingQueue阻塞隊列。當(dāng)線程池中的線程數(shù)超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。通過workQueue,線程池實現(xiàn)了阻塞功能。
      一般來說,我們應(yīng)該盡量使用有界隊列,因為使用無界隊列作為工作隊列會對線程池帶來如下影響。
    1)、當(dāng)線程池中的線程數(shù)達到corePoolSize后,新任務(wù)將在無界隊列中等待,因此線程池中的線程數(shù)不會超過corePoolSize。
    2)、由于1,使用無界隊列時maximumPoolSize將是一個無效參數(shù)。
    3)、由于1和2,使用無界隊列時keepAliveTime將是一個無效參數(shù)。
    4)、更重要的,使用無界queue可能會耗盡系統(tǒng)資源,有界隊列則有助于防止資源耗盡,同時即使使用有界隊列,也要盡量控制隊列的大小在一個合適的范圍。
  • threadFactory
      創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設(shè)置一個具有識別度的線程名,當(dāng)然還可以更加自由的對線程做更多的設(shè)置,比如設(shè)置所有的線程為守護線程。
      Executors靜態(tài)工廠里默認(rèn)的threadFactory,線程的命名規(guī)則是“pool-數(shù)字-thread-數(shù)字”。
  • RejectedExecutionHandler
      線程池的飽和策略,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù)。

5)、線程池的飽和策略

  • 1)AbortPolicy
    直接拋出異常,默認(rèn)策略;
  • 2)CallerRunsPolicy
    用調(diào)用者所在的線程來執(zhí)行任務(wù)
  • 3)DiscardOldestPolicy
    丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)
  • 4)DiscardPolicy
    直接丟棄加入線程池的任務(wù)

當(dāng)然也可以根據(jù)應(yīng)用場景實現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。

二、 Java通過Executors提供創(chuàng)建四種線程池的靜態(tài)方法

newCachedThreadPool 創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
適用:執(zhí)行很多短期異步的小程序或者負(fù)載較輕的服務(wù)器

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newFixedThreadPool 創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
適用:執(zhí)行長期的任務(wù),性能好很多

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newScheduledThreadPool 創(chuàng)建一個定時線程池,支持定時及周期性任務(wù)執(zhí)行。
適用:一個任務(wù)一個任務(wù)執(zhí)行的場景

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

newSingleThreadExecutor 創(chuàng)建一個單利的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
適用:周期性執(zhí)行任務(wù)的場景

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }


    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

例子

    ExecutorService executorService = Executors.newCachedThreadPool();  
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

二、 ExecutorService 的submit() 與execute()區(qū)別

  • 1、接收的參數(shù)不一樣
    submit()可以接受runnable和callable 有返回值
    execute()接受runnable 無返回值,所以無法判斷任務(wù)是否被線程池執(zhí)行成功。

  • 2、submit()方法用于提交需要返回值的任務(wù)
      線程池會返回一個future類型的對象,通過這個future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完

  • 3、submit方便Exception處理
      意思就是如果你在你的task里會拋出checked或者unchecked exception,而你又希望外面的調(diào)用者能夠感知這些exception并做出及時的處理,那么就需要用到submit,通過捕獲Future.get拋出的異常。

三、關(guān)閉線程池

可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠無法終止。

  • 1、shutdown() 只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。

  • 2、shutdownNow() 首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。

只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。

一般分兩個階段關(guān)閉 ExecutorService。第一階段調(diào)用 shutdown 拒絕傳入任務(wù),然后調(diào)用 shutdownNow(如有必要)取消所有遺留的任務(wù)

    // 啟動一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。
    threadPool.shutdown();

四、常用阻塞隊列 BlockingQueue

什么是阻塞隊列

  • 1)支持阻塞的插入方法:意思是當(dāng)隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡?br> 在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序整體處理數(shù)據(jù)的速度。

入對和出隊方法

對應(yīng)關(guān)系表
  • 拋出異常:當(dāng)隊列滿時,如果再往隊列里插入元素,會拋出IllegalStateException("Queuefull")異常。當(dāng)隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常。
  • 返回特殊值:當(dāng)往隊列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊列里取出一個元素,如果沒有則返回null。
  • 一直阻塞:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程,直到隊列可用或者響應(yīng)中斷退出。當(dāng)隊列空時,如果消費者線程從隊列里take元素,隊列會阻塞住消費者線程,直到隊列不為空。
  • 超時退出:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里插入元素,隊列會阻塞生產(chǎn)者線程一段時間,如果超過了指定的時間,生產(chǎn)者線程就會退出。

有界無界

有限隊列就是長度有限,滿了以后生產(chǎn)者會阻塞;無界隊列就是里面能放無數(shù)的東西而不會因為隊列長度限制被阻塞,當(dāng)然空間限制來源于系統(tǒng)資源的限制,如果處理不及時,導(dǎo)致隊列越來越大越來越大,超出一定的限制致使內(nèi)存超限,操作系統(tǒng)或者JVM幫你解決煩惱,直接把你 OOM 省事了。

無界也會阻塞,為何?因為阻塞不僅僅體現(xiàn)在生產(chǎn)者放入元素時會阻塞,消費者拿取元素時,如果沒有元素,同樣也會阻塞。

1、ArrayBlockingQueue
ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界阻塞隊列,其內(nèi)部按先進先出(FIFO)的原則對元素進行排序,默認(rèn)情況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程,可以按照阻塞的先后順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當(dāng)隊列可用時,阻塞的線程都可以爭奪訪問隊列的資格,有可能先阻塞的線程最后才訪問隊列。初始化時有參數(shù)可以設(shè)置。

//默認(rèn)非公平阻塞隊列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//公平阻塞隊列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);

2、LinkedBlockingQueue
LinkedBlockingQueue是一個由鏈表實現(xiàn)的有界隊列阻塞隊列,但大小默認(rèn)值為Integer.MAX_VALUE,此隊列按照先進先出的原則對元素進行排序。所以我們在使用LinkedBlockingQueue時建議手動傳值,為其提供我們所需的大小,避免隊列過大造成機器負(fù)載或者內(nèi)存爆滿等情況

3、SynchronousQueue
是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續(xù)添加元素。SynchronousQueue可以看成是一個傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合傳遞性場景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

4、DelayQueue
是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現(xiàn)。隊列中的元素必須實現(xiàn)Delayed接口,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素。只有在延遲期滿時才能從隊列中提取元素。

  • DelayQueue非常有用,可以將DelayQueue運用在以下應(yīng)用場景。
    緩存系統(tǒng)的設(shè)計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

5、PriorityBlockingQueue
一個支持優(yōu)先級排序的無界阻塞隊列。默認(rèn)情況下元素采取自然順序升序排列。也可以自定義類實現(xiàn)compareTo()方法來指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時,指定構(gòu)造參數(shù)Comparator來對元素進行排序。需要注意的是不能保證同優(yōu)先級元素的順序。

6、LinkedTransferQueue
一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。

多了tryTransfer和transfer方法

  • (1)transfer方法
    如果當(dāng)前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節(jié)點,并等到該元素被消費者消費了才返回。
  • (2)tryTransfer方法
    tryTransfer方法是用來試探生產(chǎn)者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區(qū)別是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法是必須等到消費者消費了才返回。

7、LinkedBlockingDeque
LinkedBlockingDeque是一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。

多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結(jié)尾的方法,表示插入、獲?。╬eek)或移除雙端隊列的第一個元素。以Last單詞結(jié)尾的方法,表示插入、獲取或移除雙端隊列的最后一個元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法卻等同于takeFirst,不知道是不是JDK的bug,使用時還是用帶有First和Last后綴的方法更清楚。在初始化LinkedBlockingDeque時可以設(shè)置容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。

以上的阻塞隊列都實現(xiàn)了BlockingQueue接口,也都是線程安全的。

五、合理地配置線程池

要想合理地配置線程池,就必須首先分析任務(wù)特性

要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來分析。

  • 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
  • 任務(wù)的優(yōu)先級:高、中和低。
  • 任務(wù)的執(zhí)行時間:長、中和短。
  • 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。
  CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu核心數(shù)+1個線程的線程池。(執(zhí)行純計算任務(wù);因為操作系統(tǒng)引入了虛擬內(nèi)存技術(shù),可能出現(xiàn)頁缺失,再頁缺失時為了把cpu填滿所以加1)
  IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu核心數(shù)個線程的線程池。(如從網(wǎng)絡(luò)、磁盤獲取資源;因為其遠遠慢于比CPU的執(zhí)行速度,沒有獲取數(shù)據(jù)時CPU處于等待狀態(tài)。所以一般設(shè)置為二倍的CPU核心數(shù))
  混合型的任務(wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解??梢酝ㄟ^Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)
  優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先執(zhí)行。
  執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
  建議使用有界隊列。有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點兒,比如幾千。
  如果當(dāng)時我們設(shè)置成無界隊列,那么線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。

六、線程池的簡單實現(xiàn)

class ExecutorText {
    private val mWorkThreads: MutableList<WorkThread> = ArrayList()
    val mBlockingDeque: BlockingDeque<Runnable>
    var isWorking: Boolean = true

    constructor(threadCount: Int, taskRunnableCount: Int) {
        this.mBlockingDeque = LinkedBlockingDeque(taskRunnableCount)
        for (i in 0..threadCount) {
            val work = WorkThread()
            work.start()
            this.mWorkThreads.add(work)
        }
        Runtime.getRuntime().availableProcessors()
    }


    fun execute(runnable: Runnable): Boolean {
        //把execute任務(wù)加入隊列中
        return this.mBlockingDeque.offer(runnable)
    }


    fun shutDown() {
        isWorking = false
    }

    inner class WorkThread : Thread() {
        override fun run() {
            super.run()
            while (isWorking || mBlockingDeque.size > 0) {
                var task = mBlockingDeque.poll()
                task?.run()
                task = null
            }
        }

        //中斷線超
        fun stopWork() {
            interrupt()
        }
    }

    fun onDestroy() {
        for (i in 0..mWorkThreads.size){
            mWorkThreads[i].stopWork()
        }
        mBlockingDeque.clear()
    }

}

調(diào)用

object Text {
    @JvmStatic
    fun main(argc: Array<String>) {
        val executorText = ExecutorText(3, 6)
        for (i in 0..10) {
            executorText.execute(Runnable {
                println(Thread.currentThread().name + "任務(wù)開始執(zhí)行" + i)
            })
        }
        executorText.shutDown()

    }
   
}

七、阻塞隊列(DelayQueue)

1)、訂單的實體類

public class Order {
    private final String orderNo;//訂單的編號
    private final double orderMoney;//訂單的金額
    
    public Order(String orderNo, double orderMoney) {
        super();
        this.orderNo = orderNo;
        this.orderMoney = orderMoney;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public double getOrderMoney() {
        return orderMoney;
    }
}

2)、存放的隊列的元素

public class ItemVo<T> implements Delayed {
    //到期時間,但傳入的數(shù)值代表過期的時長,傳入單位毫秒
    private long activeTime;
    private T data;//業(yè)務(wù)數(shù)據(jù),泛型

    //傳入過期時長,單位秒,內(nèi)部轉(zhuǎn)換
    public ItemVo(long expirationTime, T data) {
        this.activeTime = expirationTime * 1000 + System.currentTimeMillis();
        this.data = data;
    }

    public long getActiveTime() {
        return activeTime;
    }

    public T getData() {
        return data;
    }

    /*
     * 這個方法返回到激活日期的剩余時間,時間單位由單位參數(shù)指定。
     */
    public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.activeTime
                - System.currentTimeMillis(), unit);
        return d;
    }

    /*
     *Delayed接口繼承了Comparable接口,按剩余時間排序,實際計算考慮精度為納秒數(shù)
     */
    @Override
    public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.MILLISECONDS)
                - o.getDelay(TimeUnit.MILLISECONDS));
        if (d == 0) {
            return 0;
        } else {
            if (d < 0) {
                return -1;
            } else {
                return 1;
            }
        }
    }
}

3)、將訂單推入隊列

public class PutOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;

    public PutOrder(DelayQueue<ItemVo<Order>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        //5秒后到期
        Order orderTb = new Order("Tb12345", 366);
        ItemVo<Order> itemTb = new ItemVo<Order>(5, orderTb);
        queue.offer(itemTb);
        System.out.println("訂單5秒后超時:" + orderTb.getOrderNo() + ";"
                + orderTb.getOrderMoney());
        //8秒后到期
        Order orderJd = new Order("Jd54321", 366);
        ItemVo<Order> itemJd = new ItemVo<Order>(8, orderJd);
        queue.offer(itemJd);
        System.out.println("訂單8秒后超時:" + orderJd.getOrderNo() + ";"
                + orderJd.getOrderMoney());

    }
}

4)、取出到期的訂單的功能

public class FetchOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;
    
    public FetchOrder(DelayQueue<ItemVo<Order>> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while(true) {
            try {
                ItemVo<Order> item = queue.take();
                Order order = (Order)item.getData();
                System.out.println("Get From Queue:"+"data="
                +order.getOrderNo()+";"+order.getOrderMoney());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }   
}

5)、延時隊列測試程序

public class Test {
    public static void main(String[] args) throws InterruptedException {

        DelayQueue<ItemVo<Order>> queue = new DelayQueue<ItemVo<Order>>();//延時隊列

        new Thread(new PutOrder(queue)).start();
        new Thread(new FetchOrder(queue)).start();

        //每隔500毫秒,打印個數(shù)字
        for (int i = 1; i < 15; i++) {
            Thread.sleep(500);
            System.out.println(i * 500);
        }
    }
}

八、beforeExecute 和 afterExecute

在線程池執(zhí)行某個任務(wù)前會調(diào)用beforeExecute()方法,在任務(wù)結(jié)束后(任務(wù)異常退出)會執(zhí)行afterExecute()方法。

查看ThreadPoolExecutor源碼,在該類中定義了一個內(nèi)部類Worker, ThreadPoolExecutor線程池中的工作線程就是Worker類的實例,Worker實例在執(zhí)行時會調(diào)用beforeExecute()與afterExecute()方法。

    public static void main(String[] args) {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println("任務(wù)數(shù)量---");
            }
        };

        // 創(chuàng)建線程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)){
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("renwu 666");
            }

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("jiueshu");
            }
        };

        for (int i = 0; i < 10; i++) {
            executor.submit(r);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容