python并發(fā)編程:asyncio與協(xié)程

協(xié)程應(yīng)該是屬于python里面一個(gè)獨(dú)有的概念。源自它設(shè)計(jì)的一個(gè)特性:同一時(shí)刻,Python 主程序只允許有一個(gè)線程執(zhí)行。
但是對(duì)于一些IO操作頻繁的操作,如網(wǎng)絡(luò)請(qǐng)求,如果單線程同步執(zhí)行的話,那么很多時(shí)間都會(huì)浪費(fèi)在等待請(qǐng)求返回中。假設(shè)下載一個(gè)網(wǎng)頁(yè)的數(shù)據(jù)需要2秒,現(xiàn)在需要下載10個(gè)網(wǎng)頁(yè),按同步一個(gè)一個(gè)執(zhí)行的話,則需要花費(fèi)2*10=20秒。但如果我開(kāi)始下載1個(gè)之后,然后又立即去下載第2個(gè),如果第1個(gè)的結(jié)果返回了之后,我們?cè)偃ヌ幚淼谝粋€(gè)的下載結(jié)果。這樣我們就不需要每個(gè)都等待2秒了。
協(xié)程就是用來(lái)在單線程中實(shí)現(xiàn)并發(fā)編程的一種操作。協(xié)程由用戶決定,在哪些地方交出控制權(quán),切換到下一個(gè)任務(wù)。

協(xié)程就是異步編程??梢园褏f(xié)程理解成一個(gè)異步函數(shù),而直接調(diào)用這個(gè)異步函數(shù)返回一個(gè)協(xié)程對(duì)象。協(xié)程使用asyncawait語(yǔ)法糖,通過(guò)async def聲明。

#定義一個(gè)協(xié)程
async def main():
    print("hello")
main()
#輸出:返回一個(gè)協(xié)程對(duì)象
<coroutine object main at 0x7fb64949ef40>

上面看到,直接調(diào)用協(xié)程并不會(huì)真正調(diào)用執(zhí)行,要真正運(yùn)行一個(gè)協(xié)程,需要用到asyncio庫(kù)(已經(jīng)廢棄的生成器調(diào)用方法就不說(shuō)了)

asyncio

使用協(xié)程,我們需要用到asyncio庫(kù)。

asyncio 是用來(lái)編寫 并發(fā) 代碼的庫(kù),使用 async/await 語(yǔ)法。
asyncio 被用作多個(gè)提供高性能 Python 異步框架的基礎(chǔ),包括網(wǎng)絡(luò)和網(wǎng)站服務(wù),數(shù)據(jù)庫(kù)連接庫(kù),分布式任務(wù)隊(duì)列等等。
asyncio 往往是構(gòu)建 IO 密集型和高層級(jí) 結(jié)構(gòu)化 網(wǎng)絡(luò)代碼的最佳選擇。

asyncio 提供一組 高層級(jí)API 用于并發(fā)地 運(yùn)行 Python 協(xié)程并對(duì)其執(zhí)行過(guò)程實(shí)現(xiàn)完全控制

asyncio 提供了三種主要機(jī)制運(yùn)行協(xié)程:

  • asyncio.run() 函數(shù)用來(lái)運(yùn)行最高層級(jí)的入口點(diǎn) "main()"。
#此代碼在paycharm中運(yùn)行
async def main():
    print("hello")
asyncio.run(main())
#輸出:
hello

asyncio.run(coro, *, *debug=False*)會(huì)運(yùn)行傳入的協(xié)程coro,并返回結(jié)果。它會(huì)創(chuàng)建一個(gè)事件循環(huán)(event loop),負(fù)責(zé)管理asyncio事件的循環(huán)調(diào)度。而且在一個(gè)線程中,只能運(yùn)行一個(gè)事件循環(huán)。比如你想在jupyter notebook中執(zhí)行asyncio.run(),就會(huì)失敗,你會(huì)收到一個(gè)錯(cuò)誤提示:RuntimeError: asyncio.run() cannot be called from a running event loop。因?yàn)閖upter notebook本身已經(jīng)運(yùn)行了一個(gè)event loop了。

