Tornado異步原理詳析

原創(chuàng)文章出自公眾號:「碼農富哥」,如需轉載請請注明出處!
文章如果對你有收獲,可以收藏轉發(fā),這會給我一個大大鼓勵喲!另外可以關注我公眾號「碼農富哥」 (搜索id:coder2025),我會持續(xù)輸出Python,算法,計算機基礎的 原創(chuàng) 文章

Tornado是什么?

Tornado是一個用Python編寫的異步HTTP服務器,同時也是一個web開發(fā)框架。
Tornado 優(yōu)秀的大并發(fā)處理能力得益于它的 web server 從底層開始就自己實現了一整套基于 epoll 的單線程異步架構。

同步、異步編程差異

  • 對于同步阻塞型Web服務器,我們來打個比方,將它比作一間飯館,而Web請求就是來這家飯館里吃飯的客人。假設飯館店里只有20個座位,那么同時能夠就餐的客人數也就是20,剩下的客人被迫就在店門外等,如果客人們吃的太慢了,那么外面的客人等得不耐煩了,就會走掉(timeout)。

  • 對于異步非阻塞型服務器,我們打另一個比方,將它比作一家超市,客人們想進就能進,前往貨架拿他們想要的貨物,然后再去收銀臺結賬(callback),假設,這家超市只有20個收銀臺,卻可以同時滿足成百上千人的購物需求。和購物的時間長度比起來,結賬的時間基本可以忽略不計。

大部分Web應用都是阻塞性質的,也就是說當一個請求被處理時,這個進程就會被掛起直至請求完成。
假設你正在寫一個需要請求一些來自其他服務器上的數據(比如數據庫服務,調用其他http 接口獲取數據)的應用程序,這幾個請求假設需要花費5秒鐘,大多數的web開發(fā)框架中處理請求的代碼:

def handler_request(self, request):
    answ = self.remote_server.query(request) # 耗時5秒
    request.write_response(answ)

如果這些代碼運行在單個線程中,你的服務器只能每5秒接收一個客戶端的請求。在這5秒鐘的時間里,服務器不能干其他任何事情,所以,你的服務效率是每秒0.2個請求, 這樣的效率時不能接受。

大部分服務器會使用多線程技術來讓服務器一次接收多個客戶端的請求,我們假設你有20個線程,你將在性能上獲得20倍的提高,所以現在你的服務器效率是每秒接受4個請求,但這還是太低了。
當然,你可以通過不斷地提高線程的數量來解決這個問題,但是,線程在內存和調度方面的開銷是昂貴的,大多數Linux發(fā)布版中都是默認線程堆大小為8MB。為每個打開的連接維護一個大的線程池等待數據極易迅速耗光服務器的內存資源??赡苓@種提高線程數量的方式將永遠不可能達到每秒100個請求的效率。

如果使用異步IO(asynchronous IO AIO),達到每秒上千個請求的效率是非常輕松的事情。服務器請求處理的代碼將被改成這樣:

def handler_request(self, request):
   self.remote_server.query_async(request, self.response_received)     
def response_received(self, request, answ):    #回調函數 耗時5秒
   request.write(answ)

AIO的思想是當我們在等待結果的時候不阻塞,轉而我們給框架一個回調函數作為參數,讓框架在收到結果的時候通過回調函數繼續(xù)操作。這樣,服務器就可以被解放去接受其他客戶端的請求了。

IO復用 Epoll

tornado.ioloop 就是 tornado web server 異步最底層的實現。
看 ioloop 之前,我們需要了解一些預備知識,有助于我們理解 ioloop。

ioloop 的實現基于 epoll ,那么什么是 epoll? epoll 是Linux內核為處理大批量文件描述符而作了改進的 poll 。
那么什么又是 poll ? 首先,我們回顧一下, socket 通信時的服務端,當它接受( accept )一個連接并建立通信后( connection )就進行通信,而此時我們并不知道連接的客戶端有沒有信息發(fā)完。 這時候我們有兩種選擇:

  1. 一直在這里等著直到收發(fā)數據結束;
  2. 每隔一定時間來看看這里有沒有數據;

第一種辦法雖然可以解決問題,但我們要注意的是對于一個線程\進程同時只能處理一個 socket 通信,其他連接只能被阻塞,顯然這種方式在單進程情況下不現實。

第二種辦法要比第一種好一些,多個連接可以統(tǒng)一在一定時間內輪流看一遍里面有沒有數據要讀寫,看上去我們可以處理多個連接了,這個方式就是 poll / select 的解決方案。 看起來似乎解決了問題,但實際上,隨著連接越來越多,輪詢所花費的時間將越來越長,而服務器連接的 socket 大多不是活躍的,所以輪詢所花費的大部分時間將是無用的。

