生產(chǎn)者消費(fèi)模式三種

  • 最外層的while是為了執(zhí)行完一輪不終止

wait()和notifyall()

ublic class ShareDataV1 {

    public static AtomicInteger atomicInteger = new AtomicInteger();
    public volatile boolean flag = true;
    public static final int MAX_COUNT = 10;
    public static final List<Integer> pool = new ArrayList<>();

    public void produce() {
        // 判斷,干活,通知
        while (flag) {
            // 每隔 1000 毫秒生產(chǎn)一個(gè)商品
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            synchronized (pool) {
                //池子滿(mǎn)了,生產(chǎn)者停止生產(chǎn)
                //TODO 判斷
                while (pool.size() == MAX_COUNT) {
                    try {
                        System.out.println("pool is full, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    public void consumue() {
        // 判斷,干活,通知
        while (flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            synchronized (pool) {
                //池子空了,消費(fèi)者停止消費(fèi)
                while (pool.size() == 0) {
                    try {
                        System.out.println("pool is empty, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    public void stop() {
        flag = false;
    }
}

lock,condition,signal

public class ShareDataV2 {


    public static AtomicInteger atomicInteger = new AtomicInteger();
    public volatile boolean flag = true;
    public static final int MAX_COUNT = 10;
    public static final List<Integer> pool = new ArrayList<>();

    private Lock lock = new ReentrantLock();
    //也可以一個(gè)condition然后signalall
    private Condition produce_condition = lock.newCondition();
    private Condition consumue_condition = lock.newCondition();

    public void produce() {
        // 判斷,干活,通知
        while (flag){

            lock.lock();
            try {
                Thread.sleep(100);
                //池子滿(mǎn)了,生產(chǎn)者停止生產(chǎn)
                while (pool.size() == MAX_COUNT) {
                    //等待,不能生產(chǎn)
                    produce_condition.await();
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());

                consumue_condition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

    }

    public void consumue() {
        // 判斷,干活,通知
        while (flag) {
            lock.lock();
            try {
                Thread.sleep(1000);
                //池子空了,消費(fèi)者停止消費(fèi)
                while (pool.size() == 0) {
                    //等待,不能消費(fèi)
                    System.out.println("pool is empty, wating...");
                    consumue_condition.await();
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());

                produce_condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

    }

    public void stop() {
        flag = false;
    }
}

阻塞隊(duì)列

public class ShareDataV3 {

    private static final int MAX_CAPACITY = 10; //阻塞隊(duì)列容量
    private static BlockingQueue<Integer> blockingQueue= new ArrayBlockingQueue<>(MAX_CAPACITY); //阻塞隊(duì)列
    private  volatile boolean FLAG = true;
    private AtomicInteger atomicInteger = new AtomicInteger();

    public void produce() throws InterruptedException {
        while (FLAG){
            boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
            if (retvalue==true){
                System.out.println(Thread.currentThread().getName()+"\t 插入隊(duì)列"+ atomicInteger.get()+"成功"+"資源隊(duì)列大小= " + blockingQueue.size());
            }else {
                System.out.println(Thread.currentThread().getName()+"\t 插入隊(duì)列"+ atomicInteger.get()+"失敗"+"資源隊(duì)列大小= " + blockingQueue.size());

            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"FLAG變?yōu)閒lase,生產(chǎn)停止");
    }

    public void consume() throws InterruptedException {
        Integer result = null;
        while (true){
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (null==result){
                System.out.println("超過(guò)兩秒沒(méi)有取道數(shù)據(jù),消費(fèi)者即將退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t 消費(fèi)"+ result+"成功"+"\t\t"+"資源隊(duì)列大小= " + blockingQueue.size());
            Thread.sleep(1500);
        }

    }

    public void stop() {
        this.FLAG = false;
    }
}

三個(gè)的統(tǒng)一調(diào)用

public class ProducerConsumer_V1 {
    public static void main(String[] args) {
        ShareDataV1 shareDataV1 = new ShareDataV1();
        new Thread(() -> {
            shareDataV1.produce();
        }, "AAA").start();

        new Thread(() -> {
            shareDataV1.consumue();
        }, "BBB").start();

        new Thread(() -> {
            shareDataV1.produce();
        }, "CCC").start();

        new Thread(() -> {
            shareDataV1.consumue();
        }, "DDD").start();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        shareDataV1.stop();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容