關(guān)于aiohttp

aiohttp強(qiáng)調(diào)的是異步并發(fā)。提供了對asyncio/await的支持,可以實(shí)現(xiàn)單線程并發(fā)IO操作。

import aiohttp
import asyncio

async def fetch(session, url):
   async with session.get(url) as response:
       return await response.text()

async def main():
   async with aiohttp.ClientSession() as session:
       html = await fetch(session, 'http://httpbin.org/headers')
       print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

{'headers':{'Accept':'*/*','Accept-Encoding':'gzip, deflate','Connection':'close','Host':'httpbin.org','User-Agent':'Python/3.6 aiohttp/3.2.1'}}

#我們的例子不涉及服務(wù)端

原理就是不用等待一個(gè)操作完成,可以同步做其他事情
(如果需要等這個(gè)操作的結(jié)果呢?前面有一個(gè)功能是批量操作一組?)

官方推薦使用ClientSession來管理會(huì)話。

aioredis和motor兩個(gè)異步操作數(shù)據(jù)庫的庫。

另外,Scrapy也是異步的,是基于Twisted事件驅(qū)動(dòng)的。在任何情況下,都不要寫阻塞的代碼。阻塞的代碼包括:

◆ 1.訪問文件、數(shù)據(jù)庫或者Web;

◆ 2.產(chǎn)生新的進(jìn)程并需要處理新進(jìn)程的輸出,如運(yùn)行shell命令;

◆ 3.執(zhí)行系統(tǒng)層次操作的代碼,如等待系統(tǒng)隊(duì)列。

這里就出現(xiàn)一個(gè)問題,如果一個(gè)操作必須依賴上一個(gè)操作,是否需要異步操作,如何應(yīng)用這個(gè)操作?

下面是一段異步操作代碼

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import logging


class AsnycGrab(object):

   def __init__(self, url_list, max_threads):

       self.urls = url_list
       self.results = {}
       self.max_threads = max_threads

   def __parse_results(self, url, html):

       try:
           soup = BeautifulSoup(html, 'html.parser')
           title = soup.find('title').get_text()
       except Exception as e:
           raise e

       if title:
           self.results[url] = title

   async def get_body(self, url):
       async with aiohttp.ClientSession() as session:
           async with session.get(url, timeout=30) as response:
               assert response.status == 200
               html = await response.read()
               return response.url, html

   async def get_results(self, url):
       url, html = await self.get_body(url)
       self.__parse_results(url, html)
       return 'Completed'

   async def handle_tasks(self, task_id, work_queue):
       while not work_queue.empty():
           current_url = await work_queue.get()
           try:
               task_status = await self.get_results(current_url)
           except Exception as e:
               logging.exception('Error for {}'.format(current_url), exc_info=True)

   def eventloop(self):
       q = asyncio.Queue()
       [q.put_nowait(url) for url in self.urls]
       loop = asyncio.get_event_loop()
       tasks = [self.handle_tasks(task_id, q, ) for task_id in range(self.max_threads)]
       loop.run_until_complete(asyncio.wait(tasks))
       loop.close()


if __name__ == '__main__':
   async_example = AsnycGrab(['http://edmundmartin.com',
              'https://www.udemy.com',
              'https://github.com/',
              'https://zhangslob.github.io/',
              'https://www.zhihu.com/'], 5)
   async_example.eventloop()
   print(async_example.results)

里面涉及了一個(gè)隊(duì)列通信的功能asyncio.Queue()

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容