為了解決這個問題, epoll 被創(chuàng)造出來,它的概念和 poll 類似,不過每次輪詢時,他只會把有數據活躍的 socket 挑出來輪詢,這樣在有大量連接時輪詢就節(jié)省了大量時間。

對于 epoll 的操作,其實也很簡單,只要 4 個 API 就可以完全操作它。

epoll_create

用來創(chuàng)建一個 epoll 描述符( 就是創(chuàng)建了一個 epoll )

epoll_ctl

對epoll 事件操作,包括以下操作:
EPOLL_CTL_ADD 添加一個新的epoll事件
EPOLL_CTL_DEL 刪除一個epoll事件
EPOLL_CTL_MOD 改變一個事件的監(jiān)聽方式

epoll監(jiān)聽的事件七種,而我們只需要關心其中的三種:
EPOLLIN 緩沖區(qū)滿,有數據可讀(read)
EPOLLOUT 緩沖區(qū)空,可寫數據 (write)
EPOLLERR 發(fā)生錯誤 (error)

epoll_wait

就是讓 epoll 開始工作,里面有個參數 timeout,當設置為非 0 正整數時,會監(jiān)聽(阻塞) timeout 秒;設置為 0 時立即返回,設置為 -1 時一直監(jiān)聽。
在監(jiān)聽時有數據活躍的連接時其返回活躍的文件句柄列表(此處為 socket 文件句柄)。

close

關閉 epoll

IO復用詳解可以參考我另外一篇文章: IO復用模型同步,異步,阻塞,非阻塞及實例詳解

IOLoop模塊

讓我們通過查看ioloop.py文件直接進入服務器的核心。這個模塊是異步機制的核心。它包含了一系列已經打開的文件描述符(文件指針)和每個描述符的處理器(handlers)。它的功能是選擇那些已經準備好讀寫的文件描述符,然后調用它們各自的處理器(一種IO多路復用的實現,select / epoll)。
可以通過調用add_handler()方法將一個socket加入IO循環(huán)中:

"""為文件描述符注冊指定處理器(callback),當文件描述指定的事件發(fā)生"""    
def add_handler(self, fd, handler, events):    
   self._handlers[fd] = handler   
   self._impl.register(fd, events | self.ERROR)

_handlers這個字典類型的變量保存著文件描述符(其實就是socket)到當該文件描述符準備好時需要調用的方法的映射(在Tornado中,該方法被稱為處理器)。然后,文件描述符被注冊到epoll列表中。Tornado關心三種類型的事件(指發(fā)生在文件描述上的事件):READ,WRITE 和 ERROR。正如你所見,ERROR是默認為你自動添加的。
self._impl是select.epoll()selet.select()兩者中的一個
現在讓我們來看看實際的主循環(huán),這段代碼被放在了start()方法中:

def start(self):
    """Starts the I/O loop.
    The loop will run until one of the I/O handlers calls stop(), which
    will make the loop stop after the current event iteration completes.
    """
    self._running = True
    while True: # 開始事件循環(huán) Event Loop 
        [ ... ]
        if not self._running:
            break
        [ ... ]
        try:
            event_pairs = self._impl.poll(poll_timeout)  # 通過epoll/select機制返回有事件返回的(fd: events)的鍵值對
        except Exception, e:
            if e.args == (4, "Interrupted system call"):
                logging.warning("Interrupted system call", exc_info=1)
                continue
            else:
                raise
        # Pop one fd at a time from the set of pending fds and run
        # its handler. Since that handler may perform actions on
        # other file descriptors, there may be reentrant calls to
        # this IOLoop that update self._events
        self._events.update(event_pairs) # 更新所有準備好的事件列表
        while self._events:
            fd, events = self._events.popitem() # 循環(huán)逐個彈出可以待執(zhí)行的socket和事件
            try:
                self._handlers[fd](fd, events) # 之前通過add_handler注冊的fd和回調函數, 到這里就可以執(zhí)行相對應的回調函數了
            except KeyboardInterrupt:
                raise
            except OSError, e:
                if e[0] == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
            except:
                logging.error("Exception in I/O handler for fd %d",
                              fd, exc_info=True)

這就是異步的核心組件IOLoop 的核心工作,我們來看看它的工作流程:

  • 開始一個事件循環(huán) Event Loop ,用于監(jiān)測被注冊到這里的fd(非阻塞socket), 如果有執(zhí)行事件發(fā)生,就執(zhí)行相應回調函數
  • event_pairs = self._impl.poll(poll_timeout) 通過epoll/select機制返回有事件返回的(fd: events)的鍵值對
  • self._events.update(event_pairs) # 更新所有準備好的事件列表
  • while self._events: 循環(huán)這個事件列表,循環(huán)逐個彈出可以待執(zhí)行的socket和事件
  • self._handlers[fd](fd, events) 之前通過add_handler注冊的fd和回調函數, 到這里就可以執(zhí)行相對應的回調函數了

