概述
blockQueue 作為線程容器、阻塞隊列,多用于生產(chǎn)者、消費者的關(guān)系模式中,保障并發(fā)編程線程同步,線程池中被用于當作存儲任務(wù)的隊列,還可以保證線程執(zhí)行的有序性。
常用方法
生產(chǎn)
- add(obj):往隊列里面增加一個對象,如果隊列沒有空間拋出異常,反之返回true。
- offer(obj): 往隊列增加一個對象,返回true/false
- put(obj): 往隊列增加一個對象,如果沒有空間,則會阻塞改線程,直到有空間.
消費
- poll(time):取出排在首位的對象,如果在一定時間內(nèi)沒有返回,則會返回null
- take():取出排在首位的對象,如果隊列中沒有數(shù)據(jù),則會阻塞該線程直到有數(shù)據(jù)。
查詢
- contains(obj):查詢是否存在某個元素,返回true/false
- peek():返回隊列頭部的元素,無則返回null
特點
- 容量有限,可以限定隊列的長度,如果沒有主動顯示隊列長度的情況下,默認長度為Integer.MAX_VALUE
- 內(nèi)存一致性,遵循h(huán)append-before原則,即寫操作總是先于后面的讀操作。參考資料 happend-before
- 因為其繼承Collection接口,所以可以使用集合的接口,但某些接口并不保證立即執(zhí)行,因為其內(nèi)部維護著內(nèi)部鎖(ReentrantLock),所以只有在獲取鎖的情況下才會執(zhí)行對應(yīng)的代碼,以remove()源碼為例:
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
每次操作都會去獲取鎖,如果鎖被其他操作暫用,沒有獲取到鎖,則只能去排隊,所以上面代碼并不會立即執(zhí)行。
常用分類
前言
創(chuàng)建隊列時,可以添加fair參數(shù),用于聲明內(nèi)部鎖是否是公平鎖,公平鎖用于決定隊列里面的任務(wù)是否會按照順訊執(zhí)行。
公平鎖:
顯式聲明為公平鎖的任務(wù)執(zhí)行完全按照隊列的順序執(zhí)行,新的任務(wù)進來會存放在隊尾。
非公平鎖:
隊列里面的任務(wù)可以按照順序執(zhí)行,但是新的任務(wù)可能會與隊列爭搶CPU資源,不保證隊列外的順序。
- ArrayBlockingQueue,創(chuàng)建固定大小的隊列,內(nèi)部維護一個數(shù)組,遵循FIFO原則
- LinkedBlockingQueue,可以自定義隊列長度,無指定的情況下默認為Integer.MAX_VALUE,內(nèi)部維護著一個鏈表,遵循著FIFO原則
- PriorityBlockingQueue,類似ArrayBlockingQueue,內(nèi)部維護一個數(shù)組,但并不按照FIFO原則,其內(nèi)部有個compare屬性決定隊列任務(wù)的執(zhí)行順序。
- SynchronousQueue,特殊的隊列,內(nèi)部無存儲空間維護隊列,只有當生產(chǎn)者和消費者同時存在時,才會執(zhí)行,類似與管道。
例子
- 生產(chǎn)者與消費者案例,一個生產(chǎn)者和多個消費者。
public class BlockQueueDemo {
/**
* 生產(chǎn)者
*/
static class Productor implements Runnable{
private BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Productor(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for(int i=0;i<100;i++){
try {
Thread.sleep(200);
blockingQueue.put(i);
System.out.println("生產(chǎn)者產(chǎn)品了產(chǎn)品"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費者
*/
static class Consumer implements Runnable{
public BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Consumer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while(true){
try {
String name = Thread.currentThread().getName();
Integer queueData = blockingQueue.take();
System.out.println("消費者"+name+"消費了產(chǎn)品"+queueData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 一個生產(chǎn)者對應(yīng)多個消費者,采用BlockQueue作為緩沖區(qū)
* @param args
*/
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Productor productor = new Productor(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
Consumer consumer2 = new Consumer(blockingQueue);
new Thread(productor).start();
new Thread(consumer).start();
new Thread(consumer2).start();
}
}
執(zhí)行結(jié)果:
消費者Thread-2消費了產(chǎn)品21
生產(chǎn)者產(chǎn)品了產(chǎn)品22
消費者Thread-1消費了產(chǎn)品22
生產(chǎn)者產(chǎn)品了產(chǎn)品23
消費者Thread-2消費了產(chǎn)品23
生產(chǎn)者產(chǎn)品了產(chǎn)品24
消費者Thread-1消費了產(chǎn)品24
生產(chǎn)者產(chǎn)品了產(chǎn)品25
消費者Thread-2消費了產(chǎn)品25
生產(chǎn)者產(chǎn)品了產(chǎn)品26
消費者Thread-1消費了產(chǎn)品26
生產(chǎn)者產(chǎn)品了產(chǎn)品27
消費者Thread-2消費了產(chǎn)品27