一 生產者消費者模型介紹
為什么要使用生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在并發(fā)編程中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續(xù)生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者和消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū),平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的
二 生產者消費者模型實現
基于上一小節(jié)學習的隊列來實習一個生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[43m%s 吃 %s\033[0m' %(name,res))
def producer(q,name,food):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m%s 生產了 %s\033[0m' %(name,res))
if name == 'main':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,'egon','包子'))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,'alex'))
#開始
p1.start()
c1.start()
print('主')
執(zhí)行結果
主
egon 生產了 包子0
egon 生產了 包子1
alex 吃 包子0
alex 吃 包子1
egon 生產了 包子2
alex 吃 包子2
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發(fā)一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環(huán)
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[43m%s 吃 %s\033[0m' %(name,res))
def producer(q,name,food):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m%s 生產了 %s\033[0m' %(name,res))
if name == 'main':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,'egon','包子'))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,'alex'))
#開始
p1.start()
c1.start()
p1.join()
q.put(None)
print('主')
但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決,有幾個消費者就需要發(fā)送幾次結束信號:相當low,例如
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[43m%s 吃 %s\033[0m' %(name,res))
def producer(q,name,food):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m%s 生產了 %s\033[0m' %(name,res))
if name == 'main':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,'egon1','包子'))
p2=Process(target=producer,args=(q,'egon2','骨頭'))
p3=Process(target=producer,args=(q,'egon3','泔水'))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,'alex1'))
c2=Process(target=consumer,args=(q,'alex2'))
#開始
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
q.put(None)
print('主')
其實我們的思路無非是發(fā)送結束信號而已,有另外一種隊列提供了這種機制
JoinableQueue([maxsize])
這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
參數介紹
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發(fā)ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續(xù)到隊列中的每個項目均調用q.task_done()方法為止
基于JoinableQueue實現生產者消費者模型
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q,name):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[43m%s 吃 %s\033[0m' %(name,res))
q.task_done() #發(fā)送信號給q.join(),說明已經從隊列中取走一個數據并處理完畢了
def producer(q,name,food):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m%s 生產了 %s\033[0m' %(name,res))
q.join() #等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束
if name == 'main':
q=JoinableQueue() #使用JoinableQueue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,'egon1','包子'))
p2=Process(target=producer,args=(q,'egon2','骨頭'))
p3=Process(target=producer,args=(q,'egon3','泔水'))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,'alex1'))
c2=Process(target=consumer,args=(q,'alex2'))
c1.daemon=True
c2.daemon=True
#開始
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
#1、主進程等生產者p1、p2、p3結束
#2、而p1、p2、p3是在消費者把所有數據都取干凈之后才會結束
#3、所以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該隨著主進程一塊死掉,因而需要將生產者們設置成守護進程
print('主')
三 生產者消費者模型總結
1、程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
2、引入生產者消費者模型為了解決的問題是
平衡生產者與消費者之間的速度差
程序解開耦合
3、如何實現生產者消費者模型
生產者<--->隊列<--->消費者