通過上面介紹的add_handler注冊socket->callback,這個start()功能就是tornado開啟的一個單線程事件IO循環(huán),用于監(jiān)測所有非阻塞socket的事件,只要被注冊的socket事件發(fā)生了,就執(zhí)行注冊時的回調函數。
具體到實際就是可以分為這兩種情況:

  • 監(jiān)聽連接: 一開始創(chuàng)建的一個服務器端socket監(jiān)聽端口,等待客戶端連接,這時通過setblocking(0)設置這個socket 非阻塞,然后add_handler(socket, handler_connection, READ) 注冊這個socket的可讀事件,只有要新連接過來,就會觸發(fā)讀事件,handler_connection這個回調函數就執(zhí)行。
  • 請求其他數據時: 比如http_client.fetch()的connected,recvfrom的socket都會設置成nonblocking非阻塞,同時add_hanndler注冊,等待事件發(fā)生,并調用回調函數。

例子:一個建議的服務器監(jiān)聽:

def connection_ready(sock, fd, events):
    while True:
        try:
            connection, address = sock.accept()
        except socket.error as e:
            if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
        connection.setblocking(0)
        handle_connection(connection, address)

if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0) # 把監(jiān)聽的socket設置成非阻塞
    sock.bind(("", port))
    sock.listen(128)

    io_loop = tornado.ioloop.IOLoop.current()
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ) # 注冊這個服務器端監(jiān)聽socket可讀事件,同時注冊這個回調函數
    io_loop.start()
tornado 異步原理.jpg

效率對比實例代碼


"""異步抓取網頁"""
class AsyncHandler(RequestHandler):
    @asynchronous
    def get(self):
        http_client = AsyncHTTPClient()
        http_client.fetch("http://www.163.com",
                          callback=self.on_fetch)

    def on_fetch(self, response):
        print response
        self.write('done')
        self.finish()


"""同步抓取網頁"""
class SyncHandler(RequestHandler):
    def get(self):
        http_client = HTTPClient()
        response = http_client.fetch("http://www.163.com")
        print response
        self.write('done')

進行壓測測試

# 異步代碼壓測結果
Document Path:          /async_fetch/
Document Length:        4 bytes

Concurrency Level:      5
Time taken for tests:   1.945 seconds
Complete requests:      50
Requests per second:    25.71 [#/sec] (mean)
Time per request:       194.488 [ms] (mean)
Time per request:       38.898 [ms] (mean, across all concurrent requests)
# 同步代碼壓測結果
Document Path:          /sync_fetch/
Concurrency Level:      5
Time taken for tests:   5.423 seconds
Complete requests:      50
Requests per second:    9.22 [#/sec] (mean)
Time per request:       542.251 [ms] (mean)
Time per request:       108.450 [ms] (mean, across all concurrent requests)

可以看出異步比同步的性能高很多

總結

  • Tornado的異步條件:要使用到異步,就必須把IO操作變成非阻塞的IO。
  • Tornado的異步原理: 單線程的torndo打開一個IO事件循環(huán), 當碰到IO請求(新鏈接進來 或者 調用api獲取數據),由于這些IO請求都是非阻塞的IO,都會把這些非阻塞的IO socket 扔到一個socket管理器,所以,這里單線程的CPU只要發(fā)起一個網絡IO請求,就不用掛起線程等待IO結果,這個單線程的事件繼續(xù)循環(huán),接受其他請求或者IO操作,如此循環(huán)。

參考

http://www.cnblogs.com/yiwenshengmei/archive/2011/06/08/understanding_tornado.html
https://github.com/tornadoweb/tornado/blob/master/tornado/ioloop.py#L928
http://blog.csdn.net/wyx819/article/details/45420017
https://www.rapospectre.com/blog/34
http://golubenco.org/understanding-the-code-inside-tornado-the-asynchronous-web-server-powering-friendfeed.html
https://www.futures.moe/writings/introduction-for-tornado-async-programming.htm

最后

原創(chuàng)文章出自公眾號:「碼農富哥」,如需轉載請請注明出處!
文章如果對你有收獲,可以收藏轉發(fā),這會給我一個大大鼓勵喲!另外可以關注我公眾號「碼農富哥」 (搜索id:coder2025),我會持續(xù)輸出Python,算法,計算機基礎的 原創(chuàng) 文章

掃碼關注我:碼農富哥
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容