Java生產(chǎn)者消費(fèi)者的三種實(shí)現(xiàn)

一、使用synchronize以及wait()、notify() /notifyAll()

  • 1.wait()的作用是讓當(dāng)前線程進(jìn)入等待狀態(tài),同時(shí),wait()也會(huì)讓當(dāng)前線程釋放它所持有的鎖。“直到其他線程調(diào)用此對(duì)象的 notify() 方法或 notifyAll() 方法”,當(dāng)前線程被喚醒(進(jìn)入“就緒狀態(tài)”)
  • 2.notify()和notifyAll()的作用,則是喚醒當(dāng)前對(duì)象上的等待線程;notify()是喚醒單個(gè)線程,而notifyAll()是喚醒所有的線程。
  • 3.wait(long timeout)讓當(dāng)前線程處于“等待(阻塞)狀態(tài)”,“直到其他線程調(diào)用此對(duì)象的notify()方法或 notifyAll() 方法,或者超過指定的時(shí)間量”,當(dāng)前線程被喚醒(進(jìn)入“就緒狀態(tài)”)。
/**
 * 使用synchronize以及wait()、notify() /notifyAll()
 */
public class ShareDataV1 {

    /**
     * 原子計(jì)數(shù)
     */
    public static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 標(biāo)示
     */
    public volatile boolean flag = true;

    /**
     * 生產(chǎn)隊(duì)列最大容量
     */
    public static final int MAX_COUNT = 10;

    /**
     * 生產(chǎn)隊(duì)列容器
     */
    public static final List<Integer> pool = new ArrayList<>();

