python-復(fù)盤-web-aiohttp-并發(fā)設(shè)計(jì)

初識(shí) asyncio/aiohttp

異步編程并不簡(jiǎn)單。相比平常的同步編程,你需要付出更多的努力在使用回調(diào)函數(shù),以事件以及事件處理器的模式進(jìn)行思考。同時(shí)也是因?yàn)閍syncio相對(duì)較新,相關(guān)的教程以及博客還很少的緣故。官方文檔非常簡(jiǎn)陋,只有最基本的范例。在我寫本文的時(shí)候,Stack Overflow上面,只有410個(gè)與asyncio相關(guān)的話題(相比之下,twisted相關(guān)的有2585)。有個(gè)別關(guān)于asyncio的不錯(cuò)的博客以及文章,比如這個(gè)這個(gè)、這個(gè),或者還有這個(gè)
以及這個(gè)
簡(jiǎn)單起見,我們先從基礎(chǔ)開始 —— 簡(jiǎn)單HTTP hello world —— 發(fā)起GET請(qǐng)求,同時(shí)獲取一個(gè)單獨(dú)的HTTP響應(yīng)。

同步模式,你這么做:

import requests 
def hello()    
     return requests.get("http://httpbin.org/get")     
print(hello())

接著我們使用aiohttp:

#!/usr/local/bin/python3.5 
import asyncio 
from aiohttp import ClientSession 

async def hello():     
    async with ClientSession() as session:         
        async with session.get("http://httpbin.org/headers") as response:                
            response = await response.read()                         
            print(response) 
  
loop = asyncio.get_event_loop() 
loop.run_until_complete(hello())

好吧,看上去僅僅一個(gè)簡(jiǎn)單的任務(wù),我寫了很多的代碼……那里有“async def”、“async with”、“await”—— 看上去讓人迷惑,讓我們嘗試弄懂它們。
你使用async以及await關(guān)鍵字將函數(shù)異步化。在hello()中實(shí)際上有兩個(gè)異步操作:首先異步獲取相應(yīng),然后異步讀取響應(yīng)的內(nèi)容。
Aiohttp推薦使用ClientSession作為主要的接口發(fā)起請(qǐng)求。ClientSession允許在多個(gè)請(qǐng)求之間保存cookie以及相關(guān)對(duì)象信息。Session(會(huì)話)在使用完畢之后需要關(guān)閉,關(guān)閉Session是另一個(gè)異步操作,所以每次你都需要使用async with關(guān)鍵字。
一旦你建立了客戶端session,你可以用它發(fā)起請(qǐng)求。這里是又一個(gè)異步操作的開始。上下文管理器的with語句可以保證在處理session的時(shí)候,總是能正確的關(guān)閉它。

要讓你的程序正常的跑起來,你需要將他們加入事件循環(huán)中。所以你需要?jiǎng)?chuàng)建一個(gè)asyncio loop的實(shí)例, 然后將任務(wù)加入其中。

看起來有些困難,但是只要你花點(diǎn)時(shí)間進(jìn)行思考與理解,就會(huì)有所體會(huì),其實(shí)并沒有那么復(fù)雜。

訪問多個(gè)鏈接

同步方式如下:

for url in urls:     
    print(requests.get(url).text)

很簡(jiǎn)單。不過異步方式卻沒有這么容易。所以任何時(shí)候你都需要思考,你的處境是否有必要用到異步。如果你的app在同步模式工作的很好,也許你并不需要將之遷移到異步方式。如果你確實(shí)需要異步方式,這里會(huì)給你一些啟示。我們的異步函數(shù)hello()還是保持原樣,不過我們需要將之包裝在asyncio的Future對(duì)象中,然后將Future對(duì)象列表作為任務(wù)傳遞給事件循環(huán)。

loop = asyncio.get_event_loop() 
tasks = [] # I'm using test server localhost, but you can use any url 
url = "http://localhost:8080/{}" 
for i in range(5):     
    task = asyncio.ensure_future(hello(url.format(i)))     
    tasks.append(task) 

