1. 進(jìn)程、線程、協(xié)程的概念
進(jìn)程:CPU是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,進(jìn)程是程序執(zhí)行的實(shí)體,是線程執(zhí)行的容器。
線程:輕量級(jí)進(jìn)程,是程序執(zhí)行流的最小單元。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程。在多線程OS中,通常是在一個(gè)進(jìn)程中包括多個(gè)線程,每個(gè)線程都是作為利用CPU的基本單位,是花費(fèi)最小開(kāi)銷的實(shí)體。在同一進(jìn)程中的各個(gè)線程,都可以共享該進(jìn)程所擁有的資源。
協(xié)程:類似于線程,線程的調(diào)度由系統(tǒng)決定,協(xié)程的切換由自己決定。
協(xié)程和線程區(qū)別:協(xié)程避免了無(wú)意義的調(diào)度,由此可以提高性能,但也因此,程序員必須自己承擔(dān)調(diào)度的責(zé)任,同時(shí),協(xié)程也失去了標(biāo)準(zhǔn)線程使用多CPU的能力。
2.python 異步編程框架 asyncio
?查看python 官方文檔:
https://docs.python.org/zh-cn/3.8/library/asyncio-task.html#asyncio.run
3.7之前的asyncio模塊使用理解:
將多個(gè)協(xié)程進(jìn)一步封裝到一個(gè)task對(duì)象中,task就是一個(gè)儲(chǔ)存任務(wù)的盒子。此時(shí),裝在盒子里的任務(wù)并沒(méi)有真正的運(yùn)行,需要把它接入到一個(gè)監(jiān)視器中使它運(yùn)行,同時(shí)監(jiān)視器還要持續(xù)不斷的盯著盒子里的任務(wù)運(yùn)行到了哪一步,這個(gè)持續(xù)不斷的監(jiān)視器就用一個(gè)循環(huán)對(duì)象loop來(lái)實(shí)現(xiàn)。



3.7之后的協(xié)程,直接使用run調(diào)用協(xié)程:
以下代碼段,會(huì)打印 "hello",等待 1 秒,再打印 "world":

使用create_task()將協(xié)程打包成任務(wù)

