Tornado異步非阻塞詳解

前言:鑒于Google了大片關(guān)于Tornado框架關(guān)于其異步非阻塞的實現(xiàn)方法和緣由結(jié)果都不盡理想,在此寫一篇個人了解的博客來向諸位解釋Tornado的異步非阻塞的原理和實現(xiàn)方法,在此感謝前人栽樹!

異步非阻塞是針對另一請求來說的,本次的請求該是阻塞的仍然是阻塞的,這跟Python里面的異步是不一樣的,Python里面的異步是指異步的代碼段獨立執(zhí)行,原代碼中會持續(xù)執(zhí)行異步代碼段下面的代碼

系統(tǒng):MAC
python:3.6
Tornado:6.1
接口測試:ab

一、異步實現(xiàn)

1.使用 gen.coroutine 異步編程

在 Tornado 中兩個裝飾器:

  • tornado.web.asynchronous :長連接裝飾器 python2中,Python3中取消了
  • tornado.gen.coroutine :協(xié)程模式裝飾器 跟python3中 async await功能一致
    asynchronous 裝飾器是讓請求變成長連接的方式,必須手動調(diào)用 self.finish() 才會響應(yīng)
class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        # bad 
        self.write("Hello, world")

asynchronous 裝飾器不會自動調(diào)用self.finish() ,如果沒有沒有指定結(jié)束,該長連接會一直保持直到 pending 狀態(tài)。

peding

所以正確是使用方式是使用了 asynchronous 需要手動 finish

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()

coroutine 裝飾器是指定改請求為協(xié)程模式,說明白點就是能使用 yield 配合 Tornado 編寫異步程序。

Tronado 為協(xié)程實現(xiàn)了一套自己的協(xié)議,不能使用 Python 普通的生成器。

在使用協(xié)程模式編程之前要知道如何編寫 Tornado 中的異步函數(shù),Tornado 提供了多種的異步編寫形式:回調(diào)、Future、協(xié)程等,其中以協(xié)程模式最是簡單和用的最多。

編寫一個基于協(xié)程的異步函數(shù)同樣需要 coroutine 裝飾器

class SleepHandler(BaseHandler):
    """
    異步的延時10秒的接口
    """
    @gen.coroutine
    def get(self):
        yield gen.sleep(10)
        self.write("when i sleep 5s")

使用 coroutine 方式有個很明顯是缺點就是嚴重依賴第三方庫的實現(xiàn),如果庫本身不支持 Tornado 的異步操作再怎么使用協(xié)程也是白搭依然會是阻塞的,放個例子感受一下。

class SyncSleepHandler(BaseHandler):
    """
    同步的方式,一個延時10s的接口
    """
    def get(self):
        print(3)
        time.sleep(10)
        print(4)
        self.write("when i sleep 10s")


class SleepHandler(BaseHandler):
    """
    異步的延時10秒的接口,gen.sleep支持Tornado異步
    """
    @gen.coroutine
    def get(self):
        print(1)
        yield gen.sleep(10)
        print(2)
        self.write("when i sleep 10s")


class NoSleepHandler(BaseHandler):
    """
    time庫不支持Tornado異步
    """
    @gen.coroutine
    def get(self):
        print(5)
        yield time.sleep(5)
        print(6)
        self.write("when i sleep 10s")

1 .運行SleepHandler接口,執(zhí)行異步非阻塞,執(zhí)行100個并發(fā),通過打印發(fā)現(xiàn)一個線程在一個請求在執(zhí)行gen.sleep(10)并沒有阻塞其他請求,正因為如此所以它是異步非阻塞的。

image.png

image.png

2 .運行SyncSleepHandler接口,執(zhí)行同步阻塞,執(zhí)行10個并發(fā)(由于同步100個等待時間太長,減少到10個,并不影響),通過打印發(fā)現(xiàn)一個線程在一個請求執(zhí)行time.sleep(10)的時候會阻塞其他請求,直到該請求完畢后才會執(zhí)行下一個請求
image.png

image.png

3 . 運行NoSleepHandler接口,執(zhí)行不支持Tornado異步的協(xié)程接口,5個并發(fā)(理由如上),通過ab發(fā)現(xiàn)5個并發(fā)數(shù)耗時25秒,沒有實現(xiàn)異步,在第二圖中打印出來發(fā)現(xiàn)接口存在任務(wù)調(diào)度,執(zhí)行第二個接口后沒等接口執(zhí)行完畢,系統(tǒng)線程任務(wù)調(diào)度暫停第二個接口,執(zhí)行第三個接口以此類推,這就表明使用@gen.coroutine如果庫本身不支持Tornado 的異步操作再怎么使用協(xié)程也是白搭依然會是阻塞的,
image.png