loop.run_until_complete(asyncio.wait(tasks))

現(xiàn)在假設(shè)我們想獲取所有的響應(yīng),并將他們保存在同一個(gè)列表中。目前,我們沒有保存響應(yīng)內(nèi)容,僅僅只是打印了他們。讓我們返回他們,將之存儲(chǔ)在一個(gè)列表當(dāng)中,最后再打印出來。

為了達(dá)到這個(gè)目的,我們需要修改一下代碼:

#!/usr/local/bin/python3.5 
import asyncio 
from aiohttp import ClientSession 

async def fetch(url):      
   async with ClientSession() as session:           
      async with session.get(url) as response:             
          return await response.read()             
          
async def run(loop,  r):     
   url = "http://localhost:8080/{}"         
   tasks = []         
   for i in range(r):             
       task = asyncio.ensure_future(fetch(url.format(i)))                 
       tasks.append(task)         
       responses = await asyncio.gather(*tasks)         
       # you now have all response bodies in this variable         
       print(responses)     
       
def print_responses(result):       
    print(result)     
    
loop = asyncio.get_event_loop() 
future = asyncio.ensure_future(run(loop, 4)) 
loop.run_until_complete(future)

注意asyncio.gather()的用法,它搜集所有的Future對(duì)象,然后等待他們返回。

常見錯(cuò)誤

現(xiàn)在我們來模擬真實(shí)場(chǎng)景,去調(diào)試一些錯(cuò)誤,作為演示范例。

# WARNING! BROKEN CODE DO NOT COPY PASTE 
async def fetch(url):      
   async with ClientSession() as session:           
      async with session.get(url) as response:                     
          return response.read()

如果你對(duì)aiohttp或者asyncio不夠了解,即使你很熟悉Python,這段代碼也不好debug。

上面的代碼產(chǎn)生如下輸出:

pawel@pawel-VPCEH390X ~/p/l/benchmarker> ./bench.py  
[<generator object ClientResponse.read at 0x7fa68d465728>,
 <generator object ClientResponse.read at 0x7fa68cdd9468>,
 <generator object ClientResponse.read at 0x7fa68d4656d0>,
 <generator object ClientResponse.read at 0x7fa68cdd9af0>]

發(fā)生了什么?你期待獲得響應(yīng)對(duì)象,但是你得到的是一組生成器。怎么會(huì)這樣?

我之前提到過,response.read()是一個(gè)異步操作,這意味著它不會(huì)立即返回結(jié)果,僅僅返回生成器。這些生成器需要被調(diào)用跟運(yùn)行,但是這并不是默認(rèn)行為。在Python34中加入的yield from以及Python3.5中加入的await便是為此而生。它們將迭代這些生成器。以上代碼只需要在response.read()前加上await關(guān)鍵字即可修復(fù)。如下:

    # async operation must be preceded by await          
    return await response.read() 
    # NOT: return response.read()

我們看看另一個(gè)例子。

# WARNING! BROKEN CODE DO NOT COPY PASTE 
async def run(loop,  r):       
   url = "http://localhost:8080/{}"         
   tasks = []         
   for i in range(r):             
       task = asyncio.ensure_future(fetch(url.format(i)))                 
       tasks.append(task)         
       responses = asyncio.gather(*tasks)         
       print(responses)

輸出結(jié)果如下:

pawel@pawel-VPCEH390X ~/p/l/benchmarker> ./bench.py  
<_GatheringFuture pending> 
Task was destroyed but it is pending! 
task: <Task pending coro=<fetch() running at ./bench.py:7> 
        wait_for=<Future pending cb=[Task._wakeup()]> 
        cb=[gather.<locals>._done_callback(0)() 
        at /usr/local/lib/python3.5/asyncio/tasks.py:602]> 
