假如我們現(xiàn)在有這樣的需求,有一個(gè)倉(cāng)庫(kù),我們可以存東西和取東西,倉(cāng)庫(kù)有存儲(chǔ)上限。當(dāng)倉(cāng)庫(kù)已滿的時(shí)候,存東西的人就必須等待,直到有人取走東西。當(dāng)倉(cāng)庫(kù)為空的時(shí)候,取東西的人必須等待,直到有人存入東西。
這是一個(gè)典型的生產(chǎn)者消費(fèi)者問(wèn)題。這里有兩個(gè)條件,倉(cāng)庫(kù)已滿的條件和倉(cāng)庫(kù)為空的條件,用條件讓線程等待,這個(gè)讓我們想到了并發(fā)框架下的Condition。
自己實(shí)現(xiàn)這個(gè)功能也不難,但是java中給我們提供了很好的實(shí)現(xiàn),就是并發(fā)集合中的阻塞隊(duì)列BlockingQueue。
// 阻塞隊(duì)列的接口
public interface BlockingQueue<E> extends Queue<E> {}
阻塞隊(duì)列BlockingQueue繼承自Queue隊(duì)列接口。而Queue接口我們?cè)?a href="http://www.itdecent.cn/p/199eed7938cc" target="_blank">LinkedList(源碼解析)文章中詳細(xì)介紹過(guò)了,這里先回顧一下。
一. Queue接口
隊(duì)列是一種FIFO(先入先出)的數(shù)據(jù)結(jié)構(gòu),也就是說(shuō)每次插入元素都是插入在隊(duì)列尾,每次取出元素都是在隊(duì)列頭取出。
所以隊(duì)列應(yīng)該有三個(gè)重要方法:
- boolean offer(E e); 向隊(duì)列尾添加元素。
- E poll(); 移除隊(duì)列頭元素,并返回它。
- E peek(); 查看隊(duì)列頭元素。
下面我們來(lái)看一下Queue接口的代碼
public interface Queue<E> extends Collection<E> {
// 向隊(duì)列末尾新添加元素,返回true表示添加成功
// 不會(huì)返回false,因?yàn)樘砑邮≈苯訏伋鯥llegalStateException異常。
// 一般調(diào)用offer方法實(shí)現(xiàn)。
boolean add(E e);
// 向隊(duì)列末尾新添加元素,返回true表示添加成功,返回false,添加失敗
boolean offer(E e);
// 這個(gè)與Collection中的remove方法不一樣,因?yàn)镃ollection中的remove方法都要提供一個(gè)元素或者集合,用于刪除。
// 這里不穿任何參數(shù),就是代表刪除隊(duì)列第一個(gè)元素(即隊(duì)列頭),并返回它
// 還需要注意的時(shí),如果隊(duì)列是空的,即隊(duì)列頭是null,這個(gè)方法會(huì)拋出NoSuchElementException異常。
E remove();
// 這個(gè)方法也是刪除隊(duì)列第一個(gè)元素(即隊(duì)列頭),并返回它
// 但是它和remove()方法不同的時(shí),如果隊(duì)列是空的,即隊(duì)列頭是null,它不會(huì)拋出異常,而是會(huì)返回null。
E poll();
// 查看隊(duì)列頭的元素,如果隊(duì)列是空的,就拋出異常
E element();
// 查看隊(duì)列頭的元素。如果隊(duì)列是空的,不會(huì)拋出異常,而是返回null
E peek();
}
可以看出繼承自Collection接口,但是Queue接口還提供了是三個(gè)好像重復(fù)的方法。
- 向隊(duì)列尾添加元素的方法:add(E e)與offer(E e)。區(qū)別就是隊(duì)列是滿的,添加失敗時(shí),add方法會(huì)拋出異常,而offer方法只會(huì)返回false。
- 移除隊(duì)列頭元素的方法:remove()與poll()。區(qū)別就是隊(duì)列為空的時(shí)候,remove方法會(huì)拋出異常,poll方法只會(huì)返回null。
- 查看隊(duì)列頭元素的方法:element()與peek()。區(qū)別就是隊(duì)列為空的時(shí)候,element方法會(huì)拋出異常,peek方法只會(huì)返回null。
二. BlockingQueue接口
阻塞隊(duì)列BlockingQueue與普通隊(duì)列Queue相比較,不同有兩點(diǎn):
- 當(dāng)隊(duì)列已滿的時(shí)候,當(dāng)前線程會(huì)阻塞等待,直到隊(duì)列有空余位置,當(dāng)前線程會(huì)被喚醒,將元素插入隊(duì)列尾。
- 當(dāng)隊(duì)列為空的時(shí)候,當(dāng)前線程會(huì)阻塞等待,直到隊(duì)列中有元素,當(dāng)前線程會(huì)被喚醒,從隊(duì)列頭取出元素。
在BlockingQueue中有這樣兩個(gè)方法:
// 向隊(duì)列末尾新添加元素,如果隊(duì)列已滿,當(dāng)前線程就等待。響應(yīng)中斷異常
void put(E e) throws InterruptedException;
/**
* 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。響應(yīng)中斷異常
*/
E take() throws InterruptedException;
但是有這樣一個(gè)問(wèn)題,當(dāng)隊(duì)列已滿的時(shí)候,當(dāng)前線程會(huì)阻塞等待,它只能祈禱有別的線程取出隊(duì)列中的元素,如果一直沒有線程取走元素,那么當(dāng)前線程就會(huì)一直阻塞等待,整個(gè)過(guò)程對(duì)當(dāng)前線程是不可控的。
如果我們希望對(duì)等待時(shí)間有一定的控制,不能出現(xiàn)一直等待的現(xiàn)象,那么就要增加超時(shí)時(shí)間,所以要添加有超時(shí)時(shí)間的對(duì)應(yīng)方法。
/**
* 向隊(duì)列末尾新添加元素,如果隊(duì)列中沒有可用空間,當(dāng)前線程就等待,
* 如果等待時(shí)間超過(guò)timeout了,那么返回false,表示添加失敗
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。
* 如果等待時(shí)間超過(guò)timeout了,那么返回false,表示獲取元素失敗
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
所以與Queue接口進(jìn)行對(duì)比,BlockingQueue阻塞隊(duì)列的插入、移除和查看的方法不同:
向隊(duì)列尾添加元素的方法:
1). add(E e)與offer(E e)方法,和Queue接口作用一樣,只不過(guò)加了多線程同步安全的操作。
2). put(E e)方法,添加元素時(shí),當(dāng)前線程可能會(huì)阻塞等待。
3). offer(E e, long timeout, TimeUnit unit)方法,與put(E e)方法作用相同,只不過(guò)加了超時(shí)時(shí)間。(其實(shí)這個(gè)方法名用put更好,用offer容易讓人和offer(E e)方法產(chǎn)生混淆)移除隊(duì)列頭元素的方法:
1). remove()與poll()方法,和Queue接口作用一樣,只不過(guò)加了多線程同步安全的操作。(注:這個(gè)沒有在BlockingQueue接口中重新聲明)
2). take()方法,移除元素時(shí),當(dāng)前線程可能會(huì)阻塞等待。
3). poll(long timeout, TimeUnit unit) 方法,與take()方法作用相同,只不過(guò)加了超時(shí)時(shí)間。(其實(shí)這個(gè)方法名用take更好,用poll容易讓人和poll()方法產(chǎn)生混淆)查看隊(duì)列頭元素的方法:BlockingQueue接口沒有添加新的關(guān)于查看隊(duì)列頭的新方法,因?yàn)椴榭丛夭粫?huì)改變隊(duì)列,所以不需要增加阻塞的方法。還是element()與peek()這兩個(gè)方法。
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
// 阻塞隊(duì)列的接口
public interface BlockingQueue<E> extends Queue<E> {
/**
* 向隊(duì)列尾添加元素。如果隊(duì)列已滿,添加失敗,拋出IllegalStateException異常
*/
boolean add(E e);
// 向隊(duì)列末尾新添加元素。返回true表示添加成功,false表示添加失敗,不會(huì)拋出異常
boolean offer(E e);
// 向隊(duì)列末尾新添加元素,如果隊(duì)列已滿,當(dāng)前線程就等待。響應(yīng)中斷異常
void put(E e) throws InterruptedException;
/**
* 向隊(duì)列末尾新添加元素,如果隊(duì)列中沒有可用空間,當(dāng)前線程就等待,
* 如果等待時(shí)間超過(guò)timeout了,那么返回false,表示添加失敗
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。響應(yīng)中斷異常
*/
E take() throws InterruptedException;
/**
* 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。
* 如果等待時(shí)間超過(guò)timeout了,那么返回false,表示獲取元素失敗
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 返回隊(duì)列剩余空間的個(gè)數(shù)。(即還能添加多少個(gè)元素)
int remainingCapacity();
// 移除隊(duì)列中包含的所有o對(duì)象。返回true表示隊(duì)列有變化,即刪除了元素,返回false表示沒有刪除任何元素。
boolean remove(Object o);
// 隊(duì)列中是否包含對(duì)象o
public boolean contains(Object o);
/**
* 移除阻塞隊(duì)列中所有有效元素,把它們添加到另一個(gè)集合c中。返回值轉(zhuǎn)移的元素個(gè)數(shù)。
* @param c
* @return 實(shí)際轉(zhuǎn)移的元素個(gè)數(shù)。
*/
int drainTo(Collection<? super E> c);
/**
* 最多移除阻塞隊(duì)列中maxElements個(gè)數(shù)的有效元素,把它們添加到另一個(gè)集合c中。
* 返回值實(shí)際轉(zhuǎn)移的元素個(gè)數(shù)。
* @param c
* @param maxElements 最多添加的元素個(gè)數(shù)
* @return 實(shí)際轉(zhuǎn)移的元素個(gè)數(shù)。
*/
int drainTo(Collection<? super E> c, int maxElements);
}