Java多線程 -- 05 線程通信

導(dǎo)讀目錄:
  • 傳統(tǒng)的線程通信
  • 使用Condition控制線程通信
  • 使用阻塞隊(duì)列(BlockingQueue)控制線程通信

線程的調(diào)度具有一定的透明性,程序通常無法準(zhǔn)確控制線程的輪換執(zhí)行,但Java也提供了一些機(jī)制來保證線程協(xié)調(diào)運(yùn)行

1.傳統(tǒng)的線程通信

要實(shí)現(xiàn)不同線程之間彼此通信,可借助Object類的方法:

public final native void notify();
public final native void notifyAll();
public final native void wait(long timeout) throws InterruptedException; //等待timeout時(shí)間后自動(dòng)蘇醒
public final void wait(long timeout, int nanos)throws InterruptedException {...} //等待timeout時(shí)間后自動(dòng)蘇醒
public final void wait() throws InterruptedException {}//一直等待

wait():導(dǎo)致當(dāng)前線程等待(會(huì)釋放對(duì)同步監(jiān)視器的鎖),直到其他線程調(diào)用該同步監(jiān)視器的notify()方法或notifyAll()方法來喚醒該線程。
notify():喚醒正在此同步監(jiān)視器上等待的單個(gè)線程。如果所有的線程都在此同步監(jiān)視器上等待,則會(huì)喚醒其中一個(gè)線程,選擇是任意的。
notifyAll():喚醒在此同步監(jiān)視器上等待的所有線程,只有當(dāng)前線程放棄對(duì)該同步監(jiān)視器的鎖定,才可以執(zhí)行被喚醒的線程。

事例