image.png

2.基于線程的異步編程

使用 gen.coroutine 裝飾器編寫異步函數(shù),如果庫本身不支持異步,那么響應(yīng)任然是阻塞的。

在 Tornado 中有個裝飾器能使用 ThreadPoolExecutor 來讓阻塞過程變成非阻塞,其原理是在 Tornado 本身這個線程之外另外啟動一個線程來執(zhí)行阻塞的程序,從而讓 Tornado 變得阻塞。

futures 在 Python3 是標準庫,但是在 Python2 中需要手動安裝
pip install futures

class ThreadSleepHandler(BaseHandler):
    """
    time庫不支持Tornado異步
    """
    # 必須定義一個executor的屬性,然后run_on_executor 注解才管用。
    executor = ThreadPoolExecutor(max_workers=4)

    @gen.coroutine
    def get(self):
        print(5)
        yield self.sleep_fun()
        print(6)
        self.write("when i sleep 10s")

    @run_on_executor
    def sleep_fun(self):
        time.sleep(5)

通過下圖發(fā)現(xiàn)5個并發(fā)只需10秒,實現(xiàn)了異步非阻塞
但是與之而來的問題是,如果大量使用線程化的異步函數(shù)做一些高負載的活動,會導致該 Tornado 進程性能低下響應(yīng)緩慢,這只是從一個問題到了另一個問題而已。

所以在處理一些小負載的工作,是能起到很好的效果,讓 Tornado 異步非阻塞的跑起來。

但是明明知道這個函數(shù)中做的是高負載的工作,那么你應(yīng)該采用另一種方式,使用 Tornado 結(jié)合 Celery 來實現(xiàn)異步非阻塞。


image.png
image.png
3.基于 Celery 的異步編程

先編寫一個異步任務(wù)

import time

from celery import Celery

app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672"

@app.task
def sleep_fun(second):
    time.sleep(second)
    return 'ok'

if __name__ == "__main__":
    app.start()

然后啟動celery celery -A apps.foo.tasks.app worker --loglevel=info

class CelerySleepHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        print(7)
        response = yield gen.Task(tasks.sleep_fun.apply_async, args=[5])
        print(8)

        self.write("when i sleep 10s")

調(diào)用該接口就會發(fā)現(xiàn)結(jié)過跟異步非阻塞一致

Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng),專注于實時處理的任務(wù)隊列,同時也支持任務(wù)調(diào)度。

Celery 并不是唯一選擇,你可選擇其他的任務(wù)隊列來實現(xiàn),但是 Celery 是 Python 所編寫,能很快的上手,同時 Celery 提供了優(yōu)雅的接口,易于與 Python Web 框架集成等特點。

與 Tornado 的配合可以使用 tornado-celery ,該包已經(jīng)把 Celery 封裝到 Tornado 中,可以直接使用。

實際測試中,由于 tornado-celery 很久沒有更新,導致請求會一直阻塞,不會返回
解決辦法是:

  1. 把 celery 降級到 3.1 pip install celery==3.1
  2. 把 pika 降級到 0.9.14 pip install pika==0.9.14
4.python的原生協(xié)程關(guān)鍵字:Async和Await

它們的底層基于生成器函數(shù),使得協(xié)程的實現(xiàn)更加方便。

Async 用來聲明一個函數(shù)為異步函數(shù),異步函數(shù)的特點是能在函數(shù)執(zhí)行過程中掛起,去執(zhí)行其他異步函數(shù),等到掛起條件(假設(shè)掛起條件是sleep(5))消失后,也就是5秒到了再回來執(zhí)行。

Await 用來用來聲明程序掛起,比如異步程序執(zhí)行到某一步時需要等待的時間很長,就將此掛起,去執(zhí)行其他的異步程序
首先我們先來看一個程序(其中 asyncio 庫是支持異步的)

class AsynchronousSleepHandler(BaseHandler):


    async def get(self):
        print(5)
        await asyncio.sleep(5)
        print(6)
        self.write("when i sleep 10s")

通過打印可知是異步非阻塞的,想問一下為什么是10s呢 ,5個并發(fā)休眠5秒不應(yīng)該是5秒嗎?為何第一個執(zhí)行完畢才開始并發(fā)執(zhí)行下面的?


image.png

image.png

參考文獻:

真正的 Tornado 異步非阻塞

歡迎參觀個人博客:

不愛去冒險的少年y

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容