【python】協(xié)程之a(chǎn)syncio:實(shí)現(xiàn)并發(fā)、asyncio實(shí)現(xiàn)的協(xié)程爬蟲、阻塞函數(shù)

asyncio協(xié)程如何實(shí)現(xiàn)并發(fā)
1、先介紹一下 并發(fā)和并行以及高并發(fā)

【并發(fā)】(Concurrent):同時(shí)擁有兩個(gè)或多個(gè)線程,如果程序在單核處理器上運(yùn)行,多個(gè)線程將交替地?fù)Q入或者換出內(nèi)存,這些線程是同時(shí)存在的,每個(gè)線程都處于執(zhí)行過程中的某個(gè)狀態(tài);如果運(yùn)行在多核處理器上,此時(shí),程序中每個(gè)線程都將分配到一個(gè)處理器核上,然后能夠同時(shí)運(yùn)行。

操作系統(tǒng)中,指一個(gè)時(shí)間段中有幾個(gè)程序都處于已啟動(dòng)運(yùn)行到運(yùn)行完畢之間,且這幾個(gè)程序都是在同一個(gè)處理機(jī)上運(yùn)行,但任一個(gè)時(shí)刻點(diǎn)上只有一個(gè)程序在處理機(jī)上運(yùn)行。
-------【百度百科】

【總結(jié)】:并發(fā)就是多個(gè)線程操作相同的處理機(jī)中的資源,保證線程安全的同時(shí),對(duì)資源進(jìn)行合理的利用。

【并行】(High Concurrency):當(dāng)系統(tǒng)有一個(gè)以上CPU時(shí),則線程的操作有可能非并發(fā)。當(dāng)一個(gè)CPU執(zhí)行一個(gè)線程時(shí),另一個(gè)CPU可以執(zhí)行另一個(gè)線程,兩個(gè)線程互不搶占CPU資源,可以同時(shí)進(jìn)行。(宏觀上,時(shí)間有重疊)。


二者的區(qū)別:

  • 【并行】是指兩個(gè)或者多個(gè)事件在同一時(shí)刻發(fā)生;
  • 【并發(fā)】是指兩個(gè)或多個(gè)事件在同一時(shí)間間隔內(nèi)發(fā)生;
    多道程序環(huán)境下,并發(fā)性是指在一段時(shí)間內(nèi)宏觀上有多個(gè)程序在同時(shí)運(yùn)行,但在單處理機(jī)系統(tǒng)中,每一時(shí)刻卻僅能有一道程序執(zhí)行,故微觀上這些程序只能是分時(shí)地交替執(zhí)行。