Task was destroyed but it is pending! 
task: <Task pending coro=<fetch() running at ./bench.py:7> 
  wait_for=<Future pending cb=[Task._wakeup()]> 
  cb=[gather.<locals>._done_callback(1)() 
  at /usr/local/lib/python3.5/asyncio/tasks.py:602]> 
Task was destroyed but it is pending! 
task: <Task pending coro=<fetch() running at ./bench.py:7> 
  wait_for=<Future pending cb=[Task._wakeup()]> 
  cb=[gather.<locals>._done_callback(2)() 
  at /usr/local/lib/python3.5/asyncio/tasks.py:602]> 
Task was destroyed but it is pending! 
task: <Task pending coro=<fetch() running at ./bench.py:7>
  wait_for=<Future pending cb=[Task._wakeup()]> 
  cb=[gather.<locals>._done_callback(3)() 
  at /usr/local/lib/python3.5/asyncio/tasks.py:602]>

發(fā)生了什么?查看本地日志,你會(huì)發(fā)現(xiàn)沒有任何請(qǐng)求到達(dá)服務(wù)器,實(shí)際上沒有任何請(qǐng)求發(fā)生。打印信息首先打印<_Gathering pending>對(duì)象,然后警告等待的任務(wù)被銷毀。又一次的,你忘記了await。

修改

    responses = asyncio.gather(*tasks)

    responses = await asyncio.gather(*tasks)

即可解決問題。

經(jīng)驗(yàn):任何時(shí)候,你在等待什么的時(shí)候,記得使用await

參考



同步 vs 異步

看看同步與異步(client)效率上的區(qū)別。異步每分鐘能夠發(fā)起多少請(qǐng)求。
為此,我們首先配置一個(gè)異步的aiohttp服務(wù)器端。這個(gè)服務(wù)端將獲取全部的html文本, 來自Marry Shelley的Frankenstein。在每個(gè)響應(yīng)中,它將添加隨機(jī)的延時(shí)。有的為0,最大值為3s。類似真正的app。有些app的響應(yīng)延時(shí)為固定值,一般而言,每個(gè)響應(yīng)的延時(shí)是不同的。

服務(wù)器代碼如下:

#!/usr/local/bin/python3.5 
import asyncio 
from datetime import datetime 
from aiohttp import web 
import random 

# set seed to ensure async and sync client get same distribution of delay values 
# and tests are fair random.seed(1) 
async def hello(request):     
    name = request.match_info.get("name", "foo")         
    n = datetime.now().isoformat()         
    delay = random.randint(0, 3)         
    await asyncio.sleep(delay)         
    headers = {"content_type": "text/html", "delay": str(delay)}         
    # opening file is not async here, so it may block, to improve         
    # efficiency of this you can consider using asyncio Executors         
    # that will delegate file operation to separate thread or process         
    # and improve performance         
    # https://docs.python.org/3/library/asyncio-eventloop.html#executor         
    # https://pymotw.com/3/asyncio/executors.html     
    with open("frank.html", "rb") as html_body:            
         print("{}: {} delay: {}".format(n, request.path, delay))                 
         response = web.Response(body=html_body.read(), headers=headers)         
         return response     
         
app = web.Application() 
app.router.add_route("GET", "/{name}", hello) 
web.run_app(app)

同步客戶端代碼如下:

import requests 
r = 100 
url = "http://localhost:8080/{}" 
for i in range(r):      
   res = requests.get(url.format(i))      
  delay = res.headers.get("DELAY")         
  d = res.headers.get("DATE")         
  print("{}:{} delay {}".format(d, res.url, delay))

在我的機(jī)器上,上面的代碼耗時(shí)2分45s。而異步代碼只需要3.48s。

