本文主要介紹java中生產(chǎn)者/消費者模式的實現(xiàn),對java線程鎖機制的一次深入理解。
生產(chǎn)者/消費者模型
生產(chǎn)者/消費者模型要保證,同一個資源在同一時間節(jié)點下只能被最多一個線程訪問,這個在java中用鎖就很容易實現(xiàn)。
下面的例子就是模擬多個生產(chǎn)者生產(chǎn),多個消費者消費的demo
//抽象生產(chǎn)者
public abstract class AbstractProducer implements Runnable {
abstract void produce() throws InterruptedException;
@Override
public void run() {
try {
while (true) {
produce();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//抽象消費者
public abstract class AbstractConsumer implements Runnable {
abstract void consume() throws InterruptedException;
@Override
public void run() {
try {
while (true) {
consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConsumerAndProducerDemo {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Condition CONDITION = LOCK.newCondition();
private static final Queue<Product> PRODUCTS = new LinkedList<>();
private static final int SIZE = 4;
public static class Product {
int id;
Product(int id) {
this.id = id;
}
}
//實現(xiàn)消費者
private static class Consumer extends AbstractConsumer {
@Override
void consume() throws InterruptedException {
try {
LOCK.lock();
while (PRODUCTS.isEmpty()) {
CONDITION.await();
}
Product product = PRODUCTS.poll();
Thread.sleep((long) (500 + Math.random() * 1000));
System.out.println(" consume product " + product.id);
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
//實現(xiàn)生產(chǎn)者
private static class Producer extends AbstractProducer {
@Override
void produce() throws InterruptedException {
try {
LOCK.lock();
while (PRODUCTS.size() >= SIZE) {
CONDITION.await();
}
Thread.sleep(1000);
Product product = new Product(ATOMIC_INTEGER.incrementAndGet());
PRODUCTS.add(product);
System.out.println("produce product " + product.id);
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
public static void main(String[] args) {
for (int index = 0; index < 2; index++) {
new Thread(new Producer()).start();
}
for (int index = 0; index < 3; index++) {
new Thread(new Consumer()).start();
}
}
}
上面的demo這么實現(xiàn)
- 啟動多個線程模擬多個生產(chǎn)者和多個消費者
- 同時使用了queue用來緩存產(chǎn)品
- 當緩存區(qū)沒滿時生產(chǎn)者生產(chǎn)
- 當緩沖區(qū)滿時消費者開始消費
線程之間的同步,這里使用了ReentrantLock,ReentrantLock在之前的博客中有介紹過,當然也可以使用Object自帶的wait()等方法,實現(xiàn)同步這里就不在修改demo另行實現(xiàn)了。