Java并發(fā)集合_ArrayBlockingQueue原理分析

上一章中,我們介紹了阻塞隊列BlockingQueue,下面我們介紹它的常用實現(xiàn)類ArrayBlockingQueue。

一. 用數(shù)組來實現(xiàn)隊列

因為隊列這種數(shù)據(jù)結(jié)構(gòu)的特殊要求,所以它天然適合用鏈表的方式來實現(xiàn),用兩個變量分別記錄鏈表頭和鏈表尾,當(dāng)刪除或插入隊列時,只要改變鏈表頭或鏈表尾就可以了,而且鏈表使用引用的方式鏈接的,所以它的容量幾乎是無限的。
那么怎么使用數(shù)組來實現(xiàn)隊列,我們需要四個變量:Object[] array來存儲隊列中元素,headIndex和tailIndex分別記錄隊列頭和隊列尾,count記錄隊列的個數(shù)。

  1. 因為數(shù)組的長度是固定,所以當(dāng)count==array.length時,表示隊列已經(jīng)滿了,當(dāng)count==0的時候,表示隊列是空的。
  2. 當(dāng)添加元素的時候,將array[tailIndex] = e將tailIndex位置設(shè)置成新元素,之后將tailIndex++自增,然后將count++自增。但是有兩點需要注意,在添加之前必須先判斷隊列是否已滿,不然會出現(xiàn)覆蓋已有元素。當(dāng)tailIndex的值等于數(shù)組最后一個位置的時候,需要將tailIndex=0,循環(huán)利用數(shù)組
  3. 當(dāng)刪除元素的時候,將先記錄下array[headIndex] 元素,之后將headIndex++自增,然后將count--自減。但是有兩點需要注意要注意,在刪除之前,必須先判斷隊列是否為空,不然可能會刪除已刪除的元素。

這里用了一個很巧妙的方式,我們知道當(dāng)向隊列中插入一個元素,那么就占用了數(shù)組的一個位置,當(dāng)刪除一個元素的時候,那么其實數(shù)組的這個位置就空閑出來了,表示這個位置又可以插入新元素了。
所以我們插入新元素前,必須檢查隊列是否已滿,刪除元素之前,必須檢查隊列是否為空。

二. ArrayBlockingQueue中重要成員變量

 /** 儲存隊列的中元素 */
    final Object[] items;

    /** 隊列頭的位置 */
    int takeIndex;

    /** 隊列尾的位置 */
    int putIndex;

    /** 當(dāng)前隊列擁有的元素個數(shù) */
    int count;

    /** 用來保證多線程操作共享變量的安全問題 */
    final ReentrantLock lock;

    /** 當(dāng)隊列為空時,就會調(diào)用notEmpty的wait方法,讓當(dāng)前線程等待 */
    private final Condition notEmpty;

    /** 當(dāng)隊列為滿時,就會調(diào)用notFull的wait方法,讓當(dāng)前線程等待 */
    private final Condition notFull;

就是多了lock、notEmpty、notFull變量來實現(xiàn)多線程安全和線程等待條件的,它們?nèi)齻€是怎么操作的呢?

2.1 lock的作用

因為ArrayBlockingQueue是在多線程下操作的,所以修改items、takeIndex、putIndex和count這些成員變量時,必須要考慮多線程安全問題,所以這里使用lock獨占鎖,來保證并發(fā)操作的安全。

2.2 notEmpty與notFull的作用

因為阻塞隊列必須實現(xiàn),當(dāng)隊列為空或隊列已滿的時候,隊列的讀取或插入操作要等待。所以我們想到了并發(fā)框架下的Condition對象,使用它來控制。
AQS中,我們介紹了這個類的作用:

  1. await系列方法,會釋放當(dāng)前鎖,并讓當(dāng)前線程等待。
  2. signal與signalAll方法,會喚醒當(dāng)前線程。其實它并不是喚醒當(dāng)前線程,而是將在這個Condition條件上等待的線程,添加到lock鎖上的等待線程池中,所以當(dāng)鎖被釋放時,會喚醒lock鎖上的等待線程池中一個線程。具體在AQS中有源碼分析。

三. 添加元素方法

