python協(xié)程(yield、asyncio標(biāo)準(zhǔn)庫、gevent第三方)、異步的實(shí)現(xiàn)

引言

同步:不同程序單元為了完成某個任務(wù),在執(zhí)行過程中需靠某種通信方式以協(xié)調(diào)一致,稱這些程序單元是同步執(zhí)行的。

例如購物系統(tǒng)中更新商品庫存,需要用“行鎖”作為通信信號,讓不同的更新請求強(qiáng)制排隊(duì)順序執(zhí)行,那更新庫存的操作是同步的。

簡言之,同步意味著有序。

阻塞:程序未得到所需計算資源時被掛起的狀態(tài)。

程序在等待某個操作完成期間,自身無法繼續(xù)干別的事情,則稱該程序在該操作上是阻塞的。

常見的阻塞形式有:網(wǎng)絡(luò)I/O阻塞、磁盤I/O阻塞、用戶輸入阻塞等。

阻塞狀態(tài)下的性能提升

引入多進(jìn)程:

在一個程序內(nèi),依次執(zhí)行10次太耗時,那開10個一樣的程序同時執(zhí)行不就行了。于是我們想到了多進(jìn)程編程。為什么會先想到多進(jìn)程呢?發(fā)展脈絡(luò)如此。在更早的操作系統(tǒng)(Linux 2.4)及其以前,進(jìn)程是 OS 調(diào)度任務(wù)的實(shí)體,是面向進(jìn)程設(shè)計的OS.

改善效果立竿見影。但仍然有問題。總體耗時并沒有縮減到原來的十分之一,而是九分之一左右,還有一些時間耗到哪里去了?進(jìn)程切換開銷。

進(jìn)程切換開銷不止像“CPU的時間觀”所列的“上下文切換”那么低。CPU從一個進(jìn)程切換到另一個進(jìn)程,需要把舊進(jìn)程運(yùn)行時的寄存器狀態(tài)、內(nèi)存狀態(tài)全部保存好,再將另一個進(jìn)程之前保存的數(shù)據(jù)恢復(fù)。對CPU來講,幾個小時就干等著。當(dāng)進(jìn)程數(shù)量大于CPU核心數(shù)量時,進(jìn)程切換是必然需要的。
除了切換開銷,多進(jìn)程還有另外的缺點(diǎn)。一般的服務(wù)器在能夠穩(wěn)定運(yùn)行的前提下,可以同時處理的進(jìn)程數(shù)在數(shù)十個到數(shù)百個規(guī)模。如果進(jìn)程數(shù)量規(guī)模更大,系統(tǒng)運(yùn)行將不穩(wěn)定,而且可用內(nèi)存資源往往也會不足。
多進(jìn)程解決方案在面臨每天需要成百上千萬次下載任務(wù)的爬蟲系統(tǒng),或者需要同時搞定數(shù)萬并發(fā)的電商系統(tǒng)來說,并不適合。
除了切換開銷大,以及可支持的任務(wù)規(guī)模小之外,多進(jìn)程還有其他缺點(diǎn),如狀態(tài)共享等問題,后文會有提及,此處不再細(xì)究。

多線程(改進(jìn)(多進(jìn)程帶來多問題))

由于線程的數(shù)據(jù)結(jié)構(gòu)比進(jìn)程更輕量級,同一個進(jìn)程可以容納多個線程,從進(jìn)程到線程的優(yōu)化由此展開。后來的OS也把調(diào)度單位由進(jìn)程轉(zhuǎn)為線程,進(jìn)程只作為線程的容器,用于管理進(jìn)程所需的資源。而且OS級別的線程是可以被分配到不同的CPU核心同時運(yùn)行的。

結(jié)果符合預(yù)期,比多進(jìn)程耗時要少些。從運(yùn)行時間上看,多線程似乎已經(jīng)解決了切換開銷大的問題。而且可支持的任務(wù)數(shù)量規(guī)模,也變成了數(shù)百個到數(shù)千個。

