阻塞隊列(二)(ArrayBlockingQueue )

前言

ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界阻塞隊列,其大小在構(gòu)造函數(shù)來決定,確認之后就不能改變了,其內(nèi)部按照先進先出的原則對元素進行排序,其中put方法和take方法 為添加和刪除的阻塞方法。下面通過一個生產(chǎn)者-消費者模式來了解其使用方式:

生產(chǎn)者-消費者模型

public class BlockQueueDemo {

    private static final BlockingQueue<Person> QUEUE = new ArrayBlockingQueue<>(50);

    public static void main(String[] args) {
        new Thread(new Producer(QUEUE)).start();
        new Thread(new Consumer(QUEUE)).start();
        System.out.println("主線程結(jié)束-----------------");
    }
}

class Person {

    private String name;

    private String cardno;

    public Person(String name, String cardno) {
        this.name = name;
        this.cardno = cardno;
    }

    @Override
    public String toString() {
        return "name:" + this.name + ",cardno:" + this.cardno;
    }
}

class Producer implements Runnable {

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                //阻塞放入隨機產(chǎn)生的對象Person
                queue.put(new Person(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
                System.out.println("Produce Person-------Queue Size : " + queue.size());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {

    private BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            //睡眠等待隊列被生產(chǎn)者填充完畢
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            try {
                //阻塞獲取隊列中的Person對象
                System.out.println("Consumer Person :" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

ArrayBlockingQueue的線程阻塞是通過重入鎖ReenterLock和Condition條件隊列實現(xiàn)的, 所以ArrayBlockingQueue 阻塞等待隊列 存在公平訪問與非公平訪問的區(qū)別,對于公平訪問隊列,被阻塞的線程可以按照阻塞的先后順序訪問隊列,即先阻塞的線程先訪問隊列,而非公平鎖,當(dāng)隊列可用時,阻塞的線程將進入爭奪資源的競爭中,也就是誰先搶到誰先執(zhí)行,沒有固定的先后順序,

ArrayBlockingQueue原理概要

ArrayBlockingQueue 的內(nèi)部通過一個可重入鎖ReentrantLock 和兩個Condition 條件對象來實現(xiàn)阻塞,內(nèi)部成員變量如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
    //底層存儲數(shù)組
    final Object[] items;
    //獲取數(shù)據(jù)索引
    int takeIndex;
    //添加數(shù)據(jù)索引
    int putIndex;
    //隊列元素個數(shù)
    int count;
    //控制并發(fā)訪問鎖
    final ReentrantLock lock;
    //notEmpty條件對象,用于通知take方法隊列已有元素,可執(zhí)行獲取操作 
    private final Condition notEmpty;
    //notFull條件對象,用于通知put方法隊列未滿,可執(zhí)行添加操作
    private final Condition notFull;
    //迭代器
    transient Itrs itrs = null;
}

從成員變量上看,ArrayBlockingQueue 內(nèi)部通過數(shù)組對象items來存儲數(shù)據(jù)。通過ReentrantLock 來同時控制添加線程和移除線程的并發(fā)訪問。而對于notEmpty 條件對象則是用于存放等待或喚醒take 方法的線程。告訴他們隊列已有元素,可以執(zhí)行獲取元素。

下面分析一下 阻塞的添加方法:

  public void put(E e) throws InterruptedException {
      //判斷新增元素不為null
        checkNotNull(e);
      //獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//該方法可中斷
        try {
            //元素個數(shù)等于數(shù)組長度無法添加
            while (count == items.length)
                //當(dāng)前線程掛起,添加到等待隊列等待喚起
                notFull.await();
            //直接進去隊列
            enqueue(e);
        } finally {
            //釋放鎖
            lock.unlock();
        }
  }

put 方法是一個阻塞方法,如果隊列元素已滿,那么當(dāng)前線程會被notFull 條件對象掛起加到等待隊列中,直到隊列有空才會喚起添加操作。

1.png

其他方法的分析 同理。。。。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容