python之協(xié)程

一、python協(xié)程概念的引入

之前我們學(xué)習了線程、進程的概念,了解了在操作系統(tǒng)中進程是資源分配的最小單位,線程是CPU調(diào)度的最小單位。按道理來說我們已經(jīng)算是把cpu的利用率提高很多了。但是我們知道無論是創(chuàng)建多進程還是創(chuàng)建多線程來解決問題,都要消耗一定的時間來創(chuàng)建進程、創(chuàng)建線程、以及管理他們之間的切換。

隨著我們對于效率的追求不斷提高,基于單線程來實現(xiàn)并發(fā)又成為一個新的課題,即只用一個主線程(很明顯可利用的cpu只有一個)情況下實現(xiàn)并發(fā)。這樣就可以節(jié)省創(chuàng)建線進程所消耗的時間。

為此我們需要先回顧下并發(fā)的本質(zhì):切換+保存狀態(tài)

cpu正在運行一個任務(wù),會在兩種情況下切走去執(zhí)行其他的任務(wù)(切換由操作系統(tǒng)強制控制),一種情況是該任務(wù)發(fā)生了阻塞,另外一種情況是該任務(wù)計算的時間過長
image.png

一:其中第二種情況并不能提升效率,只是為了讓cpu能夠雨露均沾,實現(xiàn)看起來所有任務(wù)都被“同時”執(zhí)行的效果,如果多個任務(wù)都是純計算的,這種切換反而會降低效率。
  為此我們可以基于yield來驗證。yield本身就是一種在單線程下可以保存任務(wù)運行狀態(tài)的方法。
  1. yiled可以保存狀態(tài),yield的狀態(tài)保存與操作系統(tǒng)的保存線程狀態(tài)很像,但是yield是代碼級別控制的,更輕量級
  2. send可以把一個函數(shù)的結(jié)果傳給另外一個函數(shù),以此實現(xiàn)單線程內(nèi)程序之間的切換
二:第一種情況的切換。在任務(wù)一遇到io情況下,切到任務(wù)二去執(zhí)行,這樣就可以利用任務(wù)一阻塞的時間完成任務(wù)二的計算,效率的提升就在于此。
.
  對于單線程下,我們不可避免程序中出現(xiàn)io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統(tǒng)級別)控制單線程下的多個任務(wù)能在一個任務(wù)遇到io阻塞時就切換到另外一個任務(wù)去計算,這樣就保證了該線程能夠最大限度地處于就緒態(tài),即隨時都可以被cpu執(zhí)行的狀態(tài),相當于我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統(tǒng),讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執(zhí)行權(quán)限分配給我們的線程。
  協(xié)程的本質(zhì)就是在單線程下,由用戶自己控制一個任務(wù)遇到io阻塞了就切換另外一個任務(wù)去執(zhí)行,以此來提升效率。

二、協(xié)程概念

協(xié)程:是單線程下的并發(fā),又稱微線程,纖程。英文名Coroutine。
一句話說明什么是協(xié)程:
協(xié)程是一種用戶態(tài)的輕量級線程,即協(xié)程是由用戶程序自己控制調(diào)度的。
.
強調(diào):
  python的線程屬于內(nèi)核級別的,即由操作系統(tǒng)控制調(diào)度(如單線程遇到io或執(zhí)行時間過長就會被迫交出cpu執(zhí)行權(quán)限,切換其他線程運行)
  單線程內(nèi)開啟協(xié)程,一旦遇到io,就會從應(yīng)用程序級別(而非操作系統(tǒng))控制切換,以此來提升效率(!??!非io操作的切換與效率無關(guān))

對比操作系統(tǒng)控制線程的切換,用戶在單線程內(nèi)控制協(xié)程的切換
優(yōu)點:
  協(xié)程的切換開銷更小,屬于程序級別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級
  單線程內(nèi)就可以實現(xiàn)并發(fā)的效果,最大限度地利用cpu
缺點:
  協(xié)程的本質(zhì)是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內(nèi)開啟多個線程,每個線程內(nèi)開啟協(xié)程
  協(xié)程指的是單個線程,因而一旦協(xié)程出現(xiàn)阻塞,將會阻塞整個線程

