前言
最近在看并發(fā)編程藝術(shù)這本書,對看書的一些筆記及個人工作中的總結(jié)。
什么是生產(chǎn)者和消費者模式?
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序整體處理數(shù)據(jù)的速度。
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,
那么消費者就必須等待生產(chǎn)者。為了解決這種生產(chǎn)消費能力不均衡的問題,便有了生產(chǎn)者和消費者模式。
生產(chǎn)者和消費者模式是通過一個容器(一般使用阻塞隊列)來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通信,而是通過阻塞隊列來進行通信,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
直接看demo:
生產(chǎn)者:
public class Provider implements Runnable{
//共享緩存區(qū)
private BlockingQueue<Data> queue;
//多線程間是否啟動變量,有強制從主內(nèi)存中刷新的功能。即時返回線程的狀態(tài)
private volatile boolean isRunning = true;
//id生成器
private static AtomicInteger count = new AtomicInteger();
//隨機對象
private static Random r = new Random();
public Provider(BlockingQueue queue){
this.queue = queue;
}
@Override
public void run() {
while(isRunning){
try {
//隨機休眠0 - 1000 毫秒 表示獲取數(shù)據(jù)(產(chǎn)生數(shù)據(jù)的耗時)
Thread.sleep(r.nextInt(1000));
//獲取的數(shù)據(jù)進行累計...
int id = count.incrementAndGet();
//比如通過一個getData方法獲取了
Data data = new Data(Integer.toString(id), "數(shù)據(jù)" + id);
System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + ", 獲取了數(shù)據(jù),id為:" + id + ", 將數(shù)據(jù)加入到阻塞隊列...");
//將生產(chǎn)者生產(chǎn)的數(shù)據(jù)加入阻塞隊列中,如果2s加入沒有成功就失敗
if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
System.out.println("提交緩沖區(qū)數(shù)據(jù)失敗....");
//do something... 比如重新提交
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning = false;
}
}
消費端:
public class Consumer implements Runnable{
private BlockingQueue<Data> queue;
public Consumer(BlockingQueue queue){
this.queue = queue;
}
//隨機對象
private static Random r = new Random();
@Override
public void run() {
while(true){
try {
//獲取數(shù)據(jù)
Data data = this.queue.take();
//進行數(shù)據(jù)處理。休眠0 - 1000毫秒模擬耗時
Thread.sleep(r.nextInt(1000));
System.out.println("當(dāng)前消費線程:" + Thread.currentThread().getName() + ", 消費成功,消費數(shù)據(jù)為id: " + data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public final class Data {
private String id;
private String name;
public Data(String id, String name){
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString(){
return "{id: " + id + ", name: " + name + "}";
}
}
測試類:
public class Test {
public static void main(String[] args) throws Exception {
//內(nèi)存緩沖區(qū)
BlockingQueue<Data> queue = new LinkedBlockingQueue<>(10);
//生產(chǎn)者
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
//消費者
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
//創(chuàng)建線程池運行,這是一個緩存的線程池,可以創(chuàng)建無窮大的線程,沒有任務(wù)的時候不創(chuàng)建線程??臻e線程存活時間為60s(默認(rèn)值)
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(p1);
cachePool.execute(p2);
cachePool.execute(p3);
cachePool.execute(c1);
cachePool.execute(c2);
cachePool.execute(c3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//3s之后停止生產(chǎn)數(shù)據(jù),消費者也就無法再繼續(xù)取到數(shù)據(jù)
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// cachePool.shutdown();
// cachePool.shutdownNow();
}
}
線程池與生產(chǎn)消費者模式
之前的博客已經(jīng)提到過線程池的底層都使用了阻塞隊列(BlockingQueue)
Java中的線程池類其實就是一種生產(chǎn)者和消費者模式的實現(xiàn)方式,但是我覺得其實現(xiàn)方式更加高明。生產(chǎn)者把任務(wù)丟給線程池,線程池創(chuàng)建線程并處理任務(wù),如果將要運行的任務(wù)數(shù)大于線程池的基本線程數(shù)就把任務(wù)扔到阻塞隊列里,這種做法比只使用一個阻塞隊列來實現(xiàn)生產(chǎn)者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產(chǎn)者先存,消費者再取這種方式顯然慢一些。
比如:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
比如:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
我相信這種場景應(yīng)該非常多,特別是需要處理任務(wù)時間比較長的場景,比如上傳附件并處理,用戶把文件上傳到系統(tǒng)后,系統(tǒng)把文件丟到隊列里,然后立刻返回告訴用戶上傳成功,最后消費者再去隊列里取出文件處理。再如,調(diào)用一個遠(yuǎn)程接口查詢數(shù)據(jù),如果遠(yuǎn)程服務(wù)接口查詢時需要幾十秒的時間,那么它可以提供一個申請查詢的接口,這個接口把要申請查詢?nèi)蝿?wù)放數(shù)據(jù)庫中,然后該接口立刻返回。然后服務(wù)器端用線程輪詢并獲取申請任務(wù)進行處理,處理完之后發(fā)消息給調(diào)用方,讓調(diào)用方再來調(diào)用另外一個接口取數(shù)據(jù)。