DelayQueue 是一個無界延時阻塞隊列,元素順序按照過期時間排序,通過顯式鎖 ReentrantLock 保證并發(fā)安全,隊列中的存儲的元素必須實現(xiàn) Delayed 接口,也就是說只允許放入可以“延期”的元素。獲取元素時,只有當元素期滿之后才可獲取。DelayQueue 相對比較簡單,不過在我們之后線程池源碼分析中會遇到,所以這里也簡單介紹一下。
概述
DelayQueue 的延時策略可以總結(jié)為以下幾點:
- 存儲元素必須實現(xiàn)
Delayed接口 - 內(nèi)部持有一個
ReentrantLock保證線程安全 - 使用優(yōu)先級隊列
PriorityQueue實現(xiàn)元素存儲 - 持有一個優(yōu)化內(nèi)部阻塞通知的線程
leader - 用于實現(xiàn)阻塞的
Condition對象
關(guān)于 Delayed 和 PriorityQueue:
- Delayed 是一個具有過期時間的元素類型
- PriorityQueue 是二叉堆實現(xiàn)的根據(jù)隊列里元素的某些屬性排序的的優(yōu)先級隊列,內(nèi)部持有一個比較器comparator(參考JUC源碼分析-集合篇(七):PriorityBlockingQueue)
DelayQueue 其實就是在每次往優(yōu)先級隊列中添加元素,然后以元素的 delay(過期值)作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列里取出來都是最先要過期的元素。
數(shù)據(jù)結(jié)構(gòu)及核心參數(shù)

//鎖
private final transient ReentrantLock lock = new ReentrantLock();
//內(nèi)部使用PriorityQueue存儲元素
private final PriorityQueue<E> q = new PriorityQueue<E>();
//等待獲取隊列頭元素的線程
private Thread leader = null;
//當一個新任務在隊列的頭部可用,或者新線程可能需要成為leader時,喚醒等待條件
private final Condition available = lock.newCondition();
這里我們主要介紹一下leader變量:
leader是等待獲取隊列頭元素的線程,應用主從式設計減少不必要的等待。當一個線程為leader,它只會等待下一個延遲屆期,但是其他線程的等待是不確定的。在從take()或poll()獲取數(shù)據(jù)返回前,leader線程必須喚醒其他等待的線程,除非其他線程在這期間變成leader。如果隊列頭被一個有著更快過期時間的元素替換掉,leader將會被設置為null而失效,并喚醒其他等待線程(不一定是當前leader線程)。所以等待線程在等待期間必須時刻準備獲取或失去 leader 權(quán)限。
源碼解析
offer(E e)
DelayQueue 添加或入列操作方法包括put、add、offer,都是通過offer方法實現(xiàn),所以我們這里只對offer進行解析:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//調(diào)用priorityQueue的offer出列
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
說明:首先執(zhí)行鎖操作,把元素添加到優(yōu)先級隊列priorityQueue中;如果當前元素是隊列的頭元素,則設置leader為空,并喚醒所有等待available的線程;最后釋放鎖。
這里用到了PriorityQueue的offer()和peek()方法。PriorityQueue 的入列操作與 PriorityBlockingQueue 基本一致,這里大家可以參考筆者的另一篇文章JUC源碼分析-集合篇(七):PriorityBlockingQueue,查看 PriorityBlockingQueue 的入列方式,本篇就不多贅述了。
take()
DelayQueue 的出列或獲取元素方法包括poll、take、peek,poll直接獲取元素,如果隊列中沒有屆期元素返回null;take會一直等待元素可用;peek只獲取但不移除元素。相對來說,take方法內(nèi)包括了另外兩個方法的邏輯,所以這里我們只針對take方法進行分析:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//自旋
for (;;) {
E first = q.peek();
if (first == null)//首節(jié)點為空,等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);//延時時間
if (delay <= 0)
return q.poll();
//釋放first的引用,避免內(nèi)存泄漏
first = null; // don't retain ref while waiting
if (leader != null)//leader不為空,證明有其他線程已經(jīng)獲取到leader,加入條件隊列等到延時結(jié)束
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;//leader指向當前線程
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)//檢查是否被其他線程改變,沒有就重置,再次循環(huán)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)//leader為空并且隊列不空,說明沒有其他線程在等待,那就通知條件隊列
available.signal();
lock.unlock();
}
}
說明:函數(shù)執(zhí)行邏輯如下:
- 首先對隊列加(響應中斷)鎖;
- 調(diào)用
priorityQueue.peek()方法獲取首節(jié)點; - 首節(jié)點為空則調(diào)用
available.await()等待被喚醒; - 首節(jié)點不為空,并且延遲時間為0則執(zhí)行出列操作;
- 首節(jié)點不為空但是尚未屆期,則首先釋放 first 的引用,防止內(nèi)存泄漏,然后執(zhí)行如下邏輯:
-
leader不為空,證明有其他線程已經(jīng)獲取到leader,當前線程加入等待條件隊列; -
leader為空,使leader指向當前線程,等待節(jié)點的 delay 時間;
- 處理結(jié)束,如果隊列中還有節(jié)點就喚醒在
available條件上等待的線程。 - unlock
小結(jié)
如果你理解了我們前幾章的內(nèi)容,會發(fā)現(xiàn) DelayQueue 還是比較簡單的。
本章重點:DelayQueue 的延時策略。