一般用asyncio.run(main()) 作為主程序的入口函數(shù),在程序運(yùn)行周期內(nèi),只調(diào)用一次 asyncio.run。

  • 使用await等待一個(gè)協(xié)程

await后面接一個(gè)可等待對(duì)象。可等待對(duì)象有三種主要類型: 協(xié)程, 任務(wù) 和 Future。

await,字面意思,就是等待程序執(zhí)行。我們用await等待一個(gè)協(xié)程,那么程序就會(huì)阻塞在這里,進(jìn)入被調(diào)用的協(xié)程函數(shù),執(zhí)行完畢后再返回繼續(xù),這個(gè)和正常python流程是一樣的。使用await,在jupter notebook中,我們就可以用它來(lái)運(yùn)行協(xié)程了。如下:


#此代碼在jupyter notebook中執(zhí)行。
import requests
import asyncio
async def crawl_page(url):
    
    print("crawl url:{}".format(url))
    await asyncio.sleep(2) #用休眠代替網(wǎng)絡(luò)請(qǐng)求操作,排除網(wǎng)絡(luò)的原因
    print("ok url:{}".format(url))
     

async def main(): 
    urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
            'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
            'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
    for url in urls:
        await crawl_page(url)
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()

print("總共耗時(shí):{}".format(end_time-start_time))
#### 輸出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
總共耗時(shí):6.007925541001896

上面模擬網(wǎng)絡(luò)3個(gè)網(wǎng)絡(luò)請(qǐng)求,每個(gè)耗時(shí)2秒,總共耗時(shí)約6秒。整個(gè)過(guò)程與同步過(guò)程是一樣。相當(dāng)于下面的同步流程:

import requests
import time

def crawl_page(url):
    
    print("crawl url:{}".format(url))
    time.sleep(2) #簡(jiǎn)單粗暴用休眠代替網(wǎng)絡(luò)請(qǐng)求操作,也不受其他因素的影響
    print("ok url:{}".format(url))
     

def main():    
    urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
            'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
            'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
    for url in urls:
        crawl_page(url)
        
start_time = time.perf_counter()
main()
end_time = time.perf_counter()

print("總共耗時(shí):{}".format(end_time-start_time))

#### 輸出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
總共耗時(shí):6.009312429000602

看到這,是不是覺(jué)得協(xié)程沒(méi)什么特別嘛,并沒(méi)有實(shí)現(xiàn)并發(fā)呀。并沒(méi)有實(shí)現(xiàn)我想要的耗時(shí)約2秒。那時(shí)因?yàn)檎嬲闹鹘沁€沒(méi)上場(chǎng)。上面只是用異步方法實(shí)現(xiàn)了同步功能,如果要實(shí)現(xiàn)并發(fā),那就需要任務(wù)。也即協(xié)程的第三種調(diào)用方式:

  • 用任務(wù)“并行地”調(diào)度協(xié)程
    asyncio.create_task()將一個(gè)協(xié)程封裝成任務(wù),該協(xié)程就可以被自動(dòng)調(diào)度執(zhí)行了。創(chuàng)建一個(gè)任務(wù):task = asyncio.create_task(coro),然后用await調(diào)度任務(wù)。任務(wù)可以很快地被執(zhí)行而不被阻塞。

asyncio.create_task(coro,name=None):
coro 協(xié)程 封裝為一個(gè) Task 并調(diào)度其執(zhí)行。返回 Task 對(duì)象。
該任務(wù)會(huì)在 get_running_loop() 返回的循環(huán)中執(zhí)行,如果當(dāng)前線程沒(méi)有在運(yùn)行的循環(huán)則會(huì)引發(fā) RuntimeError。

上面的例子用task實(shí)現(xiàn)如下:

import asyncio

async def crawl_page(url):
    
    print("crawl url:{}".format(url))
    await asyncio.sleep(2) #asyncio的sleep()總是會(huì)掛起當(dāng)前任務(wù),以允許其他任務(wù)運(yùn)行
    print("ok url:{}".format(url))
     

async def main():    
    urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
            'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
            'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
    
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls] #創(chuàng)建任務(wù)列表
    for task in tasks:
        await task #用await調(diào)度任務(wù)
        
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()

