前言:鑒于Google了大片關(guān)于Tornado框架關(guān)于其異步非阻塞的實現(xiàn)方法和緣由結(jié)果都不盡理想,在此寫一篇個人了解的博客來向諸位解釋Tornado的異步非阻塞的原理和實現(xiàn)方法,在此感謝前人栽樹!
異步非阻塞是針對另一請求來說的,本次的請求該是阻塞的仍然是阻塞的,這跟Python里面的異步是不一樣的,Python里面的異步是指異步的代碼段獨立執(zhí)行,原代碼中會持續(xù)執(zhí)行異步代碼段下面的代碼
系統(tǒng):MAC
python:3.6
Tornado:6.1
接口測試:ab
一、異步實現(xiàn)
1.使用 gen.coroutine 異步編程
在 Tornado 中兩個裝飾器:
-
tornado.web.asynchronous:長連接裝飾器 python2中,Python3中取消了 -
tornado.gen.coroutine:協(xié)程模式裝飾器 跟python3中 async await功能一致
asynchronous 裝飾器是讓請求變成長連接的方式,必須手動調(diào)用self.finish()才會響應(yīng)
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
# bad
self.write("Hello, world")
asynchronous 裝飾器不會自動調(diào)用self.finish() ,如果沒有沒有指定結(jié)束,該長連接會一直保持直到 pending 狀態(tài)。

所以正確是使用方式是使用了 asynchronous 需要手動 finish
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
self.write("Hello, world")
self.finish()
coroutine 裝飾器是指定改請求為協(xié)程模式,說明白點就是能使用 yield 配合 Tornado 編寫異步程序。
Tronado 為協(xié)程實現(xiàn)了一套自己的協(xié)議,不能使用 Python 普通的生成器。
在使用協(xié)程模式編程之前要知道如何編寫 Tornado 中的異步函數(shù),Tornado 提供了多種的異步編寫形式:回調(diào)、Future、協(xié)程等,其中以協(xié)程模式最是簡單和用的最多。
編寫一個基于協(xié)程的異步函數(shù)同樣需要 coroutine 裝飾器
class SleepHandler(BaseHandler):
"""
異步的延時10秒的接口
"""
@gen.coroutine
def get(self):
yield gen.sleep(10)
self.write("when i sleep 5s")
使用 coroutine 方式有個很明顯是缺點就是嚴重依賴第三方庫的實現(xiàn),如果庫本身不支持 Tornado 的異步操作再怎么使用協(xié)程也是白搭依然會是阻塞的,放個例子感受一下。
class SyncSleepHandler(BaseHandler):
"""
同步的方式,一個延時10s的接口
"""
def get(self):
print(3)
time.sleep(10)
print(4)
self.write("when i sleep 10s")
class SleepHandler(BaseHandler):
"""
異步的延時10秒的接口,gen.sleep支持Tornado異步
"""
@gen.coroutine
def get(self):
print(1)
yield gen.sleep(10)
print(2)
self.write("when i sleep 10s")
class NoSleepHandler(BaseHandler):
"""
time庫不支持Tornado異步
"""
@gen.coroutine
def get(self):
print(5)
yield time.sleep(5)
print(6)
self.write("when i sleep 10s")
1 .運行SleepHandler接口,執(zhí)行異步非阻塞,執(zhí)行100個并發(fā),通過打印發(fā)現(xiàn)一個線程在一個請求在執(zhí)行gen.sleep(10)并沒有阻塞其他請求,正因為如此所以它是異步非阻塞的。


2 .運行SyncSleepHandler接口,執(zhí)行同步阻塞,執(zhí)行10個并發(fā)(由于同步100個等待時間太長,減少到10個,并不影響),通過打印發(fā)現(xiàn)一個線程在一個請求執(zhí)行time.sleep(10)的時候會阻塞其他請求,直到該請求完畢后才會執(zhí)行下一個請求


3 . 運行NoSleepHandler接口,執(zhí)行不支持Tornado異步的協(xié)程接口,5個并發(fā)(理由如上),通過ab發(fā)現(xiàn)5個并發(fā)數(shù)耗時25秒,沒有實現(xiàn)異步,在第二圖中打印出來發(fā)現(xiàn)接口存在任務(wù)調(diào)度,執(zhí)行第二個接口后沒等接口執(zhí)行完畢,系統(tǒng)線程任務(wù)調(diào)度暫停第二個接口,執(zhí)行第三個接口以此類推,這就表明使用
@gen.coroutine如果庫本身不支持Tornado 的異步操作再怎么使用協(xié)程也是白搭依然會是阻塞的,