但是,多線程仍有問題,特別是Python里的多線程。首先,Python中的多線程因?yàn)镚IL的存在,它們并不能利用CPU多核優(yōu)勢,一個Python進(jìn)程中,只允許有一個線程處于運(yùn)行狀態(tài)。那為什么結(jié)果還是如預(yù)期,耗時縮減到了十分之一?

因?yàn)樵谧鲎枞南到y(tǒng)調(diào)用時,例如sock.connect(),sock.recv()時,當(dāng)前線程會釋放GIL,讓別的線程有執(zhí)行機(jī)會。但是單個線程內(nèi),在阻塞調(diào)用上還是阻塞的。

小提示:Python中 time.sleep 是阻塞的,都知道使用它要謹(jǐn)慎,但在多線程編程中,time.sleep 并不會阻塞其他線程。

除了GIL之外,所有的多線程還有通病。它們是被OS調(diào)度,調(diào)度策略是搶占式的,以保證同等優(yōu)先級的線程都有均等的執(zhí)行機(jī)會,那帶來的問題是:并不知道下一時刻是哪個線程被運(yùn)行,也不知道它正要執(zhí)行的代碼是什么。所以就可能存在競態(tài)條件。

例如爬蟲工作線程從任務(wù)隊(duì)列拿待抓取URL的時候,如果多個爬蟲線程同時來取,那這個任務(wù)到底該給誰?那就需要用到“鎖”或“同步隊(duì)列”來保證下載任務(wù)不會被重復(fù)執(zhí)行。

而且線程支持的多任務(wù)規(guī)模,在數(shù)百到數(shù)千的數(shù)量規(guī)模。在大規(guī)模的高頻網(wǎng)絡(luò)交互系統(tǒng)中,仍然有些吃力。當(dāng)然,多線程最主要的問題還是競態(tài)條件。

非阻塞(阻塞狀態(tài)下的性能提升)

原非阻方案

def nonblocking_way():
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('example.com', 80))
    except BlockingIOError:
        # 非阻塞連接過程中也會拋出異常
        pass
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    data = request.encode('ascii')
    # 不知道socket何時就緒,所以不斷嘗試發(fā)送
    while True:
        try:
            sock.send(data)
            # 直到send不拋異常,則發(fā)送完成
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
            break
        except OSError:
            pass
    return response

首先注意到兩點(diǎn),就感覺被騙了。一是耗時與同步阻塞相當(dāng),二是代碼更復(fù)雜。要非阻塞何用?且慢。
上第9行代碼sock.setblocking(False)告訴OS,讓socket上阻塞調(diào)用都改為非阻塞的方式。之前我們說到,非阻塞就是在做一件事的時候,不阻礙調(diào)用它的程序做別的事情。上述代碼在執(zhí)行完 sock.connect() 和 sock.recv() 后的確不再阻塞,可以繼續(xù)往下執(zhí)行請求準(zhǔn)備的代碼或者是執(zhí)行下一次讀取。

代碼變得更復(fù)雜也是上述原因所致。第11行要放在try語句內(nèi),是因?yàn)閟ocket在發(fā)送非阻塞連接請求過程中,系統(tǒng)底層也會拋出異常。connect()被調(diào)用之后,立即可以往下執(zhí)行第15和16行的代碼。

需要while循環(huán)不斷嘗試 send(),是因?yàn)閏onnect()已經(jīng)非阻塞,在send()之時并不知道 socket 的連接是否就緒,只有不斷嘗試,嘗試成功為止,即發(fā)送數(shù)據(jù)成功了。recv()調(diào)用也是同理。
雖然 connect() 和 recv() 不再阻塞主程序,空出來的時間段CPU沒有空閑著,但并沒有利用好這空閑去做其他有意義的事情,而是在循環(huán)嘗試讀寫 socket (不停判斷非阻塞調(diào)用的狀態(tài)是否就緒)。還得處理來自底層的可忽略的異常。也不能同時處理多個 socket 。

