一、隊(duì)列的概念
| 定義 | 特點(diǎn) | |
|---|---|---|
| 隊(duì)列 | 一端刪除(頭)另一端添加(尾) | First In First Out |
| 雙端隊(duì)列 | 兩端都可以刪除、添加 | |
| 優(yōu)先級(jí)隊(duì)列 | 優(yōu)先級(jí)高者先出隊(duì) | |
| 延時(shí)隊(duì)列 | 根據(jù)延時(shí)時(shí)間確定優(yōu)先級(jí) | |
| 并發(fā)非阻塞隊(duì)列 | 隊(duì)列空或滿(mǎn)時(shí)不阻塞 | |
| 并發(fā)阻塞隊(duì)列 | 隊(duì)列空時(shí)刪除阻塞、隊(duì)列滿(mǎn)時(shí)添加阻塞 |
二、實(shí)現(xiàn)雙端隊(duì)列
定義接口
public interface Deque<E> {
/**
* 隊(duì)首添加元素
* @param e
* @return
*/
boolean offerFirst(E e);
/**
* 隊(duì)尾添加元素
* @param e
* @return
*/
boolean offerLast(E e);
/**
* 隊(duì)首移除元素并返回
* @return
*/
E pollFirst();
/**
* 隊(duì)尾移除元素并返回
* @return
*/
E pollLast();
/**
* 獲取隊(duì)首元素
* @return
*/
E peekFirst();
/**
* 獲取隊(duì)尾元素
* @return
*/
E peekLast();
/**
* 判斷隊(duì)列是否為空
* @return
*/
boolean isEmpty();
/**
* 判斷隊(duì)列是否已滿(mǎn)
* @return
*/
boolean isFull();
}
2.1 基于數(shù)組實(shí)現(xiàn):
package com.hcx.algorithm.queue;
import java.util.Iterator;
/**
* @Title: CircularArrayDeque.java
* @Package com.hcx.algorithm.queue
* @Description: 基于循環(huán)數(shù)組實(shí)現(xiàn)雙端隊(duì)列
* tail指向的位置不存儲(chǔ)元素,每次都指向待存儲(chǔ)的位置
* head操作:先往前走一個(gè),然后設(shè)置元素
* tail操作:先設(shè)置值,然后往后走一個(gè)
* @Author: hongcaixia
* @Date: 2025/1/12 15:04
* @Version V1.0
*/
public class CircularArrayDeque<E> implements Deque<E>,Iterable<E> {
E[] array;
int head;
int tail;
public CircularArrayDeque(int capacity) {
array = (E[]) new Object[capacity + 1];
}
@Override
public boolean offerFirst(E e) {
if (isFull()) {
return false;
}
// head指針先往前走一個(gè)位置,然后設(shè)置值
head = head - 1;
if (head < 0) {
head = array.length - 1;
}
array[head] = e;
return true;
}
@Override
public boolean offerLast(E e) {
if (isFull()) {
return false;
}
array[tail] = e;
tail = tail + 1;
if (tail >= array.length) {
tail = 0;
}
return true;
}
@Override
public E pollFirst() {
if (isEmpty()) {
return null;
}
E e = array[head];
// help gc
array[head] = null;
head = head + 1;
if (head >= array.length) {
head = 0;
}
return e;
}
@Override
public E pollLast() {
if (isEmpty()) {
return null;
}
tail = tail - 1;
if (tail < 0) {
tail = array.length - 1;
}
E e = array[tail];
// help gc
array[tail] = null;
return e;
}
@Override
public E peekFirst() {
if (isEmpty()) {
return null;
}
return array[head];
}
@Override
public E peekLast() {
if (isEmpty()) {
return null;
}
tail = tail - 1;
if (tail < 0) {
tail = array.length - 1;
}
return array[tail];
}
@Override
public boolean isEmpty() {
return head == tail;
}
@Override
public boolean isFull() {
if (tail > head) {
return tail - head == array.length - 1;
} else if (tail < head) {
return head - tail == 1;
} else {
return false;
}
}
@Override
public Iterator<E> iterator() {
return new Iterator<E>() {
int pointer = head;
@Override
public boolean hasNext() {
return pointer != tail;
}
@Override
public E next() {
E e = array[pointer];
pointer = pointer + 1;
if (pointer >= array.length) {
pointer = 0;
}
return e;
}
@Override
public void remove() {
}
};
}
}
2.2 基于鏈表實(shí)現(xiàn)
package com.hcx.algorithm.queue;
/**
* @Title: CircularLinkedListQueue.java
* @Package com.hcx.algorithm.queue
* @Description: Leetcode622.設(shè)計(jì)循環(huán)隊(duì)列
* @Author: hongcaixia
* @Date: 2025/1/11 14:57
* @Version V1.0
*/
public class CircularLinkedListQueue {
private static class Node {
int value;
Node next;
Node(int value, Node next) {
this.value = value;
this.next = next;
}
}
private final Node head = new Node(-1, null);
private Node tail = head;
private int size = 0;
private int capacity = 0;
{
tail.next = head;
}
public CircularLinkedListQueue(int capacity) {
this.capacity = capacity;
}
public boolean enQueue(int value) {
if(isFull()) {
return false;
}
Node added = new Node(value, head);
tail.next = added;
tail = added;
size++;
return true;
}
public boolean deQueue() {
if(isEmpty()) {
return false;
}
Node first = head.next;
head.next = first.next;
if (first == tail) {
tail = head;
}
size--;
return true;
}
public int Front() {
if(isEmpty()) {
return -1;
}
return head.next.value;
}
public int Rear() {
if(isEmpty()) {
return -1;
}
return tail.value;
}
public boolean isEmpty() {
return head == tail;
}
public boolean isFull() {
return size == capacity;
}
}
三、實(shí)現(xiàn)阻塞隊(duì)列
大部分場(chǎng)景要求分離生產(chǎn)者(向隊(duì)列放入)和消費(fèi)者(從隊(duì)列拿出)兩個(gè)角色、它們由不同的線(xiàn)程來(lái)承擔(dān),前面的實(shí)現(xiàn)沒(méi)有考慮線(xiàn)程安全問(wèn)題。
隊(duì)列為空,前面的實(shí)現(xiàn)里會(huì)返回 null,如果就是要拿到一個(gè)元素只能不斷循環(huán)嘗試。
隊(duì)列滿(mǎn),前面的實(shí)現(xiàn)里會(huì)返回 false,如果就是要放入一個(gè)元素只能不斷循環(huán)嘗試。
實(shí)現(xiàn)方案:
- 用鎖保證線(xiàn)程安全
- 用條件變量讓等待非空線(xiàn)程與等待不滿(mǎn)線(xiàn)程進(jìn)入等待狀態(tài),而不是不斷循環(huán)嘗試,讓 CPU 空轉(zhuǎn)。
1.阻塞隊(duì)列接口
package com.hcx.algorithm.queue;
/**
* @Title: BlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 阻塞隊(duì)列接口
* @Author: hongcaixia
* @Date: 2025/1/13 15:16
* @Version V1.0
*/
public interface BlockingQueue<E> {
/**
* 阻塞入隊(duì)元素
* 當(dāng)隊(duì)列滿(mǎn)時(shí),阻塞,直到隊(duì)列不滿(mǎn)時(shí),把元素入隊(duì)
*
* @param e
* @throws InterruptedException
*/
void offer(E e) throws InterruptedException;
/**
* 超時(shí)阻塞入隊(duì)元素
* 當(dāng)隊(duì)列滿(mǎn)時(shí),阻塞,直到隊(duì)列不滿(mǎn)時(shí),把元素入隊(duì),如果中間超時(shí)時(shí)間達(dá)到,則放棄入隊(duì),返回false
*
* @param e
* @param timeout
* @return
* @throws InterruptedException
*/
void offer(E e, long timeout) throws InterruptedException;
/**
* 元素出隊(duì)
*
* @return
* @throws InterruptedException
*/
E poll() throws InterruptedException;
/**
* 判斷隊(duì)列是否為空
*
* @return
*/
boolean isEmpty();
/**
* 判斷隊(duì)列是否已滿(mǎn)
*
* @return
*/
boolean isFull();
}
2.單鎖實(shí)現(xiàn)
package com.hcx.algorithm.queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Title: SingleLockBlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 單鎖實(shí)現(xiàn)阻塞隊(duì)列
* @Author: hongcaixia
* @Date: 2025/1/13 15:23
* @Version V1.0
*/
public class SingleLockBlockingQueue<E> implements BlockingQueue<E> {
E[] array;
// 隊(duì)列大小
int size = 0;
// 頭指針
int head = 0;
// 尾指針
int tail = 0;
// 保證線(xiàn)程安全操作隊(duì)列時(shí)需要加鎖
ReentrantLock lock = new ReentrantLock();
// 當(dāng)隊(duì)列滿(mǎn)時(shí),線(xiàn)程先進(jìn)入等待隊(duì)列阻塞(此時(shí)會(huì)釋放鎖),等到有元素出隊(duì)之后,再被其他線(xiàn)程喚醒,然后重新執(zhí)行元素入隊(duì)操作。
Condition tailWait = lock.newCondition();
// 當(dāng)隊(duì)列空時(shí),線(xiàn)程需要進(jìn)入等待隊(duì)列(此時(shí)會(huì)釋放鎖),需要等到有元素入隊(duì)之后,再被其他線(xiàn)程喚醒,然后執(zhí)行元素出隊(duì)操作。
Condition headWait = lock.newCondition();
public SingleLockBlockingQueue(int capacity) {
array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
// tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size++;
// 此時(shí)入隊(duì)成功 喚醒在headWait中阻塞的線(xiàn)程(headWait中的線(xiàn)程是因?yàn)殛?duì)列沒(méi)有元素所以阻塞,此時(shí)隊(duì)列中成功入隊(duì)了一個(gè)元素,喚醒線(xiàn)程去取隊(duì)列的元素)
headWait.signal();
} finally {
lock.unlock();
}
}
@Override
public void offer(E e, long timeout) throws InterruptedException {
lock.lockInterruptibly();
try {
long time = TimeUnit.MILLISECONDS.toNanos(timeout);
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待時(shí)間
time = tailWait.awaitNanos(time);
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size++;
// 此時(shí)入隊(duì)成功 喚醒在headWait中阻塞的線(xiàn)程(headWait中的線(xiàn)程是因?yàn)殛?duì)列沒(méi)有元素所以阻塞,此時(shí)隊(duì)列中成功入隊(duì)了一個(gè)元素,喚醒線(xiàn)程去取隊(duì)列的元素)
headWait.signal();
} finally {
lock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
lock.lockInterruptibly();
try {
// 喚醒后應(yīng)該重新檢查條件
while (isEmpty()) {
headWait.await();
}
E e = array[head];
array[head] = null;
head++;
if (head == array.length) {
head = 0;
}
size--;
// 此時(shí)隊(duì)列中已經(jīng)有元素出隊(duì),那么隊(duì)列中已經(jīng)有空間可以再入隊(duì)了,喚醒在入隊(duì)時(shí)因?yàn)闆](méi)空間阻塞的等待隊(duì)列tailWait
tailWait.signal();
return e;
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public boolean isFull() {
return size == array.length;
}
}
2.1 lock.lockInterruptibly();
加鎖的過(guò)程中在阻塞時(shí)可以打斷(如果是lock方法就不會(huì)被打斷,會(huì)一直等下去),提前喚醒,不需要在阻塞下去了,但此時(shí)也不一定能獲取到鎖,除非持鎖線(xiàn)程已經(jīng)解鎖了。
2.2 隊(duì)列滿(mǎn)時(shí)的操作
讓offer線(xiàn)程進(jìn)入阻塞狀態(tài)。不繼續(xù)執(zhí)行添加操作。
使用lock的條件變量存儲(chǔ)阻塞的offer線(xiàn)程。
tailWait.await(); 將當(dāng)前offer線(xiàn)程加入tailWait集合中,當(dāng)前線(xiàn)程阻塞住。
await()方法會(huì)釋放鎖。
2.3 隊(duì)列不滿(mǎn)時(shí)的操作
喚醒阻塞的offer線(xiàn)程,讓阻塞的線(xiàn)程繼續(xù)往后執(zhí)行,完成入隊(duì)操作。
tailWait.signal();
調(diào)用signal()方法之前需要先獲取鎖。
2.4 處理虛假喚醒
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待時(shí)間
time = tailWait.awaitNanos(time);
}
這里判斷隊(duì)列是否是滿(mǎn)的要使用while,不能只使用if:
當(dāng)多線(xiàn)程并發(fā)的情況下,隊(duì)列只有一個(gè)空位時(shí),喚醒了阻塞的線(xiàn)程,可能這個(gè)時(shí)候恰好又有一個(gè)線(xiàn)程也要操作入隊(duì),恰好比他這個(gè)剛被喚醒的線(xiàn)程搶先一步拿到了鎖,發(fā)現(xiàn)隊(duì)列沒(méi)有滿(mǎn),就添加元素,隊(duì)列又滿(mǎn)了,執(zhí)行完釋放鎖;此時(shí)對(duì)于剛被喚醒的線(xiàn)程來(lái)說(shuō),再拿到了鎖,繼續(xù)執(zhí)行,那么就會(huì)覆蓋掉隊(duì)列的元素了,所以需要再判斷一次隊(duì)列是否已滿(mǎn)。
這里注意被喚醒的阻塞線(xiàn)程,也是要重新?lián)屾i的(await方法會(huì)釋放鎖的)沒(méi)搶到,就還是被阻塞著。
2.5 處理帶超時(shí)時(shí)間的虛假喚醒
long time = TimeUnit.MILLISECONDS.toNanos(timeout);
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待時(shí)間 返回的就是剩余等待時(shí)間。
time = tailWait.awaitNanos(time);
}
假設(shè)offer線(xiàn)程超時(shí)等待的時(shí)間為5s,時(shí)間過(guò)去了2s時(shí),poll線(xiàn)程取走了一個(gè)元素,喚醒了在等待的offer線(xiàn)程;此時(shí)又來(lái)了一個(gè)offer線(xiàn)程,搶到了鎖,又把隊(duì)列放滿(mǎn)了,那么被喚醒的線(xiàn)程此次為虛假喚醒,但是不能再繼續(xù)等待5s了,已經(jīng)等待了2s,只需要再等待3s;所以需要重新更新等待時(shí)間。
3.雙鎖實(shí)現(xiàn)
針對(duì)單鎖的實(shí)現(xiàn),出隊(duì)和入隊(duì)都使用了一把鎖,實(shí)際出隊(duì)和入隊(duì)是不沖突的,所以為了提高性能,這里出隊(duì)使用一把鎖,入隊(duì)使用另一把鎖。針對(duì)共享變量size的操作,使用原子類(lèi)來(lái)保證安全。
- 入隊(duì)使用tailLock鎖住隊(duì)尾,阻塞時(shí)加入tailWait等待
- 出隊(duì)使用headLock鎖住隊(duì)頭,阻塞時(shí)加入headWait等待
入隊(duì)offer邏輯
- 1.加tailLock鎖
- 2.如果隊(duì)列滿(mǎn),進(jìn)入tailWait等待
- 3.隊(duì)列不滿(mǎn),執(zhí)行入隊(duì)操作,同時(shí)喚醒在headWait等待的出隊(duì)線(xiàn)程(因?yàn)橛性厝腙?duì)了,就代表可以有元素出隊(duì),之間因?yàn)闆](méi)有元素而阻塞的等待出隊(duì)的線(xiàn)程就可以喚醒來(lái)執(zhí)行了)
出隊(duì)poll邏輯
- 1.加headLock鎖
- 2.如果隊(duì)列空,進(jìn)入headWait等待;
- 3.隊(duì)列不空,執(zhí)行出隊(duì)操作,同時(shí)喚醒在tailWait等待的入隊(duì)線(xiàn)程(因?yàn)橛性爻鲫?duì)了,就代表有位置給元素入隊(duì),之間因?yàn)闆](méi)有位置而阻塞的等待入隊(duì)的線(xiàn)程就可以喚醒來(lái)執(zhí)行了)
signal和await使用
- 必須配合鎖來(lái)使用,調(diào)用前加鎖,調(diào)用后解鎖
- 必須配對(duì)使用:
- headWait調(diào)用await和signal方法,必須加的是headLock鎖。(headWait是由headLock創(chuàng)建的)
- tailWait調(diào)用await和signal方法,必須加的是tailLock鎖。(tailWait是由tailLock創(chuàng)建的)
基礎(chǔ)版:
/**
* @Title: SingleLockBlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 雙鎖實(shí)現(xiàn)阻塞隊(duì)列
* 一把鎖保護(hù) tail,一把鎖保護(hù) head,提升性能
* @Author: hongcaixia
* @Date: 2025/1/13 15:23
* @Version V1.0
*/
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {
E[] array;
// 隊(duì)列大小
AtomicInteger size = new AtomicInteger(0);
// 頭指針
int head = 0;
// 尾指針
int tail = 0;
// 出隊(duì)鎖
ReentrantLock headLock = new ReentrantLock();
// 入隊(duì)鎖
ReentrantLock tailLock = new ReentrantLock();
// 當(dāng)隊(duì)列滿(mǎn)時(shí),線(xiàn)程先進(jìn)入等待隊(duì)列阻塞(此時(shí)會(huì)釋放鎖),等到有元素出隊(duì)之后,再被其他線(xiàn)程喚醒,然后重新執(zhí)行元素入隊(duì)操作。
Condition tailWait = tailLock.newCondition();
// 當(dāng)隊(duì)列空時(shí),線(xiàn)程需要進(jìn)入等待隊(duì)列(此時(shí)會(huì)釋放鎖),需要等到有元素入隊(duì)之后,再被其他線(xiàn)程喚醒,然后執(zhí)行元素出隊(duì)操作。
Condition headWait = headLock.newCondition();
public DoubleLockBlockingQueue(int capacity) {
array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size.getAndIncrement();
// 隊(duì)列從空到非空,喚醒等待出隊(duì)的poll線(xiàn)程
headLock.lock();
try {
headWait.signal();
}finally {
headLock.unlock();
}
} finally {
tailLock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 隊(duì)列空等待
while (isEmpty()) {
headWait.await();
}
// 不空則出隊(duì)
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
// 隊(duì)列從滿(mǎn)到不滿(mǎn),喚醒等待入隊(duì)的offer線(xiàn)程
tailLock.lock();
try {
tailWait.signal();
} finally {
tailLock.unlock();
}
} finally {
headLock.unlock();
}
return e;
}
@Override
public boolean isEmpty() {
return size.get() == 0;
}
@Override
public boolean isFull() {
return size.get() == array.length;
}
}
上述代碼,兩把鎖嵌套使用,非常容易出現(xiàn)死鎖:
- poll線(xiàn)程獲取了head鎖,準(zhǔn)備去獲取tail鎖
- offer線(xiàn)程獲取了tail鎖,準(zhǔn)備去獲取head鎖
兩個(gè)線(xiàn)程各自持有著一把鎖,都準(zhǔn)備去獲取對(duì)方的鎖,誰(shuí)也不讓誰(shuí),就出現(xiàn)了死鎖。
改進(jìn):不嵌套獲取,把喚醒線(xiàn)程的操作放在上一個(gè)鎖解鎖之后:
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size.getAndIncrement();
} finally {
tailLock.unlock();
}
// 隊(duì)列從空到非空,喚醒等待出隊(duì)的poll線(xiàn)程
headLock.lock();
try {
headWait.signal();
} finally {
headLock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 隊(duì)列空等待
while (isEmpty()) {
headWait.await();
}
// 不空則出隊(duì)
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
} finally {
headLock.unlock();
}
// 隊(duì)列從滿(mǎn)到不滿(mǎn),喚醒等待入隊(duì)的offer線(xiàn)程
tailLock.lock();
try {
tailWait.signal();
} finally {
tailLock.unlock();
}
return e;
}
優(yōu)化版:使用級(jí)聯(lián)通知完成喚醒操作
因?yàn)樵诓僮鱫ffer和poll時(shí),不僅要拿到當(dāng)前要操作的鎖,還要拿對(duì)方的鎖來(lái)喚醒對(duì)方。所以效率降低了。
針對(duì)offer線(xiàn)程來(lái)說(shuō),入隊(duì)用的是tailLock鎖,喚醒用的是headLock鎖,要盡可能減少去獲取headLock鎖的次數(shù)。
場(chǎng)景:
剛開(kāi)始隊(duì)列為空:
來(lái)了3個(gè)poll線(xiàn)程操作,因?yàn)殛?duì)列沒(méi)有元素,所以都進(jìn)入了headWait中等待。
來(lái)了3個(gè)offer線(xiàn)程,分別往隊(duì)伍加入了3個(gè)元素,現(xiàn)在的代碼,對(duì)于每一個(gè)offer線(xiàn)程,他都會(huì)去獲取鎖執(zhí)行喚醒操作;為了減少offer線(xiàn)程取獲取headLock鎖,只讓其中一個(gè)offer線(xiàn)程去獲取鎖執(zhí)行喚醒操作,而對(duì)于headWait中剩下2個(gè)poll線(xiàn)程,則由已經(jīng)被offer線(xiàn)程喚醒的那一個(gè)poll線(xiàn)程來(lái)喚醒。(condition里的結(jié)構(gòu)是一個(gè)鏈表結(jié)構(gòu))
具體實(shí)現(xiàn):
offer操作的headWait的喚醒邏輯:隊(duì)列空時(shí)在headWait等待出隊(duì)的tail線(xiàn)程
- 針對(duì)offer線(xiàn)程:只讓剛?cè)腙?duì)第一個(gè)元素的offer線(xiàn)程(此時(shí)隊(duì)列大小size為0)去執(zhí)行喚醒操作,其他喚醒操作交給被喚醒的poll線(xiàn)程來(lái)實(shí)現(xiàn)級(jí)聯(lián)喚醒。
- 針對(duì)poll線(xiàn)程:當(dāng)headWait中的線(xiàn)程被喚醒之后,判斷隊(duì)列中是不是還有多的元素,如果有超過(guò)1個(gè),那么就自己來(lái)喚醒在headWait中等待出隊(duì)的線(xiàn)程。
poll操作的tailWait的喚醒邏輯:隊(duì)列滿(mǎn)時(shí)在tailWait中等待入隊(duì)的offer線(xiàn)程
- 針對(duì)poll線(xiàn)程:取走了一個(gè)元素,那么隊(duì)列就有空位了,可以喚醒在tailWait中等待的線(xiàn)程。不是所有的poll都執(zhí)行,只在隊(duì)列剛從滿(mǎn)變成不滿(mǎn)的時(shí)候(取走元素之前隊(duì)列size和數(shù)組長(zhǎng)度一樣)由poll來(lái)喚醒,其他喚醒交給被喚醒的offer線(xiàn)程來(lái)實(shí)現(xiàn)級(jí)聯(lián)喚醒。
- 針對(duì)offer線(xiàn)程:當(dāng)tailWait中的offer線(xiàn)程被喚醒之后,判斷隊(duì)列是不是還有多的空位,如果空位超過(guò)1個(gè),那么就自己來(lái)喚醒在tailWait中等待入隊(duì)的線(xiàn)程。
完整代碼:
package com.hcx.algorithm.queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Title: SingleLockBlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 雙鎖實(shí)現(xiàn)阻塞隊(duì)列
* 一把鎖保護(hù) tail,一把鎖保護(hù) head,提升性能
* 在喚醒的時(shí)候進(jìn)行了優(yōu)化,只有當(dāng)隊(duì)列處于臨界條件的時(shí)候才由另一個(gè)線(xiàn)程喚醒,否則自己?jiǎn)拘眩?jí)聯(lián)通知)
* @Author: hongcaixia
* @Date: 2025/1/13 15:23
* @Version V1.0
*/
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {
E[] array;
// 隊(duì)列大小
AtomicInteger size = new AtomicInteger(0);
// 頭指針
int head = 0;
// 尾指針
int tail = 0;
// 出隊(duì)鎖
ReentrantLock headLock = new ReentrantLock();
// 入隊(duì)鎖
ReentrantLock tailLock = new ReentrantLock();
// 當(dāng)隊(duì)列滿(mǎn)時(shí),線(xiàn)程先進(jìn)入等待隊(duì)列阻塞(此時(shí)會(huì)釋放鎖),等到有元素出隊(duì)之后,再被其他線(xiàn)程喚醒,然后重新執(zhí)行元素入隊(duì)操作。
Condition tailWait = tailLock.newCondition();
// 當(dāng)隊(duì)列空時(shí),線(xiàn)程需要進(jìn)入等待隊(duì)列(此時(shí)會(huì)釋放鎖),需要等到有元素入隊(duì)之后,再被其他線(xiàn)程喚醒,然后執(zhí)行元素出隊(duì)操作。
Condition headWait = headLock.newCondition();
public DoubleLockBlockingQueue(int capacity) {
array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
// 隊(duì)列當(dāng)前大小
int beforeAdd;
tailLock.lockInterruptibly();
try {
// tailWait中的線(xiàn)程被喚醒之后,會(huì)與其他線(xiàn)程一起爭(zhēng)搶鎖,如果搶不到,那么就還需要再次進(jìn)入等待隊(duì)列,所以需要使用while循環(huán)判斷,確保隊(duì)列不是滿(mǎn)的才往下執(zhí)行
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
// 返回增加之前的值
beforeAdd = size.getAndIncrement();
if (beforeAdd + 1 < array.length) {
// 說(shuō)明隊(duì)列還沒(méi)有滿(mǎn),喚醒阻塞的offer線(xiàn)程
tailWait.signal();
}
} finally {
tailLock.unlock();
}
// 再offer這里加了鎖去喚醒非空的poll線(xiàn)程,會(huì)導(dǎo)致正常的poll線(xiàn)程也阻塞,搶不到鎖,所以需要減少喚醒時(shí)的加鎖次數(shù)
/**
* 當(dāng)隊(duì)列原本為空,剛好入隊(duì)了一個(gè)元素時(shí),此時(shí)執(zhí)行喚醒操作,對(duì)于剩余的其他阻塞了的poll線(xiàn)程,交給poll線(xiàn)程自己來(lái)喚醒
*/
if (beforeAdd == 0) {
headLock.lock();
try{
// 隊(duì)列從空變化到不空,會(huì)喚醒一個(gè)等待的 poll 線(xiàn)程,為了避免死鎖,需要等tailLock釋放了之后才開(kāi)始加(條件變量的 await(), signal() 等方法需要先獲得與之關(guān)聯(lián)的鎖)
headWait.signal();
}finally {
headLock.unlock();
}
}
}
@Override
public void offer(E e, long timeout) throws InterruptedException {
int beforeAdd;
tailLock.lockInterruptibly();
try {
long time = TimeUnit.MILLISECONDS.toNanos(timeout);
// 喚醒后應(yīng)該重新檢查條件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待時(shí)間
time = tailWait.awaitNanos(time);
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
beforeAdd = size.getAndIncrement();
if (beforeAdd + 1 < array.length) {
// 如果入隊(duì)之前+1 還有剩余空間,說(shuō)明隊(duì)列中元素還有剩余,可以喚醒阻塞的polll線(xiàn)程
tailWait.signal();
}
} finally {
tailLock.unlock();
}
// 如果入隊(duì)之前隊(duì)列時(shí)空的,那么此時(shí)只有一個(gè)元素,就喚醒阻塞的poll線(xiàn)程
if (beforeAdd == 0) {
headLock.lock();
try {
// 此時(shí)入隊(duì)成功 喚醒在headWait中阻塞的線(xiàn)程(headWait中的線(xiàn)程是因?yàn)殛?duì)列沒(méi)有元素所以阻塞,此時(shí)隊(duì)列中成功入隊(duì)了一個(gè)元素,喚醒線(xiàn)程去取隊(duì)列的元素)
headWait.signal();
} finally {
headLock.unlock();
}
}
}
@Override
public E poll() throws InterruptedException {
E e;
// size減之前的值
int beforeDecrement;
headLock.lockInterruptibly();
try {
// 喚醒后應(yīng)該重新檢查條件
while (isEmpty()) {
headWait.await();
}
e = array[head];
array[head] = null;
head++;
if (head == array.length) {
head = 0;
}
beforeDecrement = size.getAndDecrement();
if (beforeDecrement > 1) {
// 說(shuō)明隊(duì)列中還有剩余元素(并非當(dāng)前poll完了之后就為空了),所以還可以喚醒阻塞的poll線(xiàn)程
headWait.signal();
}
} finally {
headLock.unlock();
}
// 隊(duì)列剛從滿(mǎn)變成不滿(mǎn)的時(shí)候,去喚醒等待隊(duì)列中的offer線(xiàn)程,剩余阻塞的offer線(xiàn)程由offer線(xiàn)程自己?jiǎn)拘? if (beforeDecrement == array.length) {
tailLock.lock();
try {
// 隊(duì)列從滿(mǎn)變化到不滿(mǎn),喚醒等待隊(duì)列中的offer線(xiàn)程
tailWait.signal();
} finally {
tailLock.unlock();
}
}
return e;
}
@Override
public boolean isEmpty() {
return size.get() == 0;
}
@Override
public boolean isFull() {
return size.get() == array.length;
}
}