與BlockingQueue息息相關的一個話題就是:生產者-消費者模型。
- 生產者持續(xù)生產,直道緩沖區(qū)滿,阻塞
- 消費者持續(xù)消費,直到緩沖區(qū)空,阻塞
- 生產,消費 并發(fā)
接下來,我們就要用BlockingQueue去實現生產消費。
分別定義生產,消費接口。請注意定義中的阻塞。
public interface Consumer{
void consume() throws InterruptedException;
}
public interface Producer{
void produce() throws InterruptedException;
}
注意生產者,消費者可以并發(fā)??梢詫涌谶M行進一步封裝
abstract class AbsConsumer implements Consumer, Runnable{
@Override
public void run() {
try{
consume();
} catch(InterruptedException e) {
}
}
}
abstract class AbstractProducer implements Producer, Runnable {
@Override
public void run() {
try {
produce();
} catch (InterruptedException e) {
}
}
}
生產者,消費者消費的個體單位是:
public class Task {
public int no;
public Task(int no) {
this.no = no;
}
}
最后,模型就是:
public class BlockingQueueModel {
private final BlockingQueue<Task> queue;
//這里使用AtomicInteger,保證并發(fā)的生產的個體的標記唯一
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public BlockingQueueModel(int cap) {
this.queue = new LinkedBlockingQueue<>(cap);
}
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer {
@Override
public void consume() throws InterruptedException {
Task task = queue.take();
// 固定時間范圍的消費,模擬相對穩(wěn)定的服務器處理過程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println("consume: " + task.no);
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不定期生產,模擬隨機的用戶請求
Thread.sleep((long) (Math.random() * 1000));
Task task = new Task(increTaskNo.getAndIncrement());
System.out.println("produce: " + task.no);
queue.put(task);
}
}
public static void main(String[] args) {
Model model = new BlockingQueueModel(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}