總結(jié)協(xié)程特點:
-- 必須在只有一個單線程里實現(xiàn)并發(fā)
-- 修改共享數(shù)據(jù)不需加鎖
-- 用戶程序里自己保存多個控制流的上下文棧
-- 附加:一個協(xié)程遇到IO操作自動切換到其它協(xié)程(如何實現(xiàn)檢測IO,yield、greenlet都無法實現(xiàn),就用到了gevent模塊(select機制))

三、Greenlet模塊

安裝:pip3 install greenlet
單純的切換(在沒有io的情況下或者沒有重復(fù)開辟內(nèi)存空間的操作),反而會降低程序的執(zhí)行速度

#順序執(zhí)行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=I

def f2():
    res=1
    for i in range(100000000):
        res*=I

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=I
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=I
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務(wù)執(zhí)行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
  單線程里的這20個任務(wù)的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執(zhí)行任務(wù)1時遇到阻塞,就利用阻塞的時間去執(zhí)行任務(wù)2。。。。如此,才能提高效率,這就用到了Gevent模塊。

四、Gevent模塊

安裝:pip3 install gevent

Gevent 是一個第三方庫,可以輕松通過gevent實現(xiàn)并發(fā)同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協(xié)程。 Greenlet全部運行在主程序操作系統(tǒng)進程的內(nèi)部,但它們被協(xié)作式地調(diào)度。

g1=gevent.spawn(func,1,,2,3,x=4,y=5)創(chuàng)建一個協(xié)程對象g1,spawn括號內(nèi)第一個參數(shù)是函數(shù)名,如eat,后面可以有多個參數(shù),可以是位置實參或關(guān)鍵字實參,都是傳給函數(shù)eat的
g2=gevent.spawn(func2)
g1.join() #等待g1結(jié)束
g2.join() #等待g2結(jié)束
或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)

g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,****而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了。
.
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前**
.
或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')

Gevent之同步與異步:

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():  # 同步
    for i in range(10):
        task(i)

def asynchronous(): # 異步
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)
    print('DONE')
    
if __name__ == '__main__':
    print('Synchronous:')
    synchronous()
    print('Asynchronous:')
    asynchronous()
#  上面程序的重要部分是將task函數(shù)封裝到Greenlet內(nèi)部線程的gevent.spawn。
#  初始化的greenlet列表存放在數(shù)組threads中,此數(shù)組被傳給gevent.joinall 函數(shù),
#  后者阻塞當前流程,并執(zhí)行所有給定的greenlet任務(wù)。執(zhí)行流程只會在所有g(shù)reenlet執(zhí)行完后才會繼續(xù)向下走。

五、asyncio模塊

asyncio是Python 3.4版本引入的標準庫,直接內(nèi)置了對異步IO的支持。asyncio的異步操作,需要在coroutine中通過yield from完成。

import asyncio
# asyncio.coroutine裝飾器是用來標記這個函數(shù)是一個協(xié)程函數(shù)
@asyncio.coroutine
def test(i):
    print("test_1", i)
    r = yield from asyncio.sleep(1)
    print("test_2", i)

#使用 get_event_loop() 方法創(chuàng)建了一個事件循環(huán) loop
loop = asyncio.get_event_loop()

# tasks = [test(i) for i in range(500)]# 直接創(chuàng)建task
# tasks = [loop.create_task(test(i)) for i in range(500)]#create_task創(chuàng)建task
tasks = [asyncio.ensure_future(test(i)) for i in range(500)]# ensure_future創(chuàng)建task

#run_until_complete() 方法將協(xié)程注冊到事件循環(huán) loop 中,然后啟動
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

asyncio說明:
  @asyncio.coroutine把一個generator標記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執(zhí)行。
  test()會首先打印出test_1,然后,yield from語法可以讓我們方便地調(diào)用另一個generator。由于asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷并執(zhí)行下一個消息循環(huán)。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接著執(zhí)行下一行語句。
  把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行EventLoop中其他可以執(zhí)行的coroutine了,因此可以實現(xiàn)并發(fā)執(zhí)行。

asynico/await
  為了簡化并更好地標識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀。
  請注意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:
  把@asyncio.coroutine替換為async;
  把yield from替換為await。

import asyncio

async def test(i):
    print("test_1", i)
    await asyncio.sleep(1)
    print("test_2", i)

loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(test(i)) for i in range(500)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

這里只是把yield from換成了await,@asyncio.coroutine換成了async,其余不變,執(zhí)行效果也一樣。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容