【高并發(fā)】(High Concurrency):是現(xiàn)在互聯(lián)網(wǎng)設(shè)計(jì)系統(tǒng)中需要考慮的一個(gè)重要因素之一,通常來(lái)說(shuō),就是通過嚴(yán)謹(jǐn)?shù)脑O(shè)計(jì)來(lái)保證系統(tǒng)能夠同時(shí)并行處理很多的請(qǐng)求。這就是大家常說(shuō)的「 高并發(fā) 」。就是說(shuō)系統(tǒng)能也夠在某一時(shí)間段內(nèi)提供很多請(qǐng)求,但是不會(huì)影響系統(tǒng)的性能。如果想設(shè)計(jì)出高可用和高性能的系統(tǒng),就應(yīng)該從很多的方面來(lái)考慮,例如應(yīng)該從*硬件、軟件、編程語(yǔ)言的選擇、網(wǎng)絡(luò)方面的考慮、系統(tǒng)的整體架構(gòu)、數(shù)據(jù)結(jié)構(gòu)、算法的優(yōu)化、數(shù)據(jù)庫(kù)的優(yōu)化`等等多方面。

asyncio想要實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來(lái)完成任務(wù),每當(dāng)有任務(wù)阻塞的時(shí)候就await,然后其他協(xié)程繼續(xù)工作,這需要?jiǎng)?chuàng)建多個(gè)協(xié)程的列表,然后將這些協(xié)程注冊(cè)到事件循環(huán)中。

看例子說(shuō)話

import asyncio

async def test1():

    print("1")
    await asyncio.sleep(1)
    print("2")
    return "stop"

a = test1()
b = test1()
c = test1()

tasks = [
    asyncio.ensure_future(a),
    asyncio.ensure_future(b),
    asyncio.ensure_future(c),
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法
for task in tasks:
    print("task result is ",task.result())

【解釋一下】:代碼先是定義了三個(gè)協(xié)程對(duì)象,然后通過asyncio.ensure_future方法創(chuàng)建了三個(gè)task,并且將所有的task加入到了task列表,最終使用loop.run_until_complete將task列表添加到事件循環(huán)中。


基于asyncio實(shí)現(xiàn)的協(xié)程爬蟲

上文中已經(jīng)介紹了如何使用async與await創(chuàng)建協(xié)程函數(shù),以及如何使用asyncio.get_event_loop創(chuàng)建事件循環(huán)并執(zhí)行協(xié)程函數(shù)。在這里主要講一講涉及到asyncio庫(kù)的異步爬蟲如何實(shí)現(xiàn)。

【存在的問題】:requests模塊、urllib模塊寫異步爬蟲,但實(shí)際操作發(fā)現(xiàn)并不支持asyncio異步;
【解決思路】:使用aiohttp模塊編寫異步爬蟲;

import asyncio
import aiohttp

async def run(url):
    print("start spider ",url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.url)

url_list = ["https://www.baidu.com","https://home.com","https://movie.com","https://taobai.com"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

關(guān)于阻塞函數(shù)在asyncio中使用的問題以及解決方案

【客觀背景】:雖然目前有異步函數(shù)支持asyncio,但實(shí)際問題是大部分IO模塊還不支持asyncio。
【問題】:如果想在asyncio中使用其他阻塞函數(shù),該怎么實(shí)現(xiàn)呢?

阻塞函數(shù)在asyncio中使用的問題

【】阻塞函數(shù)(例如io讀寫,requests網(wǎng)絡(luò)請(qǐng)求等),阻塞了客戶程序與asycio事件循環(huán)的唯一線程,因此在執(zhí)行調(diào)用時(shí),整個(gè)應(yīng)用程序都會(huì)凍結(jié)。

【解決方案】:使用事件循環(huán)對(duì)象的run_in_executor方法。
【詳細(xì)解釋】:syncio的事件循環(huán)在背后維護(hù)著一個(gè)ThreadPoolExecutor對(duì)象,我們可以調(diào)用run_in_executor方法,把可調(diào)用對(duì)象發(fā)給它執(zhí)行,即可以通過run_in_executor方法來(lái)新建一個(gè)線程來(lái)執(zhí)行耗時(shí)函數(shù)。


【run_in_executor方法】

AbstractEventLoop.run_in_executor(executor, func, *args)

【組成解釋】:

  • executor: 參數(shù)應(yīng)該是一個(gè) Executor 實(shí)例。如果為 None,則使用默認(rèn) executor。(區(qū)分大小寫)
  • func :就是要執(zhí)行的函數(shù)。
  • args: 就是傳遞給 func 的參數(shù)。
    看個(gè)實(shí)例:
import asyncio
import time

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        await loop.run_in_executor(None,time.sleep,1)
    except Exception as e:
        print(e)
    print("stop ",url)

url_list = ["https://www.baidu.com","https://home.com","https://movie.com","https://taobai.com"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

【好處】run_in_executor方法:可以借此創(chuàng)建協(xié)稱并發(fā);避免使用特定的模塊來(lái)實(shí)現(xiàn)IO異步開發(fā)。

【參考】:https://blog.csdn.net/sinat_34082752/article/details/80680348

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容