一、python協(xié)程概念的引入
之前我們學(xué)習了線程、進程的概念,了解了在操作系統(tǒng)中進程是資源分配的最小單位,線程是CPU調(diào)度的最小單位。按道理來說我們已經(jīng)算是把cpu的利用率提高很多了。但是我們知道無論是創(chuàng)建多進程還是創(chuàng)建多線程來解決問題,都要消耗一定的時間來創(chuàng)建進程、創(chuàng)建線程、以及管理他們之間的切換。
隨著我們對于效率的追求不斷提高,基于單線程來實現(xiàn)并發(fā)又成為一個新的課題,即只用一個主線程(很明顯可利用的cpu只有一個)情況下實現(xiàn)并發(fā)。這樣就可以節(jié)省創(chuàng)建線進程所消耗的時間。
為此我們需要先回顧下并發(fā)的本質(zhì):切換+保存狀態(tài)
cpu正在運行一個任務(wù),會在兩種情況下切走去執(zhí)行其他的任務(wù)(切換由操作系統(tǒng)強制控制),一種情況是該任務(wù)發(fā)生了阻塞,另外一種情況是該任務(wù)計算的時間過長image.png
一:其中第二種情況并不能提升效率,只是為了讓cpu能夠雨露均沾,實現(xiàn)看起來所有任務(wù)都被“同時”執(zhí)行的效果,如果多個任務(wù)都是純計算的,這種切換反而會降低效率。
為此我們可以基于yield來驗證。yield本身就是一種在單線程下可以保存任務(wù)運行狀態(tài)的方法。
1. yiled可以保存狀態(tài),yield的狀態(tài)保存與操作系統(tǒng)的保存線程狀態(tài)很像,但是yield是代碼級別控制的,更輕量級
2. send可以把一個函數(shù)的結(jié)果傳給另外一個函數(shù),以此實現(xiàn)單線程內(nèi)程序之間的切換
二:第一種情況的切換。在任務(wù)一遇到io情況下,切到任務(wù)二去執(zhí)行,這樣就可以利用任務(wù)一阻塞的時間完成任務(wù)二的計算,效率的提升就在于此。
.
對于單線程下,我們不可避免程序中出現(xiàn)io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統(tǒng)級別)控制單線程下的多個任務(wù)能在一個任務(wù)遇到io阻塞時就切換到另外一個任務(wù)去計算,這樣就保證了該線程能夠最大限度地處于就緒態(tài),即隨時都可以被cpu執(zhí)行的狀態(tài),相當于我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統(tǒng),讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執(zhí)行權(quán)限分配給我們的線程。
協(xié)程的本質(zhì)就是在單線程下,由用戶自己控制一個任務(wù)遇到io阻塞了就切換另外一個任務(wù)去執(zhí)行,以此來提升效率。
二、協(xié)程概念
協(xié)程:是單線程下的并發(fā),又稱微線程,纖程。英文名Coroutine。
一句話說明什么是協(xié)程:
協(xié)程是一種用戶態(tài)的輕量級線程,即協(xié)程是由用戶程序自己控制調(diào)度的。
.
強調(diào):
python的線程屬于內(nèi)核級別的,即由操作系統(tǒng)控制調(diào)度(如單線程遇到io或執(zhí)行時間過長就會被迫交出cpu執(zhí)行權(quán)限,切換其他線程運行)
單線程內(nèi)開啟協(xié)程,一旦遇到io,就會從應(yīng)用程序級別(而非操作系統(tǒng))控制切換,以此來提升效率(!??!非io操作的切換與效率無關(guān))
對比操作系統(tǒng)控制線程的切換,用戶在單線程內(nèi)控制協(xié)程的切換
優(yōu)點:
協(xié)程的切換開銷更小,屬于程序級別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級
單線程內(nèi)就可以實現(xiàn)并發(fā)的效果,最大限度地利用cpu
缺點:
協(xié)程的本質(zhì)是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內(nèi)開啟多個線程,每個線程內(nèi)開啟協(xié)程
協(xié)程指的是單個線程,因而一旦協(xié)程出現(xiàn)阻塞,將會阻塞整個線程
總結(jié)協(xié)程特點:
-- 必須在只有一個單線程里實現(xiàn)并發(fā)
-- 修改共享數(shù)據(jù)不需加鎖
-- 用戶程序里自己保存多個控制流的上下文棧
-- 附加:一個協(xié)程遇到IO操作自動切換到其它協(xié)程(如何實現(xiàn)檢測IO,yield、greenlet都無法實現(xiàn),就用到了gevent模塊(select機制))
三、Greenlet模塊
安裝:pip3 install greenlet
單純的切換(在沒有io的情況下或者沒有重復(fù)開辟內(nèi)存空間的操作),反而會降低程序的執(zhí)行速度
#順序執(zhí)行
import time
def f1():
res=1
for i in range(100000000):
res+=I
def f2():
res=1
for i in range(100000000):
res*=I
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337
#切換
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=I
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=I
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務(wù)執(zhí)行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
單線程里的這20個任務(wù)的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執(zhí)行任務(wù)1時遇到阻塞,就利用阻塞的時間去執(zhí)行任務(wù)2。。。。如此,才能提高效率,這就用到了Gevent模塊。
四、Gevent模塊
安裝:pip3 install gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現(xiàn)并發(fā)同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協(xié)程。 Greenlet全部運行在主程序操作系統(tǒng)進程的內(nèi)部,但它們被協(xié)作式地調(diào)度。
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創(chuàng)建一個協(xié)程對象g1,spawn括號內(nèi)第一個參數(shù)是函數(shù)名,如eat,后面可以有多個參數(shù),可以是位置實參或關(guān)鍵字實參,都是傳給函數(shù)eat的
g2=gevent.spawn(func2)
g1.join() #等待g1結(jié)束
g2.join() #等待g2結(jié)束
或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
import gevent
def eat(name):
print('%s eat 1' %name)
gevent.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
gevent.sleep(1)
print('%s play 2' %name)
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')
上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,****而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了。
.
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前**
.
或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')
Gevent之同步與異步:
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
def synchronous(): # 同步
for i in range(10):
task(i)
def asynchronous(): # 異步
g_l=[spawn(task,i) for i in range(10)]
joinall(g_l)
print('DONE')
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
# 上面程序的重要部分是將task函數(shù)封裝到Greenlet內(nèi)部線程的gevent.spawn。
# 初始化的greenlet列表存放在數(shù)組threads中,此數(shù)組被傳給gevent.joinall 函數(shù),
# 后者阻塞當前流程,并執(zhí)行所有給定的greenlet任務(wù)。執(zhí)行流程只會在所有g(shù)reenlet執(zhí)行完后才會繼續(xù)向下走。
五、asyncio模塊
asyncio是Python 3.4版本引入的標準庫,直接內(nèi)置了對異步IO的支持。asyncio的異步操作,需要在coroutine中通過yield from完成。
import asyncio
# asyncio.coroutine裝飾器是用來標記這個函數(shù)是一個協(xié)程函數(shù)
@asyncio.coroutine
def test(i):
print("test_1", i)
r = yield from asyncio.sleep(1)
print("test_2", i)
#使用 get_event_loop() 方法創(chuàng)建了一個事件循環(huán) loop
loop = asyncio.get_event_loop()
# tasks = [test(i) for i in range(500)]# 直接創(chuàng)建task
# tasks = [loop.create_task(test(i)) for i in range(500)]#create_task創(chuàng)建task
tasks = [asyncio.ensure_future(test(i)) for i in range(500)]# ensure_future創(chuàng)建task
#run_until_complete() 方法將協(xié)程注冊到事件循環(huán) loop 中,然后啟動
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
asyncio說明:
@asyncio.coroutine把一個generator標記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執(zhí)行。
test()會首先打印出test_1,然后,yield from語法可以讓我們方便地調(diào)用另一個generator。由于asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷并執(zhí)行下一個消息循環(huán)。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接著執(zhí)行下一行語句。
把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行EventLoop中其他可以執(zhí)行的coroutine了,因此可以實現(xiàn)并發(fā)執(zhí)行。
asynico/await
為了簡化并更好地標識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀。
請注意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:
把@asyncio.coroutine替換為async;
把yield from替換為await。
import asyncio
async def test(i):
print("test_1", i)
await asyncio.sleep(1)
print("test_2", i)
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(test(i)) for i in range(500)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
這里只是把yield from換成了await,@asyncio.coroutine換成了async,其余不變,執(zhí)行效果也一樣。