2.基于線程的異步編程
使用 gen.coroutine 裝飾器編寫異步函數(shù),如果庫本身不支持異步,那么響應(yīng)任然是阻塞的。
在 Tornado 中有個裝飾器能使用 ThreadPoolExecutor 來讓阻塞過程變成非阻塞,其原理是在 Tornado 本身這個線程之外另外啟動一個線程來執(zhí)行阻塞的程序,從而讓 Tornado 變得阻塞。
futures 在 Python3 是標準庫,但是在 Python2 中需要手動安裝
pip install futures
class ThreadSleepHandler(BaseHandler):
"""
time庫不支持Tornado異步
"""
# 必須定義一個executor的屬性,然后run_on_executor 注解才管用。
executor = ThreadPoolExecutor(max_workers=4)
@gen.coroutine
def get(self):
print(5)
yield self.sleep_fun()
print(6)
self.write("when i sleep 10s")
@run_on_executor
def sleep_fun(self):
time.sleep(5)
通過下圖發(fā)現(xiàn)5個并發(fā)只需10秒,實現(xiàn)了異步非阻塞
但是與之而來的問題是,如果大量使用線程化的異步函數(shù)做一些高負載的活動,會導致該 Tornado 進程性能低下響應(yīng)緩慢,這只是從一個問題到了另一個問題而已。
所以在處理一些小負載的工作,是能起到很好的效果,讓 Tornado 異步非阻塞的跑起來。
但是明明知道這個函數(shù)中做的是高負載的工作,那么你應(yīng)該采用另一種方式,使用 Tornado 結(jié)合 Celery 來實現(xiàn)異步非阻塞。


3.基于 Celery 的異步編程
先編寫一個異步任務(wù)
import time
from celery import Celery
app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672"
@app.task
def sleep_fun(second):
time.sleep(second)
return 'ok'
if __name__ == "__main__":
app.start()
然后啟動celery celery -A apps.foo.tasks.app worker --loglevel=info
class CelerySleepHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
print(7)
response = yield gen.Task(tasks.sleep_fun.apply_async, args=[5])
print(8)
self.write("when i sleep 10s")
調(diào)用該接口就會發(fā)現(xiàn)結(jié)過跟異步非阻塞一致
Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng),專注于實時處理的任務(wù)隊列,同時也支持任務(wù)調(diào)度。
Celery 并不是唯一選擇,你可選擇其他的任務(wù)隊列來實現(xiàn),但是 Celery 是 Python 所編寫,能很快的上手,同時 Celery 提供了優(yōu)雅的接口,易于與 Python Web 框架集成等特點。
與 Tornado 的配合可以使用 tornado-celery ,該包已經(jīng)把 Celery 封裝到 Tornado 中,可以直接使用。
實際測試中,由于 tornado-celery 很久沒有更新,導致請求會一直阻塞,不會返回
解決辦法是:
- 把 celery 降級到 3.1
pip install celery==3.1- 把 pika 降級到 0.9.14
pip install pika==0.9.14
4.python的原生協(xié)程關(guān)鍵字:Async和Await
它們的底層基于生成器函數(shù),使得協(xié)程的實現(xiàn)更加方便。
Async 用來聲明一個函數(shù)為異步函數(shù),異步函數(shù)的特點是能在函數(shù)執(zhí)行過程中掛起,去執(zhí)行其他異步函數(shù),等到掛起條件(假設(shè)掛起條件是sleep(5))消失后,也就是5秒到了再回來執(zhí)行。
Await 用來用來聲明程序掛起,比如異步程序執(zhí)行到某一步時需要等待的時間很長,就將此掛起,去執(zhí)行其他的異步程序
首先我們先來看一個程序(其中 asyncio 庫是支持異步的)
class AsynchronousSleepHandler(BaseHandler):
async def get(self):
print(5)
await asyncio.sleep(5)
print(6)
self.write("when i sleep 10s")
通過打印可知是異步非阻塞的,想問一下為什么是10s呢 ,5個并發(fā)休眠5秒不應(yīng)該是5秒嗎?為何第一個執(zhí)行完畢才開始并發(fā)執(zhí)行下面的?

