aiohttp是一個(gè)基于Python 3.4+ asyncio模塊的HTTP工具包。它包括:
- 類(lèi)似requests的HTTP客戶端
- 高層級(jí)HTTP服務(wù)器接口 - 用于構(gòu)建Web應(yīng)用程序
- 低層級(jí)HTTP服務(wù)器 - 用于不需要高層級(jí)服務(wù)器便利性的Web服務(wù)器
高低層級(jí)的服務(wù)器區(qū)別在于高層級(jí)服務(wù)器提供了:
- 路由
- 信號(hào)(啟動(dòng)、loop可用、關(guān)閉)
- 中間件
- 安裝次級(jí)應(yīng)用(類(lèi)似但不同于Flask的blueprints)
低層級(jí)服務(wù)器仍然允許使用高層級(jí)的請(qǐng)求、響應(yīng)和websocket對(duì)象,并不是低到讓你直接處理TCP套接字。
在本指南中,我們將構(gòu)建一個(gè)簡(jiǎn)單的Todo應(yīng)用程序來(lái)上手高級(jí)服務(wù)器。在未來(lái),我希望涵蓋更復(fù)雜的應(yīng)用,但目前待辦事項(xiàng)列表已經(jīng)成為網(wǎng)絡(luò)編程的最佳選擇。
安裝
- 你至少需要Python 3.6。aiohttp確實(shí)可以運(yùn)行在Python 3.4上,但我更傾向于Python 3.5引入的
async和await。至于3.6是因?yàn)樵谠摪嬉氲?a target="_blank" rel="nofollow">acync列表推導(dǎo),除此之外,Python 3.5以上都沒(méi)有問(wèn)題。 - 在確定Python版本后,我們使用虛擬環(huán)境安裝aiohttp
python3.6 -m venv asynctodo
source asynctodo/bin/activate
python -m pip install aiohttp==2.0.7 # current version as 2017-04-16
基本應(yīng)用
- 我們從簡(jiǎn)單的開(kāi)始:一個(gè)存儲(chǔ)在內(nèi)存中的列表(由字典組成)以及展示該列表和其中單個(gè)項(xiàng)目的路由。由于該列表是一個(gè)內(nèi)存中的對(duì)象,我們會(huì)重啟服務(wù)器幾次(aiohttp不附帶reloader重載器但我稍后會(huì)介紹)。新建
aiotodo.py添加如下內(nèi)容:
from aiohttp import web
TODOS = [
{
'name': 'Start this tutorial',
'finished': True
},
{
'name': 'Finish this tutorial',
'finished': False
}
]
def get_all_todos(request):
return web.json_response([
{'id': idx, **todo} for idx, todo in enumerate(TODOS)
])
def get_one_todo(request):
id = int(request.match_info['id'])
if id >= len(TODOS):
return web.json_response({'error': 'Todo not found'}, status=404)
return web.json_response({'id': id, **TODOS[id]})
def app_factory(args=()):
app = web.Application()
app.router.add_get('/todos/', get_all_todos, name='all_todos')
app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
return app
-
aiohttp.web包含所有高層級(jí)應(yīng)用服務(wù)器組件。它提供了諸如應(yīng)用程序類(lèi),請(qǐng)求和響應(yīng)類(lèi)以及像json_response這樣的幫助方法(json_response是創(chuàng)建application/json響應(yīng)的快捷方式,而不需要你手動(dòng)轉(zhuǎn)換對(duì)象)。 -
get_all_todos和get_one_todo是我們的路由處理器。(你會(huì)注意到它們不是異步的——這是正確的,因?yàn)橥胶彤惒匠绦蚍?wù)器都能應(yīng)對(duì))。處理器應(yīng)該以Django風(fēng)格接受請(qǐng)求,但與Django和Flask不同(不過(guò)顯然類(lèi)似于Pyramid),通過(guò)request.match_info映射來(lái)訪問(wèn)url參數(shù)。請(qǐng)求對(duì)象包含與特定請(qǐng)求相關(guān)的所有內(nèi)容,包括對(duì)生成它的應(yīng)用程序的引用。
補(bǔ)充:nerdwaller評(píng)論指出路由處理器應(yīng)該總是異步的來(lái)鼓勵(lì)你以協(xié)程思考。這里我保留了原始代碼來(lái)保持文本的一致性,然而,將上述內(nèi)容轉(zhuǎn)換為協(xié)程只需要將
def替換為async def。我必須同意,即使在將列表轉(zhuǎn)儲(chǔ)到j(luò)son的簡(jiǎn)單情況下,使用async def也提供了與其它路由處理器的良好對(duì)稱性。
-
{'id': id, **todo}語(yǔ)法在3.5引入,詳見(jiàn)PEP 448,簡(jiǎn)單效果如下:
-
app_factory不僅具有將所有應(yīng)用構(gòu)建組合在一起的好處,而且我們需要它來(lái)調(diào)用aiohttp dev服務(wù)器以及用于gunicorn aiohttp workers。你會(huì)注意到它還接收參數(shù)args,當(dāng)從命令行執(zhí)行時(shí),aiohttp入口點(diǎn)所不能解析的參數(shù)都會(huì)被傳到這里——我們稍后會(huì)用到。 - 最終,應(yīng)用路由像是Flask和Django的混合。它使用
{name:pattern}({名稱:模式})的形式來(lái)捕獲URL參數(shù),在request.match_info字典中獲取該參數(shù),模式部分應(yīng)用正則表達(dá)式。然而,雖然它與Flask(實(shí)際上是Werkzeug)路由模式相似,但沒(méi)有一個(gè)轉(zhuǎn)換器的概念,所以轉(zhuǎn)換必須在路由中手動(dòng)完成。
小貼士
- 即使
aiohttp.Application能夠使用同步路由處理器,你也應(yīng)當(dāng)小心,永遠(yuǎn)不要做長(zhǎng)時(shí)間阻塞事件循環(huán)處理器的事——包括同步IO和CPU密集型任務(wù)。 - 在URL參數(shù)的模式匹配部分,任何沒(méi)有配對(duì)的
{或者}字符都會(huì)導(dǎo)致將導(dǎo)致路由器拒絕建立路由并拋出異常。但是,使用配對(duì)的{}(如\d{2})可以工作。(詳見(jiàn)Issue 1778) - aiohttp的默認(rèn)路由器不執(zhí)行任何類(lèi)型的自動(dòng)尾斜杠重定向,所以
/todos和/todos/是不同的。如果你期望它處理尾斜杠/,請(qǐng)小心。 - 此外,上面的關(guān)于參數(shù)正則表達(dá)式匹配的注意事項(xiàng)僅適用于aiohttp的默認(rèn)路由器。它還有其他的實(shí)現(xiàn)(事實(shí)上,庫(kù)的維護(hù)者鼓勵(lì)你嘗試它們)。
運(yùn)行應(yīng)用
- 既然我們建好了基本應(yīng)用,我們可以用aiohttp開(kāi)發(fā)服務(wù)器運(yùn)行它。在
aiotodo.py同級(jí)目錄與虛擬環(huán)境(source asynctodo/bin/activate)下運(yùn)行:
python -m aiohttp.web -P 8080 aiotodo:app_factory
你將看到:
======== Running on http://localhost:8080 ========
(Press CTRL+C to quit)
然后打開(kāi)瀏覽器訪問(wèn)localhost:8080/todos/(注意尾斜杠),會(huì)看到我們放在列表中的兩個(gè)初始項(xiàng)目以及它們的id。訪問(wèn)localhost:8080/todos/0和localhost:8080/todos/1(沒(méi)有尾斜杠)查看單個(gè)項(xiàng)目。

