一、協(xié)程
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
執(zhí)行結(jié)果
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函數(shù)是一個generator,把一個consumer傳入produce后:
首先調(diào)用c.send(None)啟動生成器;
然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;
consumer通過yield拿到消息,處理,又通過yield把結(jié)果傳回;
produce拿到consumer處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;
produce決定不生產(chǎn)了,通過c.close()關(guān)閉consumer,整個過程結(jié)束。
整個流程無鎖,由一個線程執(zhí)行,produce和consumer協(xié)作完成任務(wù),所以稱為“協(xié)程”,而非線程的搶占式多任務(wù)
二、asyncio
asyncio是Python 3.4版本引入的標(biāo)準(zhǔn)庫,直接內(nèi)置了對異步IO的支持
asyncio的編程模型就是一個消息循環(huán)。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執(zhí)行的協(xié)程扔到EventLoop中執(zhí)行,就實(shí)現(xiàn)了異步IO
異步操作需要在coroutine中通過yield from完成
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
執(zhí)行結(jié)果
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
可見3個連接由一個線程通過coroutine并發(fā)完成
三、async/await
Python從3.5版本開始為asyncio提供了async和await的新語法;
注意新語法只能用在Python 3.5以及后續(xù)版本,如果使用3.4版本,則使用asyncio
用asyncio提供的@asyncio.coroutine可以把一個generator標(biāo)記為coroutine類型,然后在coroutine內(nèi)部用yield from調(diào)用另一個coroutine實(shí)現(xiàn)異步操作。
為了簡化并更好地標(biāo)識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀。
請注意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:
把@asyncio.coroutine替換為async;
把yield from替換為await。
讓我們對比一下上一節(jié)的代碼:
@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")
用新語法重新編寫如下
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
四、aiohttp
asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實(shí)現(xiàn)的HTTP框架。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
注意aiohttp的初始化函數(shù)init()也是一個coroutine,loop.create_server()則利用asyncio創(chuàng)建TCP服務(wù)。