多線程并發(fā)編程15-DelayQueue源碼剖析

? ? 今天來說一說DelayQueue,DelayQueue并發(fā)隊列是一個無界阻塞延遲隊列,隊列中的每個元素都有一個過期時間,當從隊列獲取元素時,只有過期元素才會出隊列,不允許存放null元素。隊列頭元素是最快要過期的元素。

? ??DelayQueue內(nèi)部有一個PriorityQueue優(yōu)先隊列,存入到該隊列的元素都實現(xiàn)Delayed接口,由于每個原始都有一個過期時間,所以要實現(xiàn)獲取當前元素還剩多少時間就過期的接口,由于元素是存放在PriorityQueue優(yōu)先隊列中的,所以需要實現(xiàn)元素之間相互比較的接口。

? ??DelayQueue內(nèi)部實現(xiàn)了一個Leader-Follower的模式,用于盡量減少不必要的線程等待。當一個線程調(diào)用隊列的take方法變?yōu)閘eader線程后,它會調(diào)用條件變量available.awaitNanos(delay)等待delay時間,但其他線程(follower線程)則會調(diào)用available.await()進行無限等待。leader線程延遲時間過期后,會退出take方法,并通過調(diào)用available.signal()方法喚醒一個follower線程,被喚醒的follower線程被選舉為leader線程。當向隊列插入一個過期時間比頭元素過期時間還短的元素時,leader會被重置為null。

????DelayQueue內(nèi)部主要的成員變量:

private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

????下面對主要函數(shù)原理進行講解。

offer(E e)

? ? 插入元素到隊列,當元素e為null時會拋出NullPointException異常,否則由于是無界隊列,所以一直返回true。插入的元素需要實現(xiàn)Delayed接口。

public boolean offer(E e) {

//(1)嘗試獲取獨占鎖。

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

//(2)將元素放入PriorityQueue優(yōu)先隊列中,如果e為null則會拋出NullPointException異常。

? ? ? ? q.offer(e);

//(3)如果當前插入的元素成為隊列中的第一個節(jié)點(即插入元素的過期時間比隊列中的所有元素的過期時間都小)則將leader設(shè)置為null,并調(diào)用available.signal()方法激活available條件隊列中的一個線程。

? ? ? ? if (q.peek() == e) {

? ? ? ? ? ? leader = null;

? ? ? ? ? ? available.signal();

? ? ? ? }

? ? ? ? return true;

? ? } finally {

//(4)釋放鎖。

? ? ? ? lock.unlock();

? ? }

}

take()

? ? 從隊列中獲取并移除延遲時間過期的元素,當隊列中沒有元素或沒有延遲時間過去的元素則會進行阻塞。

public E take() throws InterruptedException {

//(1)嘗試獲取獨占鎖 ,調(diào)用的是lockInterruptibly 方法,所以會被中斷。

? ? final ReentrantLock lock = this.lock;

? ? lock.lockInterruptibly();

? ? try {

? ? ? ? for (;;) {

//(2)如果隊列中沒有元素則調(diào)用available.await()進行阻塞。

? ? ? ? ? ? E first = q.peek();

? ? ? ? ? ? if (first == null)

? ? ? ? ? ? ? ? available.await();

? ? ? ? ? ? else {

//(3)如果獲取隊列的第一個元素的延遲時間已到期,則返回該元素并從隊列中移除。

? ? ? ? ? ? ? ? long delay = first.getDelay(NANOSECONDS);

? ? ? ? ? ? ? ? if (delay <= 0)

? ? ? ? ? ? ? ? ? ? return q.poll();

//(4)這里有個小細節(jié),當要進行阻塞等待的時候?qū)irst 變量置為null不引用隊列中的元素。

? ? ? ? ? ? ? ? first = null;?

//(5)如果當前線程不是leader線程,則調(diào)用available.await()方法持續(xù)阻塞等待,直到被喚醒。

? ? ? ? ? ? ? ? if (leader != null)

? ? ? ? ? ? ? ? ? ? available.await();

? ? ? ? ? ? ? ? else {

//(6)如果當前線程是leader線程,而調(diào)用available.awaitNanos(delay)方法,阻塞等待指定的過期時間。

? ? ? ? ? ? ? ? ? ? Thread thisThread = Thread.currentThread();

? ? ? ? ? ? ? ? ? ? leader = thisThread;

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? available.awaitNanos(delay);

? ? ? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? ? ? if (leader == thisThread)

? ? ? ? ? ? ? ? ? ? ? ? ? ? leader = null;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? } finally {

? ? ? ? if (leader == null && q.peek() != null)

//(7)如果代碼執(zhí)行到這,說明當前線程從隊列中移除過期元素了,但是隊列中還存在元素,則需要調(diào)用available.signal() 方法,喚醒available條件隊列中阻塞的線程去獲取隊列中的元素。

? ? ? ? ? ? available.signal();

//(8)釋放鎖。

? ? ? ? lock.unlock();

? ? }

}

poll()

? ? 嘗試獲取并移除優(yōu)先級隊列中的頭元素,如果優(yōu)先級隊列為空或者頭元素的延遲時間還沒有過期則返回null,否則返回頭元素并從優(yōu)先級隊列中移除。

public E poll() {

//(1)嘗試獲取獨占鎖。

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

//(2)獲取優(yōu)先級隊列中的第一個元素。

? ? ? ? E first = q.peek();

//(3)如果第一個元素為null(即優(yōu)先級隊列中無元素)或第一個元素的延遲時間沒有過期則返回null,否則返回頭元素并將頭元素從優(yōu)先級隊列中移除。

? ? ? ? if (first == null || first.getDelay(NANOSECONDS) > 0)

? ? ? ? ? ? return null;

? ? ? ? else

? ? ? ? ? ? return q.poll();

? ? } finally {

//(4)釋放鎖。

? ? ? ? lock.unlock();

? ? }

}

size()

? ? 獲取隊列中的元素個數(shù),由于在獲取元素個數(shù)前需要先獲取鎖,所有size()方法返回的元素個數(shù)是精準的。

public int size() {

//(1)嘗試獲取獨占鎖。

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

//(2)獲取優(yōu)先級隊列中的元素個數(shù)。

? ? ? ? return q.size();

? ? } finally {

//(3)釋放鎖。

? ? ? ? lock.unlock();

? ? }

}

? ??DelayQueue阻塞無界延遲隊列,內(nèi)部使用PriorityQueue優(yōu)先級隊列進行存儲元素,根據(jù)元素的延遲時間進行排序,使用ReentrantLock獨占鎖實現(xiàn)線程同步。

????今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚,所有歡迎提任何問題以及改善方法。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容