asyncio協(xié)程如何實(shí)現(xiàn)并發(fā)
1、先介紹一下 并發(fā)和并行以及高并發(fā)
【并發(fā)】(Concurrent):同時(shí)擁有兩個(gè)或多個(gè)線程,如果程序在單核處理器上運(yùn)行,多個(gè)線程將交替地?fù)Q入或者換出內(nèi)存,這些線程是同時(shí)存在的,每個(gè)線程都處于執(zhí)行過程中的某個(gè)狀態(tài);如果運(yùn)行在多核處理器上,此時(shí),程序中每個(gè)線程都將分配到一個(gè)處理器核上,然后能夠同時(shí)運(yùn)行。
在操作系統(tǒng)中,指一個(gè)時(shí)間段中有幾個(gè)程序都處于已啟動(dòng)運(yùn)行到運(yùn)行完畢之間,且這幾個(gè)程序都是在同一個(gè)處理機(jī)上運(yùn)行,但任一個(gè)時(shí)刻點(diǎn)上只有一個(gè)程序在處理機(jī)上運(yùn)行。
-------【百度百科】
【總結(jié)】:并發(fā)就是多個(gè)線程操作相同的處理機(jī)中的資源,保證線程安全的同時(shí),對(duì)資源進(jìn)行合理的利用。
【并行】(High Concurrency):當(dāng)系統(tǒng)有一個(gè)以上CPU時(shí),則線程的操作有可能非并發(fā)。當(dāng)一個(gè)CPU執(zhí)行一個(gè)線程時(shí),另一個(gè)CPU可以執(zhí)行另一個(gè)線程,兩個(gè)線程互不搶占CPU資源,可以同時(shí)進(jìn)行。(宏觀上,時(shí)間有重疊)。
二者的區(qū)別:
- 【并行】是指兩個(gè)或者多個(gè)事件在同一時(shí)刻發(fā)生;
- 【并發(fā)】是指兩個(gè)或多個(gè)事件在同一時(shí)間間隔內(nèi)發(fā)生;
在多道程序環(huán)境下,并發(fā)性是指在一段時(shí)間內(nèi)宏觀上有多個(gè)程序在同時(shí)運(yùn)行,但在單處理機(jī)系統(tǒng)中,每一時(shí)刻卻僅能有一道程序執(zhí)行,故微觀上這些程序只能是分時(shí)地交替執(zhí)行。
【高并發(fā)】(High Concurrency):是現(xiàn)在互聯(lián)網(wǎng)設(shè)計(jì)系統(tǒng)中需要考慮的一個(gè)重要因素之一,通常來(lái)說(shuō),就是通過嚴(yán)謹(jǐn)?shù)脑O(shè)計(jì)來(lái)保證系統(tǒng)能夠同時(shí)并行處理很多的請(qǐng)求。這就是大家常說(shuō)的「 高并發(fā) 」。就是說(shuō)系統(tǒng)能也夠在某一時(shí)間段內(nèi)提供很多請(qǐng)求,但是不會(huì)影響系統(tǒng)的性能。如果想設(shè)計(jì)出高可用和高性能的系統(tǒng),就應(yīng)該從很多的方面來(lái)考慮,例如應(yīng)該從*硬件、軟件、編程語(yǔ)言的選擇、網(wǎng)絡(luò)方面的考慮、系統(tǒng)的整體架構(gòu)、數(shù)據(jù)結(jié)構(gòu)、算法的優(yōu)化、數(shù)據(jù)庫(kù)的優(yōu)化`等等多方面。
asyncio想要實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來(lái)完成任務(wù),每當(dāng)有任務(wù)阻塞的時(shí)候就await,然后其他協(xié)程繼續(xù)工作,這需要?jiǎng)?chuàng)建多個(gè)協(xié)程的列表,然后將這些協(xié)程注冊(cè)到事件循環(huán)中。
看例子說(shuō)話
import asyncio
async def test1():
print("1")
await asyncio.sleep(1)
print("2")
return "stop"
a = test1()
b = test1()
c = test1()
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法
for task in tasks:
print("task result is ",task.result())
【解釋一下】:代碼先是定義了三個(gè)協(xié)程對(duì)象,然后通過asyncio.ensure_future方法創(chuàng)建了三個(gè)task,并且將所有的task加入到了task列表,最終使用loop.run_until_complete將task列表添加到事件循環(huán)中。
基于asyncio實(shí)現(xiàn)的協(xié)程爬蟲
上文中已經(jīng)介紹了如何使用async與await創(chuàng)建協(xié)程函數(shù),以及如何使用asyncio.get_event_loop創(chuàng)建事件循環(huán)并執(zhí)行協(xié)程函數(shù)。在這里主要講一講涉及到asyncio庫(kù)的異步爬蟲如何實(shí)現(xiàn)。
【存在的問題】:requests模塊、urllib模塊寫異步爬蟲,但實(shí)際操作發(fā)現(xiàn)并不支持asyncio異步;
【解決思路】:使用aiohttp模塊編寫異步爬蟲;
import asyncio
import aiohttp
async def run(url):
print("start spider ",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.url)
url_list = ["https://www.baidu.com","https://home.com","https://movie.com","https://taobai.com"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
關(guān)于阻塞函數(shù)在asyncio中使用的問題以及解決方案
【客觀背景】:雖然目前有異步函數(shù)支持asyncio,但實(shí)際問題是大部分IO模塊還不支持asyncio。
【問題】:如果想在asyncio中使用其他阻塞函數(shù),該怎么實(shí)現(xiàn)呢?
阻塞函數(shù)在asyncio中使用的問題
【】阻塞函數(shù)(例如io讀寫,requests網(wǎng)絡(luò)請(qǐng)求等),阻塞了客戶程序與asycio事件循環(huán)的唯一線程,因此在執(zhí)行調(diào)用時(shí),整個(gè)應(yīng)用程序都會(huì)凍結(jié)。
【解決方案】:使用事件循環(huán)對(duì)象的run_in_executor方法。
【詳細(xì)解釋】:syncio的事件循環(huán)在背后維護(hù)著一個(gè)ThreadPoolExecutor對(duì)象,我們可以調(diào)用run_in_executor方法,把可調(diào)用對(duì)象發(fā)給它執(zhí)行,即可以通過run_in_executor方法來(lái)新建一個(gè)線程來(lái)執(zhí)行耗時(shí)函數(shù)。
【run_in_executor方法】
AbstractEventLoop.run_in_executor(executor, func, *args)
【組成解釋】:
- executor: 參數(shù)應(yīng)該是一個(gè) Executor 實(shí)例。如果為 None,則使用默認(rèn) executor。(區(qū)分大小寫)
- func :就是要執(zhí)行的函數(shù)。
- args: 就是傳遞給 func 的參數(shù)。
看個(gè)實(shí)例:
import asyncio
import time
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None,time.sleep,1)
except Exception as e:
print(e)
print("stop ",url)
url_list = ["https://www.baidu.com","https://home.com","https://movie.com","https://taobai.com"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
【好處】run_in_executor方法:可以借此創(chuàng)建協(xié)稱并發(fā);避免使用特定的模塊來(lái)實(shí)現(xiàn)IO異步開發(fā)。
【參考】:https://blog.csdn.net/sinat_34082752/article/details/80680348