Python筆記_從迭代器、生成器到協(xié)程(二)

之前的一篇一直寫了python中迭代器和生成器的內(nèi)容,這篇內(nèi)容將介紹由生成器實(shí)現(xiàn)協(xié)程的方法。

日志中實(shí)例的代碼都是在python3.5中運(yùn)行的。

協(xié)程的內(nèi)容受以下教程的啟發(fā)較大:
http://dabeaz.com/coroutines/

1、什么是協(xié)程?

并發(fā)(不是并行)編程目前有四種方式,多進(jìn)程,多線程,異步,和協(xié)程。協(xié)程和多線程的區(qū)別在于:協(xié)程的調(diào)度對(duì)于內(nèi)核來說是不可見的,協(xié)程間是協(xié)同調(diào)度的。

在python中協(xié)程可以由生成器來實(shí)現(xiàn)的??梢杂靡粋€(gè)“生產(chǎn)者--消費(fèi)者”的模型來解釋。

假設(shè)有兩個(gè)函數(shù),一個(gè)負(fù)責(zé)生產(chǎn),一個(gè)負(fù)責(zé)消費(fèi)。生產(chǎn)完成一個(gè)內(nèi)容后,傳遞給消費(fèi)函數(shù)進(jìn)行消費(fèi),消費(fèi)完之后又進(jìn)行請(qǐng)求一次生產(chǎn)。當(dāng)然可以用多線程來實(shí)現(xiàn),用生成器的來實(shí)現(xiàn)例子如下:

#生產(chǎn)者、消費(fèi)者
def consumer():
    #消費(fèi)結(jié)果
    res=""
    while True:
        #n:消費(fèi)的數(shù)目
        n = yield res                 
        #如果沒有傳入消費(fèi)商品,繼續(xù)while循環(huán)
        if not n:
            continue    
        print ('[消費(fèi)者]consuming %s...' % n)
        res ='ok'

def producer(c):
    #c是消費(fèi)者的生成器對(duì)象
    c.__next__()
    n=0 
    while True:
        n=n+1
        print("[生產(chǎn)者]producing %s" % n)
        res=c.send(n)
        print("[生產(chǎn)者]consumer return is %s" % res)
        if n > 1000:
            break
    c.close()

if __name__ == '__main__':
    c=consumer()
    producer(c)
執(zhí)行結(jié)果:
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 997
[消費(fèi)者]consuming 997...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 998
[消費(fèi)者]consuming 998...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 999
[消費(fèi)者]consuming 999...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 1000
[消費(fèi)者]consuming 1000...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 1001
[消費(fèi)者]consuming 1001...
[生產(chǎn)者]consumer return is ok

在上面的例子中,consumer是一個(gè)生成器,producer是一個(gè)函數(shù),producer接收一個(gè)生成器對(duì)象。

如果把上面的邏輯按照多線程來寫就比較復(fù)雜,我還是用python的多線程寫了一個(gè)同樣的邏輯,用Queue來做進(jìn)程間通信:

import threading,queue
number=10000

class producer(threading.Thread):
    def __init__(self,q1,q2):
        super(producer,self).__init__()
        #消費(fèi)者反饋
        self.q1=q1
        #產(chǎn)品輸出
        self.q2=q2

    def run(self):
        n=1
        print("[生產(chǎn)者]producing %s" % n)
        self.q2.put(n)
        while True:
            if not self.q1.empty():
                res = self.q1.get(block=False)
                print("[生產(chǎn)者]consumer return is %s" % res)
                if n > number:
                    break
                print("[生產(chǎn)者]producing %s" % n)
                self.q2.put(n)
                n=n+1

class consumer(threading.Thread):
    def __init__(self,q1,q2):
        super(consumer,self).__init__()
        self.q1=q1
        self.q2=q2
    def run(self):
        while True:
            if not self.q2.empty():
                i=self.q2.get(block=False)
                print ('[消費(fèi)者]consum ing %s...' % i)
                res ='ok'
                self.q1.put(res)
                if i>=number:
                    break

def test():
    q1=queue.Queue(maxsize=5)
    q2=queue.Queue(maxsize=5)
    p = producer(q1,q2)
    c = consumer(q1,q2)
    p.start()
    c.start()
    p.join()
    c.join()
test()

執(zhí)行結(jié)果:
[生產(chǎn)者]producing 99997
[消費(fèi)者]consuming 99997...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 99998
[消費(fèi)者]consuming 99998...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 99999
[消費(fèi)者]consuming 99999...
[生產(chǎn)者]consumer return is ok
[生產(chǎn)者]producing 100000
[消費(fèi)者]consuming 100000...
[生產(chǎn)者]consumer return is ok

協(xié)程和多線程去執(zhí)行這樣一個(gè)邏輯速度有多大差異呢,我對(duì)比了一下生產(chǎn)/消費(fèi)1000個(gè)東西的耗時(shí)對(duì)比:

  • 協(xié)程:0.086s
  • 多線程:21.79s

2、協(xié)程的好處都有啥?

引用一下廖雪峰的教程中的觀點(diǎn)(http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090171191d05dae6e129940518d1d6cf6eeaaa969000

  • 協(xié)程的執(zhí)行有點(diǎn)像多線程,但協(xié)程的特點(diǎn)在于是一個(gè)線程執(zhí)行。

  • 最大的優(yōu)勢(shì)就是協(xié)程極高的執(zhí)行效率。因?yàn)樽映绦蚯袚Q不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢(shì)就越明顯。

  • 第二大優(yōu)勢(shì)就是不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程,也不存在同時(shí)寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