非阻塞改進(jìn)

  1. epoll
    判斷非阻塞調(diào)用是否就緒如果 OS 能做,是不是應(yīng)用程序就可以不用自己去等待和判斷了,就可以利用這個空閑去做其他事情以提高效率。
    所以O(shè)S將I/O狀態(tài)的變化都封裝成了事件,如可讀事件、可寫事件。并且提供了專門的系統(tǒng)模塊讓應(yīng)用程序可以接收事件通知。這個模塊就是select。讓應(yīng)用程序可以通過select注冊文件描述符和回調(diào)函數(shù)。當(dāng)文件描述符的狀態(tài)發(fā)生變化時,select 就調(diào)用事先注冊的回調(diào)函數(shù)。
    select因其算法效率比較低,后來改進(jìn)成了poll,再后來又有進(jìn)一步改進(jìn),BSD內(nèi)核改進(jìn)成了kqueue模塊,而Linux內(nèi)核改進(jìn)成了epoll模塊。這四個模塊的作用都相同,暴露給程序員使用的API也幾乎一致,區(qū)別在于kqueue 和 epoll 在處理大量文件描述符時效率更高。
    鑒于 Linux 服務(wù)器的普遍性,以及為了追求更高效率,所以我們常常聽聞被探討的模塊都是 epoll

2 . 回調(diào)((Callback))
把I/O事件的等待和監(jiān)聽任務(wù)交給了 OS,那 OS 在知道I/O狀態(tài)發(fā)生改變后(例如socket連接已建立成功可發(fā)送數(shù)據(jù)),它又怎么知道接下來該干嘛呢?只能回調(diào)。
需要我們將發(fā)送數(shù)據(jù)與讀取數(shù)據(jù)封裝成獨(dú)立的函數(shù),讓epoll代替應(yīng)用程序監(jiān)聽socket狀態(tài)時,得告訴epoll:“如果socket狀態(tài)變?yōu)榭梢酝飳憯?shù)據(jù)(連接建立成功了),請調(diào)用HTTP請求發(fā)送函數(shù)。如果socket 變?yōu)榭梢宰x數(shù)據(jù)了(客戶端已收到響應(yīng)),請調(diào)用響應(yīng)處理函數(shù)。”
首先,不斷嘗試send() 和 recv() 的兩個循環(huán)被消滅掉了。
其次,導(dǎo)入了selectors模塊,并創(chuàng)建了一個DefaultSelector 實(shí)例。Python標(biāo)準(zhǔn)庫提供的selectors模塊是對底層select/poll/epoll/kqueue的封裝。DefaultSelector類會根據(jù) OS 環(huán)境自動選擇最佳的模塊,那在 Linux 2.5.44 及更新的版本上都是epoll了。
然后,在第25行和第31行分別注冊了socket可寫事件(EVENT_WRITE)和可讀事件(EVENT_READ)發(fā)生后應(yīng)該采取的回調(diào)函數(shù)。

雖然代碼結(jié)構(gòu)清晰了,阻塞操作也交給OS去等待和通知了,但是,我們要抓取10個不同頁面,就得創(chuàng)建10個Crawler實(shí)例,就有20個事件將要發(fā)生,那如何從selector里獲取當(dāng)前正發(fā)生的事件,并且得到對應(yīng)的回調(diào)函數(shù)去執(zhí)行呢?

  1. 事件循環(huán)(Event Loop)
    為了解決上述問題,那我們只得采用老辦法,寫一個循環(huán),去訪問selector模塊,等待它告訴我們當(dāng)前是哪個事件發(fā)生了,應(yīng)該對應(yīng)哪個回調(diào)。這個等待事件通知的循環(huán),稱之為事件循環(huán)。
    重要的是第49行代碼,selector.select() 是一個阻塞調(diào)用,因?yàn)槿绻录话l(fā)生,那應(yīng)用程序就沒事件可處理,所以就干脆阻塞在這里等待事件發(fā)生。那可以推斷,如果只下載一篇網(wǎng)頁,一定要connect()之后才能send()繼而recv(),那它的效率和阻塞的方式是一樣的。因?yàn)椴辉赾onnect()/recv()上阻塞,也得在select()上阻塞。
    '
    python3 asyncio封裝上述非阻塞的改進(jìn)方法,用時4年打造異步標(biāo)準(zhǔn)庫

