ArrayBlockingQueue

簡(jiǎn)單介紹

ArrayBlockingQueue 是基于數(shù)組的有界阻塞隊(duì)列。

  • 有界
    指它不能夠存儲(chǔ)無(wú)限多數(shù)量的元素,在創(chuàng)建 ArrayBlockingQueue 時(shí),必須要給它指定一個(gè)隊(duì)列的大小
  • 阻塞
    指在添加 / 取走元素時(shí),當(dāng)隊(duì)列 沒有空間 / 為空的時(shí)候會(huì)阻塞,知道隊(duì)列有空間 / 有新的元素加入時(shí)再繼續(xù)
源碼解讀

屬性

  • 隊(duì)列集合,是一個(gè)數(shù)組,用來(lái)存放元素
/** The queued items */
final Object[] items;
  • 調(diào)用 takepoll, peek 或者 remove 方法所取元素的下標(biāo)位置
/** items index for next take, poll, peek or remove */
int takeIndex;
  • 調(diào)用 put, offer 或者 add 方法添加元素時(shí),所添加的位置
/** items index for next put, offer, or add */
int putIndex;
  • 隊(duì)列的所有元素?cái)?shù)目
/** Number of elements in the queue */
int count;
  • 全局鎖,掌管所有訪問操作
/** Main lock guarding all access */
final ReentrantLock lock;
  • 取元素操作的等待條件,如果隊(duì)列中沒有元素,會(huì)調(diào)用 notEmpty.await() 方法讓當(dāng)前線程處于等待狀態(tài)
/** Condition for waiting takes */
private final Condition notEmpty;
  • 新增元素操作的等待條件,如果隊(duì)列中已經(jīng)滿元素,會(huì)調(diào)用 notFull.await() 方法讓當(dāng)前線程處于等待狀態(tài)
/** Condition for waiting puts */
private final Condition notFull;
  • 這個(gè) Itrs 是迭代器和隊(duì)列之間數(shù)據(jù)共享的工具類(這塊代碼太多了有空再看把 - -||| )
transient Itrs itrs = null;

其他方法

  • 參數(shù)自減的方法,就是當(dāng)傳入的參數(shù) i0 時(shí)返回 數(shù)組長(zhǎng)度減 1,否則返回 i1
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}
  • 返回對(duì)應(yīng)位置上的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
    return (E) items[i];
}
  • 參數(shù) v 的非空判斷
private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
核心方法

向隊(duì)列中添加元素
add(e) 、offer(e)put(e) 都是添加元素的方法, add(e)offer(e) 是無(wú)阻塞的添加, put(e) 是阻塞添加

  • add(e) 方法,實(shí)際上調(diào)用了 offer(e) 方法
public boolean add(E e) {
    return super.add(e);
}
  • offe(e) 方法,將元素添加到 BlockingQueue 里,如果可以容納返回 true 否則返回 false
public boolean offer(E e) {
    checkNotNull(e); // 檢查元素是否為 null
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖
    try {
        if (count == items.length) // 如果隊(duì)列已經(jīng)滿了返回 false
            return false;
        else { // 隊(duì)列還沒有滿,則添加到隊(duì)列中
            enqueue(e); // 進(jìn)隊(duì)
            return true;
        }
    } finally {
        lock.unlock(); // 釋放鎖
    }
}
  • put(e) 方法,將元素添加到 BlockingQueue 里,如果 BlockQueue 沒有空間,則調(diào)用此方法的線程被阻塞,直到 BlockingQueue 里面有空間
public void put(E e) throws InterruptedException {
    checkNotNull(e); // 檢查元素是否為 null
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加鎖
    try {
        while (count == items.length)
            notFull.await(); //如果隊(duì)列已經(jīng)滿了,就阻塞(添加到 notFull 條件隊(duì)列中等待喚醒)
        enqueue(e); // 如果隊(duì)列沒有滿直接添加
    } finally {
        lock.unlock(); // 釋放鎖
    }
}
  • enqueue 進(jìn)隊(duì)操作
private void enqueue(E x) {
    // 獲取當(dāng)前數(shù)組
    final Object[] items = this.items;
    // 通過索引賦值
    items[putIndex] = x;
    // 如果當(dāng)前添加對(duì)象的位置 +1 等于 數(shù)組的長(zhǎng)度,也就是當(dāng)前對(duì)象的位置在數(shù)組的最后一個(gè)
    // 那么下一個(gè)應(yīng)該從數(shù)組的第一個(gè)添加
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 喚醒正在等待獲取對(duì)象的線程
    notEmpty.signal();
}
  • 除了以上三種常用的添加方法之外,還有個(gè)帶超時(shí)時(shí)間的添加方法 offer(e, timeout, unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

從隊(duì)列中取元素

  • poll() 方法,取隊(duì)頭(首個(gè))元素并刪除,沒有則返回 null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果隊(duì)列中有元素,則執(zhí)行 dequeue 操作,否則返回 null
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
  • take() 方法,如果隊(duì)列中有元素,則獲取并刪除,如果沒有元素,則阻塞等待
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); 
    try {
        while (count == 0)
            notEmpty.await(); // 如果隊(duì)列中沒有元素,則添加到 notEmpty 條件隊(duì)列中等待
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  • poll(timeout, unit)帶阻塞超時(shí)的取首個(gè)元素的方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  • peek()方法,只取不刪,當(dāng)隊(duì)列中沒有元素時(shí)會(huì)返回 null
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}
  • dequeue 出隊(duì)操作
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 根據(jù)索引獲取對(duì)象
    E x = (E) items[takeIndex];
    // 當(dāng)前位置的對(duì)象被取走,位置就騰出來(lái)了
    items[takeIndex] = null;
    // 如果被取走的是數(shù)組的最后一個(gè),那下一個(gè)要從第一個(gè)取
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 喚醒正在等待添加對(duì)象的線程
    notFull.signal();
    return x;
}

刪除隊(duì)列中某個(gè)元素

  • remove(o) 方法,如果隊(duì)列為空或者沒有找到該元素返回 false,否則刪除元素并且返回 true
public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex; // 從取得位置開始,到添加的位置
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length) // 若果判斷的位置到了隊(duì)列最末尾,那下一個(gè)從第一個(gè)判斷
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
  • removeAt(index) 方法,刪除指定位置上的元素
void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) { // 當(dāng)刪除的元素是下次取操作要取到的元素時(shí),既隊(duì)頭元素
        items[takeIndex] = null;  // 刪除隊(duì)頭元素,并且 takeIndex 加 1
        if (++takeIndex == items.length) // 如果刪除得是數(shù)組最后一個(gè)元素,則 takeIndex 從數(shù)組第一個(gè)元素開始
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // 如果刪除的不是隊(duì)頭元素
        // 則從刪除元素的后面一直到添加元素的位置(removeIndex ~ putIndex)期間的元素都要往前挪一個(gè)位置
        // 取 putIndex 作為循環(huán)結(jié)束判斷條件
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) { // 順序往前挪一個(gè)位置
            int next = i + 1;
            if (next == items.length)// 當(dāng)循環(huán)到數(shù)組最后一個(gè)元素,下一個(gè)元素應(yīng)該是數(shù)組第一個(gè)元素
                next = 0;
            if (next != putIndex) { // 如果查找的索引不等于要添加元素的索引,說明元素可以再移動(dòng)
                items[i] = items[next];
                i = next;
            } else { // 在 removeIndex 索引之后的元素都往前移動(dòng)完畢后清空最后一個(gè)元素
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

啊,累了,不寫了
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容