前言
ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界阻塞隊列,其大小在構(gòu)造函數(shù)來決定,確認之后就不能改變了,其內(nèi)部按照先進先出的原則對元素進行排序,其中put方法和take方法 為添加和刪除的阻塞方法。下面通過一個生產(chǎn)者-消費者模式來了解其使用方式:
生產(chǎn)者-消費者模型
public class BlockQueueDemo {
private static final BlockingQueue<Person> QUEUE = new ArrayBlockingQueue<>(50);
public static void main(String[] args) {
new Thread(new Producer(QUEUE)).start();
new Thread(new Consumer(QUEUE)).start();
System.out.println("主線程結(jié)束-----------------");
}
}
class Person {
private String name;
private String cardno;
public Person(String name, String cardno) {
this.name = name;
this.cardno = cardno;
}
@Override
public String toString() {
return "name:" + this.name + ",cardno:" + this.cardno;
}
}
class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
//阻塞放入隨機產(chǎn)生的對象Person
queue.put(new Person(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
System.out.println("Produce Person-------Queue Size : " + queue.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
//睡眠等待隊列被生產(chǎn)者填充完畢
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
try {
//阻塞獲取隊列中的Person對象
System.out.println("Consumer Person :" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ArrayBlockingQueue的線程阻塞是通過重入鎖ReenterLock和Condition條件隊列實現(xiàn)的, 所以ArrayBlockingQueue 阻塞等待隊列 存在公平訪問與非公平訪問的區(qū)別,對于公平訪問隊列,被阻塞的線程可以按照阻塞的先后順序訪問隊列,即先阻塞的線程先訪問隊列,而非公平鎖,當(dāng)隊列可用時,阻塞的線程將進入爭奪資源的競爭中,也就是誰先搶到誰先執(zhí)行,沒有固定的先后順序,
ArrayBlockingQueue原理概要
ArrayBlockingQueue 的內(nèi)部通過一個可重入鎖ReentrantLock 和兩個Condition 條件對象來實現(xiàn)阻塞,內(nèi)部成員變量如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//底層存儲數(shù)組
final Object[] items;
//獲取數(shù)據(jù)索引
int takeIndex;
//添加數(shù)據(jù)索引
int putIndex;
//隊列元素個數(shù)
int count;
//控制并發(fā)訪問鎖
final ReentrantLock lock;
//notEmpty條件對象,用于通知take方法隊列已有元素,可執(zhí)行獲取操作
private final Condition notEmpty;
//notFull條件對象,用于通知put方法隊列未滿,可執(zhí)行添加操作
private final Condition notFull;
//迭代器
transient Itrs itrs = null;
}
從成員變量上看,ArrayBlockingQueue 內(nèi)部通過數(shù)組對象items來存儲數(shù)據(jù)。通過ReentrantLock 來同時控制添加線程和移除線程的并發(fā)訪問。而對于notEmpty 條件對象則是用于存放等待或喚醒take 方法的線程。告訴他們隊列已有元素,可以執(zhí)行獲取元素。
下面分析一下 阻塞的添加方法:
public void put(E e) throws InterruptedException {
//判斷新增元素不為null
checkNotNull(e);
//獲取鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//該方法可中斷
try {
//元素個數(shù)等于數(shù)組長度無法添加
while (count == items.length)
//當(dāng)前線程掛起,添加到等待隊列等待喚起
notFull.await();
//直接進去隊列
enqueue(e);
} finally {
//釋放鎖
lock.unlock();
}
}
put 方法是一個阻塞方法,如果隊列元素已滿,那么當(dāng)前線程會被notFull 條件對象掛起加到等待隊列中,直到隊列有空才會喚起添加操作。

1.png