ArrayBlockingQueue
1、基于數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列(FIFO先進(jìn)先出隊(duì)列)
2、其數(shù)據(jù)結(jié)構(gòu)為數(shù)組,是一個(gè)環(huán)形數(shù)組結(jié)構(gòu)。
3、其實(shí)現(xiàn)了生產(chǎn)-消費(fèi)隊(duì)列功能
4、基于非公平鎖的鎖
一、對(duì)象屬性
final Object[] items;// 存儲(chǔ)數(shù)據(jù)載體,環(huán)形數(shù)組(首尾相連)
int takeIndex;// 下一個(gè)出隊(duì)列元素的數(shù)組下標(biāo)
int putIndex;// 下一個(gè)入隊(duì)列元素的數(shù)組下標(biāo)
int count;// 統(tǒng)計(jì)數(shù)組中的元素個(gè)數(shù)
final ReentrantLock lock;// 鎖,保證隊(duì)列數(shù)據(jù)安全線(基于AQS、CAS)
private final Condition notFull;// 條件鎖
private final Condition notEmpty;// 條件鎖
二、構(gòu)造器
// 有界隊(duì)列的"有界",即隊(duì)列最大容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 默認(rèn)非公平鎖
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 默認(rèn)非公平鎖
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
...
// (重要) 構(gòu)成了環(huán)形數(shù)組(元素等于數(shù)組長度的時(shí)候,從頭開始),記錄下一次入隊(duì)列的下標(biāo)
putIndex = (i == capacity) ? 0 : i;
...
}
三、核心方法
// 入隊(duì)列
private void enqueue(E x) {
final Object[] items = this.items;
// putIndex為下一個(gè)入隊(duì)列元素的數(shù)組下標(biāo)
items[putIndex] = x;
// 生成下一個(gè)入隊(duì)列元素的下標(biāo)。隊(duì)列如果滿了,下標(biāo)從0開始,構(gòu)成了環(huán)形數(shù)組
if (++putIndex == items.length)
putIndex = 0;
count++;// 統(tǒng)計(jì)數(shù)組元素?cái)?shù)量
// 隊(duì)列增加數(shù)據(jù),喚醒因隊(duì)列為空而阻塞的線程
notEmpty.signal();
}
// 出隊(duì)列
private E dequeue() {
final Object[] items = this.items;
// takeIndex為下一個(gè)出隊(duì)列元素的數(shù)組下標(biāo)
E x = (E) items[takeIndex];
items[takeIndex] = null;// help GC
// 生成下一個(gè)出隊(duì)列元素的下標(biāo),隊(duì)列滿了,下標(biāo)從0開始,構(gòu)成了環(huán)形數(shù)組
if (++takeIndex == items.length)
takeIndex = 0;
count--;// 統(tǒng)計(jì)數(shù)組元素?cái)?shù)量
if (itrs != null)
itrs.elementDequeued();
// 隊(duì)列減少數(shù)據(jù),喚醒因隊(duì)列滿了而阻塞的線程
notFull.signal();
return x;
}
// 隊(duì)列中刪除數(shù)據(jù)
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 移除數(shù)據(jù)的下標(biāo)剛好等于出隊(duì)列的下標(biāo),按照出隊(duì)列方式處理
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
// 移動(dòng)removeIndex后的數(shù)據(jù)位置,保持隊(duì)列的順序
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;// 最后的數(shù)據(jù)置為null
this.putIndex = i;// putIndex - 1
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 喚醒線程
notFull.signal();
}
四、普通方法(都是基于核心方法之上的操作)
// 入隊(duì)列
public boolean add(E e) {
return super.add(e);
}
// 抽象父類的方法,調(diào)用offer方法
public boolean add(E e) {
if (offer(e))
return true;
else// 隊(duì)列滿了會(huì)拋異常
throw new IllegalStateException("Queue full");
}
// 入隊(duì)列,隊(duì)列滿了返回false
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);// 核心入隊(duì)列方法,外層判斷隊(duì)列是否滿了
return true;
}
} finally {
lock.unlock();
}
}
// 入隊(duì)列
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 線程中途停止會(huì)拋異常
try {// 隊(duì)列滿了,會(huì)阻塞,等待隊(duì)列消費(fèi)數(shù)據(jù)后,會(huì)喚醒noFull
while (count == items.length)
notFull.await();
enqueue(e);// 核心入隊(duì)列方法,外層判斷隊(duì)列是否滿了
} finally {
lock.unlock();
}
}
// 出隊(duì)列
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();// 出隊(duì)列核心方法
} finally {
lock.unlock();
}
}
// 出隊(duì)列
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 隊(duì)列為空,會(huì)進(jìn)行阻塞,等待隊(duì)列加入數(shù)據(jù),會(huì)喚醒notEmpty
while (count == 0)
notEmpty.await();
return dequeue();// 出隊(duì)列核心方法
} finally {
lock.unlock();
}
}