37.Python之生產(chǎn)者消費(fèi)者模型

Python之生產(chǎn)者消費(fèi)者模型

  1. 生產(chǎn)者消費(fèi)者模型

    • 模型指的是一種解決問題的套路,目的是為了使生產(chǎn)數(shù)據(jù)與處理數(shù)據(jù)達(dá)到平衡,使得效率最大化。

  1. 生產(chǎn)者消費(fèi)者模型中包含兩類重要角色一類叫生產(chǎn)者,另一類叫消費(fèi)者:

    • 生產(chǎn)者:將負(fù)責(zé)制造數(shù)據(jù)的稱為生產(chǎn)者(生產(chǎn)數(shù)據(jù))。通常在生產(chǎn)數(shù)據(jù)之前需要通過一些代碼瀨獲取數(shù)據(jù)。
    • 消費(fèi)者:接收生產(chǎn)者制造出的數(shù)據(jù),來做進(jìn)一步處理,該類任務(wù)被比喻成消費(fèi)者(處理數(shù)據(jù))。通常在獲取代碼之后需要通過一些代碼進(jìn)行數(shù)據(jù)處理。

  1. 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型的三要素:

    • 生產(chǎn)者
    • 消費(fèi)者
    • 隊(duì)列(隊(duì)列中存放的是一些消息)

  1. 生產(chǎn)者消費(fèi)者模型的運(yùn)作方式:

    • 生產(chǎn)者生產(chǎn)數(shù)據(jù),放到一個(gè)共享的空間,然后消費(fèi)者取走進(jìn)行處理。

  1. 生產(chǎn)者消費(fèi)者模型的實(shí)現(xiàn)方式:(由于生產(chǎn)者消費(fèi)者模型并不局限于某一類技術(shù),因此,有多種實(shí)現(xiàn)方式,不限于以下方式)

    • 生產(chǎn)者進(jìn)程+隊(duì)列+消費(fèi)者進(jìn)程

  1. 該模型的應(yīng)用場景

    • 程序中出現(xiàn)明顯的兩類任務(wù),一類任務(wù)負(fù)責(zé)生產(chǎn)數(shù)據(jù),另一類任務(wù)是負(fù)責(zé)處理生產(chǎn)數(shù)據(jù)的,此時(shí)就應(yīng)該考慮生產(chǎn)者消費(fèi)者模型。(例如:爬蟲)

  1. 使用生產(chǎn)者消費(fèi)者模型的優(yōu)勢:

    • 實(shí)現(xiàn)了生產(chǎn)者與消費(fèi)者解耦合
    • 平衡了生產(chǎn)力和消費(fèi)力,彼此不影響,生產(chǎn)者可以一致不停的生產(chǎn),消費(fèi)者可以一致不停的消費(fèi),因?yàn)槎卟辉偈侵苯訙贤?,而是跟?duì)列溝通
    import time
    import random
    from multiprocessing import Process,Queue
    
    def consumer(name, q):
     while True:
         res = q.get()
         if res == None:break
         time.sleep(random.randint(1,3))
         print(F'[{name}]獲取到{res}')
    
    def producer(name,q,tools):
     for i in range(3):
         time.sleep(random.randint(1,2))
         res = F'{tools}:{i}'
         q.put(res)
         print(F'[{name}]制造了{(lán)res}')
     q.put(None)
    
    if __name__ == '__main__':
     # 隊(duì)列
     q = Queue()
    
     # 生產(chǎn)者
     p1 = Process(target=producer, args=(('生產(chǎn)者-1', q, '食物')))
     p2 = Process(target=producer, args=(('生產(chǎn)者-2', q, '武器')))
    
    
     # 消費(fèi)者
     c1 = Process(target=consumer, args=(('消費(fèi)者-1',q)))
     c2 = Process(target=consumer, args=(('消費(fèi)者-2',q)))
     c3 = Process(target=consumer, args=(('消費(fèi)者-3',q)))
    
    
     p1.start()
     p2.start()
    
     c1.start()
     c2.start()
     c3.start()
    
     # 在生產(chǎn)者生產(chǎn)完畢之后,往隊(duì)列的末尾添加一個(gè)結(jié)束信號None
     p1.join()
     p2.join()
    
     # 有幾個(gè)消費(fèi)者就應(yīng)該放幾個(gè)結(jié)束信號
     q.put(None)
     q.put(None)
     q.put(None)
    

    改進(jìn):(加入守護(hù)進(jìn)程)

    import time
    import random
    from multiprocessing import Process,JoinableQueue
    
    def consumer(name, q):
       while True:
          res = q.get()
          if res == None:break
          time.sleep(random.randint(1,3))
          print(F'[{name}]獲取到{res}')
          q.task_done()
    
    def producer(name, q, tools):
       for i in range(3):
          time.sleep(random.randint(1,2))
          res = F'{tools}:{i}'
          q.put(res)
          print(F'[{name}]制造了{(lán)res}')
    
    if __name__ == '__main__':
       # 隊(duì)列
       q = JoinableQueue()
    
       # 生產(chǎn)者
       p1 = Process(target=producer, args=(('生產(chǎn)者-1', q, '食物')))
       p2 = Process(target=producer, args=(('生產(chǎn)者-2', q, '武器')))
    
    
       # 消費(fèi)者
       c1 = Process(target=consumer, args=(('消費(fèi)者-1', q)))
       c2 = Process(target=consumer, args=(('消費(fèi)者-2', q)))
       c3 = Process(target=consumer, args=(('消費(fèi)者-3', q)))
       c1.daemon = True
       c2.daemon = True
       c3.daemon = True
    
    
       p1.start()
       p2.start()
    
       c1.start()
       c2.start()
       c3.start()
       # 確定生產(chǎn)者生產(chǎn)完畢
       p1.join()
       p2.join()
       # 在生產(chǎn)者生產(chǎn)完畢后,拿到隊(duì)列中元素的總個(gè)數(shù),然后知道元素總數(shù)變?yōu)?,q.join()這一行代碼才算運(yùn)行完畢
       q.join()
       print('父進(jìn)程結(jié)束')
    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容