增加修改刪除Todos
- 為了與代辦事項(xiàng)列表進(jìn)行交互,我們需要更多的處理器。先添加新建功能:
async def create_todo(request):
data = await request.json()
if 'name' not in data:
return web.json_response({'error': '"name" is a required field'})
name = data.get('name')
if not isinstance(name, str) or not len(name):
return web.json_response(
{'error': '"name" must be a string with at least one character'})
data['finished'] = bool(data.get('finished', False))
TODOS.append(data)
new_id = len(TODOS) - 1
return web.Response(
headers={
'Location': str(request.app.router['one_todo'].url_for(id=new_id))
},
status=303
)
- 大部分代碼是自解釋的(希望是)。我們需要
awaitrequest.json函數(shù),因?yàn)樗琲o操作。
其它 - [Benchmark of Python JSON libraries](http://artem.krylysov.com/blog/2015/09/29/benchmark-python-json-libraries/)
我們返回了一個(gè)303重定向,Location頭設(shè)為重定向地址。request.app.router[...].url_for()用于構(gòu)建通向其他路由處理器的URL——對(duì)router的字典查找對(duì)應(yīng)路由構(gòu)造(add_get等方法)的name='...'參數(shù)。 - 這里假設(shè)客戶端會(huì)自動(dòng)處理重定向,訪問(wèn)新創(chuàng)建的todo。如果不是(如
curl),它們需要讀取Location并手動(dòng)處理。 - 更新和刪除也是類(lèi)似的:
async def update_todo(request):
id = int(request.match_info['id'])
if id >= len(TODOS):
return web.json_response({'error': 'Todo not found'}, status=404)
data = await request.json()
if 'finished' not in data:
return web.json_response(
{'error': '"finished" is a required key'}, status=400)
TODOS[id]['finished'] = bool(data['finished'])
return web.Response(status=204)
def remove_todo(request):
id = int(request.match_info['id'])
if id >= len(TODOS):
return web.json_response({'error': 'Todo not found'})
del TODOS[id]
return web.Response(status=204)
- 這里沒(méi)有什么特別的。就是直接從列表中刪除項(xiàng)目有點(diǎn)問(wèn)題——我們可以用
None替換被刪除項(xiàng),并對(duì)兩個(gè)get方法稍作修改。 - 增加和修改刪除的最大不同在于,增加返回重定向到新建的項(xiàng)目頁(yè),而修改刪除返回?zé)o內(nèi)容204。原因在于訪客不知道新創(chuàng)建的項(xiàng)目的位置,而對(duì)于修改和刪除舊項(xiàng)目,他們已經(jīng)知道舊項(xiàng)目的地址了,否則就無(wú)法操作。對(duì)于刪除,這里的實(shí)現(xiàn)為地址簡(jiǎn)單地順移到下一個(gè)項(xiàng)目或者越界。
- 現(xiàn)在,讓我們添加路由:
def app_factory(args=()):
app = web.Application()
app.router.add_get('/todos/', get_all_todos, name='all_todos')
app.router.add_post('/todos/', create_todo, name='create_todo',
expect_handler=web.Request.json)
app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
app.router.add_patch('/todos/{id:\d+}', update_todo, name='update_todo')
app.router.add_delete('/todos/{id:\d+}', remove_todo, name='remove_todo')
return app
- 重啟服務(wù)器,開(kāi)始測(cè)試吧。使用requests測(cè)試POST:
import requests, json
#
body = json.dumps({u"name": u"feed the api"})
url = u"http://localhost:8080/todos/"
#
r = requests.post(url=url, data=body)
r.content
可以看到,requests幫我們處理了重定向,成功訪問(wèn)到新建的項(xiàng)目:

