JUC源碼分析-集合篇(十):DelayQueue

DelayQueue 是一個無界延時阻塞隊列,元素順序按照過期時間排序,通過顯式鎖 ReentrantLock 保證并發(fā)安全,隊列中的存儲的元素必須實現(xiàn) Delayed 接口,也就是說只允許放入可以“延期”的元素。獲取元素時,只有當元素期滿之后才可獲取。DelayQueue 相對比較簡單,不過在我們之后線程池源碼分析中會遇到,所以這里也簡單介紹一下。

概述

DelayQueue 的延時策略可以總結(jié)為以下幾點:

  1. 存儲元素必須實現(xiàn)Delayed接口
  2. 內(nèi)部持有一個ReentrantLock保證線程安全
  3. 使用優(yōu)先級隊列PriorityQueue實現(xiàn)元素存儲
  4. 持有一個優(yōu)化內(nèi)部阻塞通知的線程leader
  5. 用于實現(xiàn)阻塞的Condition對象

關(guān)于 Delayed 和 PriorityQueue:

DelayQueue 其實就是在每次往優(yōu)先級隊列中添加元素,然后以元素的 delay(過期值)作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列里取出來都是最先要過期的元素。

數(shù)據(jù)結(jié)構(gòu)及核心參數(shù)

DelayQueue 繼承關(guān)系
//鎖
private final transient ReentrantLock lock = new ReentrantLock();

//內(nèi)部使用PriorityQueue存儲元素
private final PriorityQueue<E> q = new PriorityQueue<E>();

//等待獲取隊列頭元素的線程
private Thread leader = null;

//當一個新任務在隊列的頭部可用,或者新線程可能需要成為leader時,喚醒等待條件
private final Condition available = lock.newCondition();

這里我們主要介紹一下leader變量:
leader是等待獲取隊列頭元素的線程,應用主從式設計減少不必要的等待。當一個線程為leader,它只會等待下一個延遲屆期,但是其他線程的等待是不確定的。在從take()poll()獲取數(shù)據(jù)返回前,leader線程必須喚醒其他等待的線程,除非其他線程在這期間變成leader。如果隊列頭被一個有著更快過期時間的元素替換掉,leader將會被設置為null而失效,并喚醒其他等待線程(不一定是當前leader線程)。所以等待線程在等待期間必須時刻準備獲取或失去 leader 權(quán)限。

源碼解析

offer(E e)

DelayQueue 添加或入列操作方法包括put、add、offer,都是通過offer方法實現(xiàn),所以我們這里只對offer進行解析:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //調(diào)用priorityQueue的offer出列
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

說明:首先執(zhí)行鎖操作,把元素添加到優(yōu)先級隊列priorityQueue中;如果當前元素是隊列的頭元素,則設置leader為空,并喚醒所有等待available的線程;最后釋放鎖。
這里用到了PriorityQueueoffer()peek()方法。PriorityQueue 的入列操作與 PriorityBlockingQueue 基本一致,這里大家可以參考筆者的另一篇文章JUC源碼分析-集合篇(七):PriorityBlockingQueue,查看 PriorityBlockingQueue 的入列方式,本篇就不多贅述了。

take()

DelayQueue 的出列或獲取元素方法包括poll、take、peek,poll直接獲取元素,如果隊列中沒有屆期元素返回null;take會一直等待元素可用;peek只獲取但不移除元素。相對來說,take方法內(nèi)包括了另外兩個方法的邏輯,所以這里我們只針對take方法進行分析:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //自旋
        for (;;) {
            E first = q.peek();
            if (first == null)//首節(jié)點為空,等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);//延時時間
                if (delay <= 0)
                    return q.poll();
                //釋放first的引用,避免內(nèi)存泄漏
                first = null; // don't retain ref while waiting
                if (leader != null)//leader不為空,證明有其他線程已經(jīng)獲取到leader,加入條件隊列等到延時結(jié)束
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;//leader指向當前線程
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)//檢查是否被其他線程改變,沒有就重置,再次循環(huán)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)//leader為空并且隊列不空,說明沒有其他線程在等待,那就通知條件隊列
            available.signal();
        lock.unlock();
    }
}

說明:函數(shù)執(zhí)行邏輯如下:

  1. 首先對隊列加(響應中斷)鎖;
  2. 調(diào)用priorityQueue.peek()方法獲取首節(jié)點;
  3. 首節(jié)點為空則調(diào)用available.await()等待被喚醒;
  4. 首節(jié)點不為空,并且延遲時間為0則執(zhí)行出列操作;
  5. 首節(jié)點不為空但是尚未屆期,則首先釋放 first 的引用,防止內(nèi)存泄漏,然后執(zhí)行如下邏輯:
  • leader不為空,證明有其他線程已經(jīng)獲取到leader,當前線程加入等待條件隊列;
  • leader為空,使leader指向當前線程,等待節(jié)點的 delay 時間;
  1. 處理結(jié)束,如果隊列中還有節(jié)點就喚醒在available條件上等待的線程。
  2. unlock

小結(jié)

如果你理解了我們前幾章的內(nèi)容,會發(fā)現(xiàn) DelayQueue 還是比較簡單的。
本章重點:DelayQueue 的延時策略。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容