協(xié)程引入

yield

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

demo解析:
注意到consumer函數(shù)是一個generator,把一個consumer傳入produce后:
首先調(diào)用c.send(None)啟動生成器;
然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;
consumer通過yield拿到消息,處理,又通過yield把結(jié)果傳回;
produce拿到consumer處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;
produce決定不生產(chǎn)了,通過c.close()關(guān)閉consumer,整個過程結(jié)束

在 Python 中調(diào)用協(xié)程對象1的 send() 方法時,第一次調(diào)用必須使用參數(shù) None, 這使得協(xié)程的使用變得十分麻煩
解決此問題:
借助 Python 自身的特性來避免這一問題,比如,創(chuàng)建一個裝飾器

def routine(func):
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.send(None)
        return cr
    return start
        
@routine
def product():
    pass

yield from

yield from 是Python 3.3 新引入的語法(PEP 380)。它主要解決的就是在生成器里玩生成器不方便的問題。它有兩大主要功能。
第一個功能是:讓嵌套生成器不必通過循環(huán)迭代yield,而是直接yield from。以下兩種在生成器里玩子生成器的方式是等價的。

def gen_one():

    subgen = range(10)    yield from subgendef gen_two():

    subgen = range(10)    for item in subgen:        yield item

第二個功能就是在子生成器和原生成器的調(diào)用者之間打開雙向通道,兩者可以直接通信。

def gen():
    yield from subgen()def subgen():
    while True:
        x = yield
        yield x+1def main():
    g = gen()
    next(g)                # 驅(qū)動生成器g開始執(zhí)行到第一個 yield
    retval = g.send(1)     # 看似向生成器 gen() 發(fā)送數(shù)據(jù)
    print(retval)          # 返回2
    g.throw(StopIteration) # 看似向gen()拋入異常

用yield from改進(jìn)基于生成器的協(xié)程,代碼抽象程度更高。使業(yè)務(wù)邏輯相關(guān)的代碼更精簡。由于其雙向通道功能可以讓協(xié)程之間隨心所欲傳遞數(shù)據(jù),使Python異步編程的協(xié)程解決方案大大向前邁進(jìn)了一步。
于是Python語言開發(fā)者們充分利用yield from,使 Guido 主導(dǎo)的Python異步編程框架Tulip迅速脫胎換骨,并迫不及待得讓它在 Python 3.4 中換了個名字asyncio以“實(shí)習(xí)生”角色出現(xiàn)在標(biāo)準(zhǔn)庫中。

asyncio

asyncio是Python 3.4 試驗(yàn)性引入的異步I/O框架(PEP 3156),提供了基于協(xié)程做異步I/O編寫單線程并發(fā)代碼的基礎(chǔ)設(shè)施。其核心組件有事件循環(huán)(Event Loop)、協(xié)程(Coroutine)、任務(wù)(Task)、未來對象(Future)以及其他一些擴(kuò)充和輔助性質(zhì)的模塊。

在引入asyncio的時候,還提供了一個裝飾器@asyncio.coroutine用于裝飾使用了yield from的函數(shù),以標(biāo)記其為協(xié)程。但并不強(qiáng)制使用這個裝飾器。

  • 協(xié)程的隱式創(chuàng)建方式