    /**
     * 生產(chǎn)
     */
    public void produce() {
        // 判斷,干活,通知
        while (flag) {
            try {
                // 每隔 1000 毫秒生產(chǎn)一個(gè)商品
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 這里有一個(gè)鎖
            synchronized (pool) {
                //池子滿了,生產(chǎn)者停止生產(chǎn)
                //埋個(gè)坑,這里用的if
                //TODO 判斷
                while (pool.size() == MAX_COUNT) {
                    try {
                        System.out.println("pool is full, wating...");
                        // TODO 線程阻塞
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
                //通知
                // notify() 方法隨機(jī)喚醒對(duì)象的等待池中的一個(gè)線程,進(jìn)入鎖池;
                // notifyAll() 喚醒對(duì)象的等待池中的所有線程,進(jìn)入鎖池。
                pool.notifyAll();
            }
        }
    }


    /**
     * 消費(fèi)
     */
    public void consumue() {
        // 判斷,干活,通知
        while (flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 這里有一個(gè)鎖
            synchronized (pool) {
                //池子滿了,生產(chǎn)者停止生產(chǎn)
                //埋個(gè)坑,這里用的if
                //TODO 判斷
                while (pool.size() == 0) {
                    try {
                        System.out.println("pool is empty, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    /**
     * 停止
     */
    public void stop() {
        flag = false;
    }


    public static void main(String[] args) {
        ShareDataV1 shareDataV1 = new ShareDataV1();

        // 開啟生產(chǎn)線程
        new Thread(() -> {
            shareDataV1.produce();
        }, "AAA").start();

        // 開啟消費(fèi)線程
        new Thread(() -> {
            shareDataV1.consumue();
        }, "BBB").start();

        // 開啟生產(chǎn)線程
        new Thread(() -> {
            shareDataV1.produce();
        }, "CCC").start();

        // 開啟消費(fèi)線程
        new Thread(() -> {
            shareDataV1.consumue();
        }, "DDD").start();

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        shareDataV1.stop();
    }

}

二、使用Lock,Condition的await和signal方法

  • JUC包下的鎖Lock替代synchronize關(guān)鍵字。await方法代替wait,signal代替notifyall。
    下面這個(gè)demo實(shí)現(xiàn)了pool的大小為1的生產(chǎn)者消費(fèi)者模型。
class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try { 
            while (number != 0) {
                //等待,不能生產(chǎn)
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }

    public void decrement() throws Exception {
        lock.lock();
        try {
            while (number == 0) {
                //等待,不能消費(fèi)
                condition.await();
            }

            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }
}

public class ProducerConsumer_V2{
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "CC").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "DD").start();
    }
}

三、下面是使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式:

  • 當(dāng)阻塞隊(duì)列為空時(shí),從阻塞隊(duì)列中取數(shù)據(jù)的操作會(huì)被阻塞。
  • 當(dāng)阻塞隊(duì)列為滿時(shí),往阻塞隊(duì)列中添加數(shù)據(jù)的操作會(huì)被阻塞。

LinkedBlockingQueue

  • 基于單向鏈表的阻塞隊(duì)列實(shí)現(xiàn),在初始化LinkedBlockingQueue的時(shí)候可以指定對(duì)立的大小,也可以不指定,默認(rèn)類似一個(gè)無限大小的容量(Integer.MAX_VALUE),不指隊(duì)列容量大小也是會(huì)有風(fēng)險(xiǎn)的,一旦數(shù)據(jù)生產(chǎn)速度大于消費(fèi)速度,系統(tǒng)內(nèi)存將有可能被消耗殆盡,因此要謹(jǐn)慎操作。另外LinkedBlockingQueue中用于阻塞生產(chǎn)者、消費(fèi)者的鎖是兩個(gè)(鎖分離),因此生產(chǎn)與消費(fèi)是可以同時(shí)進(jìn)行的。
/**
 * 下面是使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式:
 */
public class ShareDataV3 {

    /**
     * 阻塞隊(duì)列容量
     */
    private static final int MAX_CAPACITY = 10;

    /**
     * 阻塞隊(duì)列
     */
    private static BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(MAX_CAPACITY);

    /**
     * 標(biāo)示
     */
    private volatile boolean FLAG = true;

    /**
     * 原子計(jì)數(shù)
     */
    private AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 生產(chǎn)
     *
     * @throws InterruptedException
     */
    public void produce() throws InterruptedException {

        // 開啟可控的死循環(huán) 不停的生產(chǎn)
        while (FLAG) {
            // 向隊(duì)列中添加元素
            boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
            // 插入元素是否成功
            if (retvalue == true) {
                System.out.println(Thread.currentThread().getName() + "\t 插入隊(duì)列" + atomicInteger.get() + "成功" + "資源隊(duì)列大小= " + blockingQueue.size());
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入隊(duì)列" + atomicInteger.get() + "失敗" + "資源隊(duì)列大小= " + blockingQueue.size());
            }
            // 線程睡眠
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "FLAG變?yōu)閒lase,生產(chǎn)停止");
    }

    /**
     * 消費(fèi)
     *
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {
        Integer result = null;
        while (true) {
            // 從隊(duì)列中獲取元素
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            // 獲取結(jié)果判斷
            if (null == result) {
                System.out.println("超過兩秒沒有取道數(shù)據(jù),消費(fèi)者即將退出");
                return;
            }
            // 打印獲取結(jié)果
            System.out.println(Thread.currentThread().getName() + "\t 消費(fèi)" + result + "成功" + "\t\t" + "資源隊(duì)列大小= " + blockingQueue.size());
            Thread.sleep(1500);
        }

    }

    /**
     * 停止
     */
    public void stop() {
        this.FLAG = false;
    }


    public static void main(String[] args) {

        ShareDataV3 v3 = new ShareDataV3();

        // 開啟一個(gè)線程 執(zhí)行生產(chǎn)
        new Thread(() -> {
            try {
                v3.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AAA").start();

        // 開啟一個(gè)線程 執(zhí)行消費(fèi)
        new Thread(() -> {
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();

        // 開啟一個(gè)線程 執(zhí)行消費(fèi)
        new Thread(() -> {
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "CCC").start();


        try {
            // 測(cè)試 主線程睡眠5秒
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        v3.stop();
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容