juc之二:生產(chǎn)者與消費者(wait,notify,signal,join等)

1.使用等待喚醒機制模擬生產(chǎn)者和消費者模式

package com.zy.tools.undefined.concurrent.v2.consumerproducer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerAndProducer01 {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Clerk clerk = new Clerk();

        for (int i = 0; i < 20; i++) {
            executorService.submit(new Consumer(clerk));
        }

        for (int i = 0; i < 40; i++) {
            executorService.submit(new Producer(clerk));
        }
    }


    private static class Clerk {
        private int count = 0;
        private final Object lock = new Object();

        public void goodsOut() {
            // synchronized (this) { // this 與 任意對象鎖 (此處是 lock 對象), 結(jié)果稍有不同, 自行體會
            // 當(dāng)進入 synchronized 塊時, 表示當(dāng)前線程獲取到對象鎖
            // 如果沒有進入 synchronized 塊, 則表示獲取對象鎖失敗, 線程狀態(tài)為 BLOCKED
            synchronized (lock) {
                // 為了避免虛假喚醒問題,應(yīng)該總是使用在while循環(huán)中,此處不能用if
                while (count <= 0) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " ==> 庫存不足, 請等待");
                        // 當(dāng)調(diào)用 wait 方法(不帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 WAITING, 其他線程可繼續(xù)爭搶鎖
                        // 當(dāng)調(diào)用 wait 方法(帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 TIME_WAITING, 其他線程可繼續(xù)爭搶鎖
                        // 調(diào)用完 wait 方法后, 當(dāng)前線程掛起, 需[等待其他線程]調(diào)用 notify/notifyAll 方法, 才能[喚起此線程], 執(zhí)行 wait 之后的代碼
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count--;
                System.out.println(Thread.currentThread().getName() + " ==> 消費了商品, 當(dāng)前剩余庫存: " + count);
                // 調(diào)用完 notifyAll 方法后, 喚起所有 當(dāng)前對象鎖下 線程狀態(tài)為 WAITING/TIME_WAITING 的線程
                notifyAll();
            }
        }

        public void goodsIn() {
            // synchronized (this) { // this 與 任意對象鎖 (此處是 lock 對象), 結(jié)果稍有不同, 自行體會
            // 當(dāng)進入 synchronized 塊時, 表示當(dāng)前線程獲取到對象鎖
            // 如果沒有進入 synchronized 塊, 則表示獲取對象鎖失敗, 線程狀態(tài)為 BLOCKED
            synchronized (lock) {
                // 為了避免虛假喚醒問題,應(yīng)該總是使用在while循環(huán)中,此處不能用if
                while (count >= 30) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " ==> 庫存已達到 30, 停止進貨");
                        // 當(dāng)調(diào)用 wait 方法(不帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 WAITING, 其他線程可繼續(xù)爭搶鎖
                        // 當(dāng)調(diào)用 wait 方法(帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 TIME_WAITING, 其他線程可繼續(xù)爭搶鎖
                        // 調(diào)用完 wait 方法后, 當(dāng)前線程掛起, 需[等待其他線程]調(diào)用 notify/notifyAll 方法, 才能[喚起此線程], 執(zhí)行 wait 之后的代碼
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count++;
                System.out.println(Thread.currentThread().getName() + " ==> 生產(chǎn)了商品, 當(dāng)前剩余庫存: " + count);
                // 調(diào)用完 notifyAll 方法后, 喚起所有 當(dāng)前對象鎖下 線程狀態(tài)為 WAITING/TIME_WAITING 的線程
                notifyAll();
            }
        }
    }

    private static class Producer implements Runnable {

        private final Clerk clerk;

        private Producer(Clerk clerk) {
            this.clerk = clerk;
        }

        @Override
        public void run() {
            clerk.goodsIn();
        }
    }

    private static class Consumer implements Runnable {

        private final Clerk clerk;

        private Consumer(Clerk clerk) {
            this.clerk = clerk;
        }

        @Override
        public void run() {
            clerk.goodsOut();
        }
    }
}

2.通過Lock及Condition來實現(xiàn)生產(chǎn)者消費者模型

? Condition 接口描述了可能會與鎖有關(guān)聯(lián)的條件變量。
        這些變量在用法上與使用 Object.wait 訪問的隱式監(jiān)視器類似,但提供了更強大的功能。
        需要特別指出的是,單個 Lock 可能與多個 Condition 對象關(guān)聯(lián)。
        為了避免兼容性問題, Condition 方法的名稱與對應(yīng)的 Object 版本中的不同。