3.1 add(E e)與offer(E e)方法:

   // 調(diào)用AbstractQueue父類中的方法。
   public boolean add(E e) {
        // 通過調(diào)用offer來時實現(xiàn)
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    //向隊列末尾新添加元素。返回true表示添加成功,false表示添加失敗,不會拋出異常
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lock();
        try {
            // 隊列已滿,添加元素失敗,返回false。
            if (count == items.length)
                return false;
            else {
                // 調(diào)用enqueue方法將元素插入隊列中
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

add方法是調(diào)用offer方法實現(xiàn)的。在offer方法中,必須先判斷隊列是否已滿,如果已滿就直接返回false,而不會阻塞當(dāng)前線程。如果不滿就調(diào)用enqueue方法將元素插入隊列中。

注意:這里使用lock.lock()是保證同一時間只有一個線程修改成員變量,防止出現(xiàn)并發(fā)操作問題。雖然它也會阻塞當(dāng)前線程,但是它并不是條件等待,只是因為鎖被其他線程持有,而ArrayBlockingQueue中方法操作時間都不長,這里相當(dāng)于不阻塞線程。

3.2 put方法

    // 向隊列末尾新添加元素,如果隊列已滿,當(dāng)前線程就等待。響應(yīng)中斷異常
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lockInterruptibly();
        try {
            // 隊列已滿,則調(diào)用notFull.await()方法,讓當(dāng)前線程等待,直到隊列不是滿的
            while (count == items.length)
                notFull.await();
            // 調(diào)用enqueue方法將元素插入隊列中
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

與offer方法大體流程一樣,只是當(dāng)隊列已滿的時候,會調(diào)用notFull.await()方法,讓當(dāng)前線程阻塞等待,直到隊列被別的線程移除了元素,隊列不滿的時候,會喚醒這個等待線程。

3.3 offer(E e, long timeout, TimeUnit unit)方法

/**
     * 向隊列末尾新添加元素,如果隊列中沒有可用空間,當(dāng)前線程就等待,
     * 如果等待時間超過timeout了,那么返回false,表示添加失敗
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        // 計算一共最多等待的時間值nanos
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lockInterruptibly();
        try {
            // 隊列已滿
            while (count == items.length) {
                // nanos <= 0表示最大等待時間已到,那么不用再等待了,返回false,表示添加失敗。
                if (nanos <= 0)
                    return false;
                // 調(diào)用notFull.awaitNanos(nanos)方法,超時nanos時間會被自動喚醒,
                // 如果被提前喚醒,那么返回剩余的時間
                nanos = notFull.awaitNanos(nanos);
            }
            // 調(diào)用enqueue方法將元素插入隊列中
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

與put的方法大體流程一樣,只不過是調(diào)用notFull.awaitNanos(nanos)方法,讓當(dāng)前線程等待,并設(shè)置等待時間。

四. 刪除元素方法

4.1 remove()和poll()方法:

   // 調(diào)用AbstractQueue父類中的方法。
    public E remove() {
        // 通過調(diào)用poll來時實現(xiàn)
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

// 刪除隊列第一個元素(即隊列頭),并返回它。如果隊列是空的,它不會拋出異常,而是會返回null。
    public E poll() {
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lock();
        try {
            // 如果count == 0,列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

remove方法是調(diào)用poll()方法實現(xiàn)的。在 poll()方法中,如果列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素。

4.2 take()方法

    /**
     * 返回并移除隊列第一個元素,如果隊列是空的,就前線程就等待。響應(yīng)中斷異常
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lockInterruptibly();
        try {
            // 如果隊列為空,就調(diào)用notEmpty.await()方法,讓當(dāng)前線程等待。
            // 直到有別的線程向隊列中插入元素,那么這個線程會被喚醒。
            while (count == 0)
                notEmpty.await();
            // 調(diào)用dequeue方法,返回列表頭元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

take()方法當(dāng)隊列為空的時候,當(dāng)前線程必須等待,直到有別的線程向隊列中插入新元素,那么這個線程會被喚醒。

4.3 poll(long timeout, TimeUnit unit)方法

    /**
     * 返回并移除隊列第一個元素,如果隊列是空的,就前線程就等待。
     * 如果等待時間超過timeout了,那么返回false,表示獲取元素失敗
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // 計算一共最多等待的時間值nanos
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lockInterruptibly();
        try {
            // 隊列為空
            while (count == 0) {
                // nanos <= 0表示最大等待時間已到,那么不用再等待了,返回null。
                if (nanos <= 0)
                    return null;
                // 調(diào)用notEmpty.awaitNanos(nanos)方法讓檔期線程等待,并設(shè)置超時時間。
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 調(diào)用dequeue方法,返回列表頭元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

與take()方法流程一樣,只不過調(diào)用awaitNanos(nanos)方法,讓當(dāng)前線程等待,并設(shè)置等待時間。

五. 查看元素的方法

5.1 element()與peek() 方法

    // 調(diào)用AbstractQueue父類中的方法。
    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    // 查看隊列頭元素
    public E peek() {
        final ReentrantLock lock = this.lock;
        // 使用lock來保證,多線程修改成員屬性的安全
        lock.lock();
        try {
            // 返回當(dāng)前隊列頭的元素
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

六. 其他重要方法

6.1 enqueue和dequeue方法

    // 將元素x插入隊列尾
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null; //當(dāng)前putIndex位置元素一定是null
        final Object[] items = this.items;
        items[putIndex] = x;
        // 如果putIndex等于items.length,那么將putIndex重新設(shè)置為0
        if (++putIndex == items.length)
            putIndex = 0;
        // 隊列數(shù)量加一
        count++;
        // 因為插入一個元素,那么當(dāng)前隊列肯定不為空,那么喚醒在notEmpty條件下等待的一個線程
        notEmpty.signal();
    }

    // 刪除隊列頭的元素,返回它
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;

        final Object[] items = this.items;
        // 得到當(dāng)前隊列頭的元素
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        // 將當(dāng)前隊列頭位置設(shè)置為null
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 隊列數(shù)量減一
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 因為刪除了一個元素,那么隊列肯定不滿了,那么喚醒在notFull條件下等待的一個線程
        notFull.signal();
        return x;
    }

這兩個方法分別代表,向隊列中插入元素和從隊列中刪除元素。而且它們要喚醒等待的線程。當(dāng)插入元素后,那么隊列一定不為空,那么就要喚醒在notEmpty條件下等待的線程。當(dāng)刪除元素后,那么隊列一定不滿,那么就要喚醒在notFull條件下等待的線程。

6.2 remove(Object o)方法

    // 刪除隊列中對象o元素,最多刪除一條
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        // 使用lock來保證,多線程修改成員屬性的安全
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 當(dāng)隊列中有值的時候,才進(jìn)行刪除。
            if (count > 0) {
                // 隊列尾下一個位置
                final int putIndex = this.putIndex;
                // 隊列頭的位置
                int i = takeIndex;
                do {
                    // 當(dāng)前位置元素與被刪除元素相同
                    if (o.equals(items[i])) {
                        // 刪除i位置元素
                        removeAt(i);
                        // 返回true
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                    // 當(dāng)i==putIndex表示遍歷完所有元素
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

從隊列中刪除指定對象o,那么就要遍歷隊列,刪除第一個與對象o相同的元素,如果隊列中沒有對象o元素,那么返回false刪除失敗。
這里有兩點需要注意:

  1. 如何遍歷隊列,就是從隊列頭遍歷到隊列尾。就要靠takeIndex和putIndex兩個變量了。
  2. 為什么Object[] items = this.items;這句代碼沒有放到同步鎖lock代碼塊內(nèi)。items是成員變量,那么多線程操作的時候,不會有并發(fā)問題么?
    這個是因為items是個引用變量,不是基本數(shù)據(jù)類型,而且我們對隊列的插入和刪除操作,都是針對這一個items數(shù)組,沒有改變數(shù)組的引用,所以在lock代碼中,items會得到其他線程對它最新的修改。但是如果這里將int putIndex = this.putIndex;方法lock代碼塊外面,就會產(chǎn)生問題。

removeAt(final int removeIndex)方法

    // 刪除隊列removeIndex位置的元素
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        // 表示刪除元素是列表頭,就容易多了,與dequeue方法流程差不多
        if (removeIndex == takeIndex) {
            // 移除removeIndex位置元素
            items[takeIndex] = null;
            // 到了數(shù)組末尾,就要轉(zhuǎn)到數(shù)組頭位置
            if (++takeIndex == items.length)
                takeIndex = 0;
            // 隊列數(shù)量減一
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                // 還沒有到隊列尾,那么就將后一個位置元素覆蓋前一個位置的元素
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    // 將隊列尾元素置位null
                    items[i] = null;
                    // 重新設(shè)置putIndex的值
                    this.putIndex = i;
                    break;
                }
            }
            // 隊列數(shù)量減一
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        // 因為刪除了一個元素,那么隊列肯定不滿了,那么喚醒在notFull條件下等待的一個線程
        notFull.signal();
    }

在隊列中刪除指定位置的元素。需要注意的是刪除之后的數(shù)組還能保持隊列形式,分為兩種情況:

  1. 如果刪除位置是隊列頭,那么簡單,只需要將隊列頭的位置元素設(shè)置為null,將將隊列頭位置加一。
  2. 如果刪除位置不是隊列頭,那么麻煩了,這個時候,我們就要將從removeIndex位置后的元素全部左移一位,覆蓋前一個元素。最后將原來隊列尾的元素置位null。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容