定義
一個由數(shù)組支持的有界阻塞隊列。此隊列按FIFO(先進先出)原則對元素進行排序。隊列的頭部是在隊列中存在時間最長的元素。隊列的尾部是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。
模型
這是一個典型的 “有界緩存區(qū)”,固定大小的數(shù)組在其中保持生產(chǎn)者插入的元素和使用者提取的元素。一旦創(chuàng)建了這樣的緩存區(qū),就不能再增加其容量。試圖向已滿隊列中放入元素會導(dǎo)致操作受阻塞;試圖向空隊列中提取元素將導(dǎo)致類似阻塞。
策略
此類支持對等待的生產(chǎn)者線程和使用者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設(shè)置為 true 而構(gòu)造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
具體實現(xiàn)

如上,我們知道ArrayBlockingQueue繼承于AbstractQueue,并實現(xiàn)了BlockingQueue和Serializable接口:
public class MyArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>,Serializable

- 變量和常量的定義:
//用于存儲數(shù)據(jù)的數(shù)組
final Object[] items;
//移除的位置
int takeIndex;
//添加的位置
int putIndex;
//隊列的大小
int count;
//互斥鎖
final ReentrantLock lock;
//保證不為空的情況下進行消費
private final Condition notEmpty;
//保證隊列未滿的情況下生產(chǎn)
private final Condition notFull;
- 構(gòu)造函數(shù)的實現(xiàn):
//構(gòu)造函數(shù),提供設(shè)置隊列的大小以及鎖的公平性設(shè)置
public MyArrayBlockingQueue(int capacity, boolean fair){
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public MyArrayBlockingQueue(int capacity){
this(capacity,false);
}
- 添加操作的實現(xiàn):
//設(shè)置添加操作
public boolean add(E e){
return super.add(e);
}
/**
* 實現(xiàn)插入元素到隊列的尾部,若隊列未滿,則插入成功,否則插入失敗,屬于非阻塞式插入
* @param e
* @return
*/
@Override
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else{
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//檢查插入的元素是否為空,若為空,則拋出異常
private static void checkNotNull(Object v){
if (v == null)
throw new NullPointerException();
}
//在持有鎖的情況下,進行插入元素
private void enqueue(E e){
final Object[] items = this.items;
items[putIndex] = e;
//若達到數(shù)組尾部,則回到首部,因為這里使用的是循環(huán)數(shù)組
if (++putIndex == items.length)
putIndex = 0;
count ++;
//生產(chǎn)了一個元素,可喚醒一個消費者進行消費
notEmpty.signal();
}
//在限定時間內(nèi)插入元素操作,若插入成功,則返回true,否則返回false
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
//進行單位轉(zhuǎn)換
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中斷加鎖
lock.lockInterruptibly();
try {
while (count == items.length){
//等待超時的結(jié)果
if (nanos <= 0)
return false;
//造成當前線程在接到信號,被中斷或到達指定等待時間之間一直處于等待狀態(tài)
//該方法會返回一個估計值,以等待鎖提供的等待時間,若超時,則會返回一個負數(shù),否則繼續(xù)下一次等待
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
//實現(xiàn)阻塞式插入,若隊列未滿,則直接插入,否則等待隊列未滿,再插入。
@Override
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//等待隊列未被填滿
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
- 查找操作的實現(xiàn):
//實現(xiàn)在隊列中查找元素是否存在
public boolean contains(Object o){
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0){
//獲取到移除位置和插入位置
final int putIndex = this.putIndex;
int i = takeIndex;
//從頭部開始遍歷直到到達尾部
do {
//找到則返回true
if (o.equals(items[takeIndex]))
return true;
//到達數(shù)組尾部,則從頭部繼續(xù)開始
if (++ i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
//阻塞式獲取移除并獲取頭部元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//阻塞直到隊列不為空
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//移除隊列中的頭部元素,并返回移除的元素
@Override
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//若隊列為空,則返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//實現(xiàn)在限定時間內(nèi)移除頭部元素,若超時,則返回null,否則返回頭部元素
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0){
//超時,返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//限定時間內(nèi),移除元素
return dequeue();
} finally {
lock.unlock();
}
}
//在持有鎖的情況下,移除并返回隊列中的頭部
private E dequeue(){
final Object[] items = this.items;
//獲取到要被移除的元素
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//移除元素
items[takeIndex] = null;
//判斷是否到達數(shù)組末尾
if (++ takeIndex == items.length)
takeIndex = 0;
count --;
/*
if (itrs != null)
itrs.elementDequeued();
*/
//移除了一個元素,可以喚醒一個生產(chǎn)者工作
notFull.signal();
return x;
}
//返回隊列中的頭部元素
@Override
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
}finally {
lock.unlock();
}
}
@SuppressWarnings("uncheked")
private E itemAt(int i){
return (E)items[i];
}
- 刪除操作的實現(xiàn):
//實現(xiàn)清空隊列的操作:從頭部開始遍歷直到尾部,逐個刪除
public void clear(){
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0){
//獲取到頭部和尾部位置
final int putIndex = this.putIndex;
int i = takeIndex;
//遍歷,刪除每個元素
do {
items[i] = null;
if (++ i == items.length)
i = 0;
} while (i != putIndex);
//重置隊列大小和頭部位置
takeIndex = putIndex;
count = 0;
/*
if (itrs != null)
itrs.queueIsEmpty();
*/
//釋放生產(chǎn)者信號量,所釋放的個數(shù)與隊列的大小一致(前提是必須有生產(chǎn)者在等待)
for (; k > 0 && lock.hasWaiters(notFull); k --)
notFull.signal();
}
} finally {
lock.unlock();
}
}
//實現(xiàn)從隊列中刪除指定的某個元素
public boolean remove(Object o){
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0){
//獲取到頭部和尾部
final int putIndex = this.putIndex;
int i = takeIndex;
//遍歷隊列直到找到指定元素
do {
//找到指定元素,進行刪除該元素
if (o.equals(items[i])){
removeAt(i);
return true;
}
if (++ i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
//實現(xiàn)刪除數(shù)組中指定位置的元素,注意不在首尾時,數(shù)組的移動
void removeAt(final int removeIndex){
final Object[] items = this.items;
//刪除的元素位于頭部,則直接刪除即可
if (removeIndex == takeIndex){
items[takeIndex] = null;
//注意刪除頭部后,是頭部位置加一,而不是減一?。?!
//結(jié)構(gòu):頭部<----尾部
if (++ takeIndex == items.length)
takeIndex = 0;
count --;
/*
if (itrs != null)
itrs.elementDequeued();
*/
} else {
final int putIndex = this.putIndex;
//從要刪除的元素開始遍歷直到尾部
for (int i = removeIndex;;){
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex){
items[i] = items[next];
i = next;
} else {
//達到尾部,更新尾部位置
items[i] = null;
this.putIndex = i;
break;
}
}
count --;
/*
if (itrs != null)
itrs.removedAt(removeIndex);
*/
}
//刪除后將釋放一個生產(chǎn)者
notFull.signal();
}
//移除隊列中所有可用的元素,并將它們添加到給定collection中
@Override
public int drainTo(Collection<? super E> c) {
return drainTo(c,Integer.MAX_VALUE);
}
//最多從此隊列中移除給定的數(shù)量的可用元素,并將這些元素添加到給定collection中
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
//如果c和當前隊列相同,則沒有必要復(fù)制給c
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//獲取到要刪除的元素個數(shù)
int n = Math.min(maxElements, count);
//從頭部開始
int take = takeIndex;
int i = 0;
try {
while (i < n){
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++ take == items.length)
take = 0;
i ++;
}
return n;
} finally {
if (i > 0){
//更新隊列中的大小和頭部位置
count -= i;
takeIndex = take;
/*
if (itrs != null){
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
*/
//釋放相應(yīng)的鎖
for (; i > 0 && lock.hasWaiters(notFull); i --)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
- 其他方法的實現(xiàn):
//返回隊列的大小
@Override
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
}finally {
lock.unlock();
}
}
//返回在無阻塞的理想情況下,此隊列能接受的其他元素的數(shù)量
@Override
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
//將隊列轉(zhuǎn)換為數(shù)組形式返回
@Override
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
//計算數(shù)組尾部到隊列頭部的距離
int n = items.length - takeIndex;
//若隊列尾部未滿數(shù)組的長度,則直接整體復(fù)制,否則分為前后兩部分分別復(fù)制
if (count <= n)
System.arraycopy(items, takeIndex, a, 0 ,count);
else{
System.arraycopy(items,takeIndex,a,0,n);
System.arraycopy(items,0,a,n,count-n);
}
} finally {
lock.unlock();
}
return a;
}
//返回一個按適當順序包含此隊列中所有元素的數(shù)組
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
//若傳入的數(shù)組的大小不夠裝入隊列,則利用反射創(chuàng)建一個足夠大的空間
if (len < count)
a = (T[]) java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0 ,count);
else{
System.arraycopy(items,takeIndex,a,0,n);
System.arraycopy(items,0,a,n,count-n);
}
//設(shè)置最后的位置為null,這里不知用意為何?
if (len > count)
a[count] = null;
} finally {
lock.unlock();
}
return a;
}
//返回此collection的字符串表示形式
@Override
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
//若隊列為空
if (k == 0)
return "[]";
final Object[] items = this.items;
//通過StringBuilder來構(gòu)造字符串形式
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = takeIndex;;){
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
//構(gòu)造結(jié)束
if (-- k == 0)
return sb.append("]").toString();
sb.append(",").append(' ');
if (++ i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}
- 迭代器實現(xiàn):
Iterator是其創(chuàng)建時隊列的一個快照,它所持有的關(guān)于queue的狀態(tài)信息,只來自于創(chuàng)建的時刻,至于之后隊列是否發(fā)生變化,迭代器并不關(guān)心。
這個類提供的iterator是具有弱一致性,同時它也僅僅代表iterator被創(chuàng)建的時刻的queue的狀態(tài):
// 構(gòu)造方法
Itr() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
lastRet = -1;
// 在iterator 被創(chuàng)建的時刻的狀態(tài)
// remaining = count
// nextItem = itemAt(nextIndex = takeIndex)
// 有可能在這個iterator被創(chuàng)建之后,當前
// queue中元素又增加了,count變大了
// 而這里的 remaining 維持的還是原來的count
// 在iterator被創(chuàng)建之后新增加的元素,將不會被
// next方法返回。
if ((remaining = count) > 0)
nextItem = itemAt(nextIndex = takeIndex);
} finally {
lock.unlock();
}
}
// next 方法
public E next() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
E x = itemAt(nextIndex); // check for fresher value
if (x == null) {
// 即使當前值已經(jīng)被修改
// next 方法依舊返回快照元素
// 而不是 null
x = nextItem; // we are forced to report old value
lastItem = null; // but ensure remove fails
}
else
lastItem = x;
// 跳過所有Null元素,注意 remaining 也會
// 相應(yīng)減少,所以 next 能夠執(zhí)行的次數(shù)一定是
// <= iterator 創(chuàng)建時刻的queue的count的。
while (--remaining > 0 && // skip over nulls
(nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
;
return x;
} finally {
lock.unlock();
}
}
由 next 方法實現(xiàn)可以確定,這個iterator返回的是queue的快照元素,因為在并發(fā)的情況下,nextItem 記錄的元素很有可能已經(jīng)被消費,而 next 方法卻依舊會返回它。
這也說 iterator 是弱一致性的,iterator在循環(huán)過程中可以容忍并發(fā)地對 queue 進行修改,而不會拋出ConcurrentModificationException。
- 注意:
ArrayBlockingQueue類沒有重寫 addAll, containsAll, retainAll and removeAll 這四個批量操作方法,所有雖然其中的 add, contains 方法是原子操作,但是這些批量操作方法卻是通過循環(huán)來完成,所以它們并不是原子操作。