簡(jiǎn)單介紹
ArrayBlockingQueue 是基于數(shù)組的有界阻塞隊(duì)列。
-
有界
指它不能夠存儲(chǔ)無(wú)限多數(shù)量的元素,在創(chuàng)建ArrayBlockingQueue時(shí),必須要給它指定一個(gè)隊(duì)列的大小 -
阻塞
指在添加 / 取走元素時(shí),當(dāng)隊(duì)列 沒有空間 / 為空的時(shí)候會(huì)阻塞,知道隊(duì)列有空間 / 有新的元素加入時(shí)再繼續(xù)
源碼解讀
屬性
- 隊(duì)列集合,是一個(gè)數(shù)組,用來(lái)存放元素
/** The queued items */
final Object[] items;
- 調(diào)用
take,poll,peek或者remove方法所取元素的下標(biāo)位置
/** items index for next take, poll, peek or remove */
int takeIndex;
- 調(diào)用
put,offer或者add方法添加元素時(shí),所添加的位置
/** items index for next put, offer, or add */
int putIndex;
- 隊(duì)列的所有元素?cái)?shù)目
/** Number of elements in the queue */
int count;
- 全局鎖,掌管所有訪問操作
/** Main lock guarding all access */
final ReentrantLock lock;
- 取元素操作的等待條件,如果隊(duì)列中沒有元素,會(huì)調(diào)用
notEmpty.await()方法讓當(dāng)前線程處于等待狀態(tài)
/** Condition for waiting takes */
private final Condition notEmpty;
- 新增元素操作的等待條件,如果隊(duì)列中已經(jīng)滿元素,會(huì)調(diào)用
notFull.await()方法讓當(dāng)前線程處于等待狀態(tài)
/** Condition for waiting puts */
private final Condition notFull;
- 這個(gè)
Itrs是迭代器和隊(duì)列之間數(shù)據(jù)共享的工具類(這塊代碼太多了有空再看把 - -||| )
transient Itrs itrs = null;
其他方法
- 參數(shù)自減的方法,就是當(dāng)傳入的參數(shù)
i為0時(shí)返回 數(shù)組長(zhǎng)度減1,否則返回i減1
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
- 返回對(duì)應(yīng)位置上的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
- 參數(shù)
v的非空判斷
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
核心方法
向隊(duì)列中添加元素
add(e) 、offer(e) 和 put(e) 都是添加元素的方法, add(e) 和 offer(e) 是無(wú)阻塞的添加, put(e) 是阻塞添加
-
add(e)方法,實(shí)際上調(diào)用了offer(e)方法
public boolean add(E e) {
return super.add(e);
}
-
offe(e)方法,將元素添加到BlockingQueue里,如果可以容納返回true否則返回false
public boolean offer(E e) {
checkNotNull(e); // 檢查元素是否為 null
final ReentrantLock lock = this.lock;
lock.lock(); // 加鎖
try {
if (count == items.length) // 如果隊(duì)列已經(jīng)滿了返回 false
return false;
else { // 隊(duì)列還沒有滿,則添加到隊(duì)列中
enqueue(e); // 進(jìn)隊(duì)
return true;
}
} finally {
lock.unlock(); // 釋放鎖
}
}
-
put(e)方法,將元素添加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻塞,直到BlockingQueue里面有空間
public void put(E e) throws InterruptedException {
checkNotNull(e); // 檢查元素是否為 null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加鎖
try {
while (count == items.length)
notFull.await(); //如果隊(duì)列已經(jīng)滿了,就阻塞(添加到 notFull 條件隊(duì)列中等待喚醒)
enqueue(e); // 如果隊(duì)列沒有滿直接添加
} finally {
lock.unlock(); // 釋放鎖
}
}
-
enqueue進(jìn)隊(duì)操作
private void enqueue(E x) {
// 獲取當(dāng)前數(shù)組
final Object[] items = this.items;
// 通過索引賦值
items[putIndex] = x;
// 如果當(dāng)前添加對(duì)象的位置 +1 等于 數(shù)組的長(zhǎng)度,也就是當(dāng)前對(duì)象的位置在數(shù)組的最后一個(gè)
// 那么下一個(gè)應(yīng)該從數(shù)組的第一個(gè)添加
if (++putIndex == items.length)
putIndex = 0;
count++;
// 喚醒正在等待獲取對(duì)象的線程
notEmpty.signal();
}
- 除了以上三種常用的添加方法之外,還有個(gè)帶超時(shí)時(shí)間的添加方法
offer(e, timeout, unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
從隊(duì)列中取元素
-
poll()方法,取隊(duì)頭(首個(gè))元素并刪除,沒有則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊(duì)列中有元素,則執(zhí)行 dequeue 操作,否則返回 null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
-
take()方法,如果隊(duì)列中有元素,則獲取并刪除,如果沒有元素,則阻塞等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 如果隊(duì)列中沒有元素,則添加到 notEmpty 條件隊(duì)列中等待
return dequeue();
} finally {
lock.unlock();
}
}
-
poll(timeout, unit)帶阻塞超時(shí)的取首個(gè)元素的方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
-
peek()方法,只取不刪,當(dāng)隊(duì)列中沒有元素時(shí)會(huì)返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
-
dequeue出隊(duì)操作
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 根據(jù)索引獲取對(duì)象
E x = (E) items[takeIndex];
// 當(dāng)前位置的對(duì)象被取走,位置就騰出來(lái)了
items[takeIndex] = null;
// 如果被取走的是數(shù)組的最后一個(gè),那下一個(gè)要從第一個(gè)取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒正在等待添加對(duì)象的線程
notFull.signal();
return x;
}
刪除隊(duì)列中某個(gè)元素
-
remove(o)方法,如果隊(duì)列為空或者沒有找到該元素返回false,否則刪除元素并且返回true
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex; // 從取得位置開始,到添加的位置
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length) // 若果判斷的位置到了隊(duì)列最末尾,那下一個(gè)從第一個(gè)判斷
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
-
removeAt(index)方法,刪除指定位置上的元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) { // 當(dāng)刪除的元素是下次取操作要取到的元素時(shí),既隊(duì)頭元素
items[takeIndex] = null; // 刪除隊(duì)頭元素,并且 takeIndex 加 1
if (++takeIndex == items.length) // 如果刪除得是數(shù)組最后一個(gè)元素,則 takeIndex 從數(shù)組第一個(gè)元素開始
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// 如果刪除的不是隊(duì)頭元素
// 則從刪除元素的后面一直到添加元素的位置(removeIndex ~ putIndex)期間的元素都要往前挪一個(gè)位置
// 取 putIndex 作為循環(huán)結(jié)束判斷條件
final int putIndex = this.putIndex;
for (int i = removeIndex;;) { // 順序往前挪一個(gè)位置
int next = i + 1;
if (next == items.length)// 當(dāng)循環(huán)到數(shù)組最后一個(gè)元素,下一個(gè)元素應(yīng)該是數(shù)組第一個(gè)元素
next = 0;
if (next != putIndex) { // 如果查找的索引不等于要添加元素的索引,說明元素可以再移動(dòng)
items[i] = items[next];
i = next;
} else { // 在 removeIndex 索引之后的元素都往前移動(dòng)完畢后清空最后一個(gè)元素
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
啊,累了,不寫了 
汪