? ? 今天來說一說DelayQueue,DelayQueue并發(fā)隊列是一個無界阻塞延遲隊列,隊列中的每個元素都有一個過期時間,當從隊列獲取元素時,只有過期元素才會出隊列,不允許存放null元素。隊列頭元素是最快要過期的元素。
? ??DelayQueue內(nèi)部有一個PriorityQueue優(yōu)先隊列,存入到該隊列的元素都實現(xiàn)Delayed接口,由于每個原始都有一個過期時間,所以要實現(xiàn)獲取當前元素還剩多少時間就過期的接口,由于元素是存放在PriorityQueue優(yōu)先隊列中的,所以需要實現(xiàn)元素之間相互比較的接口。
? ??DelayQueue內(nèi)部實現(xiàn)了一個Leader-Follower的模式,用于盡量減少不必要的線程等待。當一個線程調(diào)用隊列的take方法變?yōu)閘eader線程后,它會調(diào)用條件變量available.awaitNanos(delay)等待delay時間,但其他線程(follower線程)則會調(diào)用available.await()進行無限等待。leader線程延遲時間過期后,會退出take方法,并通過調(diào)用available.signal()方法喚醒一個follower線程,被喚醒的follower線程被選舉為leader線程。當向隊列插入一個過期時間比頭元素過期時間還短的元素時,leader會被重置為null。
????DelayQueue內(nèi)部主要的成員變量:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
????下面對主要函數(shù)原理進行講解。
offer(E e)
? ? 插入元素到隊列,當元素e為null時會拋出NullPointException異常,否則由于是無界隊列,所以一直返回true。插入的元素需要實現(xiàn)Delayed接口。
public boolean offer(E e) {
//(1)嘗試獲取獨占鎖。
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
//(2)將元素放入PriorityQueue優(yōu)先隊列中,如果e為null則會拋出NullPointException異常。
? ? ? ? q.offer(e);
//(3)如果當前插入的元素成為隊列中的第一個節(jié)點(即插入元素的過期時間比隊列中的所有元素的過期時間都小)則將leader設(shè)置為null,并調(diào)用available.signal()方法激活available條件隊列中的一個線程。
? ? ? ? if (q.peek() == e) {
? ? ? ? ? ? leader = null;
? ? ? ? ? ? available.signal();
? ? ? ? }
? ? ? ? return true;
? ? } finally {
//(4)釋放鎖。
? ? ? ? lock.unlock();
? ? }
}
take()
? ? 從隊列中獲取并移除延遲時間過期的元素,當隊列中沒有元素或沒有延遲時間過去的元素則會進行阻塞。
public E take() throws InterruptedException {
//(1)嘗試獲取獨占鎖 ,調(diào)用的是lockInterruptibly 方法,所以會被中斷。
? ? final ReentrantLock lock = this.lock;
? ? lock.lockInterruptibly();
? ? try {
? ? ? ? for (;;) {
//(2)如果隊列中沒有元素則調(diào)用available.await()進行阻塞。
? ? ? ? ? ? E first = q.peek();
? ? ? ? ? ? if (first == null)
? ? ? ? ? ? ? ? available.await();
? ? ? ? ? ? else {
//(3)如果獲取隊列的第一個元素的延遲時間已到期,則返回該元素并從隊列中移除。
? ? ? ? ? ? ? ? long delay = first.getDelay(NANOSECONDS);
? ? ? ? ? ? ? ? if (delay <= 0)
? ? ? ? ? ? ? ? ? ? return q.poll();
//(4)這里有個小細節(jié),當要進行阻塞等待的時候?qū)irst 變量置為null不引用隊列中的元素。
? ? ? ? ? ? ? ? first = null;?
//(5)如果當前線程不是leader線程,則調(diào)用available.await()方法持續(xù)阻塞等待,直到被喚醒。
? ? ? ? ? ? ? ? if (leader != null)
? ? ? ? ? ? ? ? ? ? available.await();
? ? ? ? ? ? ? ? else {
//(6)如果當前線程是leader線程,而調(diào)用available.awaitNanos(delay)方法,阻塞等待指定的過期時間。
? ? ? ? ? ? ? ? ? ? Thread thisThread = Thread.currentThread();
? ? ? ? ? ? ? ? ? ? leader = thisThread;
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? available.awaitNanos(delay);
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? if (leader == thisThread)
? ? ? ? ? ? ? ? ? ? ? ? ? ? leader = null;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? } finally {
? ? ? ? if (leader == null && q.peek() != null)
//(7)如果代碼執(zhí)行到這,說明當前線程從隊列中移除過期元素了,但是隊列中還存在元素,則需要調(diào)用available.signal() 方法,喚醒available條件隊列中阻塞的線程去獲取隊列中的元素。
? ? ? ? ? ? available.signal();
//(8)釋放鎖。
? ? ? ? lock.unlock();
? ? }
}
poll()
? ? 嘗試獲取并移除優(yōu)先級隊列中的頭元素,如果優(yōu)先級隊列為空或者頭元素的延遲時間還沒有過期則返回null,否則返回頭元素并從優(yōu)先級隊列中移除。
public E poll() {
//(1)嘗試獲取獨占鎖。
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
//(2)獲取優(yōu)先級隊列中的第一個元素。
? ? ? ? E first = q.peek();
//(3)如果第一個元素為null(即優(yōu)先級隊列中無元素)或第一個元素的延遲時間沒有過期則返回null,否則返回頭元素并將頭元素從優(yōu)先級隊列中移除。
? ? ? ? if (first == null || first.getDelay(NANOSECONDS) > 0)
? ? ? ? ? ? return null;
? ? ? ? else
? ? ? ? ? ? return q.poll();
? ? } finally {
//(4)釋放鎖。
? ? ? ? lock.unlock();
? ? }
}
size()
? ? 獲取隊列中的元素個數(shù),由于在獲取元素個數(shù)前需要先獲取鎖,所有size()方法返回的元素個數(shù)是精準的。
public int size() {
//(1)嘗試獲取獨占鎖。
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
//(2)獲取優(yōu)先級隊列中的元素個數(shù)。
? ? ? ? return q.size();
? ? } finally {
//(3)釋放鎖。
? ? ? ? lock.unlock();
? ? }
}
? ??DelayQueue阻塞無界延遲隊列,內(nèi)部使用PriorityQueue優(yōu)先級隊列進行存儲元素,根據(jù)元素的延遲時間進行排序,使用ReentrantLock獨占鎖實現(xiàn)線程同步。
????今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚,所有歡迎提任何問題以及改善方法。