print("總共耗時(shí):{}".format(end_time-start_time))

#####輸出#######
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
總共耗時(shí):2.004724346999865

看,使用任務(wù)Task,終于實(shí)現(xiàn)了想要的約2秒的效果了。那么task是如何實(shí)現(xiàn)并發(fā)的呢?看下面兩個(gè)例子:

import asyncio
import time
async def work1():
    print("work1 start")
    await asyncio.sleep(2) #特意設(shè)置成2秒
    print("work1 done")
    
async def work2():
    print("work2 start")
    await asyncio.sleep(1)
    print("work2 done")
    
async def work3():
    print("work3 start")
    await asyncio.sleep(3)
    print("work3 done")
    
async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    task3 = asyncio.create_task(work3())

print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))

#### 輸出
start at 20:26:39
ended at 20:26:39
work1 start
work2 start
work3 start
import time
async def work1():
    print("work1 start")
    await asyncio.sleep(2) #特意設(shè)置成2秒
    print("work1 done")
    
async def work2():
    print("work2 start")
    await asyncio.sleep(1)
    print("work2 done")
    
async def work3():
    print("work3 start")
    await asyncio.sleep(3)
    print("work3 done")
    
async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    task3 = asyncio.create_task(work3())
    print("before await")
    await task1
    print("awaited task1")
    await task2
    print("awaited task2")
    await task3
    print("awaited task3")

print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))

###輸出
start at 20:28:31
before await
work1 start
work2 start
work3 start
work2 done
work1 done
awaited task1
awaited task2
work3 done
awaited task3
ended at 20:28:34

從兩個(gè)例子中我們可以看出:
1、任務(wù)被創(chuàng)建后,就會(huì)被加入事件循環(huán),等待事件調(diào)度器調(diào)度。事件調(diào)度器會(huì)自動(dòng)地去調(diào)度。
2、遇到await后,當(dāng)前任務(wù)就會(huì)掛起,將控制權(quán)交出,事件調(diào)度器去調(diào)度其它任務(wù),其它任務(wù)獲得控制權(quán)。當(dāng)任務(wù)執(zhí)行完成之后,它又會(huì)重新獲得控制權(quán),繼續(xù)執(zhí)行后續(xù)的代碼。
3、上面我特意寫了work1的休眠時(shí)長(zhǎng)比work2的休眠時(shí)長(zhǎng)長(zhǎng)。work2會(huì)優(yōu)先于work1完成,work2執(zhí)行完成之后,主程序獲得控制權(quán)后,不會(huì)打印“awaited work2”,因?yàn)橹鞒绦蜻€在await work1呢。

這樣,通過(guò)事件循環(huán)調(diào)度,異步調(diào)用,我們就實(shí)現(xiàn)了python的“并發(fā)”。充分利用了等待時(shí)間。
上面,我們看到的都是沒(méi)有返回值的,協(xié)程也是可以返回值的。我們可以在結(jié)束之后,取到返回值。另外,協(xié)程也是有可能產(chǎn)生異常的,任務(wù)有可能被取消。

import asyncio
import time
async def work1():
    print("work1 start")
    await asyncio.sleep(1) 
    print("work1 done")
    return 1
    
async def work2():
    print("work2 start")
    await asyncio.sleep(2)
    print("work2 done")
    return 2/0
    
async def work3():
    print("work3 start")
    await asyncio.sleep(3)
    print("work3 done")
    return 3
async def work4():
    print("work4 start")
    await asyncio.sleep(4)
    print("work4 done")
    return 4
    
async def main():
    task1 = asyncio.create_task(work1())
    task2 = asyncio.create_task(work2())
    task3 = asyncio.create_task(work3())
    task4 = asyncio.create_task(work4())
    
    #2秒之后,取消任務(wù)4
    await asyncio.sleep(2)
    task4.cancel()

    res = await asyncio.gather(task1, task2, task3, task4,return_exceptions=True) 
    print(res)

print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))

###輸出
start at 21:10:18
work1 start
work2 start
work3 start
work4 start
work1 done
work2 done
work3 done
[1, ZeroDivisionError('division by zero'), 3, CancelledError()]
ended at 21:10:21