3. aiohttp基于asyncio的http異步框架
異步io的好處在于避免的線程的開(kāi)銷和切換,而且我們都知道python其實(shí)是沒(méi)有多線程的,只是通過(guò)底層線層鎖實(shí)現(xiàn)的多線程。另一個(gè)好處在于避免io操作(包含網(wǎng)絡(luò)傳輸)的堵塞時(shí)間。
asyncio可以實(shí)現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端,發(fā)揮的威力不大。如果把a(bǔ)syncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實(shí)現(xiàn)多用戶的高并發(fā)支持。
asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實(shí)現(xiàn)的HTTP框架。
(1)aiohttp作為服務(wù)器端的一個(gè)簡(jiǎn)單的demo。
import argparse
from aiohttp import web
import asyncio
import base64
import logging
import uvloop
import time,datetime
import json
import requests
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
routes = web.RouteTableDef()
@routes.get('/')
async def hello(request):
? return web.Response(text="Hello, world")
# 定義一個(gè)路由映射,接收網(wǎng)址參數(shù),post方式
@routes.post('/demo1/{name}')
async def demo1(request):? # 異步監(jiān)聽(tīng),只要一有握手就開(kāi)始觸發(fā),此時(shí)網(wǎng)址參數(shù)中的name就已經(jīng)知道了,但是前端可能還沒(méi)有完全post完數(shù)據(jù)。
? name = request.match_info.get('name', "Anonymous") # 獲取name
? print(datetime.datetime.now())? # 觸發(fā)視圖函數(shù)的時(shí)間
? data = await request.post()? # 等待post數(shù)據(jù)完成接收,只有接收完成才能進(jìn)行后續(xù)操作.data['key']獲取參數(shù)
? print(datetime.datetime.now())? # 接收post數(shù)據(jù)完成的時(shí)間
? logging.info('safety dect request start %s' % datetime.datetime.now())
? result = {'name':name,'key':data['key']}
? logging.info('safety dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))
? return web.json_response(result)
# 定義一個(gè)路由映射,設(shè)計(jì)到io操作
@routes.post('/demo2')
async def demo2(request):? # 異步監(jiān)聽(tīng),只要一有握手就開(kāi)始觸發(fā),此時(shí)網(wǎng)址參數(shù)中的name就已經(jīng)知道了,但是前端可能還沒(méi)有完全post完數(shù)據(jù)。
? data = await request.post()? # 等待post數(shù)據(jù)完成接收,只有接收完成才能進(jìn)行后續(xù)操作.data['key']獲取參數(shù)
? logging.info('safety dect request start %s' % datetime.datetime.now())
? res = requests.post('http://www.baidu.com')? # 網(wǎng)路id,會(huì)自動(dòng)切換到其他協(xié)成上
? logging.info('safety dect request finish %s' % res.test)
? return web.Response(text="welcome")
if __name__ == '__main__':
? logging.info('server start')
? app = web.Application()
? app.add_routes(routes)
? web.run_app(app,host='0.0.0.0',port=8080)
? logging.info('server close')
(2)aiohttp客戶端。
aiohttp的另一個(gè)主要作用是作為異步客戶端,用來(lái)解決高并發(fā)請(qǐng)求的情況。比如現(xiàn)在我要模擬一個(gè)高并發(fā)請(qǐng)求來(lái)測(cè)試我的服務(wù)器負(fù)載情況。所以需要在python里模擬高并發(fā)。高并發(fā)可以有多種方式,比如多線程,但是由于python本質(zhì)上是沒(méi)有多線程的,通過(guò)底層線程鎖實(shí)現(xiàn)的多線程。在模型高并發(fā)時(shí),具有線程切換和線程開(kāi)銷的損耗。所以我們就可以使用多協(xié)成來(lái)實(shí)現(xiàn)高并發(fā)。我們就可以使用aiohttp來(lái)模擬高并發(fā)客戶端。demo如下,用來(lái)模擬多個(gè)客戶端向指定服務(wù)器post圖片。
# 異步并發(fā)客戶端
class Asyncio_Client(object):
? def __init__(self):
? ? self.loop=asyncio.get_event_loop()
? ? self.tasks=[]
? # 將異步函數(shù)介入任務(wù)列表。后續(xù)參數(shù)直接傳給異步函數(shù)
? def set_task(self,task_fun,num,*args):
? ? for i in range(num):
? ? ? self.tasks.append(task_fun(*args))
? # 運(yùn)行,獲取返回結(jié)果
? def run(self):
? ? back=[]
? ? try:
? ? ? f = asyncio.wait(self.tasks)? # 創(chuàng)建future
? ? ? self.loop.run_until_complete(f) # 等待future完成
? ? finally:
? ? ? pass
# 服務(wù)器高并發(fā)壓力測(cè)試
class Test_Load():
? total_time=0 # 總耗時(shí)
? total_payload=0 # 總負(fù)載
? total_num=0 # 總并發(fā)數(shù)
? all_time=[]
? # 創(chuàng)建一個(gè)異步任務(wù),本地測(cè)試,所以post和接收幾乎不損耗時(shí)間,可以等待完成,主要耗時(shí)為算法模塊
? async def task_func1(self,session):
? ? begin = time.time()
? ? # print('開(kāi)始發(fā)送:', begin)
? ? file=open(self.image, 'rb')
? ? fsize = os.path.getsize(self.image)
? ? self.total_payload+=fsize/(1024*1024)
? ? data = {"image_id": "2", 'image':file}
? ? r = await session.post(self.url,data=data) #只post,不接收
? ? result = await r.json()
? ? self.total_num+=1
? ? # print(result)
? ? end = time.time()
? ? # print('接收完成:', end,',index=',self.total_num)
? ? self.all_time.append(end-begin)
? # 負(fù)載測(cè)試
? def test_safety(self):
? ? print('test begin')
? ? async_client = Asyncio_Client() # 創(chuàng)建客戶端
? ? session = aiohttp.ClientSession()
? ? for i in range(10): # 執(zhí)行10次
? ? ? self.all_time=[]
? ? ? self.total_num=0
? ? ? self.total_payload=0
? ? ? self.image = 'xxxx.jpg' # 設(shè)置測(cè)試nayizhang
? ? ? print('測(cè)試圖片:', self.image)
? ? ? begin = time.time()
? ? ? async_client.set_task(self.task_func1,self.num,session) # 設(shè)置并發(fā)任務(wù)
? ? ? async_client.run()? # 執(zhí)行任務(wù)
? ? ? end=time.time()
? ? ? self.all_time.sort(reverse=True)
? ? ? print(self.all_time)
? ? ? print('并發(fā)數(shù)量(個(gè)):',self.total_num)
? ? ? print('總耗時(shí)(s):',end-begin)
? ? ? print('最大時(shí)延(s):',self.all_time[0])
? ? ? print('最小時(shí)延(s):', self.all_time[len(self.all_time)-1])
? ? ? print('top-90%時(shí)延(s):', self.all_time[int(len(self.all_time)*0.1)])
? ? ? print('平均耗時(shí)(s/個(gè)):',sum(self.all_time)/self.total_num)
? ? ? print('支持并發(fā)率(個(gè)/s):',self.total_num/(end-begin))
? ? ? print('總負(fù)載(MB):',self.total_payload)
? ? ? print('吞吐率(MB/S):',self.total_payload/(end-begin))? # 吞吐率受上行下行帶寬,服務(wù)器帶寬,服務(wù)器算法性能諸多影響
? ? ? time.sleep(3)
? ? session.close()
? ? print('test finish')