aiohttp異步編程(一)

1. 進(jìn)程、線程、協(xié)程的概念

進(jìn)程:CPU是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,進(jìn)程是程序執(zhí)行的實(shí)體,是線程執(zhí)行的容器。

線程:輕量級(jí)進(jìn)程,是程序執(zhí)行流的最小單元。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程。在多線程OS中,通常是在一個(gè)進(jìn)程中包括多個(gè)線程,每個(gè)線程都是作為利用CPU的基本單位,是花費(fèi)最小開(kāi)銷的實(shí)體。在同一進(jìn)程中的各個(gè)線程,都可以共享該進(jìn)程所擁有的資源。

協(xié)程:類似于線程,線程的調(diào)度由系統(tǒng)決定,協(xié)程的切換由自己決定。

協(xié)程和線程區(qū)別:協(xié)程避免了無(wú)意義的調(diào)度,由此可以提高性能,但也因此,程序員必須自己承擔(dān)調(diào)度的責(zé)任,同時(shí),協(xié)程也失去了標(biāo)準(zhǔn)線程使用多CPU的能力。


2.python 異步編程框架 asyncio

?查看python 官方文檔:

https://docs.python.org/zh-cn/3.8/library/asyncio-task.html#asyncio.run

3.7之前的asyncio模塊使用理解:

將多個(gè)協(xié)程進(jìn)一步封裝到一個(gè)task對(duì)象中,task就是一個(gè)儲(chǔ)存任務(wù)的盒子。此時(shí),裝在盒子里的任務(wù)并沒(méi)有真正的運(yùn)行,需要把它接入到一個(gè)監(jiān)視器中使它運(yùn)行,同時(shí)監(jiān)視器還要持續(xù)不斷的盯著盒子里的任務(wù)運(yùn)行到了哪一步,這個(gè)持續(xù)不斷的監(jiān)視器就用一個(gè)循環(huán)對(duì)象loop來(lái)實(shí)現(xiàn)。



3.7之后的協(xié)程,直接使用run調(diào)用協(xié)程:

以下代碼段,會(huì)打印 "hello",等待 1 秒,再打印 "world":

使用create_task()將協(xié)程打包成任務(wù)



3. aiohttp基于asyncio的http異步框架

異步io的好處在于避免的線程的開(kāi)銷和切換,而且我們都知道python其實(shí)是沒(méi)有多線程的,只是通過(guò)底層線層鎖實(shí)現(xiàn)的多線程。另一個(gè)好處在于避免io操作(包含網(wǎng)絡(luò)傳輸)的堵塞時(shí)間。

asyncio可以實(shí)現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端,發(fā)揮的威力不大。如果把a(bǔ)syncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實(shí)現(xiàn)多用戶的高并發(fā)支持。

asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實(shí)現(xiàn)的HTTP框架。

(1)aiohttp作為服務(wù)器端的一個(gè)簡(jiǎn)單的demo。

import argparse

from aiohttp import web

import asyncio

import base64

import logging

import uvloop

import time,datetime

import json

import requests

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

routes = web.RouteTableDef()

@routes.get('/')

async def hello(request):

? return web.Response(text="Hello, world")

# 定義一個(gè)路由映射,接收網(wǎng)址參數(shù),post方式

@routes.post('/demo1/{name}')

async def demo1(request):? # 異步監(jiān)聽(tīng),只要一有握手就開(kāi)始觸發(fā),此時(shí)網(wǎng)址參數(shù)中的name就已經(jīng)知道了,但是前端可能還沒(méi)有完全post完數(shù)據(jù)。

? name = request.match_info.get('name', "Anonymous") # 獲取name

? print(datetime.datetime.now())? # 觸發(fā)視圖函數(shù)的時(shí)間

? data = await request.post()? # 等待post數(shù)據(jù)完成接收,只有接收完成才能進(jìn)行后續(xù)操作.data['key']獲取參數(shù)

? print(datetime.datetime.now())? # 接收post數(shù)據(jù)完成的時(shí)間

? logging.info('safety dect request start %s' % datetime.datetime.now())

? result = {'name':name,'key':data['key']}