? 在 Condition 對象中,
        與  wait、  notify 和 notifyAll 方法對應(yīng)的分別是:
            await、 signal 和 signalAll。
? Condition 實例實質(zhì)上被綁定到一個鎖上。
        要為特定 Lock 實例獲得Condition 實例,請使用其 newCondition() 方法。
package com.zy.tools.undefined.concurrent.v2.consumerproducer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConsumerAndProducer02 {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Clerk clerk = new Clerk();

        for (int i = 0; i < 20; i++) {
            executorService.submit(new Consumer(clerk));
        }

        for (int i = 0; i < 40; i++) {
            executorService.submit(new Producer(clerk));
        }
    }

    private static class Clerk {
        private int count = 0;
        private final Lock lock = new ReentrantLock();
        private final Condition condition = lock.newCondition();

        public void goodsOut() {
            lock.lock();
            // 當(dāng)進入 lock 塊時, 表示當(dāng)前線程[獲取到對象鎖]
            // 如果沒有進入 lock 塊, 則表示獲取對象鎖失敗, 線程狀態(tài)為 BLOCKED
            try {
                // 為了避免虛假喚醒問題,應(yīng)該總是使用在while循環(huán)中,此處不能用if
                while (count <= 0) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " ==> 庫存不足, 請等待");
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count--;
                System.out.println(Thread.currentThread().getName() + " ==> 消費了商品, 當(dāng)前剩余庫存: " + count);
                // 當(dāng)調(diào)用 await 方法(不帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 WAITING, 其他線程可繼續(xù)爭搶鎖
                // 當(dāng)調(diào)用 await 方法(帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 TIME_WAITING, 其他線程可繼續(xù)爭搶鎖
                // 調(diào)用完 await 方法后, 當(dāng)前線程掛起, 需等待[其他線程調(diào)用 signal/signalAll 方法], 才能[喚起此線程], 執(zhí)行 wait 之后的代碼
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }

        public void goodsIn() {
            lock.lock();
            // 當(dāng)進入 lock 塊時, 表示當(dāng)前線程[獲取到對象鎖]
            // 如果沒有進入 lock 塊, 則表示獲取對象鎖失敗, 線程狀態(tài)為 BLOCKED
            try {
                // 為了避免虛假喚醒問題,應(yīng)該總是使用在while循環(huán)中,此處不能用if
                while (count >= 30) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " ==> 庫存已達到 30, 停止進貨");
                        // 當(dāng)調(diào)用 await 方法(不帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 WAITING, 其他線程可繼續(xù)爭搶鎖
                        // 當(dāng)調(diào)用 await 方法(帶超時時間)時, 當(dāng)前線程[釋放此對象鎖], 線程狀態(tài)為 TIME_WAITING, 其他線程可繼續(xù)爭搶鎖
                        // 調(diào)用完 await 方法后, 當(dāng)前線程掛起, 需等待[其他線程調(diào)用 signal/signalAll 方法], 才能[喚起此線程], 執(zhí)行 wait 之后的代碼
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count++;
                System.out.println(Thread.currentThread().getName() + " ==> 生產(chǎn)了商品, 當(dāng)前剩余庫存: " + count);
                // 調(diào)用完 signalAll 方法后, 喚起所有 當(dāng)前對象鎖下 線程狀態(tài)為 WAITING/TIME_WAITING 的線程
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }

    private static class Producer implements Runnable {

        private final Clerk clerk;

        private Producer(Clerk clerk) {
            this.clerk = clerk;
        }

        @Override
        public void run() {
            clerk.goodsIn();
        }
    }

    private static class Consumer implements Runnable {

        private final Clerk clerk;

        private Consumer(Clerk clerk) {
            this.clerk = clerk;
        }

        @Override
        public void run() {
            clerk.goodsOut();
        }
    }
}

3.交替打印

編寫一個程序,開啟 3 個線程,這三個線程的 ID 分別為
A、 B、 C,每個線程將自己的 ID 在屏幕上打印 10 遍,要
求輸出的結(jié)果必須按順序顯示。
如: ABBCCCABBCCC…… 依次遞歸
package com.zy.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestABC {
    public static void main(String[] args) {

        PrintABC printABC = new PrintABC();

        new Thread(() -> {
            for (int i = 1; i <= 10; i ++){
                printABC.printA(i);
            }
        }, "A").start();

        new Thread(() -> {
            for (int i = 1; i <= 10; i ++){
                printABC.printB(i);
            }
        }, "B").start();

        new Thread(() -> {
            for (int i = 1; i <= 10; i ++){
                printABC.printC(i);
            }
        }, "C").start();
    }
}