taskasyncio.gatherasyncio.gather(aws,return_exceptions=True)
1、并發(fā)運(yùn)行 aws序列中的可等待對(duì)象
2、如果 aws 中的某個(gè)可等待對(duì)象為協(xié)程,它將自動(dòng)被作為一個(gè)任務(wù)調(diào)度。
3、如果所有可等待對(duì)象都成功完成,結(jié)果將是一個(gè)由所有返回值聚合而成的列表。結(jié)果值的順序與 aws 中可等待對(duì)象的順序一致。
4、如果 return_exceptionsFalse (默認(rèn)),所引發(fā)的首個(gè)異常會(huì)5、立即傳播給等待 gather() 的任務(wù)。aws 序列中的其他可等待對(duì)象 不會(huì)被取消 并將繼續(xù)運(yùn)行。
如果 return_exceptionsTrue,異常會(huì)和成功的結(jié)果一樣處理,并聚合至結(jié)果列表。
如果 gather() 被取消,所有被提交 (尚未完成) 的可等待對(duì)象也會(huì) 被取消。

最后,看一個(gè)真正的網(wǎng)絡(luò)請(qǐng)求吧

爬取豆瓣上即將上映的電影,獲取電影名稱,上映時(shí)間,時(shí)長(zhǎng),簡(jiǎn)介等信息。(僅供學(xué)習(xí)交流,勿頻繁調(diào)用)

import aiohttp
import asyncio
from bs4 import BeautifulSoup
global sleep = 0 

#解析拿到的所有即將上映的電影
def get_movies_info(text):
    print("開(kāi)始解析拿到所有即將上映的電影數(shù)據(jù)")
    soup = BeautifulSoup(text,"html.parser")
    all_movies = soup.find('div',id = 'showing-soon')
    movies_info = []
    for each_movie in all_movies.find_all('div',class_="item"):
        #print("=========")
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all("li")
        movie_name = all_a_tag[1].text
        movie_url= all_a_tag[1]["href"]
    
        movie_date = all_li_tag[0].text
        movie_type = all_li_tag[1].text
        movies_info.append([movie_url,(movie_name,movie_type,movie_date)])
    return movies_info

def get_each_movie_details(text):
    soup = BeautifulSoup(text,"html.parser")
    spans = soup.find("div",id="info").find_all("span")
    #時(shí)長(zhǎng)
    duration = spans[-2].text
    #簡(jiǎn)介
    description = soup.find("div",{"class":"indent","id":"link-report"}).find("span").text.strip()
    #海報(bào)url
    img_tag = soup.find("img")
    url = img_tag['src']
    return (duration,description,url)

async def get_url(url)->str:
    global sleep
    asyncio.sleep(sleep) #每個(gè)請(qǐng)求都會(huì)被等待,等待的sleep是一個(gè)遞增變量,防止頻繁數(shù)據(jù)請(qǐng)求,我們的目的是協(xié)程。
    print("start get url:{}".format(url))
    header = {"user-agent":"Chrome/10.0"}
    async with aiohttp.ClientSession(headers=header) as session:
        async with session.get(url,headers=header) as resp:
            result =  await resp.text()
            print("getted result")
            return result
        
    sleep += 0.5
                
async def main():
    url = "https://movie.douban.com/cinema/later/shenzhen/"
    try:
        task = asyncio.create_task(get_url(url))
        res = await asyncio.gather(task)
        movies_info = get_movies_info(res[0]) 
        
        tasks = [asyncio.create_task(get_url(info[0])) for info in movies_info]
        print("start fetch each movie info")
        htmls = await asyncio.gather(*tasks,return_exceptions=True)
        print("get html :{}".format(len(htmls)))
        all_info = []
        i = 0
        for html in htmls:
            details = get_each_movie_details(html)
            all_info.append(movies_info[i][1]+details)
            i+=1
            
            
        print(" ".join(all_info))
            
    except Exception as e:
        print("Exception:{}".format(e))

await main()
    
    

參考文檔:https://docs.python.org/zh-cn/3/library/asyncio.html
https://time.geekbang.org/column/article/103358

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容