其它請(qǐng)自行嘗試。
- 若使用postman類(lèi)似工具測(cè)試可對(duì)代碼稍作修改。
持久化
- 我們需要安裝psycopg2、aiopg和sqlalchemy,在虛擬環(huán)境下
python -m pip install psycopg2 aiopg sqlalchemy
- 你可能會(huì)問(wèn)為什么選擇aiopg而不是asyncpg,asyncpg可是要快上3倍。因?yàn)閼?yīng)用速度不是一切,我還重視在應(yīng)用程序運(yùn)行時(shí)編寫(xiě)和維護(hù)代碼的時(shí)間。最重要的是,編寫(xiě)原始SQL可能很乏味。如果我確定aiopg是瓶頸,并且asyncpg會(huì)解決它,那我才會(huì)使用asyncpg。
- 此外,老實(shí)說(shuō),如果我真的重視應(yīng)用程序的速度,我不會(huì)在Python中編寫(xiě)它。
啟動(dòng)數(shù)據(jù)庫(kù)
- 這里使用Postgres(windows直接下載安裝)是因?yàn)槲业膫€(gè)人偏好,也可選擇aiomysql或aioodbc。
- windows下安裝好后,開(kāi)始菜單 > PostgreSQL > PSQL 進(jìn)入SQL命令行
CREATE ROLE aiotodo LOGIN PASSWORD '12345' NOINHERIT CREATEDB;
CREATE DATABASE aiotodo;
import psycopg2
#
conn = psycopg2.connect("dbname=aiotodo user=aiotodo password=12345")
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS test")
cur.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def"))
cur.execute("SELECT * FROM test;")
print(cur.fetchone())
conn.commit()
cur.close()
conn.close()
- 除了自己安裝環(huán)境,也可使用Postgres docker鏡像:
mkdir -p tmp/pgdata
docker run -d --name postgres -p 5432:5432 \
-v $(pwd)/tmp/pgdata:/var/lib/postgres/data \
-e POSTGRES_USER=aiotodo -e POSTGRES_PASSWORD=12345 -e POSTGRES_DB=aiotodo \
postgres
將應(yīng)用程序連接到數(shù)據(jù)庫(kù)
- 讓我們連接數(shù)據(jù)庫(kù)并創(chuàng)建表,同時(shí)創(chuàng)建表的SQLAlchemy視圖。
from aiopg.sa import create_engine
import sqlalchemy as sa
# 表的SQLAlchemy視圖
metadata = sa.MetaData()
todos_tbl = sa.Table(
'todos', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(255), unique=True, nullable=False),
sa.Column('finished', sa.Boolean(), default=False, nullable=False)
)
# 創(chuàng)建表
async def create_table(engine):
async with engine.acquire() as conn:
await conn.execute('DROP TABLE IF EXISTS todos')
await conn.execute('''CREATE TABLE todos (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL UNIQUE,
finished BOOLEAN NOT NULL DEFAULT FALSE
)''')
- 你最后一次親手寫(xiě)DDL-數(shù)據(jù)庫(kù)定義語(yǔ)言是什么時(shí)候?有不少方法可以將這個(gè)應(yīng)用掛接上alembic來(lái)幫助我們管理創(chuàng)建表,不過(guò)暫時(shí)不討論這個(gè)問(wèn)題。
- 在SQLAlchemy中,原始SQL和ORM 之間還有一個(gè)核心層,正是我們使用的:
async def attach_db(app):
app['db'] = await create_engine(
' '.join([
# 或改為你的數(shù)據(jù)庫(kù)配置
'host=localhost',
'port=5432',
'dbname=aiotodo',
'user=aiotodo',
'password=12345'
])
)
async def teardown_db(app):
app['db'].close()
await app['db'].wait_closed()
app['db'] = None
async def populate_initial_values(engine):
async with engine.acquire() as conn:
await conn.execute(todos_tbl.insert().values({'name': 'Start this tutorial', 'finished': True}))
await conn.execute(todos_tbl.insert().values({'name': 'Finish this tutorial', 'finished': False}))
async def setup_todo_table(app):
await create_table(app['db'])
await populate_initial_values(app['db'])
- 上述函數(shù)會(huì)作為應(yīng)用中的信號(hào)處理器(在應(yīng)用中有四種可以集成的默認(rèn)信號(hào))。它們都接收應(yīng)用對(duì)象作為唯一的參數(shù):
-
on_loop_available:當(dāng)loop以同步方式可用時(shí)被觸發(fā),因此任何異步工作都要顯式地使用應(yīng)用loop。這是(當(dāng)前)唯一的同步處理程序。 -
on_startup:在應(yīng)用程序開(kāi)始之前觸發(fā),這對(duì)于設(shè)置后臺(tái)任務(wù)(如長(zhǎng)輪詢?nèi)蝿?wù))非常有用。 -
on_teardown:在應(yīng)用程序收到來(lái)自呼叫者的關(guān)閉信號(hào)且請(qǐng)求完成后觸發(fā)。在這里,我們應(yīng)該拆除我們建立的任何東西,并關(guān)閉到遠(yuǎn)程服務(wù)的長(zhǎng)連接。 -
on_cleanup:在拆卸(teardown)完成后啟動(dòng),允許最終的清理步驟運(yùn)行,例如拆卸因?yàn)榻M件之間的依賴關(guān)系而無(wú)法在拆卸步驟清理的對(duì)象。
-
- 我發(fā)現(xiàn)
on_startup和on_teardown是我最常用的,但我也碰到過(guò)一兩次有必要使用on_loop_available的時(shí)候(如一個(gè)集成zeep到aiohttp的應(yīng)用,啟動(dòng)時(shí)要使用loop執(zhí)行工作而不是await)。 - 要將這些信號(hào)處理程序掛接到應(yīng)用上,我們只需把它們附加到相應(yīng)的信號(hào)上。另外,我們?cè)趹?yīng)用工廠中添加一個(gè)可選參數(shù),該參數(shù)決定是否執(zhí)行表的創(chuàng)建和初始化。
def app_factory(args=()):
app = web.Application()
app.on_startup.append(attach_db)
# app.on_teardown.append(teardown_db)
# 原文為on_teardown但實(shí)測(cè)windows下aiohttp(2.0.7)會(huì)報(bào)錯(cuò),改為
app.on_shutdown.append(teardown_db)
if '--make-table' in args:
app.on_startup.append(setup_todo_table)
app.router.add_get('/todos/', get_all_todos, name='all_todos')
app.router.add_post('/todos/', create_todo, name='create_todo',
expect_handler=web.Request.json)
app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
app.router.add_patch('/todos/{id:\d+}', update_todo, name='update_todo')
app.router.add_delete('/todos/{id:\d+}', remove_todo, name='remove_todo')
return app
- 讓我們重新運(yùn)行應(yīng)用并使用新命令創(chuàng)建表:
python -m aiohttp.web -P 8080 --make-table aiotodo:app_factory
- 如果一切正常,新表已被創(chuàng)建,雖然現(xiàn)在所有代辦事項(xiàng)還是從內(nèi)存中獲取,但是我們已經(jīng)成功地將應(yīng)用連接上了數(shù)據(jù)庫(kù)。當(dāng)我們?nèi)∠?wù)時(shí)也不會(huì)有任何警告和異常。
- 要注意,
--make-table的每次使用都會(huì)先刪除舊表,如果它存在的話。
在路由處理器中操作數(shù)據(jù)庫(kù)
- 從數(shù)據(jù)庫(kù)中檢索,更新,插入和刪除Todos是非常簡(jiǎn)單的。因?yàn)槲覀兘邮盏恼?qǐng)求能夠訪問(wèn)應(yīng)用所具有的連接池,所以可以簡(jiǎn)單地將列表操作替換為
async with塊。以下代碼并不是性能最好的或者說(shuō)適用生產(chǎn)環(huán)境,但作為第一次嘗試它也不差。 - 我們先做簡(jiǎn)單的,從數(shù)據(jù)庫(kù)獲取數(shù)據(jù):
async def get_all_todos(request):
async with request.app['db'].acquire() as conn:
todos = [dict(row.items()) async for row in conn.execute(todos_tbl.select().order_by(todos_tbl.c.id))]
return web.json_response(todos)
async def get_one_todo(request):
id = int(request.match_info['id'])
async with request.app['db'].acquire() as conn:
result = await conn.execute(
todos_tbl.select().where(todos_tbl.c.id == id))
row = await result.fetchone()
if not row:
return web.json_response({'error': 'Todo not found'}, status=404)
return web.json_response(dict(row.items()))
- 上面的函數(shù)應(yīng)當(dāng)覆蓋之前的版本。最大的變化是
get_one_todo,首先,我們獲取查詢結(jié)果集,然后從中得到第一個(gè)結(jié)果。最終,我們檢查結(jié)果是否為空,并相應(yīng)返回。我們也可以像get_all_todos里一樣迭代查詢結(jié)果,但使用fetchone顯得我們的目的更明確,即只要一個(gè)結(jié)果。
async with
- 如果您熟悉上下文管理器,例如
with語(yǔ)句,async with就是異步上下文管理器。最大的區(qū)別在于我們使用async def __aenter__和async def __aexit__而不是def __enter__和def __exit__。這意味著我們可以異步等待代碼塊的初始化和收尾工作。在這里,async with被用來(lái)獲取數(shù)據(jù)庫(kù)連接,最后終止它。 - 你可能看見(jiàn)過(guò)
with await something(或者3.4的with (yield from something))結(jié)構(gòu),這是完全不同的,因?yàn)樗硎疽粋€(gè)返回上下文管理器的協(xié)程。
async for
- 正如
async with,async for是迭代的異步版本。這允許我們迭代某種需要做異步IO操作來(lái)產(chǎn)出值的東西。 - 這里我們使用了async列表推導(dǎo) 來(lái)代替如下的代碼:
results = []
async for x in things:
results.append(x)
- 限制在于你只能在協(xié)程中使用它們。列表推導(dǎo)式中等待協(xié)程也是支持的。還支持異步生成器表達(dá)式。
更新和刪除
- 這些也很簡(jiǎn)單,只需用數(shù)據(jù)庫(kù)連接替換原始列表操作:
async def remove_todo(request):
id = int(request.match_info['id'])
async with request.app['db'].acquire() as conn:
result = await conn.execute(todos_tbl.delete().where(todos_tbl.c.id == id))
if not result.rowcount:
return web.json_response({'error': 'Todo not found'}, status=404)
return web.Response(status=204)
async def update_todo(request):
id = int(request.match_info['id'])
data = await request.json()
if 'finished' not in data:
return web.json_response({'error': '"finished" is a required key'}, status=400)
async with request.app['db'].acquire() as conn:
result = await conn.execute(
todos_tbl.update().where(todos_tbl.c.id == id).values({
'finished': bool(data['finished'])
})
)
if result.rowcount == 0:
return web.json_response({'error': 'Todo not found'}, status=404)
return web.Response(status=204)
- 在更新指定id的項(xiàng)目后通過(guò)檢查結(jié)果的
rowcount屬性來(lái)判斷是否真的更新。 - 要注意的是,
rowcount返回查詢匹配的行數(shù),而不是有多少行被改變。
插入數(shù)據(jù)
from sqlalchemy import sql
async def create_todo(request):
data = await request.json()
if 'name' not in data:
return web.json_response({'error': '"name" is a required field'})
name = data['name']
if not name or not isinstance(name, str):
return web.json_response({'error': '"name" must be a string with at least one character'})
todo = {'name': name, 'finished': bool(data.get('finished', False))}
async with request.app['db'].acquire() as conn:
async with conn.begin():
await conn.execute(todos_tbl.insert().values(todo))
result = await conn.execute(
sql.select([sql.func.max(todos_tbl.c.id).label('id')])
)
new_id = await result.fetchone()
return web.Response(
status=303,
headers={
'Location': str(request.app.router['one_todo'].url_for(id=new_id.id))
}
)
- 這里的插入沒(méi)有返回新項(xiàng)目的id,因?yàn)?code>aiopg.sa沒(méi)有實(shí)現(xiàn)SQLAlchemy核心層所包含的
inserted_primary_key屬性,所以需要再次查詢數(shù)據(jù)庫(kù),從表中獲取最大的id。我們?cè)谝淮谓灰字型瓿缮鲜霾僮鱽?lái)防止競(jìng)態(tài)——兩個(gè)人同時(shí)提交一個(gè)新的todo,且被重定向到同一個(gè)todo項(xiàng)目。
進(jìn)一步
- 類(lèi)視圖用于組織具有多個(gè)HTTP方法的路由。
- Websockets用于多個(gè)客戶端同步狀態(tài)。例如你添加了一個(gè)新項(xiàng)目時(shí)我收到了通知。
- aiohttp-devtools用于自動(dòng)重加載應(yīng)用代碼。
- pytest-asyncio用于測(cè)試。
代碼 - (py3.5.2)
from aiohttp import web
from aiopg.sa import create_engine
import sqlalchemy as sa
from sqlalchemy import sql
# 表的SQLAlchemy視圖
metadata = sa.MetaData()
todos_tbl = sa.Table(
'todos', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(255), unique=True, nullable=False),
sa.Column('finished', sa.Boolean(), default=False, nullable=False)
)
# -----------------------------------路由處理器----------------------------------
# 使用 async with request.app['db'].acquire() as conn 連接數(shù)據(jù)庫(kù)
async def get_all_todos(request):
'''
獲取所有代辦事項(xiàng)
'''
async with request.app['db'].acquire() as conn:
todos = []
async for row in conn.execute(
todos_tbl.select().order_by(todos_tbl.c.id)
):
todos.append(
dict(row.items()))
return web.json_response(todos)
async def get_one_todo(request):
'''
根據(jù)路由中的id參數(shù)獲取指定代辦事項(xiàng)
'''
id = int(request.match_info['id'])
async with request.app['db'].acquire() as conn:
result = await conn.execute(
todos_tbl.select().where(todos_tbl.c.id == id))
row = await result.fetchone()
if not row:
return web.json_response({'error': 'Todo not found'}, status=404)
return web.json_response(dict(row.items()))
async def create_todo(request):
'''
創(chuàng)建一個(gè)新的代辦事項(xiàng)
'''
data = await request.json()
if 'name' not in data:
return web.json_response({'error': '"name" is a required field'})
name = data['name']
if not name or not isinstance(name, str):
return web.json_response(
{'error': '"name" must be a string with at least one character'})
todo = {'name': name, 'finished': bool(data.get('finished', False))}
async with request.app['db'].acquire() as conn:
async with conn.begin():
await conn.execute(todos_tbl.insert().values(todo))
result = await conn.execute(
sql.select([sql.func.max(todos_tbl.c.id).label('id')])
)
new_id = await result.fetchone()
return web.Response(
status=303,
headers={
'Location': str(
request.app.router['one_todo'].url_for(id=new_id.id))
}
)
async def remove_todo(request):
'''
清除指定代辦事項(xiàng)
'''
id = int(request.match_info['id'])
async with request.app['db'].acquire() as conn:
result = await conn.execute(
todos_tbl.delete().where(todos_tbl.c.id == id))
if not result.rowcount:
return web.json_response({'error': 'Todo not found'}, status=404)
return web.Response(status=204)
async def update_todo(request):
'''
更新某一條待辦事項(xiàng)
'''
id = int(request.match_info['id'])
data = await request.json()
if 'finished' not in data:
return web.json_response(
{'error': '"finished" is a required key'}, status=400)
async with request.app['db'].acquire() as conn:
result = await conn.execute(
todos_tbl.update().where(todos_tbl.c.id == id).values({
'finished': bool(data['finished'])
})
)
if result.rowcount == 0:
return web.json_response({'error': 'Todo not found'}, status=404)
return web.Response(status=204)
# -----------------------------數(shù)據(jù)庫(kù)連接初始化相關(guān)操作-----------------------------
async def attach_db(app):
'''
連接數(shù)據(jù)庫(kù)并附加到app
'''
app['db'] = await create_engine(
' '.join([
# 或改為你的數(shù)據(jù)庫(kù)配置
'host=localhost',
'port=5432',
'dbname=aiotodo',
'user=aiotodo',
'password=12345'
])
)
async def teardown_db(app):
'''
關(guān)閉與數(shù)據(jù)庫(kù)的連接
'''
app['db'].close()
await app['db'].wait_closed()
app['db'] = None
async def create_table(engine):
'''
在數(shù)據(jù)庫(kù)中創(chuàng)建新表
'''
async with engine.acquire() as conn:
await conn.execute('DROP TABLE IF EXISTS todos')
await conn.execute('''CREATE TABLE todos (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL UNIQUE,
finished BOOLEAN NOT NULL DEFAULT FALSE
)''')
async def populate_initial_values(engine):
'''
初始化數(shù)據(jù)庫(kù)的內(nèi)容
'''
async with engine.acquire() as conn:
await conn.execute(todos_tbl.insert().values(
{'name': 'Start this tutorial', 'finished': True}))
await conn.execute(todos_tbl.insert().values(
{'name': 'Finish this tutorial', 'finished': False}))
async def setup_todo_table(app):
'''
創(chuàng)建表并初始化內(nèi)容,只需執(zhí)行一次
'''
await create_table(app['db'])
await populate_initial_values(app['db'])
# -----------------------------app工廠 - 設(shè)置信號(hào)與路由處理器----------------------------
def app_factory(args=()):
app = web.Application()
app.on_startup.append(attach_db)
app.on_shutdown.append(teardown_db)
if '--make-table' in args:
app.on_startup.append(setup_todo_table)
app.router.add_get('/todos/', get_all_todos, name='all_todos')
app.router.add_post('/todos/', create_todo, name='create_todo',
expect_handler=web.Request.json)
app.router.add_get('/todos/{id:\d+}', get_one_todo, name='one_todo')
app.router.add_patch('/todos/{id:\d+}', update_todo, name='update_todo')
app.router.add_delete('/todos/{id:\d+}', remove_todo, name='remove_todo')
return app
# 本文件命名為 aiotodo.py
# python -m aiohttp.web -P 8080 --make-table aiotodo:app_factory 初始化數(shù)據(jù)庫(kù)并運(yùn)行
# python -m aiohttp.web -P 8080 aiotodo:app_factory 正常運(yùn)行
# --------------------------------測(cè)試-----------------------------------------
# import requests
# import json
# # 增加
# body = json.dumps({u"name": u"feed the api"})
# url = u"http://localhost:8080/todos/"
# r = requests.post(url=url, data=body)
# print(u'增加', r.content)
# # 修改
# body = json.dumps({u"name": u"feed the api", u"finished": u"true"})
# url = u"http://localhost:8080/todos/2"
# r = requests.patch(url=url, data=body)
# print(u'修改', r.status_code)
# # 獲取
# url = u"http://localhost:8080/todos/"
# r = requests.get(url=url)
# print(u'所有代辦事項(xiàng)為', r.content)
# # 刪除
# url = u"http://localhost:8080/todos/2"
# r = requests.delete(url=url)
# r.status_code
# print(u'刪除', r.status_code)


