Python之生產(chǎn)者消費(fèi)者模型
-
生產(chǎn)者消費(fèi)者模型
- 模型指的是一種解決問題的套路,目的是為了使生產(chǎn)數(shù)據(jù)與處理數(shù)據(jù)達(dá)到平衡,使得效率最大化。
-
生產(chǎn)者消費(fèi)者模型中包含兩類重要角色一類叫生產(chǎn)者,另一類叫消費(fèi)者:
- 生產(chǎn)者:將負(fù)責(zé)制造數(shù)據(jù)的稱為生產(chǎn)者(生產(chǎn)數(shù)據(jù))。通常在生產(chǎn)數(shù)據(jù)之前需要通過一些代碼瀨獲取數(shù)據(jù)。
- 消費(fèi)者:接收生產(chǎn)者制造出的數(shù)據(jù),來做進(jìn)一步處理,該類任務(wù)被比喻成消費(fèi)者(處理數(shù)據(jù))。通常在獲取代碼之后需要通過一些代碼進(jìn)行數(shù)據(jù)處理。
-
實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型的三要素:
- 生產(chǎn)者
- 消費(fèi)者
- 隊(duì)列(隊(duì)列中存放的是一些消息)
-
生產(chǎn)者消費(fèi)者模型的運(yùn)作方式:
- 生產(chǎn)者生產(chǎn)數(shù)據(jù),放到一個(gè)共享的空間,然后消費(fèi)者取走進(jìn)行處理。
-
生產(chǎn)者消費(fèi)者模型的實(shí)現(xiàn)方式:(由于生產(chǎn)者消費(fèi)者模型并不局限于某一類技術(shù),因此,有多種實(shí)現(xiàn)方式,不限于以下方式)
- 生產(chǎn)者進(jìn)程+隊(duì)列+消費(fèi)者進(jìn)程
-
該模型的應(yīng)用場景
- 程序中出現(xiàn)明顯的兩類任務(wù),一類任務(wù)負(fù)責(zé)生產(chǎn)數(shù)據(jù),另一類任務(wù)是負(fù)責(zé)處理生產(chǎn)數(shù)據(jù)的,此時(shí)就應(yīng)該考慮生產(chǎn)者消費(fèi)者模型。(例如:爬蟲)
-
使用生產(chǎn)者消費(fèi)者模型的優(yōu)勢:
- 實(shí)現(xiàn)了生產(chǎn)者與消費(fèi)者解耦合
- 平衡了生產(chǎn)力和消費(fèi)力,彼此不影響,生產(chǎn)者可以一致不停的生產(chǎn),消費(fèi)者可以一致不停的消費(fèi),因?yàn)槎卟辉偈侵苯訙贤?,而是跟?duì)列溝通
import time import random from multiprocessing import Process,Queue def consumer(name, q): while True: res = q.get() if res == None:break time.sleep(random.randint(1,3)) print(F'[{name}]獲取到{res}') def producer(name,q,tools): for i in range(3): time.sleep(random.randint(1,2)) res = F'{tools}:{i}' q.put(res) print(F'[{name}]制造了{(lán)res}') q.put(None) if __name__ == '__main__': # 隊(duì)列 q = Queue() # 生產(chǎn)者 p1 = Process(target=producer, args=(('生產(chǎn)者-1', q, '食物'))) p2 = Process(target=producer, args=(('生產(chǎn)者-2', q, '武器'))) # 消費(fèi)者 c1 = Process(target=consumer, args=(('消費(fèi)者-1',q))) c2 = Process(target=consumer, args=(('消費(fèi)者-2',q))) c3 = Process(target=consumer, args=(('消費(fèi)者-3',q))) p1.start() p2.start() c1.start() c2.start() c3.start() # 在生產(chǎn)者生產(chǎn)完畢之后,往隊(duì)列的末尾添加一個(gè)結(jié)束信號None p1.join() p2.join() # 有幾個(gè)消費(fèi)者就應(yīng)該放幾個(gè)結(jié)束信號 q.put(None) q.put(None) q.put(None)
改進(jìn):(加入守護(hù)進(jìn)程)
import time import random from multiprocessing import Process,JoinableQueue def consumer(name, q): while True: res = q.get() if res == None:break time.sleep(random.randint(1,3)) print(F'[{name}]獲取到{res}') q.task_done() def producer(name, q, tools): for i in range(3): time.sleep(random.randint(1,2)) res = F'{tools}:{i}' q.put(res) print(F'[{name}]制造了{(lán)res}') if __name__ == '__main__': # 隊(duì)列 q = JoinableQueue() # 生產(chǎn)者 p1 = Process(target=producer, args=(('生產(chǎn)者-1', q, '食物'))) p2 = Process(target=producer, args=(('生產(chǎn)者-2', q, '武器'))) # 消費(fèi)者 c1 = Process(target=consumer, args=(('消費(fèi)者-1', q))) c2 = Process(target=consumer, args=(('消費(fèi)者-2', q))) c3 = Process(target=consumer, args=(('消費(fèi)者-3', q))) c1.daemon = True c2.daemon = True c3.daemon = True p1.start() p2.start() c1.start() c2.start() c3.start() # 確定生產(chǎn)者生產(chǎn)完畢 p1.join() p2.join() # 在生產(chǎn)者生產(chǎn)完畢后,拿到隊(duì)列中元素的總個(gè)數(shù),然后知道元素總數(shù)變?yōu)?,q.join()這一行代碼才算運(yùn)行完畢 q.join() print('父進(jìn)程結(jié)束')