導(dǎo)讀目錄:
- 傳統(tǒng)的線程通信
- 使用Condition控制線程通信
- 使用阻塞隊(duì)列(BlockingQueue)控制線程通信
線程的調(diào)度具有一定的透明性,程序通常無法準(zhǔn)確控制線程的輪換執(zhí)行,但Java也提供了一些機(jī)制來保證線程協(xié)調(diào)運(yùn)行
1.傳統(tǒng)的線程通信
要實(shí)現(xiàn)不同線程之間彼此通信,可借助Object類的方法:
public final native void notify();
public final native void notifyAll();
public final native void wait(long timeout) throws InterruptedException; //等待timeout時(shí)間后自動(dòng)蘇醒
public final void wait(long timeout, int nanos)throws InterruptedException {...} //等待timeout時(shí)間后自動(dòng)蘇醒
public final void wait() throws InterruptedException {}//一直等待
wait():導(dǎo)致當(dāng)前線程等待(會(huì)釋放對(duì)同步監(jiān)視器的鎖),直到其他線程調(diào)用該同步監(jiān)視器的notify()方法或notifyAll()方法來喚醒該線程。
notify():喚醒正在此同步監(jiān)視器上等待的單個(gè)線程。如果所有的線程都在此同步監(jiān)視器上等待,則會(huì)喚醒其中一個(gè)線程,選擇是任意的。
notifyAll():喚醒在此同步監(jiān)視器上等待的所有線程,只有當(dāng)前線程放棄對(duì)該同步監(jiān)視器的鎖定,才可以執(zhí)行被喚醒的線程。
事例
//取錢
public synchronized void draw(double drawAmount) {
try {
// 如果flag為假,表明賬戶中還沒有人存錢進(jìn)去,取錢方法阻塞
if (!flag) {
wait();
} else {
// 執(zhí)行取錢
...
flag = false;
// 喚醒其他線程
notifyAll();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
//存錢
public synchronized void deposit(double depositAmount){
try {
// 如果flag為真,表明賬戶中已有人存錢進(jìn)去,則存錢方法阻塞
if (flag) {
wait();
} else {
// 執(zhí)行存款
...
flag = true;
// 喚醒其他線程
notifyAll();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
2.使用Condition控制線程通信
如果不使用synchronized的關(guān)鍵字來保證同步,而使直接使用Lock對(duì)象來保證同步,則系統(tǒng)中就不存在隱式的同步監(jiān)視器,也就不能使用wait(),notify(),notifyAll()方法來通信了
使用Lock來保證同步時(shí),需要借助Condition類來協(xié)調(diào)通信。Condition可以讓那些已經(jīng)得到Lock對(duì)象卻無法繼續(xù)執(zhí)行的線程釋放Lock對(duì)象,也可以喚醒其他處于等待狀態(tài)的線程
Condition實(shí)例被綁定在一個(gè)Lock對(duì)象上,通過Lock對(duì)象來獲得Condition對(duì)象,該類提供了如下三類方法:
void await()
boolean await(long time, TimeUnit unit)
long awaitNanos(long nanosTimeout)
void awaitUninterruptibly()
boolean awaitUntil(Date deadline)
void signal()
void signalAll()
(1)await():類似于隱式同步監(jiān)視器的wait()方法,導(dǎo)致當(dāng)前線程等待
(2)signal():喚醒在此Lock對(duì)象上等待的當(dāng)個(gè)線程。如果有多個(gè)線程在該Lock上等待,則會(huì)選擇喚醒其中一個(gè)線程,選擇是任意的
(3)signalAll():喚醒在此Lock對(duì)象上等到的所有線程。
事例
//取錢
public void draw(double drawAmount){
// 加鎖
lock.lock();
try
{
// 如果flag為假,表明賬戶中還沒有人存錢進(jìn)去,取錢方法阻塞
if (!flag) {
cond.await();
}
else {
...
flag = false;
// 喚醒其他線程
cond.signalAll();
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
// 使用finally塊來釋放鎖
lock.unlock();
}
}
//存錢
public void deposit(double depositAmount) {
lock.lock();
try {
// 如果flag為真,表明賬戶中已有人存錢進(jìn)去,則存錢方法阻塞
if (flag) {
cond.await();
}
else {
// 執(zhí)行存款
...
flag = true;
// 喚醒其他線程
cond.signalAll();
}
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
// 使用finally塊來釋放鎖
lock.unlock();
}
}
3.使用阻塞隊(duì)列(BlockingQueue)控制線程通信
Java5中提供了一個(gè)BlockingQueue接口,是Queue的子接口,但它的主要作用是用于線程同步,而非作為容器
BlockingQueue接口有一個(gè)特性,即當(dāng)生產(chǎn)者線程試圖向隊(duì)列中放元素已滿時(shí),該線程會(huì)被阻塞;當(dāng)消費(fèi)者線程試圖從隊(duì)列中取出元素時(shí),如果隊(duì)列為空,則該線程會(huì)被阻塞
BlockingQueue中提供的兩個(gè)阻塞方法:
(1)put(E e); //隊(duì)列裝滿時(shí),會(huì)阻塞
(2)take(); //隊(duì)列為空時(shí),會(huì)阻塞
由于BlockingQueue接口也是Queue的子接口,因此也擁有Queue的方法,總結(jié)如下
┌───────────────────┬─────────────┬──────────┬──────────┬───────────────────┐
│ │ 拋出異常 │ 返回值 │ 阻塞線程 │指定超時(shí)時(shí)長(zhǎng) │
├───────────────────┼─────────────┼──────────┼──────────┼───────────────────┤
│隊(duì)尾插入元素(滿時(shí)) │ add(e) │ offer(e) │ put(e) │offer(r,time,unit) │
├───────────────────┼─────────────┼──────────┼──────────┼───────────────────┤
│隊(duì)頭刪除元素(空時(shí)) │ remove() │ poll() │ take() │poll(time, unit) │
├───────────────────┼─────────────┼──────────┼──────────┼───────────────────┤
│獲取不刪除(空時(shí)) │ element() │ peek() │ 無 │無 │
└───────────────────┴─────────────┴──────────┴──────────┴───────────────────┘
BlockingQueue接口的實(shí)現(xiàn)類:
ArrayBlockingQueue: 基于數(shù)組實(shí)現(xiàn)
LinkedBlockingQueue: 基于鏈表實(shí)現(xiàn)
PriorityBlockingQueue:不是標(biāo)準(zhǔn)的阻塞隊(duì)列。與前面集合中講的PriorityQueue類似,取出隊(duì)列中的最小的元素
SynchronousQueue: 同步隊(duì)列,對(duì)該隊(duì)列的存、取操作必須交替執(zhí)行
DelayQueue: 它是一個(gè)特殊的BlockingQueue,底層是基于PriorityBlockingQueue實(shí)現(xiàn)的
例子:
//生產(chǎn)者
class Producer extends Thread {
...
public void run() {
...
for (int i = 0 ; i < 999999999 ; i++ ) {
System.out.println(getName() + "生產(chǎn)者準(zhǔn)備生產(chǎn)集合元素!");
try {
Thread.sleep(200);
// 嘗試放入元素,如果隊(duì)列已滿,線程被阻塞
bq.put(strArr[i % 3]);
}
...
}
}
}
// 消費(fèi)者
class Consumer extends Thread {
...
public void run() {
while(true) {
...
try {
Thread.sleep(200);
// 嘗試取出元素,如果隊(duì)列已空,線程被阻塞
bq.take();
}
...
}
}
}
public class BlockingQueueTest2 {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)容量為1的BlockingQueue
BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
// 啟動(dòng)3條生產(chǎn)者線程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
// 啟動(dòng)一條消費(fèi)者線程
new Consumer(bq).start();
}
}