[轉(zhuǎn)] Java 無(wú)界阻塞隊(duì)列 DelayQueue 入門(mén)實(shí)戰(zhàn)

原文出處:http://cmsblogs.com/chenssy

DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。里面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊(duì)列里面沒(méi)有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說(shuō)只有在延遲期到時(shí)才能夠從隊(duì)列中取元素。

DelayQueue主要用于兩個(gè)方面:

  • 緩存:清掉緩存中超時(shí)的緩存數(shù)據(jù)
  • 任務(wù)超時(shí)處理

DelayQueue

DelayQueue實(shí)現(xiàn)的關(guān)鍵主要有如下幾個(gè):

  1. 可重入鎖ReentrantLock
  2. 用于阻塞和通知的Condition對(duì)象
  3. 根據(jù)Delay時(shí)間排序的優(yōu)先級(jí)隊(duì)列:PriorityQueue
  4. 用于優(yōu)化阻塞通知的線程元素leader

ReentrantLock、Condition這兩個(gè)對(duì)象就不需要闡述了,他是實(shí)現(xiàn)整個(gè)BlockingQueue的核心。PriorityQueue是一個(gè)支持優(yōu)先級(jí)線程排序的隊(duì)列(參考【死磕Java并發(fā)】-----J.U.C之阻塞隊(duì)列:PriorityBlockingQueue),leader后面闡述。這里我們先來(lái)了解Delay,他是實(shí)現(xiàn)延時(shí)操作的關(guān)鍵。

Delayed

Delayed接口是用來(lái)標(biāo)記那些應(yīng)該在給定延遲時(shí)間之后執(zhí)行的對(duì)象,它定義了一個(gè)long getDelay(TimeUnit unit)方法,該方法返回與此對(duì)象相關(guān)的的剩余時(shí)間。同時(shí)實(shí)現(xiàn)該接口的對(duì)象必須定義一個(gè)compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

如何使用該接口呢?上面說(shuō)的非常清楚了,實(shí)現(xiàn)該接口的getDelay()方法,同時(shí)定義compareTo()方法即可。

內(nèi)部結(jié)構(gòu)

先看DelayQueue的定義:

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
            implements BlockingQueue<E> {
        /** 可重入鎖 */
        private final transient ReentrantLock lock = new ReentrantLock();
        /** 支持優(yōu)先級(jí)的BlockingQueue */
        private final PriorityQueue<E> q = new PriorityQueue<E>();
        /** 用于優(yōu)化阻塞 */
        private Thread leader = null;
        /** Condition */
        private final Condition available = lock.newCondition();

        /**
         * 省略很多代碼
         */
    }

看了DelayQueue的內(nèi)部結(jié)構(gòu)就對(duì)上面幾個(gè)關(guān)鍵點(diǎn)一目了然了,但是這里有一點(diǎn)需要注意,DelayQueue的元素都必須繼承Delayed接口。同時(shí)也可以從這里初步理清楚DelayQueue內(nèi)部實(shí)現(xiàn)的機(jī)制了:以支持優(yōu)先級(jí)無(wú)界隊(duì)列的PriorityQueue作為一個(gè)容器,容器里面的元素都應(yīng)該實(shí)現(xiàn)Delayed接口,在每次往優(yōu)先級(jí)隊(duì)列中添加元素時(shí)以元素的過(guò)期時(shí)間作為排序條件,最先過(guò)期的元素放在優(yōu)先級(jí)最高。

offer()

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向 PriorityQueue中插入元素
            q.offer(e);
            // 如果當(dāng)前元素的對(duì)首元素(優(yōu)先級(jí)最高),leader設(shè)置為空,喚醒所有等待線程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            // 無(wú)界隊(duì)列,永遠(yuǎn)返回true
            return true;
        } finally {
            lock.unlock();
        }
    }

offer(E e)就是往PriorityQueue中添加元素,具體可以參考(【死磕Java并發(fā)】-----J.U.C之阻塞隊(duì)列:PriorityBlockingQueue)。整個(gè)過(guò)程還是比較簡(jiǎn)單,但是在判斷當(dāng)前元素是否為對(duì)首元素,如果是的話則設(shè)置leader=null,這是非常關(guān)鍵的一個(gè)步驟,后面闡述。

take()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 對(duì)首元素
                E first = q.peek();
                // 對(duì)首為空,阻塞,等待off()操作喚醒
                if (first == null)
                    available.await();
                else {
                    // 獲取對(duì)首元素的超時(shí)時(shí)間
                    long delay = first.getDelay(NANOSECONDS);
                    // <=0 表示已過(guò)期,出對(duì),return
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // leader != null 證明有其他線程在操作,阻塞
                    if (leader != null)
                        available.await();
                    else {
                        // 否則將leader 設(shè)置為當(dāng)前線程,獨(dú)占
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 超時(shí)阻塞
                            available.awaitNanos(delay);
                        } finally {
                            // 釋放leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 喚醒阻塞線程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

首先是獲取對(duì)首元素,如果對(duì)首元素的延時(shí)時(shí)間 delay <= 0 ,則可以出對(duì)了,直接return即可。否則設(shè)置first = null,這里設(shè)置為null的主要目的是為了避免內(nèi)存泄漏。如果 leader != null 則表示當(dāng)前有線程占用,則阻塞,否則設(shè)置leader為當(dāng)前線程,然后調(diào)用awaitNanos()方法超時(shí)等待。

first = null

這里為什么如果不設(shè)置first = null,則會(huì)引起內(nèi)存泄漏呢?線程A到達(dá),列首元素沒(méi)有到期,設(shè)置leader = 線程A,這是線程B來(lái)了因?yàn)閘eader != null,則會(huì)阻塞,線程C一樣。假如線程阻塞完畢了,獲取列首元素成功,出列。這個(gè)時(shí)候列首元素應(yīng)該會(huì)被回收掉,但是問(wèn)題是它還被線程B、線程C持有著,所以不會(huì)回收,這里只有兩個(gè)線程,如果有線程D、線程E...呢?這樣會(huì)無(wú)限期的不能回收,就會(huì)造成內(nèi)存泄漏。

這個(gè)入隊(duì)、出對(duì)過(guò)程和其他的阻塞隊(duì)列沒(méi)有很大區(qū)別,無(wú)非是在出對(duì)的時(shí)候增加了一個(gè)到期時(shí)間的判斷。同時(shí)通過(guò)leader來(lái)減少不必要阻塞。

本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容