Java實現(xiàn)生產(chǎn)者-消費者模型的幾種方法

什么是生產(chǎn)者消費者模式

生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。

這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的。

為什么要使用生產(chǎn)者和消費者模式

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。

生產(chǎn)者消費者模式實現(xiàn)(Java)

阻塞隊列是實現(xiàn)生產(chǎn)者消費者模式的關(guān)鍵,本文介紹兩種自定義阻塞隊列的實現(xiàn)以及JDK 1.5 以后新增的 java.util.concurrent包中提供的阻塞隊列類。

首先,阻塞隊列接口:

package com.bytebeats.concurrent.queue;

/**
 * 阻塞隊列接口
 *
 * @author Ricky Fung
 * @create 2017-03-26 17:28
 */
public interface IBlockingQueue<T> {

    void put(T data) throws InterruptedException;

    T take() throws InterruptedException;
}

方式1

使用 Object.wait()/notifyAll() 來實現(xiàn)阻塞隊列。

1、阻塞隊列實現(xiàn)

package com.bytebeats.concurrent.queue;

import java.util.LinkedList;

/**
 * 使用Object.wait()/notifyAll()實現(xiàn)的阻塞隊列
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:18
 */
public class TraditionalBlockingQueue<T> implements IBlockingQueue<T> {
    private final int queueSize;
    private final LinkedList<T> list = new LinkedList<T>();
    private final Object lock = new Object();

    public TraditionalBlockingQueue(){
        this(10);
    }
    public TraditionalBlockingQueue(int queueSize) {
        if(queueSize<1){
            throw new IllegalArgumentException("queueSize must be positive number");
        }
        this.queueSize = queueSize;
    }

    @Override
    public void put(T data) throws InterruptedException {

        synchronized (lock){
            while(list.size()>=queueSize) {
                lock.wait();
            }
            list.addLast(data);
            lock.notifyAll();
        }
    }

    @Override
    public T take() throws InterruptedException {

        synchronized(lock){
            while(list.size()<=0) {
                lock.wait();
            }
            T data = list.removeFirst();
            lock.notifyAll();
            return data;
        }
    }
}

注意要點

  1. 判定 LinkedList大小為0或者大于等于queueSize時須使用 while (condition) {},不能使用 if(condition) {}。其中 while(condition)循環(huán),它又被叫做“自旋鎖”。自旋鎖以及wait()和notify()方法在線程通信這篇文章中有更加詳細的介紹。為防止該線程沒有收到notify()調(diào)用也從wait()中返回(也稱作虛假喚醒),這個線程會重新去檢查condition條件以決定當前是否可以安全地繼續(xù)執(zhí)行還是需要重新保持等待,而不是認為線程被喚醒了就可以安全地繼續(xù)執(zhí)行了。
  2. 在 take 方法取走一個元素后須調(diào)用 lock.notifyAll();,如果使用 lock.notify(); 方法在某些情況下會導(dǎo)致 生產(chǎn)者-消費者 同時處于阻塞狀態(tài)。

方式2

通過Lock和Condition實現(xiàn)阻塞隊列

package com.bytebeats.concurrent.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 通過Lock和Condition實現(xiàn)阻塞隊列
 *
 * @author Ricky Fung
 * @create 2017-03-26 17:08
 */
public class ConditionBlockingQueue<T> implements IBlockingQueue<T> {
    private final Object[] items;
    int putptr, takeptr, count;

    private final Lock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public ConditionBlockingQueue(){
        this(10);
    }
    public ConditionBlockingQueue(int queueSize) {
        if(queueSize<1){
            throw new IllegalArgumentException("queueSize must be positive number");
        }
        items = new Object[queueSize];
    }