import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把一個generator標(biāo)記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執(zhí)行。
hello()會首先打印出Hello world!,然后,yield from語法可以讓我們方便地調(diào)用另一個generator。由于asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷并執(zhí)行下一個消息循環(huán)。當(dāng)asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接著執(zhí)行下一行語句。
把a(bǔ)syncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行EventLoop中其他可以執(zhí)行的coroutine了,因此可以實(shí)現(xiàn)并發(fā)執(zhí)行。

  • 協(xié)程顯示的創(chuàng)建方式(便于理解隱式創(chuàng)建)
    創(chuàng)建任務(wù)task(2種)
import asyncio
import time
now = lambda : time.time()
async def do_some_work(x):
    print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)    # 方式一
task = loop.create_task(coroutine)    # 方式二
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)

創(chuàng)建task后,task在加入事件循環(huán)之前是pending狀態(tài),加入loop后運(yùn)行中是running狀態(tài),loop調(diào)用完是Done,運(yùn)行完是finished狀態(tài),雖說本質(zhì)上協(xié)程函數(shù)和task指的東西都一樣,但是task有了協(xié)程函數(shù)的狀態(tài)。
其中l(wèi)oop.run_until_complete()接受一個future參數(shù),futurn具體指代一個協(xié)程函數(shù),而task是future的子類,所以我們不聲明一個task直接傳入?yún)f(xié)程函數(shù)也能執(zhí)行。

  • 協(xié)程綁定綁定回調(diào)函數(shù)
    通過task的task.add_done_callback(callback)方法綁定回調(diào)函數(shù),回調(diào)函數(shù)接收一個future對象參數(shù)如task,在內(nèi)部通過future.result()獲得協(xié)程函數(shù)的返回值。
import asyncio
async def test(x):
    return x+3
def callback(y):
    print(y.result())
coroutine = test(5)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
task
<Task pending coro=<test() running at <ipython-input-4-61142fef17d8>:1>>
task.add_done_callback(callback)
loop.run_until_complete(task)
  • 協(xié)程耗時任務(wù)掛起
    多任務(wù)聲明了協(xié)程函數(shù),也同時在loop中注冊了,他的執(zhí)行也是順序執(zhí)行的,因?yàn)樵诋惒胶瘮?shù)中沒有聲明那些操作是耗時操作,所以會順序執(zhí)行。await的作用就是告訴控制器這個步驟是耗時的,async可以定義協(xié)程對象,使用await可以針對耗時的操作進(jìn)行掛起
import asyncio
import time
async def test(1):
    time.sleep(1)
    print(time.time())
