ArrayBlockingQueue源碼分析

前言

ArrayBlockingQueue以數(shù)組的方式實(shí)現(xiàn)了BlockingQueue接口。其是線程安全的,提供了offer, put,plll,take,offer(E,long,TimeUnit),poll(E,long,TimeUnit)方法。

源碼分析

  /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

可以看到成員變量有一個(gè)數(shù)組Object[] ,ReentrantLock lock,Condition notEmpty, Condition notFull。

成員變量的作用

  • lock ,因?yàn)锳rrayBlockingQueue是線程安全的,需要一把鎖。
  • notEmpty 維護(hù)競(jìng)爭(zhēng)鎖的條件為ArrayBlockingQueue為非空的線程集合,當(dāng)線程運(yùn)行的條件為ArrayBlockingQueue為非空,但是此時(shí)條件為空時(shí),線程會(huì)被掛起,并線程會(huì)被添加到notEmpty中的隊(duì)列中。當(dāng)條件變?yōu)榉强諘r(shí),可以通知里面線程隊(duì)列,讓其開始競(jìng)爭(zhēng)鎖。
  • notFull 維護(hù)競(jìng)爭(zhēng)鎖的條件為ArrayBlockingQueue為有剩余空間的線程集合,當(dāng)線程運(yùn)行的條件為ArrayBlockingQueue為,但是此時(shí)條件為位有剩余空間時(shí),線程會(huì)被掛起,并線程會(huì)被添加到未有剩余空間中的隊(duì)列中。當(dāng)條件變?yōu)橛惺S嗫臻g時(shí),可以通知里面線程隊(duì)列,讓其開始競(jìng)爭(zhēng)鎖。

方法實(shí)現(xiàn)分析

  • offer(E e)
public boolean offer(E e) {
       checkNotNull(e);
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           if (count == items.length)
               return false;
           else {
               enqueue(e);
               return true;
           }
       } finally {
           lock.unlock();
       }
   }

將元素添加到隊(duì)列中,并且整個(gè)過程時(shí)線程安全的。添加之后將通知到notEmpty隊(duì)列。

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
  • poll()方法
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

從隊(duì)列中取出元素,線程安全,取出之后通知需要滿足有存儲(chǔ)空間的隊(duì)列。

 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
  • offer(E e, long timeout, TimeUnit unit)
    嘗試將元素插入到隊(duì)列尾部,直接指定的時(shí)間不再成功便返回。
  public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
  • public E poll(long timeout, TimeUnit unit)
    嘗試取出一個(gè)元素,直到指定的時(shí)間返回。
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

總結(jié)

利用兩個(gè)隊(duì)列(Condition)掛在條件不同的線程,不同的條件喚醒,比同一個(gè)隊(duì)列效率更高,減少了不必要的自旋鎖。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容