    @Override
    public void put(T data) throws InterruptedException {

        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putptr] = data;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public T take() throws InterruptedException {

        lock.lock();
        try {
            while (count == 0) {
                notEmpty.wait();
            }
            T data = (T) items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }
}

方式3

JDK 1.5 以后新增的 java.util.concurrent包新增了 java.util.concurrent. BlockingQueue 接口:

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

并提供了如下幾種阻塞隊列實現(xiàn):

  • java.util.concurrent.ArrayBlockingQueue
  • java.util.concurrent.LinkedBlockingQueue
  • java.util.concurrent.SynchronousQueue
  • java.util.concurrent.PriorityBlockingQueue

實現(xiàn)生產(chǎn)者-消費者模型使用 java.util.concurrent.ArrayBlockingQueue或者 java.util.concurrent.LinkedBlockingQueue即可。

測試代碼

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;

/**
 * 生產(chǎn)者
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:16
 */
public class Producer implements Runnable {
    private IBlockingQueue<String> queue;
    private int consumerNum;

    public Producer(IBlockingQueue<String> queue, int consumerNum) {
        this.queue = queue;
        this.consumerNum = consumerNum;
    }

    @Override
    public void run() {

        for(int i=0; i< 100; i++){
            try {
                queue.put("data_"+i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        for(int i=0; i<consumerNum; i++){   //結(jié)束符
            try {
                queue.put(Constant.ENDING_SYMBOL);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        System.out.println("Producer over");
    }
}

消費者

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;

import java.util.concurrent.TimeUnit;

/**
 * 消費者
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:16
 */
public class Consumer implements Runnable {
    private IBlockingQueue<String> queue;

    public Consumer(IBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            String data = null;
            try {
                data = queue.take();
                System.out.println("Consumer "+Thread.currentThread().getName()+" consume:"+data);
                if (Constant.ENDING_SYMBOL.equals(data)) {
                    break;
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("Consumer over");
    }
}

我們用 一個生產(chǎn)者 兩個消費者來做測試,如下:

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.ConditionBlockingQueue;
import com.bytebeats.concurrent.queue.IBlockingQueue;

/**
 * ${DESCRIPTION}
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:21
 */
public class ProducerConsumerDemo {

    public static void main(String[] args) {

        //new ProducerConsumerDemo().testRun(new TraditionalBlockingQueue<String>());
        new ProducerConsumerDemo().testRun(new ConditionBlockingQueue<String>());
    }

    public void testRun(IBlockingQueue<String> queue){

        Thread producer = new Thread(new Producer(queue, 2));
        producer.start();

        Thread consumer1 = new Thread(new Consumer(queue));
        consumer1.start();
        Thread consumer2 = new Thread(new Consumer(queue));
        consumer2.start();
    }
}

如果想要使用JDK內(nèi)置的阻塞隊列,直接將 本例中的 com.bytebeats.concurrent.queue.IBlockingQueue替換為 java.util.concurrent.ArrayBlockingQueue或者 java.util.concurrent.LinkedBlockingQueue即可。

源碼

https://github.com/TiFG/daily-codelab/tree/master/producer-consumer-impl

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home...
    光劍書架上的書閱讀 4,183評論 2 8
  • 什么是生產(chǎn)者消費者模式 生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直...
    鐵甲依然在_978f閱讀 193評論 0 0
  • “為每一個你所偷來的影子找到點亮生命的小小光芒,為它們找回隱匿的記憶拼圖,這便是我們對你的全部請托?!?你的影子,...
    Molly_zhang閱讀 188評論 0 0
  • 又是一學期的開學季,學校又瞬間從冷清變得熱鬧起來了。大家都面掛笑容的迎來新的學期。新學期,新的起跑線,新的...
    澄江如練閱讀 247評論 0 0
  • 1 前段時間看了一檔非常有趣的綜藝節(jié)目《火星情報局》,幾位高級特工每期都會提出幾個有趣又好玩的火星提案,于嬉笑娛樂...
    卿木o閱讀 1,205評論 0 11

友情鏈接更多精彩內(nèi)容