BlockingQueue 生產消費

與BlockingQueue息息相關的一個話題就是:生產者-消費者模型。

  • 生產者持續(xù)生產,直道緩沖區(qū)滿,阻塞
  • 消費者持續(xù)消費,直到緩沖區(qū)空,阻塞
  • 生產,消費 并發(fā)

接下來,我們就要用BlockingQueue去實現生產消費。
分別定義生產,消費接口。請注意定義中的阻塞。

public interface Consumer{
    void consume() throws InterruptedException;
}

public interface Producer{
    void produce() throws InterruptedException;
}

注意生產者,消費者可以并發(fā)??梢詫涌谶M行進一步封裝

abstract class AbsConsumer implements Consumer, Runnable{
    @Override
    public void run() {
        try{
            consume();
        } catch(InterruptedException e) {
        }
    }
}

abstract class AbstractProducer implements Producer, Runnable {
  @Override
  public void run() {
      try {
        produce();
      } catch (InterruptedException e) {
      }
    }
}

生產者,消費者消費的個體單位是:

public class Task {
public int no;
  public Task(int no) {
    this.no = no;
  }
}

最后,模型就是:

public class BlockingQueueModel {
  private final BlockingQueue<Task> queue;
//這里使用AtomicInteger,保證并發(fā)的生產的個體的標記唯一
  private final AtomicInteger increTaskNo = new AtomicInteger(0);
  public BlockingQueueModel(int cap) {
    this.queue = new LinkedBlockingQueue<>(cap);
  }

  public Runnable newRunnableConsumer() {
    return new ConsumerImpl();
  }
  public Runnable newRunnableProducer() {
    return new ProducerImpl();
  }

  private class ConsumerImpl extends AbstractConsumer {
    @Override
    public void consume() throws InterruptedException {
      Task task = queue.take();
      // 固定時間范圍的消費,模擬相對穩(wěn)定的服務器處理過程
      Thread.sleep(500 + (long) (Math.random() * 500));
      System.out.println("consume: " + task.no);
    }
  }
  private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
    @Override
    public void produce() throws InterruptedException {
      // 不定期生產,模擬隨機的用戶請求
      Thread.sleep((long) (Math.random() * 1000));
      Task task = new Task(increTaskNo.getAndIncrement());
      System.out.println("produce: " + task.no);
      queue.put(task);
    }
  }
  public static void main(String[] args) {
    Model model = new BlockingQueueModel(3);
    for (int i = 0; i < 2; i++) {
      new Thread(model.newRunnableConsumer()).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(model.newRunnableProducer()).start();
    }
  }
}
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容