? logging.info('safety dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))

? return web.json_response(result)

# 定義一個(gè)路由映射,設(shè)計(jì)到io操作

@routes.post('/demo2')

async def demo2(request):? # 異步監(jiān)聽(tīng),只要一有握手就開(kāi)始觸發(fā),此時(shí)網(wǎng)址參數(shù)中的name就已經(jīng)知道了,但是前端可能還沒(méi)有完全post完數(shù)據(jù)。

? data = await request.post()? # 等待post數(shù)據(jù)完成接收,只有接收完成才能進(jìn)行后續(xù)操作.data['key']獲取參數(shù)

? logging.info('safety dect request start %s' % datetime.datetime.now())

? res = requests.post('http://www.baidu.com')? # 網(wǎng)路id,會(huì)自動(dòng)切換到其他協(xié)成上

? logging.info('safety dect request finish %s' % res.test)

? return web.Response(text="welcome")

if __name__ == '__main__':

? logging.info('server start')

? app = web.Application()

? app.add_routes(routes)

? web.run_app(app,host='0.0.0.0',port=8080)

? logging.info('server close')

(2)aiohttp客戶端。

aiohttp的另一個(gè)主要作用是作為異步客戶端,用來(lái)解決高并發(fā)請(qǐng)求的情況。比如現(xiàn)在我要模擬一個(gè)高并發(fā)請(qǐng)求來(lái)測(cè)試我的服務(wù)器負(fù)載情況。所以需要在python里模擬高并發(fā)。高并發(fā)可以有多種方式,比如多線程,但是由于python本質(zhì)上是沒(méi)有多線程的,通過(guò)底層線程鎖實(shí)現(xiàn)的多線程。在模型高并發(fā)時(shí),具有線程切換和線程開(kāi)銷的損耗。所以我們就可以使用多協(xié)成來(lái)實(shí)現(xiàn)高并發(fā)。我們就可以使用aiohttp來(lái)模擬高并發(fā)客戶端。demo如下,用來(lái)模擬多個(gè)客戶端向指定服務(wù)器post圖片。

# 異步并發(fā)客戶端

class Asyncio_Client(object):

? def __init__(self):

? ? self.loop=asyncio.get_event_loop()

? ? self.tasks=[]

? # 將異步函數(shù)介入任務(wù)列表。后續(xù)參數(shù)直接傳給異步函數(shù)

? def set_task(self,task_fun,num,*args):

? ? for i in range(num):

? ? ? self.tasks.append(task_fun(*args))

? # 運(yùn)行,獲取返回結(jié)果

? def run(self):

? ? back=[]

? ? try:

? ? ? f = asyncio.wait(self.tasks)? # 創(chuàng)建future

? ? ? self.loop.run_until_complete(f) # 等待future完成

? ? finally:

? ? ? pass

# 服務(wù)器高并發(fā)壓力測(cè)試

class Test_Load():

? total_time=0 # 總耗時(shí)

? total_payload=0 # 總負(fù)載

? total_num=0 # 總并發(fā)數(shù)

? all_time=[]

? # 創(chuàng)建一個(gè)異步任務(wù),本地測(cè)試,所以post和接收幾乎不損耗時(shí)間,可以等待完成,主要耗時(shí)為算法模塊

? async def task_func1(self,session):

? ? begin = time.time()

? ? # print('開(kāi)始發(fā)送:', begin)

? ? file=open(self.image, 'rb')

? ? fsize = os.path.getsize(self.image)

? ? self.total_payload+=fsize/(1024*1024)

? ? data = {"image_id": "2", 'image':file}

? ? r = await session.post(self.url,data=data) #只post,不接收

? ? result = await r.json()

? ? self.total_num+=1

? ? # print(result)

? ? end = time.time()

? ? # print('接收完成:', end,',index=',self.total_num)

? ? self.all_time.append(end-begin)

? # 負(fù)載測(cè)試

? def test_safety(self):

? ? print('test begin')

? ? async_client = Asyncio_Client() # 創(chuàng)建客戶端

? ? session = aiohttp.ClientSession()

? ? for i in range(10): # 執(zhí)行10次

? ? ? self.all_time=[]

? ? ? self.total_num=0

? ? ? self.total_payload=0

? ? ? self.image = 'xxxx.jpg' # 設(shè)置測(cè)試nayizhang

? ? ? print('測(cè)試圖片:', self.image)

? ? ? begin = time.time()

? ? ? async_client.set_task(self.task_func1,self.num,session) # 設(shè)置并發(fā)任務(wù)

? ? ? async_client.run()? # 執(zhí)行任務(wù)

? ? ? end=time.time()

? ? ? self.all_time.sort(reverse=True)

? ? ? print(self.all_time)

? ? ? print('并發(fā)數(shù)量(個(gè)):',self.total_num)

? ? ? print('總耗時(shí)(s):',end-begin)

? ? ? print('最大時(shí)延(s):',self.all_time[0])

? ? ? print('最小時(shí)延(s):', self.all_time[len(self.all_time)-1])

? ? ? print('top-90%時(shí)延(s):', self.all_time[int(len(self.all_time)*0.1)])

? ? ? print('平均耗時(shí)(s/個(gè)):',sum(self.all_time)/self.total_num)

? ? ? print('支持并發(fā)率(個(gè)/s):',self.total_num/(end-begin))

? ? ? print('總負(fù)載(MB):',self.total_payload)

? ? ? print('吞吐率(MB/S):',self.total_payload/(end-begin))? # 吞吐率受上行下行帶寬,服務(wù)器帶寬,服務(wù)器算法性能諸多影響

? ? ? time.sleep(3)

? ? session.close()

? ? print('test finish')

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容