Python筆記_從迭代器、生成器到協(xié)程(三)

1、協(xié)程和多線程的混合使用

協(xié)程的最大優(yōu)勢(shì)是沒(méi)有多線程的鎖機(jī)制,因?yàn)樗挥幸粋€(gè)線程,也不存在同時(shí)寫變量的沖突,所以執(zhí)行效率比多線程高很多。不過(guò),如果你的cpu不止一個(gè)核,那么就可以將協(xié)程和多線程(或者子線程)混合起來(lái),進(jìn)一步提高執(zhí)行效率。執(zhí)行流程大概如下圖所示:

Paste_Image.png

1.1 流水線的例子

還是看到上一篇博客中3.1節(jié)的例子,一個(gè)工廠流水線:

  1. manager隨機(jī)產(chǎn)生一些5~10位的長(zhǎng)度的字符串,傳遞給第一個(gè)工人
  2. 第一個(gè)工人將字符串截?cái)?,只取?個(gè)字符
  3. 第二個(gè)工人將這個(gè)字符串中的“數(shù)字”去掉,然后排序,輸出一個(gè)新字符串
  4. 第三個(gè)工人將字符串中的字符都變?yōu)榇髮?/li>

現(xiàn)在我們需要把第3、4個(gè)步驟放到一個(gè)新的線程里去執(zhí)行,代碼如下:

#協(xié)程和多線程混合的例子
from random import shuffle,randint
import re

def random_str():
    #隨機(jī)生成5~10位字符串
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
    chars_list=list(chars)
    shuffle(chars_list)
    res=''.join(chars_list[0:randint(5,10)])
    return res 

def manager(target):
    #生成1萬(wàn)個(gè)隨機(jī)字符串并傳入work1
    n=0 
    target.__next__()
    while (n<100000):
        n=n+1
        resource = random_str()
        print("Manager: %s" % resource)
        target.send(resource)

def work1(target):
    target.__next__()
    while True:
        input_str = yield
        if len(input_str)>5:
            #截?cái)?            res=input_str[0:5]
        else:
            res=input_str
        #給下一個(gè)生成器傳入值
        target.send(res)

def work2(target):
    target.__next__()
    while True:
        str_from_work1 = yield
        if str_from_work1:
            #去掉字符串中的數(shù)字
            res=re.sub(r'([\d]+)','',str_from_work1)
            target.send(res)


def work3():
    #字母變?yōu)榇髮?    while True:
        str_from_work2 = yield
        if str_from_work2:
            res=str_from_work2.upper()
            print("output: %s" % res)

from threading import Thread
from queue import Queue

def cothread(target):
    target.__next__()
    #由于開(kāi)多線程,使用一個(gè)Queue新線程進(jìn)行溝通
    message = Queue()

    def run_target():
        while True:
            item = message.get()
            if item is GeneratorExit:
                target.close()
                return
            else:
                target.send(item)

    #開(kāi)始一個(gè)新的線程
    Thread(target=run_target).start()

    #主線程通過(guò)queue和新的線程通信
    try:
        while True:
            item = yield
            message.put(item)
    except GeneratorExit:
        message.put(GeneratorExit)

if __name__ == '__main__':
    manager(work1(cothread(work2(work3()))))    

由上面的例子可以看到work2和work3之前多了一個(gè)cothread的生成器,這個(gè)生成器打開(kāi)了一個(gè)新的Thread,并通過(guò)一個(gè)Queue實(shí)現(xiàn)線程間的通信。類似的,還可以通過(guò)subprocess(pipe通信)、網(wǎng)絡(luò)等方法去包裝協(xié)程。也就是說(shuō)使用協(xié)程可以把你的“實(shí)現(xiàn)”和“環(huán)境”分割開(kāi)來(lái),上面例子中的work和manager就相當(dāng)于“實(shí)現(xiàn)”的邏輯。而不同的實(shí)現(xiàn)環(huán)節(jié)可以放到不同的“環(huán)境(多線程、子線程、網(wǎng)絡(luò))”中去具體執(zhí)行。

1.2 特別注意

需要特別注意的兩點(diǎn):

  1. 在調(diào)用協(xié)程的send函數(shù)時(shí),必須是同步的。如果給正在執(zhí)行的生成器send一個(gè)值,生成器會(huì)crash
  2. 在將生成器組合成流水線時(shí),生成器的連接不能存在loop

2、協(xié)程與任務(wù)調(diào)度器

在David Beazley教程的后半部分,第7章開(kāi)始(http://dabeaz.com/coroutines/),討論的主要是只用協(xié)程能不能用來(lái)構(gòu)造一個(gè)類似于操作系統(tǒng)的調(diào)度器?答案是:能!

首先來(lái)看看一個(gè)操作系統(tǒng)的調(diào)度器需要實(shí)現(xiàn)那些東西:

  • 需要有一個(gè)task類
  • 需要一個(gè)調(diào)度器,scheduler
  • scheduler能夠調(diào)度multitask,多任務(wù)交替運(yùn)行
  • task執(zhí)行完之后可以退出
  • 允許有系統(tǒng)調(diào)用,對(duì)task進(jìn)行基本的管理
  • 可以創(chuàng)建新的task
  • 系統(tǒng)調(diào)用可以kill task也可以wait for task(異步task)

一下是一個(gè)例子,在這個(gè)例子中,只用協(xié)程(不使用多線程,子線程)就實(shí)現(xiàn)了以上的各種功能:

class Task(object):
    taskid = 0 
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}    

        # Tasks waiting for other tasks to exit
        self.exit_waiting = {}

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print ("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]
        # Notify other tasks waiting for exit
        # 如果有別的task正在等這個(gè)task,那么調(diào)度別的task
        for task in self.exit_waiting.pop(task.tid,[]):
            self.schedule(task)

    def waitforexit(self,task,waittid):
        #如果waitid在taskmap中,將waittid放入self.exit_waiting字典中
        #將需要等待waitid的task,注冊(cè)到這個(gè)字典里面
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid,[]).append(task)
            return True
        else:
            return False

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                continue
            self.schedule(task)

# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------

class SystemCall(object):
    def handle(self):
        pass

# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)

# Kill a task
class KillTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        task = self.sched.taskmap.get(self.tid,None)
        if task:
            task.target.close()
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)

# Wait for a task to exit
class WaitTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        result = self.sched.waitforexit(self.task,self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    def foo():
        for i in range(5):
            print ("I'm foo")
            yield

    def main():
        child = yield NewTask(foo())
        print ("Waiting for child")
        yield WaitTask(child)
        print ("Child done")

    sched = Scheduler()
    sched.new(main())
    sched.mainloop()

上面的代碼很長(zhǎng),有興趣可以去教程的網(wǎng)站上,查看作者是如何一步一步的實(shí)現(xiàn)一個(gè)os的各種調(diào)度的功能的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容