//取錢
public synchronized void draw(double drawAmount) {
    try {
        // 如果flag為假,表明賬戶中還沒有人存錢進(jìn)去,取錢方法阻塞
        if (!flag) {
            wait();
        } else {
            // 執(zhí)行取錢
            ...
            flag = false;
            // 喚醒其他線程
            notifyAll();
        }
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}
//存錢
public synchronized void deposit(double depositAmount){
    try {
        // 如果flag為真,表明賬戶中已有人存錢進(jìn)去,則存錢方法阻塞
        if (flag)  {
            wait();
        } else {
            // 執(zhí)行存款
            ...
            flag = true;
            // 喚醒其他線程
            notifyAll();
        }
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}
2.使用Condition控制線程通信

如果不使用synchronized的關(guān)鍵字來保證同步,而使直接使用Lock對(duì)象來保證同步,則系統(tǒng)中就不存在隱式的同步監(jiān)視器,也就不能使用wait(),notify(),notifyAll()方法來通信了

使用Lock來保證同步時(shí),需要借助Condition類來協(xié)調(diào)通信。Condition可以讓那些已經(jīng)得到Lock對(duì)象卻無法繼續(xù)執(zhí)行的線程釋放Lock對(duì)象,也可以喚醒其他處于等待狀態(tài)的線程

Condition實(shí)例被綁定在一個(gè)Lock對(duì)象上,通過Lock對(duì)象來獲得Condition對(duì)象,該類提供了如下三類方法:

void await() 
boolean await(long time, TimeUnit unit) 
long awaitNanos(long nanosTimeout) 
void awaitUninterruptibly()
boolean awaitUntil(Date deadline)
void signal() 
void signalAll()  

(1)await():類似于隱式同步監(jiān)視器的wait()方法,導(dǎo)致當(dāng)前線程等待
(2)signal():喚醒在此Lock對(duì)象上等待的當(dāng)個(gè)線程。如果有多個(gè)線程在該Lock上等待,則會(huì)選擇喚醒其中一個(gè)線程,選擇是任意的
(3)signalAll():喚醒在此Lock對(duì)象上等到的所有線程。

事例

//取錢
public void draw(double drawAmount){
    // 加鎖
    lock.lock();
    try
    {
        // 如果flag為假,表明賬戶中還沒有人存錢進(jìn)去,取錢方法阻塞
        if (!flag) {
            cond.await();
        }
        else {
            ...
            flag = false;
            // 喚醒其他線程
            cond.signalAll();
        }
    }
    catch (InterruptedException ex) {
        ex.printStackTrace();
    } finally {
        // 使用finally塊來釋放鎖
        lock.unlock();
    }
}

//存錢
public void deposit(double depositAmount) {
    lock.lock();
    try {
        // 如果flag為真,表明賬戶中已有人存錢進(jìn)去,則存錢方法阻塞
        if (flag) {
            cond.await();
        }
        else {
            // 執(zhí)行存款
            ...
            flag = true;
            // 喚醒其他線程
            cond.signalAll();
        }
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    } finally {
        // 使用finally塊來釋放鎖
        lock.unlock();
    }
}
3.使用阻塞隊(duì)列(BlockingQueue)控制線程通信

Java5中提供了一個(gè)BlockingQueue接口,是Queue的子接口,但它的主要作用是用于線程同步,而非作為容器

BlockingQueue接口有一個(gè)特性,即當(dāng)生產(chǎn)者線程試圖向隊(duì)列中放元素已滿時(shí),該線程會(huì)被阻塞;當(dāng)消費(fèi)者線程試圖從隊(duì)列中取出元素時(shí),如果隊(duì)列為空,則該線程會(huì)被阻塞

BlockingQueue中提供的兩個(gè)阻塞方法:
(1)put(E e); //隊(duì)列裝滿時(shí),會(huì)阻塞
(2)take(); //隊(duì)列為空時(shí),會(huì)阻塞

由于BlockingQueue接口也是Queue的子接口,因此也擁有Queue的方法,總結(jié)如下
┌───────────────────┬─────────────┬──────────┬──────────┬───────────────────┐
│ │ 拋出異常 │ 返回值 │ 阻塞線程 │指定超時(shí)時(shí)長(zhǎng) │
├───────────────────┼─────────────┼──────────┼──────────┼───────────────────┤
│隊(duì)尾插入元素(滿時(shí)) │ add(e) │ offer(e) │ put(e) │offer(r,time,unit) │
├───────────────────┼─────────────┼──────────┼──────────┼───────────────────┤
│隊(duì)頭刪除元素(空時(shí)) │ remove() │ poll() │ take() │poll(time, unit) │
├───────────────────┼─────────────┼──────────┼──────────┼───────────────────┤
│獲取不刪除(空時(shí)) │ element() │ peek() │ 無 │無 │
└───────────────────┴─────────────┴──────────┴──────────┴───────────────────┘

BlockingQueue接口的實(shí)現(xiàn)類:
ArrayBlockingQueue: 基于數(shù)組實(shí)現(xiàn)
LinkedBlockingQueue: 基于鏈表實(shí)現(xiàn)
PriorityBlockingQueue:不是標(biāo)準(zhǔn)的阻塞隊(duì)列。與前面集合中講的PriorityQueue類似,取出隊(duì)列中的最小的元素
SynchronousQueue: 同步隊(duì)列,對(duì)該隊(duì)列的存、取操作必須交替執(zhí)行
DelayQueue: 它是一個(gè)特殊的BlockingQueue,底層是基于PriorityBlockingQueue實(shí)現(xiàn)的

例子:

//生產(chǎn)者
class Producer extends Thread {
    ...
    public void run() {
        ...
        for (int i = 0 ; i < 999999999 ; i++ ) {
            System.out.println(getName() + "生產(chǎn)者準(zhǔn)備生產(chǎn)集合元素!");
            try {
                Thread.sleep(200);
                // 嘗試放入元素,如果隊(duì)列已滿,線程被阻塞
                bq.put(strArr[i % 3]);
            }
            ...
        }
    }
}
// 消費(fèi)者
class Consumer extends Thread {
    ...
    public void run() {
        while(true) {
            ...
            try {
                Thread.sleep(200);
                // 嘗試取出元素,如果隊(duì)列已空,線程被阻塞
                bq.take();
            }
            ...
        }
    }
}
public class BlockingQueueTest2 {
    public static void main(String[] args) {
        // 創(chuàng)建一個(gè)容量為1的BlockingQueue
        BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
        // 啟動(dòng)3條生產(chǎn)者線程
        new Producer(bq).start();
        new Producer(bq).start();
        new Producer(bq).start();
        // 啟動(dòng)一條消費(fèi)者線程
        new Consumer(bq).start();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容