ArrayBlockingQueue

ArrayBlockingQueue

1、基于數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列(FIFO先進(jìn)先出隊(duì)列)
2、其數(shù)據(jù)結(jié)構(gòu)為數(shù)組,是一個(gè)環(huán)形數(shù)組結(jié)構(gòu)。
3、其實(shí)現(xiàn)了生產(chǎn)-消費(fèi)隊(duì)列功能
4、基于非公平鎖的鎖

一、對(duì)象屬性
      final Object[] items;// 存儲(chǔ)數(shù)據(jù)載體,環(huán)形數(shù)組(首尾相連)
      int takeIndex;// 下一個(gè)出隊(duì)列元素的數(shù)組下標(biāo)
      int putIndex;// 下一個(gè)入隊(duì)列元素的數(shù)組下標(biāo)
      int count;// 統(tǒng)計(jì)數(shù)組中的元素個(gè)數(shù)
      final ReentrantLock lock;// 鎖,保證隊(duì)列數(shù)據(jù)安全線(基于AQS、CAS)
      private final Condition notFull;// 條件鎖
      private final Condition notEmpty;// 條件鎖
二、構(gòu)造器
      // 有界隊(duì)列的"有界",即隊(duì)列最大容量
      public ArrayBlockingQueue(int capacity) {
         this(capacity, false);
      }
      // 默認(rèn)非公平鎖
      public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 默認(rèn)非公平鎖
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
     }

      public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
        this(capacity, fair);
        ...
        // (重要)  構(gòu)成了環(huán)形數(shù)組(元素等于數(shù)組長度的時(shí)候,從頭開始),記錄下一次入隊(duì)列的下標(biāo)
        putIndex = (i == capacity) ? 0 : i;
        ...
}
三、核心方法
       // 入隊(duì)列
      private void enqueue(E x) {
          final Object[] items = this.items;
          // putIndex為下一個(gè)入隊(duì)列元素的數(shù)組下標(biāo)
          items[putIndex] = x;
         // 生成下一個(gè)入隊(duì)列元素的下標(biāo)。隊(duì)列如果滿了,下標(biāo)從0開始,構(gòu)成了環(huán)形數(shù)組
          if (++putIndex == items.length)
              putIndex = 0;
          count++;// 統(tǒng)計(jì)數(shù)組元素?cái)?shù)量
          // 隊(duì)列增加數(shù)據(jù),喚醒因隊(duì)列為空而阻塞的線程
          notEmpty.signal();
     }
       // 出隊(duì)列
      private E dequeue() {
          final Object[] items = this.items;
        // takeIndex為下一個(gè)出隊(duì)列元素的數(shù)組下標(biāo)
          E x = (E) items[takeIndex];
          items[takeIndex] = null;// help GC
      // 生成下一個(gè)出隊(duì)列元素的下標(biāo),隊(duì)列滿了,下標(biāo)從0開始,構(gòu)成了環(huán)形數(shù)組
          if (++takeIndex == items.length)
              takeIndex = 0;
          count--;// 統(tǒng)計(jì)數(shù)組元素?cái)?shù)量
          if (itrs != null)
              itrs.elementDequeued();
          // 隊(duì)列減少數(shù)據(jù),喚醒因隊(duì)列滿了而阻塞的線程
          notFull.signal();
          return x;
      }
       // 隊(duì)列中刪除數(shù)據(jù)
      void removeAt(final int removeIndex) {
              final Object[] items = this.items;
              // 移除數(shù)據(jù)的下標(biāo)剛好等于出隊(duì)列的下標(biāo),按照出隊(duì)列方式處理
              if (removeIndex == takeIndex) {
                  items[takeIndex] = null;
                  if (++takeIndex == items.length)
                      takeIndex = 0;
                  count--;
                  if (itrs != null)
                      itrs.elementDequeued();
              } else {
                  final int putIndex = this.putIndex;
                  for (int i = removeIndex;;) {
                     // 移動(dòng)removeIndex后的數(shù)據(jù)位置,保持隊(duì)列的順序
                      int next = i + 1;
                      if (next == items.length)
                          next = 0;
                      if (next != putIndex) {
                          items[i] = items[next];
                          i = next;
                      } else {
                          items[i] = null;// 最后的數(shù)據(jù)置為null
                            this.putIndex = i;// putIndex - 1
                        break;
                      }
                  }
                  count--;
                  if (itrs != null)
                     itrs.removedAt(removeIndex);
              }  
              // 喚醒線程
              notFull.signal();
          }
   四、普通方法(都是基于核心方法之上的操作)
         // 入隊(duì)列
         public boolean add(E e) {
            return super.add(e);
        }
         // 抽象父類的方法,調(diào)用offer方法
         public boolean add(E e) {
            if (offer(e))
            return true;
            else// 隊(duì)列滿了會(huì)拋異常
            throw new IllegalStateException("Queue full");
        }
        // 入隊(duì)列,隊(duì)列滿了返回false
        public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          if (count == items.length)
            return false;
          else {
            enqueue(e);// 核心入隊(duì)列方法,外層判斷隊(duì)列是否滿了
            return true;
           }
        } finally {
        lock.unlock();
        }
      }
      // 入隊(duì)列
     public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();// 線程中途停止會(huì)拋異常
        try {// 隊(duì)列滿了,會(huì)阻塞,等待隊(duì)列消費(fèi)數(shù)據(jù)后,會(huì)喚醒noFull
            while (count == items.length)
                notFull.await();
            enqueue(e);// 核心入隊(duì)列方法,外層判斷隊(duì)列是否滿了
        } finally {
            lock.unlock();
        }
     }
    // 出隊(duì)列
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();// 出隊(duì)列核心方法
        } finally {
             lock.unlock();
        }
    }
    // 出隊(duì)列
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
       try {
         // 隊(duì)列為空,會(huì)進(jìn)行阻塞,等待隊(duì)列加入數(shù)據(jù),會(huì)喚醒notEmpty
           while (count == 0)
              notEmpty.await();
           return dequeue();// 出隊(duì)列核心方法
      } finally {
          lock.unlock();
      }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容