協(xié)程應(yīng)該是屬于python里面一個(gè)獨(dú)有的概念。源自它設(shè)計(jì)的一個(gè)特性:同一時(shí)刻,Python 主程序只允許有一個(gè)線程執(zhí)行。
但是對(duì)于一些IO操作頻繁的操作,如網(wǎng)絡(luò)請(qǐng)求,如果單線程同步執(zhí)行的話,那么很多時(shí)間都會(huì)浪費(fèi)在等待請(qǐng)求返回中。假設(shè)下載一個(gè)網(wǎng)頁(yè)的數(shù)據(jù)需要2秒,現(xiàn)在需要下載10個(gè)網(wǎng)頁(yè),按同步一個(gè)一個(gè)執(zhí)行的話,則需要花費(fèi)2*10=20秒。但如果我開(kāi)始下載1個(gè)之后,然后又立即去下載第2個(gè),如果第1個(gè)的結(jié)果返回了之后,我們?cè)偃ヌ幚淼谝粋€(gè)的下載結(jié)果。這樣我們就不需要每個(gè)都等待2秒了。
協(xié)程就是用來(lái)在單線程中實(shí)現(xiàn)并發(fā)編程的一種操作。協(xié)程由用戶決定,在哪些地方交出控制權(quán),切換到下一個(gè)任務(wù)。
協(xié)程就是異步編程??梢园褏f(xié)程理解成一個(gè)異步函數(shù),而直接調(diào)用這個(gè)異步函數(shù)返回一個(gè)協(xié)程對(duì)象。協(xié)程使用async和await語(yǔ)法糖,通過(guò)async def聲明。
#定義一個(gè)協(xié)程
async def main():
print("hello")
main()
#輸出:返回一個(gè)協(xié)程對(duì)象
<coroutine object main at 0x7fb64949ef40>
上面看到,直接調(diào)用協(xié)程并不會(huì)真正調(diào)用執(zhí)行,要真正運(yùn)行一個(gè)協(xié)程,需要用到asyncio庫(kù)(已經(jīng)廢棄的生成器調(diào)用方法就不說(shuō)了)
asyncio
使用協(xié)程,我們需要用到asyncio庫(kù)。
asyncio 是用來(lái)編寫 并發(fā) 代碼的庫(kù),使用 async/await 語(yǔ)法。
asyncio 被用作多個(gè)提供高性能 Python 異步框架的基礎(chǔ),包括網(wǎng)絡(luò)和網(wǎng)站服務(wù),數(shù)據(jù)庫(kù)連接庫(kù),分布式任務(wù)隊(duì)列等等。
asyncio 往往是構(gòu)建 IO 密集型和高層級(jí) 結(jié)構(gòu)化 網(wǎng)絡(luò)代碼的最佳選擇。
asyncio 提供一組 高層級(jí)API 用于并發(fā)地 運(yùn)行 Python 協(xié)程并對(duì)其執(zhí)行過(guò)程實(shí)現(xiàn)完全控制
asyncio 提供了三種主要機(jī)制運(yùn)行協(xié)程:
asyncio.run()函數(shù)用來(lái)運(yùn)行最高層級(jí)的入口點(diǎn) "main()"。
#此代碼在paycharm中運(yùn)行
async def main():
print("hello")
asyncio.run(main())
#輸出:
hello
asyncio.run(coro, *, *debug=False*)會(huì)運(yùn)行傳入的協(xié)程coro,并返回結(jié)果。它會(huì)創(chuàng)建一個(gè)事件循環(huán)(event loop),負(fù)責(zé)管理asyncio事件的循環(huán)調(diào)度。而且在一個(gè)線程中,只能運(yùn)行一個(gè)事件循環(huán)。比如你想在jupyter notebook中執(zhí)行asyncio.run(),就會(huì)失敗,你會(huì)收到一個(gè)錯(cuò)誤提示:RuntimeError: asyncio.run() cannot be called from a running event loop。因?yàn)閖upter notebook本身已經(jīng)運(yùn)行了一個(gè)event loop了。
一般用asyncio.run(main()) 作為主程序的入口函數(shù),在程序運(yùn)行周期內(nèi),只調(diào)用一次 asyncio.run。
- 使用await等待一個(gè)協(xié)程
await后面接一個(gè)可等待對(duì)象。可等待對(duì)象有三種主要類型: 協(xié)程, 任務(wù) 和 Future。
await,字面意思,就是等待程序執(zhí)行。我們用await等待一個(gè)協(xié)程,那么程序就會(huì)阻塞在這里,進(jìn)入被調(diào)用的協(xié)程函數(shù),執(zhí)行完畢后再返回繼續(xù),這個(gè)和正常python流程是一樣的。使用await,在jupter notebook中,我們就可以用它來(lái)運(yùn)行協(xié)程了。如下:
#此代碼在jupyter notebook中執(zhí)行。
import requests
import asyncio
async def crawl_page(url):
print("crawl url:{}".format(url))
await asyncio.sleep(2) #用休眠代替網(wǎng)絡(luò)請(qǐng)求操作,排除網(wǎng)絡(luò)的原因
print("ok url:{}".format(url))
async def main():
urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
for url in urls:
await crawl_page(url)
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()
print("總共耗時(shí):{}".format(end_time-start_time))
#### 輸出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
總共耗時(shí):6.007925541001896
上面模擬網(wǎng)絡(luò)3個(gè)網(wǎng)絡(luò)請(qǐng)求,每個(gè)耗時(shí)2秒,總共耗時(shí)約6秒。整個(gè)過(guò)程與同步過(guò)程是一樣。相當(dāng)于下面的同步流程:
import requests
import time
def crawl_page(url):
print("crawl url:{}".format(url))
time.sleep(2) #簡(jiǎn)單粗暴用休眠代替網(wǎng)絡(luò)請(qǐng)求操作,也不受其他因素的影響
print("ok url:{}".format(url))
def main():
urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
for url in urls:
crawl_page(url)
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print("總共耗時(shí):{}".format(end_time-start_time))
#### 輸出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
總共耗時(shí):6.009312429000602
看到這,是不是覺(jué)得協(xié)程沒(méi)什么特別嘛,并沒(méi)有實(shí)現(xiàn)并發(fā)呀。并沒(méi)有實(shí)現(xiàn)我想要的耗時(shí)約2秒。那時(shí)因?yàn)檎嬲闹鹘沁€沒(méi)上場(chǎng)。上面只是用異步方法實(shí)現(xiàn)了同步功能,如果要實(shí)現(xiàn)并發(fā),那就需要任務(wù)。也即協(xié)程的第三種調(diào)用方式:
-
用任務(wù)“并行地”調(diào)度協(xié)程
用asyncio.create_task()將一個(gè)協(xié)程封裝成任務(wù),該協(xié)程就可以被自動(dòng)調(diào)度執(zhí)行了。創(chuàng)建一個(gè)任務(wù):task = asyncio.create_task(coro),然后用await調(diào)度任務(wù)。任務(wù)可以很快地被執(zhí)行而不被阻塞。
asyncio.create_task(coro,name=None):
將 coro 協(xié)程 封裝為一個(gè)Task并調(diào)度其執(zhí)行。返回 Task 對(duì)象。
該任務(wù)會(huì)在get_running_loop()返回的循環(huán)中執(zhí)行,如果當(dāng)前線程沒(méi)有在運(yùn)行的循環(huán)則會(huì)引發(fā)RuntimeError。
上面的例子用task實(shí)現(xiàn)如下:
import asyncio
async def crawl_page(url):
print("crawl url:{}".format(url))
await asyncio.sleep(2) #asyncio的sleep()總是會(huì)掛起當(dāng)前任務(wù),以允許其他任務(wù)運(yùn)行
print("ok url:{}".format(url))
async def main():
urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
tasks = [asyncio.create_task(crawl_page(url)) for url in urls] #創(chuàng)建任務(wù)列表
for task in tasks:
await task #用await調(diào)度任務(wù)
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()
print("總共耗時(shí):{}".format(end_time-start_time))
#####輸出#######
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
總共耗時(shí):2.004724346999865
看,使用任務(wù)Task,終于實(shí)現(xiàn)了想要的約2秒的效果了。那么task是如何實(shí)現(xiàn)并發(fā)的呢?看下面兩個(gè)例子:
import asyncio
import time
async def work1():
print("work1 start")
await asyncio.sleep(2) #特意設(shè)置成2秒
print("work1 done")
async def work2():
print("work2 start")
await asyncio.sleep(1)
print("work2 done")
async def work3():
print("work3 start")
await asyncio.sleep(3)
print("work3 done")
async def main():
task1 = asyncio.create_task(work1())
task2 = asyncio.create_task(work2())
task3 = asyncio.create_task(work3())
print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))
#### 輸出
start at 20:26:39
ended at 20:26:39
work1 start
work2 start
work3 start
import time
async def work1():
print("work1 start")
await asyncio.sleep(2) #特意設(shè)置成2秒
print("work1 done")
async def work2():
print("work2 start")
await asyncio.sleep(1)
print("work2 done")
async def work3():
print("work3 start")
await asyncio.sleep(3)
print("work3 done")
async def main():
task1 = asyncio.create_task(work1())
task2 = asyncio.create_task(work2())
task3 = asyncio.create_task(work3())
print("before await")
await task1
print("awaited task1")
await task2
print("awaited task2")
await task3
print("awaited task3")
print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))
###輸出
start at 20:28:31
before await
work1 start
work2 start
work3 start
work2 done
work1 done
awaited task1
awaited task2
work3 done
awaited task3
ended at 20:28:34
從兩個(gè)例子中我們可以看出:
1、任務(wù)被創(chuàng)建后,就會(huì)被加入事件循環(huán),等待事件調(diào)度器調(diào)度。事件調(diào)度器會(huì)自動(dòng)地去調(diào)度。
2、遇到await后,當(dāng)前任務(wù)就會(huì)掛起,將控制權(quán)交出,事件調(diào)度器去調(diào)度其它任務(wù),其它任務(wù)獲得控制權(quán)。當(dāng)任務(wù)執(zhí)行完成之后,它又會(huì)重新獲得控制權(quán),繼續(xù)執(zhí)行后續(xù)的代碼。
3、上面我特意寫了work1的休眠時(shí)長(zhǎng)比work2的休眠時(shí)長(zhǎng)長(zhǎng)。work2會(huì)優(yōu)先于work1完成,work2執(zhí)行完成之后,主程序獲得控制權(quán)后,不會(huì)打印“awaited work2”,因?yàn)橹鞒绦蜻€在await work1呢。
這樣,通過(guò)事件循環(huán)調(diào)度,異步調(diào)用,我們就實(shí)現(xiàn)了python的“并發(fā)”。充分利用了等待時(shí)間。
上面,我們看到的都是沒(méi)有返回值的,協(xié)程也是可以返回值的。我們可以在結(jié)束之后,取到返回值。另外,協(xié)程也是有可能產(chǎn)生異常的,任務(wù)有可能被取消。
import asyncio
import time
async def work1():
print("work1 start")
await asyncio.sleep(1)
print("work1 done")
return 1
async def work2():
print("work2 start")
await asyncio.sleep(2)
print("work2 done")
return 2/0
async def work3():
print("work3 start")
await asyncio.sleep(3)
print("work3 done")
return 3
async def work4():
print("work4 start")
await asyncio.sleep(4)
print("work4 done")
return 4
async def main():
task1 = asyncio.create_task(work1())
task2 = asyncio.create_task(work2())
task3 = asyncio.create_task(work3())
task4 = asyncio.create_task(work4())
#2秒之后,取消任務(wù)4
await asyncio.sleep(2)
task4.cancel()
res = await asyncio.gather(task1, task2, task3, task4,return_exceptions=True)
print(res)
print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))
###輸出
start at 21:10:18
work1 start
work2 start
work3 start
work4 start
work1 done
work2 done
work3 done
[1, ZeroDivisionError('division by zero'), 3, CancelledError()]
ended at 21:10:21
taskasyncio.gatherasyncio.gather(aws,return_exceptions=True)
1、并發(fā)運(yùn)行 aws序列中的可等待對(duì)象
2、如果 aws 中的某個(gè)可等待對(duì)象為協(xié)程,它將自動(dòng)被作為一個(gè)任務(wù)調(diào)度。
3、如果所有可等待對(duì)象都成功完成,結(jié)果將是一個(gè)由所有返回值聚合而成的列表。結(jié)果值的順序與 aws 中可等待對(duì)象的順序一致。
4、如果 return_exceptions 為False(默認(rèn)),所引發(fā)的首個(gè)異常會(huì)5、立即傳播給等待gather()的任務(wù)。aws 序列中的其他可等待對(duì)象 不會(huì)被取消 并將繼續(xù)運(yùn)行。
如果 return_exceptions 為True,異常會(huì)和成功的結(jié)果一樣處理,并聚合至結(jié)果列表。
如果gather()被取消,所有被提交 (尚未完成) 的可等待對(duì)象也會(huì) 被取消。
最后,看一個(gè)真正的網(wǎng)絡(luò)請(qǐng)求吧
爬取豆瓣上即將上映的電影,獲取電影名稱,上映時(shí)間,時(shí)長(zhǎng),簡(jiǎn)介等信息。(僅供學(xué)習(xí)交流,勿頻繁調(diào)用)
import aiohttp
import asyncio
from bs4 import BeautifulSoup
global sleep = 0
#解析拿到的所有即將上映的電影
def get_movies_info(text):
print("開(kāi)始解析拿到所有即將上映的電影數(shù)據(jù)")
soup = BeautifulSoup(text,"html.parser")
all_movies = soup.find('div',id = 'showing-soon')
movies_info = []
for each_movie in all_movies.find_all('div',class_="item"):
#print("=========")
all_a_tag = each_movie.find_all('a')
all_li_tag = each_movie.find_all("li")
movie_name = all_a_tag[1].text
movie_url= all_a_tag[1]["href"]
movie_date = all_li_tag[0].text
movie_type = all_li_tag[1].text
movies_info.append([movie_url,(movie_name,movie_type,movie_date)])
return movies_info
def get_each_movie_details(text):
soup = BeautifulSoup(text,"html.parser")
spans = soup.find("div",id="info").find_all("span")
#時(shí)長(zhǎng)
duration = spans[-2].text
#簡(jiǎn)介
description = soup.find("div",{"class":"indent","id":"link-report"}).find("span").text.strip()
#海報(bào)url
img_tag = soup.find("img")
url = img_tag['src']
return (duration,description,url)
async def get_url(url)->str:
global sleep
asyncio.sleep(sleep) #每個(gè)請(qǐng)求都會(huì)被等待,等待的sleep是一個(gè)遞增變量,防止頻繁數(shù)據(jù)請(qǐng)求,我們的目的是協(xié)程。
print("start get url:{}".format(url))
header = {"user-agent":"Chrome/10.0"}
async with aiohttp.ClientSession(headers=header) as session:
async with session.get(url,headers=header) as resp:
result = await resp.text()
print("getted result")
return result
sleep += 0.5
async def main():
url = "https://movie.douban.com/cinema/later/shenzhen/"
try:
task = asyncio.create_task(get_url(url))
res = await asyncio.gather(task)
movies_info = get_movies_info(res[0])
tasks = [asyncio.create_task(get_url(info[0])) for info in movies_info]
print("start fetch each movie info")
htmls = await asyncio.gather(*tasks,return_exceptions=True)
print("get html :{}".format(len(htmls)))
all_info = []
i = 0
for html in htmls:
details = get_each_movie_details(html)
all_info.append(movies_info[i][1]+details)
i+=1
print(" ".join(all_info))
except Exception as e:
print("Exception:{}".format(e))
await main()
參考文檔:https://docs.python.org/zh-cn/3/library/asyncio.html
https://time.geekbang.org/column/article/103358