適用的情況
由多個線程之間處理生產(chǎn)消費的關(guān)系, 并且生產(chǎn)和消費不是即時處理的情況, 其中涉及到數(shù)據(jù)量的線程安全性問題.
實現(xiàn)的方式
在Producer和Consumer之間設(shè)立一個中轉(zhuǎn)站Channel, 讓Channel來保存和維護(hù)數(shù)據(jù)的安全, 這樣生產(chǎn)者和消費者之間就解耦了, 與他們有關(guān)的對象是Channel, 并且Channel是線程安全的.
相關(guān)的模式
- Channel角色保證數(shù)據(jù)安全狀態(tài)的時候可以使用Guarded Suspension模式.
- 在Future模式中, 傳遞返回值的時候, 可以使用Producer-Consumer模式.
- Worker-Thread模式中, 對于Worker的請求可以使用Producer-Consumer模式對請求進(jìn)行控制.
代碼示例:
下面是一個例子, 由MakerThread生產(chǎn)字符串, Table進(jìn)行保存, 然后ConsumerThread進(jìn)行消費(打印出來).
package com.graphic.producerAndConsumer;
import java.util.Random;
/**
* @author youngxinler 19-6-1 上午11:40
* @version 0.1
**/
public class MakerThread extends Thread{
private final Random random;
private final Table table;
private static int id = 0;
public MakerThread(String name, Table table, long seed) {
super(name);
this.random = new Random(seed);
this.table = table;
}
@Override
public void run(){
try {
while (true) {
Thread.sleep(random.nextInt(1000));
String cake = "[Cake No." + nextId() + " by " + getName() + "]";
table.put(cake);
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
private int nextId(){
return id++;
}
}
package com.graphic.producerAndConsumer;
import java.util.LinkedList;
/**
* @author youngxinler 19-6-1 上午11:42
* @version 0.1
*
* 這里的table其實相當(dāng)于一個池子,存放著生產(chǎn)者生產(chǎn)的物品,等待消費者來消費。
* 為什么用table來保證線程安全?
* 1.明白要保護(hù)的變量, 這個例子中會造成線程不安全的變量是buffer[],而buffer位于table.
* 2.與maker和consumer"斷絕關(guān)系", 保證了table的線程安全, 那么對于maker和consumer就可以大膽放心的寫了.
* 3.保證了table的通用性.
*
**/
public class Table {
private final String[] buffer;
private int tail;
private int head;
private int count;
public Table(int count) {
this.buffer = new String[3];
this.tail = 0;
this.head = 0;
this.count = 0;
}
public synchronized void put(String cake)throws InterruptedException{
System.out.println(Thread.currentThread().getName() + " puts " + cake);
while (count >= buffer.length){
wait();
}
buffer[tail] = cake;
tail = (tail + 1) % buffer.length;
count++;
notifyAll();
}
public synchronized String take() throws InterruptedException{
while (count <= 0){
wait();
}
String cake = buffer[head];
head = (head + 1) % buffer.length;
count--;
System.out.println(Thread.currentThread().getName() + " take " + cake);
notifyAll();
return cake;
}
}
package com.graphic.producerAndConsumer;
import java.util.Random;
/**
* @author youngxinler 19-6-1 下午12:44
* @version 0.1
**/
public class ConsumerThread extends Thread{
private final Table table;
private final Random random;
public ConsumerThread(String s, Table table, long seed) {
super(s);
this.table = table;
this.random = new Random(seed);
}
@Override
public void run(){
try{
while (true){
Thread.sleep(random.nextInt(1000));
String cake = table.take();
System.out.println(cake + " is eaten by " + getName());
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
package com.graphic.producerAndConsumer;
/**
* @author youngxinler 19-6-1 下午12:49
* @version 0.1
**/
public class Main {
public static void main(String[] args) {
Table table = new Table(3);
new MakerThread("maker-1", table, 1000).start();
new MakerThread("maker-2", table, 1000).start();
new MakerThread("maker-3", table, 1000).start();
new ConsumerThread("consumer-1", table, 1).start();
new ConsumerThread("consumer-2", table, 2).start();
new ConsumerThread("consumer-3", table, 3).start();
}
}