1、協(xié)程和多線程的混合使用
協(xié)程的最大優(yōu)勢(shì)是沒(méi)有多線程的鎖機(jī)制,因?yàn)樗挥幸粋€(gè)線程,也不存在同時(shí)寫變量的沖突,所以執(zhí)行效率比多線程高很多。不過(guò),如果你的cpu不止一個(gè)核,那么就可以將協(xié)程和多線程(或者子線程)混合起來(lái),進(jìn)一步提高執(zhí)行效率。執(zhí)行流程大概如下圖所示:

1.1 流水線的例子
還是看到上一篇博客中3.1節(jié)的例子,一個(gè)工廠流水線:
- manager隨機(jī)產(chǎn)生一些5~10位的長(zhǎng)度的字符串,傳遞給第一個(gè)工人
- 第一個(gè)工人將字符串截?cái)?,只取?個(gè)字符
- 第二個(gè)工人將這個(gè)字符串中的“數(shù)字”去掉,然后排序,輸出一個(gè)新字符串
- 第三個(gè)工人將字符串中的字符都變?yōu)榇髮?/li>
現(xiàn)在我們需要把第3、4個(gè)步驟放到一個(gè)新的線程里去執(zhí)行,代碼如下:
#協(xié)程和多線程混合的例子
from random import shuffle,randint
import re
def random_str():
#隨機(jī)生成5~10位字符串
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
chars_list=list(chars)
shuffle(chars_list)
res=''.join(chars_list[0:randint(5,10)])
return res
def manager(target):
#生成1萬(wàn)個(gè)隨機(jī)字符串并傳入work1
n=0
target.__next__()
while (n<100000):
n=n+1
resource = random_str()
print("Manager: %s" % resource)
target.send(resource)
def work1(target):
target.__next__()
while True:
input_str = yield
if len(input_str)>5:
#截?cái)? res=input_str[0:5]
else:
res=input_str
#給下一個(gè)生成器傳入值
target.send(res)
def work2(target):
target.__next__()
while True:
str_from_work1 = yield
if str_from_work1:
#去掉字符串中的數(shù)字
res=re.sub(r'([\d]+)','',str_from_work1)
target.send(res)
def work3():
#字母變?yōu)榇髮? while True:
str_from_work2 = yield
if str_from_work2:
res=str_from_work2.upper()
print("output: %s" % res)
from threading import Thread
from queue import Queue
def cothread(target):
target.__next__()
#由于開(kāi)多線程,使用一個(gè)Queue新線程進(jìn)行溝通
message = Queue()
def run_target():
while True:
item = message.get()
if item is GeneratorExit:
target.close()
return
else:
target.send(item)
#開(kāi)始一個(gè)新的線程
Thread(target=run_target).start()
#主線程通過(guò)queue和新的線程通信
try:
while True:
item = yield
message.put(item)
except GeneratorExit:
message.put(GeneratorExit)
if __name__ == '__main__':
manager(work1(cothread(work2(work3()))))
由上面的例子可以看到work2和work3之前多了一個(gè)cothread的生成器,這個(gè)生成器打開(kāi)了一個(gè)新的Thread,并通過(guò)一個(gè)Queue實(shí)現(xiàn)線程間的通信。類似的,還可以通過(guò)subprocess(pipe通信)、網(wǎng)絡(luò)等方法去包裝協(xié)程。也就是說(shuō)使用協(xié)程可以把你的“實(shí)現(xiàn)”和“環(huán)境”分割開(kāi)來(lái),上面例子中的work和manager就相當(dāng)于“實(shí)現(xiàn)”的邏輯。而不同的實(shí)現(xiàn)環(huán)節(jié)可以放到不同的“環(huán)境(多線程、子線程、網(wǎng)絡(luò))”中去具體執(zhí)行。
1.2 特別注意
需要特別注意的兩點(diǎn):
- 在調(diào)用協(xié)程的send函數(shù)時(shí),必須是同步的。如果給正在執(zhí)行的生成器send一個(gè)值,生成器會(huì)crash
- 在將生成器組合成流水線時(shí),生成器的連接不能存在loop
2、協(xié)程與任務(wù)調(diào)度器
在David Beazley教程的后半部分,第7章開(kāi)始(http://dabeaz.com/coroutines/),討論的主要是只用協(xié)程能不能用來(lái)構(gòu)造一個(gè)類似于操作系統(tǒng)的調(diào)度器?答案是:能!
首先來(lái)看看一個(gè)操作系統(tǒng)的調(diào)度器需要實(shí)現(xiàn)那些東西:
- 需要有一個(gè)task類
- 需要一個(gè)調(diào)度器,scheduler
- scheduler能夠調(diào)度multitask,多任務(wù)交替運(yùn)行
- task執(zhí)行完之后可以退出
- 允許有系統(tǒng)調(diào)用,對(duì)task進(jìn)行基本的管理
- 可以創(chuàng)建新的task
- 系統(tǒng)調(diào)用可以kill task也可以wait for task(異步task)
一下是一個(gè)例子,在這個(gè)例子中,只用協(xié)程(不使用多線程,子線程)就實(shí)現(xiàn)了以上的各種功能:
class Task(object):
taskid = 0
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
# ------------------------------------------------------------
# === Scheduler ===
# ------------------------------------------------------------
from queue import Queue
class Scheduler(object):
def __init__(self):
self.ready = Queue()
self.taskmap = {}
# Tasks waiting for other tasks to exit
self.exit_waiting = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self,task):
print ("Task %d terminated" % task.tid)
del self.taskmap[task.tid]
# Notify other tasks waiting for exit
# 如果有別的task正在等這個(gè)task,那么調(diào)度別的task
for task in self.exit_waiting.pop(task.tid,[]):
self.schedule(task)
def waitforexit(self,task,waittid):
#如果waitid在taskmap中,將waittid放入self.exit_waiting字典中
#將需要等待waitid的task,注冊(cè)到這個(gè)字典里面
if waittid in self.taskmap:
self.exit_waiting.setdefault(waittid,[]).append(task)
return True
else:
return False
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result,SystemCall):
result.task = task
result.sched = self
result.handle()
continue
self.schedule(task)
# ------------------------------------------------------------
# === System Calls ===
# ------------------------------------------------------------
class SystemCall(object):
def handle(self):
pass
# Return a task's ID number
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
# Create a new task
class NewTask(SystemCall):
def __init__(self,target):
self.target = target
def handle(self):
tid = self.sched.new(self.target)
self.task.sendval = tid
self.sched.schedule(self.task)
# Kill a task
class KillTask(SystemCall):
def __init__(self,tid):
self.tid = tid
def handle(self):
task = self.sched.taskmap.get(self.tid,None)
if task:
task.target.close()
self.task.sendval = True
else:
self.task.sendval = False
self.sched.schedule(self.task)
# Wait for a task to exit
class WaitTask(SystemCall):
def __init__(self,tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task,self.tid)
self.task.sendval = result
# If waiting for a non-existent task,
# return immediately without waiting
if not result:
self.sched.schedule(self.task)
# ------------------------------------------------------------
# === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
def foo():
for i in range(5):
print ("I'm foo")
yield
def main():
child = yield NewTask(foo())
print ("Waiting for child")
yield WaitTask(child)
print ("Child done")
sched = Scheduler()
sched.new(main())
sched.mainloop()
上面的代碼很長(zhǎng),有興趣可以去教程的網(wǎng)站上,查看作者是如何一步一步的實(shí)現(xiàn)一個(gè)os的各種調(diào)度的功能的。