3、用協(xié)程來打造流水線pipeline

用協(xié)程可以很方便的打造流水線式的程序,對(duì)生產(chǎn)者/消費(fèi)者的一個(gè)擴(kuò)展就是:流水線上的每一個(gè)環(huán)節(jié),都有input和output,它既消費(fèi)上一個(gè)流程的結(jié)果,也產(chǎn)生一個(gè)輸出給下一個(gè)環(huán)節(jié)。非常類似于linux中的管道的功能。

3.1處理字符串的例字

假設(shè)有這樣一個(gè)數(shù)據(jù)流水線:

  1. manager隨機(jī)產(chǎn)生一些5~10位的長(zhǎng)度的字符串,傳遞給第一個(gè)工人
  2. 第一個(gè)工人將字符串截?cái)啵蝗∏?個(gè)字符
  3. 第二個(gè)工人將這個(gè)字符串中的“數(shù)字”去掉,然后排序,輸出一個(gè)新字符串
  4. 第三個(gè)工人將字符串中的字符都變?yōu)榇髮?/li>

也就是完成這樣一個(gè)流水線84BzX5Fn-->84BzX-->BzX-->BZX,代碼如下:

#流水線例子
from random import shuffle,randint
import re

def random_str():
    #隨機(jī)生成5~10位字符串
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
    chars_list=list(chars)
    shuffle(chars_list)
    res=''.join(chars_list[0:randint(5,10)])
    return res

def manager(target):
    #生成1萬個(gè)隨機(jī)字符串并傳入work1
    n=0
    target.__next__()
    while (n<10000):
        n=n+1
        resource = random_str()
        print("Manager: %s" % resource)
        target.send(resource)

def work1(target):
    target.__next__()
    while True:
        input_str = yield
        if len(input_str)>5:
            #截?cái)?            res=input_str[0:5]
        else:
            res=input_str
        #給下一個(gè)生成器傳入值
        target.send(res)

def work2(target):
    target.__next__()
    while True:
        str_from_work1 = yield
        if str_from_work1:
            #去掉字符串中的數(shù)字
            res=re.sub(r'([\d]+)','',str_from_work1)
            target.send(res)

def work3():
    #字母變?yōu)榇髮?    while True:
        str_from_work2 = yield
        if str_from_work2:
            res=str_from_work2.upper()
            print("output: %s" % res)

if __name__ == '__main__':
    manager(work1(work2(work3())))

#執(zhí)行結(jié)果
Manager: 5wKvB
output: WKVB
Manager: uRDjl5
output: URDJL
Manager: 84BzX5Fn
output: BZX
Manager: FfhjoMueD
output: FFHJO
Manager: TDqe2axN
output: TDQE
Manager: cqEOj0I
output: CQEOJ
Manager: c2xQE
output: CXQE
Manager: 2qMupVNc
output: QMUP
Manager: MriKfphB
output: MRIKF
Manager: KSoc7EibJZ
output: KSOC
Manager: QDrou
output: QDROU

3.2帶分支的流水線

在上面的例字里面,第2和第3個(gè)工人之間,增加一個(gè)新的環(huán)節(jié):

  • 有若干工人,根據(jù)幾個(gè)“客戶需求”,給每個(gè)字符串的尾部添加一些tag

例字如下:
#流水線例子
from random import shuffle,randint
import re

def random_str():
    #隨機(jī)生成5~10位字符串
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
    chars_list=list(chars)
    shuffle(chars_list)
    res=''.join(chars_list[0:randint(5,10)])
    return res

def manager(target):
    #生成1萬個(gè)隨機(jī)字符串并傳入work1
    n=0
    target.__next__()
    while (n<10000):
        n=n+1
        resource = random_str()
        print("Manager: %s" % resource)
        target.send(resource)

def work1(target):
    target.__next__()
    while True:
        input_str = yield
        if len(input_str)>5:
            #截?cái)?            res=input_str[0:5]
        else:
            res=input_str
        #給下一個(gè)生成器傳入值
    target.send(res)

def work2(target):
    target.__next__()
    while True:
        str_from_work1 = yield
        if str_from_work1:
            #去掉字符串中的數(shù)字
            res=re.sub(r'([\d]+)','',str_from_work1)
            target.send(res)

def broadcast(targets):
    #廣播
    for i in targets:
        i.__next__()
    while True:
        res=yield
        for i in targets:
            i.send(res)

def work_tail(tail,target):
    #把字符串后面添加一個(gè)tag
    target.__next__()
    while True:
        res=yield
        target.send(res+tail)

def work3():
    #字母變?yōu)榇髮?    while True:
        str_from_work2 = yield
        if str_from_work2:
            res=str_from_work2.upper()
            print("output: %s" % res)
if __name__ == '__main__':
    manager(work1(work2(
        broadcast([
            work_tail('_tag1',work3()),
            work_tail('_tag2',work3()),
            work_tail('_tag_new',work3()),
            work_tail('_tag_old',work3()),
        ])
    )))

執(zhí)行結(jié)果:
Manager: BEv8akI5q
output: BEVA_TAG1
output: BEVA_TAG2
Manager: j6FfVUysB9
output: JFFV_TAG1
output: JFFV_TAG2
Manager: C27fB
output: CFB_TAG1
output: CFB_TAG2
Manager: 1V8NYRrB2
output: VNY_TAG1
output: VNY_TAG2
Manager: qtAOye1
output: QTAOY_TAG1
output: QTAOY_TAG2
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容