class PrintABC {

    private int no = 1;

    private Lock lock = new ReentrantLock();

    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    // 打印A的方法
    public void printA(int count){
        lock.lock();
        try {
            // 判斷是否到A
            while (no != 1){
                try {
                    condition1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 打印A
            for (int i = 0; i < 1; i ++){
                System.out.println(Thread.currentThread().getName() + "==========" + i + "=====" + count);
            }
            // 喚醒B
            no = 2;
            condition2.signal();
        }finally {
            lock.unlock();
        }
    }

    // 打印B的方法
    public void printB(int count){
        lock.lock();
        try{
            // 判斷是否到B
            while (no != 2) {
                try {
                    condition2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 打印B
            for (int i = 0; i < 2; i ++){
                System.out.println(Thread.currentThread().getName() + "==========" + i + "=====" + count);
            }
            // 喚醒C
            no = 3;
            condition3.signal();
        }finally {
            lock.unlock();
        }
    }

    // 打印C的方法
    public void printC(int count){
        lock.lock();
        try{
            while (no != 3){
                try {
                    condition3.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 打印C
            for (int i = 0; i < 3; i ++){
                System.out.println(Thread.currentThread().getName() + "==========" + i + "=====" + count);
            }
            System.out.println(">>>>>>>>>>>>>>>第"+ count + "輪結(jié)束<<<<<<<<<<<<<<<<<<<<<");
            // 喚醒A
            no = 1;
            condition1.signal();
        } finally {
            lock.unlock();
        }
    }
}

4.交替打印ABC的反例

package com.zy.tools.undefined.concurrent.v2.consumerproducer;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 這里旨在通過一個反例, 演示線程使用錯誤可能帶來的問題
 */
public class PrintABC {

    /**
     * 前置條件:
     * 1.啟動 3 個打印 A,B,C 的延時任務(wù), 分別延時 1s, 2s, 3s
     * 2.這里設(shè)置核心線程數(shù)為 2, 小于 3
     * 3.運行 main 方法
     *
     * 現(xiàn)象:
     * 1.大多數(shù)時候, 交替打印 A,B,C 幾輪后(或者只打印了一輪 A,B), 線程卡住, 不再打印任何一個字母
     * 2.線程卡住后,  main 方法里的 while(true){} 打印 flag 為 3
     * A
     * B
     * C
     * A
     * B
     * C
     * A
     * B
     * flag is: 3
     * C
     * A
     * B
     * flag is: 3
     * flag is: 3
     * ...
     *
     * 問題: 為何線程會卡住?
     *
     * 分析:
     * 1.由于 3 個延時任務(wù)的延遲時間不同, 3個延時任務(wù)對應(yīng)的處理調(diào)用的頻率不同
     * 2.當(dāng)執(zhí)行到某一次時, 假定 flag 為 3
     * 2.1 線程1 執(zhí)行 printA 并獲取到鎖, 但 flag 不為 1, 則執(zhí)行 await 方法, 線程掛起
     * 2.2 線程2 執(zhí)行 printB 并獲取到鎖, 但 flag 不為 2, 則執(zhí)行 await 方法, 線程掛起
     * 2.3 由于線程池中共兩個核心線程, 此時這倆線程掛起, 沒有其他線程來喚起這倆線程. 所以就造成線程卡死的現(xiàn)象.
     *
     * 分析工具及步驟:
     * 1.通過 jps 查看這個進程號為: 6136
     * 2.通過 jstack -l 6136 > /printABCDump.txt
     * 3.啟動 IBM-TMDA 工具: java -jar jca467.jar
     * 4.導(dǎo)入上一步生成的線程 dump文件
     * 5.分析可知:
     * 這里看到, 這兩個線程狀態(tài)均是 waiting on condition, 調(diào)用了 LockSupport.park方法, 即線程掛起了,
     * 通過堆棧信息可知, 分別是在 printA 和 printB 方法中的 await 方法所致.
     * 這倆線程均在等待被其他線程 喚起, 但是, 沒有多余的線程來喚起他們了 ... so, 杯具了
     * @param args
     */
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        Worker worker = new Worker();
        scheduledExecutorService.scheduleWithFixedDelay(worker::printA, 0L, 1L, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleWithFixedDelay(worker::printB, 0L, 2L, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleWithFixedDelay(worker::printC, 0L, 3L, TimeUnit.SECONDS);

        while (true) {
            try {
                TimeUnit.SECONDS.sleep(5L);
                System.out.println("flag is: " + worker.getContent());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class Worker {
        private int flag = 1;
        private final Lock lock = new ReentrantLock();
        private final Condition conditionA = lock.newCondition();
        private final Condition conditionB = lock.newCondition();
        private final Condition conditionC = lock.newCondition();

        public void printA() {
            boolean b = lock.tryLock();
            if (!b) {
                return;
            }
            try {
                while (flag != 1) {
                    conditionA.await();
                }
                System.out.println("A");
                flag = 2;
                conditionB.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        public void printB() {
            boolean b = lock.tryLock();
            if (!b) {
                return;
            }
            try {
                while (flag != 2) {
                    conditionB.await();
                }
                System.out.println("B");
                flag = 3;
                conditionC.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        public void printC() {
            boolean b = lock.tryLock();
            if (!b) {
                return;
            }
            try {
                while (flag != 3) {
                    conditionC.await();
                }
                System.out.println("C");
                flag = 1;
                conditionA.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        public int getContent() {
            return flag;
        }

    }

}
兩個線程均掛起等待被喚醒.png

5.thread.join

package com.zy.tools.undefined.concurrent.v2.thread;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * t.join 阻塞的是主線程, 適于通過join方法來等待線程執(zhí)行的結(jié)果的應(yīng)用,其實有點類似future/callable的功能。
 */
public class ThreadJoin {
    public static void main(String[] args) {
        AtomicBoolean flag = new AtomicBoolean(false);
        Thread t = new Thread(() -> {
            flag.set(true);
            System.out.println("線程 t 執(zhí)行完畢");
        });
        t.start();
        try {
            // 這里阻塞的是主線程, t 線程的內(nèi)容, 永遠在 t.join 后面的代碼之前執(zhí)行
            // 如果把 t.join 注釋掉, 可以發(fā)現(xiàn), 后面的代碼可能先于 t 線程里的內(nèi)容執(zhí)行
            t.join();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 這里在 t.join 完成后, 執(zhí)行
        System.out.println("flag value is: " + flag.get());
    }
}

6.關(guān)于線程掛起與喚醒

6.1 wait 和 notify

#wait方法
wait是要釋放對象鎖,進入等待池。
既然是釋放對象鎖,那么肯定是先要獲得鎖。
所以wait必須要寫在synchronized代碼塊中,否則會報異常。

#notify方法
notify方法也需要寫在synchronized代碼塊中, 調(diào)用對象的這兩個方法也需要先獲得該對象的鎖.
notify,notifyAll, 喚醒等待該對象同步鎖的線程,并放入該對象的鎖池中.
對象的鎖池中線程可以去競爭得到對象鎖,然后開始執(zhí)行.
>> ?如果是通過notify來喚起的線程, 那進入wait的線程會被<<隨機>>喚醒;
>> 如果是通過notifyAll喚起的線程, 默認(rèn)情況是最后進入的會先被喚起來,即LIFO的策略;

#注意
notify()或者notifyAll()調(diào)用時并不會真正釋放對象鎖, 必須等到synchronized方法或者語法塊執(zhí)行完才真正釋放鎖.
如下, 雖然調(diào)用了notifyAll, 但是緊接著進入了一個死循環(huán)。
這會導(dǎo)致一直不能出臨界區(qū), 一直不能釋放對象鎖。
所以,即使它把所有在等待池中的線程都喚醒放到了對象的鎖池中,
但是鎖池中的所有線程都不會運行,因為他們始終拿不到鎖。
public void hello()
{
    Object object = new Object();
    synchronized (object){
        object.notifyAll();
        while (true){
        }
    }
}

參考資源
http://www.itdecent.cn/p/fc51be7e5bc0
http://www.itdecent.cn/p/ffc0c755fd8d

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評論 19 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,062評論 25 709
  • 我見過三十個春天 十個是模糊的 十個是清晰的 十個沒一丁點記憶 那棵摔痛過我的桃樹 我早已原諒你 你的身軀彎曲得 ...
    冷冬年閱讀 235評論 16 29
  • 12.8號,第一次用簡書,定下個小目標(biāo),一個禮拜讀一本書,每天隨意寫兩筆。
    非煙zi閱讀 269評論 4 1
  • 父親越來越把家。 孫子女回家來看他,一進門,拉著手熱熱火火了敘說,爺長爺短地叫個不停,到走的時候,母親執(zhí)意把自家栽...
    風(fēng)_行水上閱讀 743評論 0 3

友情鏈接更多精彩內(nèi)容