tasks = [asyncio.ensure_future(test()) for _ in range(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

1547187398.7611663
1547187399.7611988
1547187400.7632194

上面執(zhí)行并不是異步執(zhí)行,而是順序執(zhí)行,但是改成下面形式那就是異步執(zhí)行:

import asyncio
import time
async def test(t):
    await asyncio.sleep(1)
    print(time.time())
tasks = [asyncio.ensure_future(test()) for _ in range(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

1547187398.7611663
1547187399.7611988
1547187400.7632194

async、await(將耗時函數(shù)掛起)

用asyncio提供的@asyncio.coroutine可以把一個generator標(biāo)記為coroutine類型,然后在coroutine內(nèi)部用yield from調(diào)用另一個coroutine實(shí)現(xiàn)異步操作。
為了簡化并更好地標(biāo)識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀。
請注意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:

把@asyncio.coroutine替換為async;
把yield from替換為await。

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

aiohttp

asyncio可以實(shí)現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端,發(fā)揮的威力不大。如果把a(bǔ)syncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實(shí)現(xiàn)多用戶的高并發(fā)支持。

asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實(shí)現(xiàn)的HTTP框架。

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()                # 創(chuàng)建一個事件循環(huán)(池)
loop.run_until_complete(init(loop))           # 將協(xié)程對象包裝并注冊協(xié)程對象
loop.run_forever()

多進(jìn)程配合使用 + 協(xié)程

方法1:
asyncio、aiohttp需要配合aiomultiprocess

方法2:
gevent.pool import Pool
multiprocessing import Process
核心代碼

def main():
        file_list = ["7001", "7002", "7003"]

        p_lst = []  # 線程列表
        for i in file_list:
            # self.run(i)
            p = Process(target=read_file, args=(i,))  # 子進(jìn)程調(diào)用函數(shù)
            p.start()  # 啟動子進(jìn)程
            p_lst.append(p)  # 將所有進(jìn)程寫入列表中

def read_file(self, number):
        """
        讀取文件
        :param number: 文件標(biāo)記
        :return:
        """
        file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number)
        # print(file_name)
        self.write_log(number, "開始讀取文件 {}".format(file_name),"green")
        with open(file_name, encoding='utf-8') as f:
            # 使用協(xié)程池,執(zhí)行任務(wù)。語法: pool.map(func,iterator)
            # partial使用偏函數(shù)傳遞參數(shù)
            # 注意:has_null第一個參數(shù),必須是迭代器遍歷的值
            pool.map(partial(self.has_null, number=number), f)

多協(xié)程

使用loop.run_until_complete(syncio.wait(tasks)) 也可以使用 loop.run_until_complete(asyncio.gather(*tasks)) ,前者傳入task列表,會對task進(jìn)行解包操作。

多協(xié)程嵌套

async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(response)
            print(time.time())

import time
async def request():
    url = "http://www.baidu.com"
    resulit = await get(url)

tasks = [asyncio.ensure_future(request()) for _ in range(10000)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • 返回方式(3種)
async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(response)
            print(time.time())
async def request():
    url = "http://www.baidu.com"
    tasks = [asyncio.ensure_future(url) for _ in range(1000)]
方式一:
    dones, pendings = await asyncio.wait(tasks) # 返回future對象,不返回直接結(jié)果
    for task in dones:
        print('Task ret: ', task.result())
方式二:
    results = await asyncio.gather(*tasks) # 直接返回結(jié)果

方式三:
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result)) # 迭代方式返回結(jié)果

tasks = asyncio.ensure_future(request())
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

停止協(xié)程任務(wù)

實(shí)現(xiàn)結(jié)束task有兩種方式:關(guān)閉單個task、關(guān)閉loop,涉及主要函數(shù):

asyncio.Task.all_tasks()獲取事件循環(huán)任務(wù)列表

KeyboardInterrupt捕獲停止異常(Ctrl+C)

loop.stop()停止任務(wù)循環(huán)

task.cancel()取消單個任務(wù)

loop.run_forever()

loop.close()關(guān)閉事件循環(huán),不然會重啟

gevent(第三方包實(shí)現(xiàn)協(xié)程方式)

python程序?qū)崿F(xiàn)的一種單線程下的多任務(wù)執(zhí)行調(diào)度器,簡單來說在一個線程里,先后執(zhí)行AB兩個任務(wù),但是當(dāng)A遇到耗時操作(網(wǎng)絡(luò)等待、文件讀寫等),這個時候gevent會讓A繼續(xù)執(zhí)行,但是同時也會開始執(zhí)行B任務(wù),如果B在遇到耗時操作同時A又執(zhí)行完了耗時操作,gevent又繼續(xù)執(zhí)行A。

import gevent
def test(time):
    print(1)
    gevent.sleep(time)
    print(2)
def test2(time):
    print(3)
    gevent.sleep(time)
    print(4)
if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(test, 2),
        gevent.spawn(test2, 3)
    ])

借鑒文章:
https://mp.weixin.qq.com/s/GgamzHPyZuSg45LoJKsofA
https://rgb-24bit.github.io/blog/2019/python-coroutine-event-loop.html
https://zhuanlan.zhihu.com/p/54657754
https://cloud.tencent.com/developer/article/1590280

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容