有趣的是,異步代碼耗時(shí)無限接近最長(zhǎng)的延時(shí)(server的配置)。如果你觀察打印信息,你會(huì)發(fā)現(xiàn)異步客戶端的優(yōu)勢(shì)有多么巨大。有的響應(yīng)為0延遲,有的為3s。同步模式下,客戶端會(huì)阻塞、等待,你的機(jī)器什么都不做。異步客戶端不會(huì)浪費(fèi)時(shí)間,當(dāng)有延遲發(fā)生時(shí),它將去做其他的事情。在日志中,你也會(huì)發(fā)現(xiàn)這個(gè)現(xiàn)象。首先是0延遲的響應(yīng),然后當(dāng)它們到達(dá)后,你將看到1s的延遲,最后是最大延遲的響應(yīng)。

極限測(cè)試

現(xiàn)在我們知道異步表現(xiàn)更好,讓我們嘗試去找到它的極限,同時(shí)嘗試讓它崩潰。我將發(fā)送1000異步請(qǐng)求。我很好奇我的客戶端能夠處理多少數(shù)量的請(qǐng)求。

> time python3 bench.py 
2.68user 0.24system 0:07.14elapsed 40%CPU 
(0avgtext+0avgdata 53704maxresident)
k 0inputs+0outputs (0major+14156minor)pagefaults 0swaps

1000個(gè)請(qǐng)求,花費(fèi)了7s。相當(dāng)不錯(cuò)的成績(jī)。然后10K呢?很不幸,失敗了:

responses are <_GatheringFuture finished exception=
  ClientOSError(24, 'Cannot connect to host localhost:8080 ssl:
  False [Can not connect to localhost:8080 [Too many open files]]')> 
Traceback (most recent call last):  
   File "/home/pawel/.local/lib/python3.5/site-packages/aiohttp/connector.py", line 581, in _create_connection  
   File "/usr/local/lib/python3.5/asyncio/base_events.py", line 651, in create_connection   
   File "/usr/local/lib/python3.5/asyncio/base_events.py", line 618, in create_connection    
   File "/usr/local/lib/python3.5/socket.py", line 134, in __init__ OS
   Error: [Errno 24] Too many open files</pre>

這樣不大好,貌似我倒在了10K connections problem面前。

traceback顯示,open files太多了,可能代表著open sockets太多。為什么叫文件?Sockets(套接字)僅僅是文件描述符,操作系統(tǒng)有數(shù)量的限制。多少才叫太多呢?我查看Python源碼,然后發(fā)現(xiàn)這個(gè)值為1024.怎么樣繞過這個(gè)問題?一個(gè)粗暴的辦法是增加這個(gè)數(shù)值,但是聽起來并不高明。更好的辦法是,加入一些同步機(jī)制,限制并發(fā)數(shù)量。于是我在asyncio.Semaphore()中加入最大任務(wù)限制為1000.

修改客戶端代碼如下:

# modified fetch function with semaphore 
import random 
import asyncio 
from aiohttp import ClientSession 

async def fetch(url):      
   async with ClientSession() as session:         
       async with session.get(url) as response:                     
        delay = response.headers.get("DELAY")                     
        date = response.headers.get("DATE")                     
        print("{}:{} with delay {}".format(date, response.url, delay))                     
        return await response.read() 
        
async def bound_fetch(sem, url):     
    # getter function with semaphore         
    async with sem:                 
     await fetch(url) 
    async def run(loop,  r):         
     url = "http://localhost:8080/{}"         
     tasks = []         
     # create instance of Semaphore         
     sem = asyncio.Semaphore(1000)         
     for i in range(r):             
         # pass Semaphore to every GET request                 
         task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))                 
         tasks.append(task)         
         responses = asyncio.gather(*tasks)         
         
await responses number = 10000 
loop = asyncio.get_event_loop() 
future = asyncio.ensure_future(run(loop, number)) 
loop.run_until_complete(future)

現(xiàn)在,我們可以處理10k鏈接了。這花去我們23s,同時(shí)返回了一些異常。不過不管怎樣,相當(dāng)不錯(cuò)的表現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容