第十一天 并發(fā)編程和異步編程(1)
今天計劃學(xué)習(xí)Python的多線程編程異步編程,學(xué)習(xí)項目及練習(xí)源碼地址:
GitHub源碼
線程
線程也是實現(xiàn)多任務(wù)的一種方式,一個進(jìn)程中,也經(jīng)常需要同時做多件事,就需要同時運行多個‘子任務(wù)’,這些子任務(wù)就是線程。一個進(jìn)程可以擁有多個并行的線程,其中每一個線程,共享當(dāng)前進(jìn)程的資源。
再鞏固下進(jìn)程和線程的區(qū)別:
| 區(qū)別 | 進(jìn)程 | 線程 |
|---|---|---|
| 根本區(qū)別 | 作為資源分配的單位 | 調(diào)度和執(zhí)行的單位 |
| 開銷 | 每一個進(jìn)程都有獨立的代碼和數(shù)據(jù)空間,進(jìn)程間的切換會有較大的開銷 | 線程可以看出是輕量級的進(jìn)程,多個線程共享內(nèi)存,線程切換的開銷小 |
| 所處環(huán)境 | 在操作系統(tǒng)中,同時運行的多個任務(wù) | 在程序中多個順序流同時執(zhí)行 |
| 分配內(nèi)存 | 系統(tǒng)在運行的時候為每一個進(jìn)程分配不同的內(nèi)存區(qū)域 | 線程所使用的資源是他所屬進(jìn)程的資源 |
| 包含關(guān)系 | 一個進(jìn)程內(nèi)可以擁有多個線程 | 線程是進(jìn)程的一部分,所有線程有時候稱為是輕量級的進(jìn)程 |
進(jìn)程和線程在使用上各有優(yōu)缺點:線程執(zhí)行開銷小,但不利于資源的管理和保護(hù),而進(jìn)程正相反。
在Python3程序中,可以通過_thread(兼容python2,不建議使用)和threading(推薦使用)這兩個模塊來處理線程。
_thread模塊
可以通過兩種方式來使用線程:使用函數(shù)或者使用類來包裝線程對象。當(dāng)使用_thread模塊來處理線程時,可以調(diào)用里面的函數(shù)start_new_thread()來生成一個新的線程,語法格式如下:
thread.start_new_thread ( function, args[, kwargs] )
其中function是線程函數(shù);args表示傳遞給線程函數(shù)的參數(shù),他必須是個tuple(元祖)類型;kwargs是可選參數(shù)。
【示例】使用_thread模塊創(chuàng)建線程:
import _thread
import time
def fun1():
print('開始運行fun1')
time.sleep(4)
print('運行fun1結(jié)束')
def fun2():
print('開始運行fun2')
time.sleep(2)
print('運行fun2結(jié)束')
if __name__=='__main__':
print('開始運行')
#啟動一個線程運行函數(shù)fun1
_thread.start_new_thread(fun1,())
#啟動一個線程運行函數(shù)fun2
_thread.start_new_thread(fun2,())
time.sleep(6)
'''
開始運行
開始運行fun1
開始運行fun2
運行fun2結(jié)束
運行fun1結(jié)束
'''
threading模塊
Python3 通過兩個標(biāo)準(zhǔn)庫_thread和threading提供對線程的支持。_thread提供了低級別的、原始的線程以及一個簡單的鎖,它相比于threading模塊的功能還是比較有限的。threading模塊除了包含 _thread模塊中的所有方法外,還提供的其他方法:
- threading.currentThread(): 返回當(dāng)前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結(jié)束前,不包括啟動前和終止后的線程。
- threading.activeCount(): 返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
在Python3程序中,對多線程支持最好的是threading模塊,使用這個模塊,可以靈活地創(chuàng)建多線程程序,并且可以在多線程之間進(jìn)行同步和通信。在Python3程序中,可以通過如下兩種方式來創(chuàng)建線程:
- 通過threading.Thread直接在線程中運行函數(shù)
- 通過繼承類threading.Thread來創(chuàng)建線程
在Python中使用threading.Thread的基本語法格式如下所示:
Thread(group=None, target=None, name=None, args=(), kwargs={})
其中target: 要執(zhí)行的方法;name: 線程名;args/kwargs: 要傳入方法的參數(shù)。
Thread類的方法如表所示:
| 方法名 | 描述 |
|---|
run() | 用以表示線程活動的方法
start() | 啟動線程活動
join([time]) | 等待至線程中止。這阻塞調(diào)用線程直至線程的join()方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時發(fā)生
isAlive() | 返回線程是否活動的
getName() | 返回線程名
setName() | 設(shè)置線程名
【示例】threading.Thread直接創(chuàng)建線程:
import threading
import time
def fun1(thread_name,delay):
print('線程{0}開始運行fun1'.format(thread_name))
time.sleep(delay)
print('線程{0}運行fun1結(jié)束'.format(thread_name))
def fun2(thread_name,delay):
print('線程{0}開始運行fun2'.format(thread_name))
time.sleep(delay)
print('線程{0}運行fun2結(jié)束'.format(thread_name))
if __name__=='__main__':
print('開始運行')
#創(chuàng)建線程
t1=threading.Thread(target=fun1,args=('thread-1',2))
t2=threading.Thread(target=fun2,args=('thread-2',4))
t1.start()
t2.start()
在Python中,通過繼承類threading.Thread的方式來創(chuàng)建一個線程。這種方法只要重寫類threading.Thread中的方法run(),然后再調(diào)用方法start()就能創(chuàng)建線程,并運行方法run()中的代碼。
在調(diào)用Thread類的構(gòu)造方法時,需要將線程函數(shù)、參數(shù)等值傳入構(gòu)造方法,其中name表示線程的名字,如果不指定這個參數(shù),默認(rèn)的線程名字格式為Thread-1、Thread-2。每一個傳入構(gòu)造方法的參數(shù)值,在Thread類中都有對應(yīng)的成員變量保存這些值,這些成員變量都以下劃線(_)開頭,如果_target、_args等。在run方法中需要使用這些變量調(diào)用傳入的線程函數(shù),并為線程函數(shù)傳遞參數(shù)。
【示例】繼承threading.Thread類創(chuàng)建線程:
import threading
import time
def fun1(delay):
print('線程{0}開始運行fun1'.format(threading.current_thread().getName()))
time.sleep(delay)
print('線程{0}運行fun1結(jié)束'.format(threading.current_thread().getName()))
def fun2(delay):
print('線程{0}開始運行fun2'.format(threading.current_thread().getName()))
time.sleep(2)
print('線程{0}運行fun2結(jié)束'.format(threading.current_thread().getName()))
#創(chuàng)建線程類繼承threading.Thread
class MyThread(threading.Thread):
#重寫父類的構(gòu)造方法,其中func是線程函數(shù),args是傳入線程的參數(shù),name是線程名
def __init__(self,func,name,args):
super().__init__(target=func,name=name,args=args)
#重寫父類的run()方法
def run(self):
self._target(*self._args)
if __name__=='__main__':
print('開始運行')
#創(chuàng)建線程
t1=MyThread(fun1,'thread-1',(2,))
t2=MyThread(fun2,'thread-2',(4,))
t1.start()
t2.start()
線程共享全局變量
在一個進(jìn)程內(nèi)所有線程共享全局變量,多線程之間的數(shù)據(jù)共享比多進(jìn)程要好。但是可能造成多個進(jìn)程同時修改一個變量(即線程非安全),可能造成混亂。
【示例】線程共享全局變量:
import time
from threading import *
#定義全局變量num
num=10
def test1():
global num
for i in range(3):
num+=1
print('test1輸出num:',num)
def test2():
global num
print('test2輸出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t1.join() # 等待線程1結(jié)束才開始線程2
t2.start()
t2.join()
'''
test1輸出num: 13
test2輸出num: 13
'''
【示例】線程共享全局變量存在問題:
import time
from threading import *
#定義全局變量num
num=0
def test1():
global num
for i in range(100000):
num+=1
print('test1輸出num:',num)
def test2():
global num
for i in range(100000):
num+=1
print('test2輸出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
'''
test1輸出num: 176838
test2輸出num: 181299
每次結(jié)果都可能不一樣,所以同一個進(jìn)程中對共享變量的讀取修改是不安全的。
'''
互斥鎖
如果多個線程共同對某個數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性,需要對多個線程進(jìn)行同步。最簡單的同步機制就是引入互斥鎖。
鎖有兩種狀態(tài)——鎖定和未鎖定。某個線程要更改共享數(shù)據(jù)時,先將其鎖定,此時資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”狀態(tài),其他的線程才能再次鎖定該資源。
互斥鎖保證了每次只有一個線程進(jìn)行寫入操作,從而保證了多線程情況下數(shù)據(jù)的正確性。
使用Thread對象的Lock可以實現(xiàn)簡單的線程同步,有上鎖acquire方法和釋放release方法,對于那些需要每次只允許一個線程操作的數(shù)據(jù),可以將其操作放到 acquire和release方法之間。
【示例】互斥鎖:
import time
from threading import Thread,Lock
#定義全局變量num
num=0
#創(chuàng)建一把互斥鎖
mutex=Lock()
def test1():
global num
'''
在兩個線程中都調(diào)用上鎖的方法,則這兩個線程就會搶著上鎖,
如果有1方成功上鎖,那么導(dǎo)致另外一方會堵塞(一直等待)直到這個鎖被解開
'''
mutex.acquire()#上鎖
for i in range(100000):
num+=1
mutex.release()
print('test1輸出num:',num)
def test2():
global num
mutex.acquire() # 上鎖
for i in range(100000):
num+=1
mutex.release()
print('test2輸出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
死鎖
在線程共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時等待對方的資源,就會造成死鎖。當(dāng)一個線程永遠(yuǎn)地持有一個鎖,并且其他線程都嘗試去獲得這個鎖時,那么它們將永遠(yuǎn)被阻塞,這個我們都知道。如果線程A持有鎖L并且想獲得鎖M,線程B持有鎖M并且想獲得鎖L,那么這兩個線程將永遠(yuǎn)等待下去,這種情況就是最簡單的死鎖形式。
【示例】死鎖
import time
from threading import Thread,Lock
import threading
mutexA=threading.Lock()
mutexB=threading.Lock()
class MyThread1(Thread):
def run(self):
if mutexA.acquire():
print(self.name,'執(zhí)行')
time.sleep(1)
if mutexB.acquire():
print(self.name,'執(zhí)行')
mutexB.release()
mutexA.release()
class MyThread2(Thread):
def run(self):
if mutexB.acquire():
print(self.name,'執(zhí)行')
time.sleep(1)
if mutexA.acquire():
print(self.name,'執(zhí)行')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
t1=MyThread1()
t2=MyThread2()
t1.start()
t2.start()
避免死鎖的方式
既然可能產(chǎn)生死鎖,那么接下來,講一下如何避免死鎖。
讓程序每次至多只能獲得一個鎖。當(dāng)然,在多線程環(huán)境下,這種情況通常并不現(xiàn)實
設(shè)計時考慮清楚鎖的順序,盡量減少嵌在的加鎖交互數(shù)量
既然死鎖的產(chǎn)生是兩個線程無限等待對方持有的鎖,那么只要等待時間有個上限不就好了。當(dāng)然synchronized不具備這個功能,但是我們可以使用Lock類中的tryLock方法去嘗試獲取鎖,這個方法可以指定一個超時時限,在等待超過該時限之后變回返回一個失敗信息
線程同步的應(yīng)用
同步就是協(xié)同步調(diào),按預(yù)定的先后次序進(jìn)行運行。例如:開會。“同”字指協(xié)同、協(xié)助、互相配合。
如進(jìn)程、線程同步,可以理解為進(jìn)程或線程A和B一塊配合,A執(zhí)行到一定程度時要依靠B的某個結(jié)果,于是停下來,示意B運行,B運行后將結(jié)果給A,A繼續(xù)運行。
線程生產(chǎn)者消費者模式
生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。
為了解決這個問題于是引入生產(chǎn)者和消費者模式生產(chǎn)者消費者模式通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者之間不直接通信。生產(chǎn)者生產(chǎn)商品,然后將其放到類似隊列的數(shù)據(jù)結(jié)構(gòu)中,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從隊列中取。這里使用queue模塊來提供線程間通信的機制,也就是說,生產(chǎn)者和消費者共享一個隊列。生產(chǎn)者生產(chǎn)商品后,會將商品添加到隊列中。消費者消費商品,會從隊列中取出商品。
ThreadLocal
我們知道多線程環(huán)境下,每一個線程均可以使用所屬進(jìn)程的全局變量。如果一個線程對全局變量進(jìn)行了修改,將會影響到其他所有的線程對全局變量的計算操作,從而出現(xiàn)數(shù)據(jù)混亂,即為臟數(shù)據(jù)。為了避免多個線程同時對變量進(jìn)行修改,引入了線程同步機制,通過互斥鎖來控制對全局變量的訪問。所以有時候線程使用局部變量比全局變量好,因為局部變量只有線程自身可以訪問,同一個進(jìn)程下的其他線程不可訪問。
但是局部變量也是有問題,就是在函數(shù)調(diào)用的時候,傳遞起來很麻煩。
【示例】局部變量作為參數(shù)傳遞:
def process_student(name):
std=Student(name)
do_task1(std)
do_task2(std)
def do_task1(std):
do_sub_task1(std)
do_sub_task2(std)
def do_task2(std):
do_sub_task1(std)
do_sub_task2(std)
從上面的實例可以看到每個函數(shù)一層一層調(diào)用都需要傳遞std參數(shù),非常麻煩,如果使用全局變量也不行,因為每個線程處理不同的Student對象,不能共享。因此Python還提供了ThreadLocal變量,它本身是一個全局變量,但是每個線程卻可以利用它來保存屬于自己的私有數(shù)據(jù),這些私有數(shù)據(jù)對其他線程也是不可見的。
【示例】ThreadLocal的使用:
import threading
# 創(chuàng)建全局ThreadLocal對象:
local = threading.local()
def process_student():
# 獲取當(dāng)前線程關(guān)聯(lián)的name:
student_name = local.name
print('線程名:%s 學(xué)生姓名:%s' % (threading.current_thread().name,student_name))
def process_thread(name):
# 綁定ThreadLocal的name:
local.name = name
process_student()
t1 = threading.Thread(target=process_thread, args=('張三',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('李四',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
協(xié)程和異步編程
什么是協(xié)程
協(xié)程,英文名是Coroutine,又稱為微線程,是一種用戶態(tài)的輕量級線程。協(xié)程不像線程和進(jìn)程那樣,需要進(jìn)行系統(tǒng)內(nèi)核上的上下文切換,協(xié)程的上下文切換是由程序員決定的。協(xié)程(Coroutine)本質(zhì)上是一個函數(shù),特點是在代碼塊中可以將執(zhí)行權(quán)交給其他協(xié)程。眾所周知,子程序(函數(shù))都是層級調(diào)用的,如果在A中調(diào)用了B,那么B執(zhí)行完畢返回后A才能執(zhí)行完畢。協(xié)程與子程序有點類似,但是它在執(zhí)行過程中可以中斷,轉(zhuǎn)而執(zhí)行其他的協(xié)程,在適當(dāng)?shù)臅r候再回來繼續(xù)執(zhí)行。
協(xié)程相對于多線程的優(yōu)點
多線程編程是比較困難的, 因為調(diào)度程序任何時候都能中斷線程, 必須記住保留鎖, 去保護(hù)程序中重要部分, 防止多線程在執(zhí)行的過程中斷。
而協(xié)程默認(rèn)會做好全方位保護(hù),以防止中斷。我們必須顯示產(chǎn)出才能讓程序的余下部分運行。對協(xié)程來說,無需保留鎖,而在多個線程之間同步操作,協(xié)程自身就會同步,因為在任意時刻,只有一個協(xié)程運行??偨Y(jié)下大概下面幾點:
- 無需系統(tǒng)內(nèi)核的上下文切換,減小開銷;
- 無需原子操作鎖定及同步的開銷,不用擔(dān)心資源共享的問題;
- 單線程即可實現(xiàn)高并發(fā),單核CPU即便支持上萬的協(xié)程都不是問題,所以很適合用于高并發(fā)處理,尤其是在應(yīng)用在網(wǎng)絡(luò)爬蟲中。
協(xié)程的缺點
-
無法使用CPU的多核
協(xié)程的本質(zhì)是個單線程,它不能同時用上單個CPU的多個核,協(xié)程需要和進(jìn)程配合才能運行在多CPU上。當(dāng)然我們?nèi)粘K帉懙慕^大部分應(yīng)用都沒有這個必要,就比如網(wǎng)絡(luò)爬蟲來說,限制爬蟲的速度還有其他的因素,比如網(wǎng)站并發(fā)量、網(wǎng)速等問題都會是爬蟲速度限制的因素。除非做一些密集型應(yīng)用,這個時候才可能會用到多進(jìn)程和協(xié)程。
-
處處都要使用非阻塞代碼
寫協(xié)程就意味著你要一值寫一些非阻塞的代碼,使用各種異步版本的庫,比如后面的異步爬蟲教程中用的aiohttp就是一個異步版本的request庫等。不過這些缺點并不能影響到使用協(xié)程的優(yōu)勢。
寫習(xí)慣NodeJS的人,還是習(xí)慣的來。
協(xié)程的實現(xiàn)(一)
使用yield關(guān)鍵字
存在yield關(guān)鍵字的函數(shù)是一個生成器,當(dāng)調(diào)用這個函數(shù)時,函數(shù)不會立即執(zhí)行,而是返回一個生成器對象。使用生成器對象時,代碼塊才會執(zhí)行。yield 有兩個關(guān)鍵作用:返回一個值;接收調(diào)用方傳入的值,且不影響返回值。
yield是實現(xiàn)生成器的重要關(guān)鍵字。
yiled的三個方法及其作用, next(), send(),throw().
yield的一般形式為:temp=yield 表達(dá)式(每次迭代要返回的值)(推薦使用:既可以返回迭代的值,也可以接受send進(jìn)去的參數(shù)并使用)
例子:
def consume():
r = ''
while True:
n = yield r # 斷點
if not n:
return
print('消費者 正在消費: {}'.format(n))
r = '200 RMB'
def produce(c):
c.send(None) # 啟動生成器
n = 0
while n < 5:
n += 1
print('生產(chǎn)者 正在生產(chǎn): {}'.format{n})
r = c.send(n)
print('[生產(chǎn)者] 消費者返回: {}'.format{r})
print('------------')
c.close()
c = consume()
produce(c)
特點:
延遲加載特性,生成器需要啟動,如果用send啟動,第一次需要傳入None,否則報錯
使用send()方法傳進(jìn)去的值,實際上就是yield表達(dá)式返回的值,沒有傳值則默認(rèn)返回None
如果使用了send(value),傳遞進(jìn)去的那個value會取代那個表達(dá)式的值,并且會將傳遞進(jìn)去的那個值返回給yield表達(dá)式的結(jié)果temp,而send(value)的返回值就是原來的值
每次惰性返回一個值
-
send 方法:
send(value)是有返回值的,即迭代的那個值。send 的主要作用, 當(dāng)需要手動更改生成器里面的某一個值并且使用它,則send發(fā)送進(jìn)去一個數(shù)據(jù),然后保存到y(tǒng)ield語句的返回值,以提供使用;send(value)的返回值就是那個本來應(yīng)該被迭代出來的那個值。這樣既可以保證我能夠傳入新的值,原來的值也不會弄丟。
-
throw 方法:
在生成器中拋出異常,并且這個throw函數(shù)會返回下一個要迭代的值或者是StopIteration。
生成器的啟動與close
生成器啟動時,使用next可以直接啟動并調(diào)用,但是使用 send() 第一次要傳入 None,不然會報錯。
- close() 表示關(guān)閉生成器,如果后續(xù)再調(diào)用此生成器,那么會拋出異常。
- 生成器的終止- StopIteration
在一個生成器中,如果沒有return,則默認(rèn)執(zhí)行到函數(shù)完畢返回 StopIteration;
如果遇到 return,則直接拋出 StopIteration,如果 return 后面有值,會一并返回。
可以使用 except StopIteration as e 捕獲異常,通過e.value 來獲取值。
協(xié)程的狀態(tài)查看
協(xié)程分別有四種狀態(tài),可以導(dǎo)入 inspect.getgeneratorstate() 模塊來查看
GEN_CREATED: 被創(chuàng)建,等待執(zhí)行
GEN_RUNNING: 解釋器執(zhí)行
GEN_SUSPENDED: 在 yield 表達(dá)式處暫停
GEN-CLOSED: 執(zhí)行結(jié)束
從某些角度來說,協(xié)程其實就是一個可以暫停執(zhí)行的函數(shù),并且可以繼續(xù)執(zhí)行。那么 yield 已經(jīng)可以暫停執(zhí)行了,如果在暫停后有辦法把一些 value 發(fā)送到暫停執(zhí)行的函數(shù)中,那么這就是 Python 中的協(xié)程。
不足之處:協(xié)程函數(shù)的返回值不是特別方便獲取,比如 return 的返回值需要捕獲異常as e,使用e.value來獲得
Python 的生成器是協(xié)程 coroutine 的一種形式,它的局限性在于只能向它的直接調(diào)用者每次 yield一個值,這意味著那么包含 yield 的代碼不能像其他代碼被分離出來放到一個單獨的函數(shù)中,而這正是 yield from 要解決的。
yiled from關(guān)鍵字
從字面看是yield的升級改進(jìn)版本,如果將 yield 理解成返回,那么 yield from 就是從哪里返回。
def generator2():
yield 'a'
yield 'b'
yield 'c'
yield from [11,22,33,44]
yield from (12,23,34)
yield from range(3)
for i in generator2():
print(i,end=' , ')
# a , b , c , 11 , 22 , 33 , 44 , 12 , 23 , 34 , 0 , 1 , 2 ,
yield from 返回另一個生成器。而yield 只返回一個元素。有下面的等價關(guān)系:
yield from iterable == for item in iterable: yield item
使用for循環(huán)迭代生成器時,不會顯式的觸發(fā)StopIteration異常,因為for循環(huán)的底層處理了這個異常,因此也就不會得到return的返回值。
def my_generator():
for i in range(5):
if i==2:
return '我被迫中斷了'
else:
yield i
def main(generator):
try:
for i in generator: #不會顯式觸發(fā)異常,故而無法獲取到return的值
print(i)
except StopIteration as exc:
print(exc.value)
g=my_generator() #調(diào)用
main(g)
# 0 1
如果使用 next 來迭代會顯示的觸發(fā)異常,雖然能夠獲取return的返回值,但是操作麻煩。
而使用yield from來實現(xiàn)這個需求:
def my_generator(): # 子生成器
for i in range(5):
if i==2:
return '我被迫中斷了'
else:
yield i
def wrap_my_generator(generator): # 委托生成器
result = yield from generator
#自動觸發(fā)StopIteration異常,并且將return的返回值賦值給yield from表達(dá)式的結(jié)果,即result
print('這是return的返回值,',result)
def main(generator): # 調(diào)用方
for j in generator:
print(j)
g=my_generator()
wrap_g=wrap_my_generator(g)
main(wrap_g) #調(diào)用
'''運行結(jié)果為:
0
1
這是return的返回值, 我被迫中斷了
'''
上面的my_generator是原始的生成器即子生成器,main是調(diào)用方,wrap_my_generator是委托生成器
在使用yield from的時候,多了一個 委托生成器,調(diào)用方通過委托生成器來與子生成器進(jìn)行交互。
委托生成器會為調(diào)用方和子生成器建立雙向通道,即調(diào)用方可以和子生成器直接進(jìn)行交互,委托生成器不參與代碼的處理,只負(fù)責(zé)充當(dāng)管道的作用以及接收 return 的返回值。
yield from iteration結(jié)構(gòu)會在內(nèi)部自動捕獲 iteration生成器的StopIteration 異常,把return返回的值或者是StopIteration的value 屬性的值變成 yield from 表達(dá)式的值。
一些異步編程的概念
同步(Sync)和異步(Async)
同步 :就是發(fā)出一個功能調(diào)用時,在沒有得到結(jié)果之前,該調(diào)用就不返回或繼續(xù)執(zhí)行后續(xù)操作。
異步 :當(dāng)一個異步過程調(diào)用發(fā)出后,調(diào)用者在沒有得到結(jié)果之前,就可以繼續(xù)執(zhí)行后續(xù)操作。當(dāng)這個調(diào)用完成后,一般通過狀態(tài)、通知和回調(diào)來通知調(diào)用者。對于異步調(diào)用,調(diào)用的返回并不受調(diào)用者控制。
對于通知調(diào)用者的三種方式,具體如下:
狀態(tài),即監(jiān)聽被調(diào)用者的狀態(tài)(輪詢),調(diào)用者需要每隔一定時間檢查一次,效率會很低。
通知,當(dāng)被調(diào)用者執(zhí)行完成后,發(fā)出通知告知調(diào)用者,無需消耗太多性能
回調(diào),與通知類似,當(dāng)被調(diào)用者執(zhí)行完成后,會調(diào)用調(diào)用者提供的回調(diào)函數(shù)。
同步和異步的區(qū)別 : 請求發(fā)出后,是否需要等待結(jié)果,才能繼續(xù)執(zhí)行其他操作。
阻塞和非阻塞
阻塞和非阻塞這兩個概念僅僅與等待消息通知時的狀態(tài)有關(guān)。跟同步、異步?jīng)]什么太大關(guān)系,也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態(tài)角度來說的。
阻塞和非阻塞關(guān)注的是程序在等待調(diào)用結(jié)果(消息,返回值)時的狀態(tài)。
阻塞調(diào)用是指調(diào)用結(jié)果返回之前,當(dāng)前線程會被掛起。調(diào)用線程只有在得到結(jié)果之后才會返回。
非阻塞調(diào)用指在不能立刻得到結(jié)果之前,該調(diào)用不會阻塞當(dāng)前線程。
并發(fā)并行
并發(fā): 是指一個時間段中有幾個程序都處于已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理機上運行,但任一個時刻點上只有一個程序在處理機上運行。并發(fā)(Concurrent)。
并行:當(dāng)一個CPU執(zhí)行一個線程時,另一個CPU可以執(zhí)行另一個線程,兩個線程互不搶占CPU資源,可以同時進(jìn)行,這種方式我們稱之為并行(Parallel)。
并發(fā)和并行的區(qū)別:并發(fā)的關(guān)鍵是你有處理多個任務(wù)的能力,不一定要同時;但是并行的關(guān)鍵是你有同時處理多個任務(wù)的能力
關(guān)鍵概念的區(qū)分
(1)阻塞/非阻塞:關(guān)注的是程序在等待調(diào)用結(jié)果(消息,返回值)時的狀態(tài)
(2)同步/異步:關(guān)注的是消息通知的機制。即等到完全做完才通知,還是邊做不后通知。
(3)同步阻塞、同步非阻塞,異步阻塞、異步非阻塞。
舉個簡單的例子來描述這四種情況,老張要做兩件事,用水壺?zé)_水,看電視,兩件事情即兩個任務(wù),兩個函數(shù)。 同步阻塞:老張把水壺放到火上,就坐在那里等水開,開了之后再去看電視。 同步非阻塞:老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞) 老張還是覺得自己有點傻,于是變高端了,買了把會響笛的那種水壺。水開之后,能大聲發(fā)出嘀~~~~的噪音。 異步阻塞:老張把響水壺放到火上,然后就坐在旁邊等著聽那個燒開的提示音。(異步阻塞) 異步非阻塞:老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(異步非阻塞) 乍一看,這“同步阻塞、意不阻塞”似乎沒有什么區(qū)別,但實際上是有區(qū)別的,所謂同步異步,指的是消息通知的機制。區(qū)別在哪里呢? 在這個例子中同步異步只是對于水壺而言。在使用普通水壺的時候,我要自己主動去觀察水是不是燒開了,自己主動去獲取燒開的這個結(jié)果,即所謂的同步;但是在響水壺的時候,我不需要再管水燒到什么程度了,因為只要水燒開了,那個滴滴的噪聲就會通知我的,即所謂的異步。 他們的相同點是,在燒水的過程中,老王啥也沒干,即“阻塞”。
(4)四種總結(jié)——同步/異步與阻塞/非阻塞
同步阻塞形式:效率是最低的。拿上面的例子來說,在燒水的過程中,什么別的事都不做。
同步非阻塞形式:實際上是效率低下的。因為老王需要不斷的在看電視與燒水之間來回跑動,看一下電視,又要去看一下水燒開沒有,這樣來回跑很多次,在程序中,程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的。
異步阻塞形式:異步操作是可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。 這個效率其實跟同步阻塞差不多的。
異步非阻塞形式:效率更高。因為老王把水燒好之后就不用管了,可以安安心心去看電視,不用來回奔波看水燒開了沒,因為水燒開了會有提示告訴他水燒好了,這樣效率豈不是更高。 那有沒有更好的辦法?當(dāng)然有,如果老王還有一個幫手老張,讓老王自己看電視、同時老張去燒開水,這樣豈不是更好?這就是所謂的并行
(5)并發(fā)/并行、同步/異步、阻塞/非阻塞
并發(fā)/并行:即能夠開啟多個任務(wù),多個任務(wù)交替執(zhí)行為并發(fā),多個任務(wù)同時執(zhí)行為并行
同步/異步:關(guān)注的是消息通知的機制,主動等候消息則為同步、被動聽消息則為異步
阻塞/非阻塞:關(guān)注的是等候消息的過程中有沒有干其他事。
總結(jié):上面的幾組概念,時刻穿插的,并沒有完全的等價關(guān)系,所以經(jīng)常有人說,異步就是非阻塞,同步就是阻塞,并發(fā)就是非阻塞、并行就是非阻塞,這些說法都是不完全準(zhǔn)確地。
概念小結(jié)
并發(fā)和并行都是實現(xiàn)異步編程的思路,只有一個線程的并發(fā),稱之為“偽并發(fā)”;有多個線程的并發(fā)稱之為“真并發(fā)”,真并發(fā)與并行是很接近的。
異步操作的優(yōu)缺點 :
因為異步操作無須額外的線程負(fù)擔(dān)(這里指的是單線程交替執(zhí)行的“偽并發(fā)”),并且使用回調(diào)的方式進(jìn)行處理,在設(shè)計良好的情況下,處理函數(shù)可以不必使用共享變量(即使無法完全不用,最起碼可以減少共享變量的數(shù)量),減少了死鎖的可能。當(dāng)然異步操作也并非完美無暇。編寫異步操作的復(fù)雜程度較高,程序主要使用回調(diào)方式進(jìn)行處理,與普通人的思維方式有些出入,而且難以調(diào)試。
多線程的優(yōu)缺點:
多線程的優(yōu)點很明顯,線程中的處理程序依然是順序執(zhí)行,符合普通人的思維習(xí)慣,所以編程簡單。但是多線程的缺點也同樣明顯,線程的使用(濫用)會給系統(tǒng)帶來上下文切換的額外負(fù)擔(dān)。并且線程間的共享變量可能造成死鎖的出現(xiàn)。 異步與多線程,從辯證關(guān)系上來看,異步和多線程并不時一個同等關(guān)系,(因為單線程也是可以實現(xiàn)異步的)異步是目的,多線程只是我們實現(xiàn)異步的一個手段.什么是異步:異步是當(dāng)一個調(diào)用請求發(fā)送給被調(diào)用者,而調(diào)用者不用等待其結(jié)果的返回.實現(xiàn)異步可以采用多線程技術(shù)或則交給另外的進(jìn)程來處理。
協(xié)程的實現(xiàn)(二)
使用asyncio模塊
從3.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個關(guān)鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。
asyncio組成的基本概念
-
協(xié)程函數(shù)的作用
result = yield from future,返回future的結(jié)果。
result = yield from coroutine,等候另一個協(xié)程函數(shù)返回結(jié)果或者是觸發(fā)異常
result= yield from task,返回一個task的結(jié)果
return expression,作為一個函數(shù)拋出返回值
raise exception
-
事件循環(huán) event_loop
線程一直在各個協(xié)程方法之間永不停歇的游走,遇到一個yield from 或者await就懸掛起來,然后又走到另外一個方法,依次進(jìn)行下去,直到事件循環(huán)所有的方法執(zhí)行完畢。實際上loop是BaseEventLoop的一個實例,我們可以查看定義,它到底有哪些方法可調(diào)用協(xié)程函數(shù),不是像普通函數(shù)那樣直接調(diào)用運行的,必須添加到事件循環(huán)中,然后由事件循環(huán)去運行,單獨運行協(xié)程函數(shù)是不會有結(jié)果的。
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"開始時間為: {time.time()}") await say_after_time(1,"hello") await say_after_time(2,"world") print(f"結(jié)束時間為: {time.time()}") ''' 直接運行 ''' # >>> main() # <coroutine object main at 0x1053bb7c8> ''' 需要通過事件循環(huán)來調(diào)用''' loop=asyncio.get_event_loop() #創(chuàng)建事件循環(huán)對象 #loop=asyncio.new_event_loop() #與上面等價,創(chuàng)建新的事件循環(huán) loop.run_until_complete(main()) #通過事件循環(huán)對象運行協(xié)程函數(shù) loop.close()獲取事件循環(huán)對象的幾種方式:
loop=asyncio.get_running_loop(),返回(獲?。┰诋?dāng)前線程中正在運行的事件循環(huán),如果沒有正在運行的事件循環(huán),則會顯示錯誤
loop=asyncio.get_event_loop() ,獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop
loop=asyncio.set_event_loop(loop), 設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán);
loop=asyncio.new_event_loop() ,創(chuàng)建一個新的事件循環(huán)
通過事件循環(huán)運行協(xié)程函數(shù)的兩種方式:
創(chuàng)建事件循環(huán)對象loop,即 asyncio.get_event_loop(),通過事件循環(huán)運行協(xié)程函數(shù)
直接通過 asyncio.run(function_name) 運行協(xié)程函數(shù)。
但是需要注意的是,首先run函數(shù)是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數(shù)總是會創(chuàng)建一個新的事件循環(huán)并在run結(jié)束之后關(guān)閉事件循環(huán),所以,如果在同一個線程中已經(jīng)有了一個事件循環(huán),則不能再使用這個函數(shù)了,因為同一個線程不能有兩個事件循環(huán),而且這個run函數(shù)不能同時運行兩次,因為他已經(jīng)創(chuàng)建一個了。即同一個線程中是不允許有多個事件循環(huán)loop的。 asyncio.run()是python3.7 新添加的內(nèi)容,也是后面推薦的運行任務(wù)的方式,因為它是高層API,后面會講到它與asyncio.run_until_complete()的差異性,run_until_complete()是相對較低層的API。
-
什么是awaitable對象
有三類對象是可等待的,即 coroutines , Tasks , and Futures .
coroutine :本質(zhì)上就是一個函數(shù),一前面的生成器yield和yield from為基礎(chǔ),不再贅述;
Tasks : 任務(wù),顧名思義,就是要完成某件事情,其實就是對協(xié)程函數(shù)進(jìn)一步的封裝;
Future :它是一個“更底層”的概念,他代表一個異步操作的最終結(jié)果,因為異步操作一般用于耗時操作,結(jié)果不會立即得到,會在“將來”得到異步運行的結(jié)果,故而命名為Future。
三者的關(guān)系,coroutin 可以自動封裝成task ,而Task是Future的子類。
-
什么是task任務(wù)
Task用來并發(fā)調(diào)度的協(xié)程, 單純的協(xié)程函數(shù)僅僅是一個函數(shù)而已,將其包裝成任務(wù),任務(wù)是可以包含各種狀態(tài)的,異步編程最重要的就是對異步操作狀態(tài)的把控了。
(1)創(chuàng)建任務(wù)(兩種方法):
方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的 方法二:task = asyncio.ensure_future(coro()) ,也可以使用loop.create_future() ,loop.create_task(coro) 也是可以的。(2)獲取某一個任務(wù)的方法:
方法一:task = asyncio.current_task(loop=None);返回在某一個指定的loop中,當(dāng)前正在運行的任務(wù),如果沒有任務(wù)正在運行,則返回None;如果loop為None,則默認(rèn)為在當(dāng)前的事件循環(huán)中獲取, 方法二:asyncio.all_tasks(loop=None);返回某一個loop中還沒有結(jié)束的任務(wù); -
什么是future?
Future是一個較低層的可等待(awaitable)對象,他表示的是異步操作的最終結(jié)果,當(dāng)一個Future對象被等待的時候,協(xié)程會一直等待,直到Future已經(jīng)運算完畢。 Future是Task的父類,一般情況下,已不用去管它們兩者的詳細(xì)區(qū)別,也沒有必要去用Future,用Task就可以了,返回 future 對象的低級函數(shù)的一個很好的例子是 loop.run_in_executor().
asyncio的基本架構(gòu)
asyncio分為高層API和低層API。我們前面所講的Coroutine和Tasks屬于高層API,而Event Loop 和Future屬于低層API。所謂的高層API主要是指那些asyncio.xxx()的方法。
高層API:
Coroutines Tasks ? Streams ? Synchronization Primitives ? Subprocesses ? Queues ? Exceptions
低層API:
Event Loop ? Futures ? Transports Protocols ? Policies ? Platform Support
-
常見的一些高層API方法
1)運行異步協(xié)程 asyncio.run(coro, *, debug=False) #運行一個異步程序 2)創(chuàng)建任務(wù) task = asyncio.create_task(coro) #python3.7 task = asyncio.ensure_future(coro()) 3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 這個函數(shù)表示的是:當(dāng)前的那個任務(wù)(協(xié)程函數(shù))睡眠多長時間,而允許其他任務(wù)執(zhí)行。這是它與time.sleep()的區(qū)別,time.sleep()是當(dāng)前線程休息 4)并發(fā)運行多個任務(wù) await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它本身也是awaitable的。當(dāng)所有的任務(wù)都完成之后,返回的結(jié)果是一個列表的形式、 5)防止任務(wù)取消 await asyncio.shield(*arg, *, loop=None) 6)設(shè)置timeout await asyncio.wait_for(aw, timeout, *, loop=None) 當(dāng)異步操作需要執(zhí)行的時間超過waitfor設(shè)置的timeout,就會觸發(fā)異常,所以在編寫程序的時候,如果要給異步操作設(shè)置timeout,一定要選擇合適,如果異步操作本身的耗時較長,而你設(shè)置的timeout太短,會涉及到她還沒做完,就拋出異常了。 7)多個協(xié)程函數(shù)時候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 與上面的區(qū)別是,第一個參數(shù)aws是一個集合,要寫成集合set的形式,比如: {func(),func(),func3()} 表示的是一系列的協(xié)程函數(shù)或者是任務(wù),其中協(xié)程會自動包裝成任務(wù)。事實上,寫成列表的形式也是可以的。 該函數(shù)的返回值是兩個Tasks/Futures的集合: (done, pending) 其中done是一個集合,表示已經(jīng)完成的任務(wù)tasks;pending也是一個集合,表示還沒有完成的任務(wù)。 常見的使用方法為:done, pending = await asyncio.wait(aws) -
Task類
他是作為一個python協(xié)程對象,和Future對象很像的這么一個對象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
使用高層API asyncio.create_task()創(chuàng)建任務(wù),或者是使用低層API loop.create_task()或者是loop.ensure_future()創(chuàng)建任務(wù)對象;
相比于協(xié)程函數(shù),任務(wù)時有狀態(tài)的,可以使用Task.cancel()進(jìn)行取消,這會觸發(fā)CancelledError異常,使用cancelled()檢查是否取消。
import asyncio async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模擬一個耗時任務(wù) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): #通過協(xié)程創(chuàng)建一個任務(wù),需要注意的是,在創(chuàng)建任務(wù)的時候,就會跳入到異步開始執(zhí)行 #因為是3.7版本,創(chuàng)建一個任務(wù)就相當(dāng)于是運行了異步函數(shù)cancel_me task = asyncio.create_task(cancel_me()) #等待一秒鐘 await asyncio.sleep(1) print('main函數(shù)休息完了') #發(fā)出取消任務(wù)的請求 task.cancel() try: await task #因為任務(wù)被取消,觸發(fā)了異常 except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) '''運行結(jié)果為: cancel_me(): before sleep main函數(shù)休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now ''' -
異步函數(shù)的結(jié)果獲取
兩種方法:第一種是直接通過Task.result()來獲?。坏诙N是綁定一個回調(diào)函數(shù)來獲取,即函數(shù)執(zhí)行完畢后調(diào)用一個函數(shù)來獲取異步函數(shù)的返回值。
通過result函數(shù):
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print("Hello again 01 end") return a+b coroutine = hello1(10,5) loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task = asyncio.ensure_future(coroutine) #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(task) #第三步:通過事件循環(huán)運行 print('-------------------------------------') print(task.result()) loop.close() '''運行結(jié)果為 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''通過定義回調(diào)函數(shù):
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print("Hello again 01 end") return a+b def callback(future): #定義的回調(diào)函數(shù) print(future.result()) loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task = asyncio.ensure_future(hello1(10,5)) #第二步:將多個協(xié)程函數(shù)包裝成任務(wù) task.add_done_callback(callback) #并被任務(wù)綁定一個回調(diào)函數(shù) loop.run_until_complete(task) #第三步:通過事件循環(huán)運行 loop.close() #第四步:關(guān)閉事件循環(huán) '''運行結(jié)果為: Hello world 01 begin Hello again 01 end 15 '''所謂的回調(diào)函數(shù),就是指協(xié)程函數(shù)coroutine執(zhí)行結(jié)束時候會調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。我們創(chuàng)建的task和回調(diào)里的future對象,實際上是同一個對象,因為task是future的子類。
-
asyncio的基本模版
針對3.7之前的版本
import asyncio import time from functools import partial async def get_url(): print('start get url') await asyncio.sleep(2) # await 后面跟的必須是一個 await 對象 print('end get url') return 'stack' def test(url,future): print(url,'hello, stack') if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # loop.run_until_complete(get_url()) # 只是提交了一個請求,時間2s tasks = [get_url() for i in range(10)] # get_future = asyncio.ensure_future(get_url()) # 獲得返回值用法1,源碼上依然是先判斷l(xiāng)oop,然后調(diào)用create_task # get_future = loop.create_task(get_url()) # 方法2,還可以繼續(xù)添加函數(shù),執(zhí)行邏輯 # get_future.add_done_callback(partial(test, 'Stack')) # 函數(shù)本身在獲得調(diào)用時需要一個任意形數(shù),參數(shù)即是 get_future 本身,否則報錯 # 如果函數(shù)需要傳遞參數(shù),需要通過 偏函數(shù) partial 模塊來解決,以及函數(shù)的形參需要放在前面 loop.run_until_complete(asyncio.wait(tasks)) # 提交了10次,時間也是2s # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上 # gather 和 wait 的區(qū)別 # gather是更高一級的抽象,且使用更加靈活,可以使用分組,以及取消任務(wù) print(time.time() - start) # print(get_future.result()) # 接收返回值針對3.7的版本
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務(wù)2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務(wù)4秒 print("Hello again 03 end") return a*b async def main(): results = await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result) asyncio.run(main()) '''運行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''總結(jié):
第一步:構(gòu)建一個入口函數(shù)main 它也是一個異步協(xié)程函數(shù),即通過async定義,并且要在main函數(shù)里面await一個或者是多個協(xié)程,同前面一樣,我可以通過gather或者是wait進(jìn)行組合,對于有返回值的協(xié)程函數(shù),一般就在main里面進(jìn)行結(jié)果的獲取。
第二步:啟動主函數(shù)main 這是python3.7新添加的函數(shù),就一句話,即 asyncio.run(main())