Tornado源碼分析手記 —— IOLoop核心實現(xiàn)

最近開始閱讀開源項目的源碼了,鑒于一直用Tornado做項目,就從它著手開始吧。

今天分析的是Tornado的"Killer Technique",哈哈,其實沒那么夸張了。我們知道Tornado采用了與Node.js相同的單線程事件驅(qū)動模型,那么它就需要一個事件輪詢機制,我沒有看過Node.js的源碼,所以不太清楚它的機制。Tornado在IO層面主要使用了兩種解決方案:

  • select
  • epoll
    通過Configurable類實現(xiàn)IOLoop的工廠模式,在*NIX系統(tǒng)下默認采用了epoll的方式。

接下來我們就逐行進行分析,IOLoop類的代碼不會都看,閱讀的順序是從start函數(shù)開始。

if self._running:
  raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
  self._stopped = False
  return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True

這部分其實就是進行了一些狀態(tài)檢測,然后設(shè)置日志,設(shè)置標志位的值。不多啰嗦了。
主要來看下面這個大的循環(huán)體:

with self._callback_lock:
  callbacks = self._callbacks
  self._callbacks = []

首先在線程同步鎖下將所有的callback獲取出來,然后清空原來的callback數(shù)組。

due_timeouts = []
if self._timeouts:
  now = self.time()
  while self._timeouts:
    if self._timeouts[0].callback is None:
      heapq.heappop(self._timeouts)
      self._cancellations -= 1
    elif self._timeouts[0].deadline <= now:
      due_timeouts.append(heapq.heappop(self._timeouts))
    else:
      break
if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)):
  self._cancellations = 0
  self._timeouts = [x for x in self._timeouts if x.callback is not None]
  heapq.heapify(self._timeouts)

這部分比較長,主要是處理了延時事件,支持異步的gen.sleep的實現(xiàn)就與這部分有關(guān)。

首先聲明空數(shù)組due_timeouts來存放被trigger的事件回調(diào),這個timeouts隊列比較有意思,它用到了heapq這個包來保證數(shù)組內(nèi)的元素存放順序是一個堆的結(jié)構(gòu),這樣一來每次取出來的元素都是最小的。然后只需要判斷取出的最小元素是不是過時了,如果這個事件被取消了,那么直接pop掉進入下次循環(huán);如果這個事件過時了,就直接加入待執(zhí)行的due_timeouts中,進入下次循環(huán);如果沒有過時,由于這是最小元素,所以它后面的元素肯定也沒有過時所以干脆直接跳出循環(huán),接著進行下面的內(nèi)容。

接下來是一個優(yōu)化操作,如果取消的事件多于512個并且大于總數(shù)的一半時,就把timeouts進行清理,清理結(jié)束后再進行堆排序。

下面一部分代碼就不放了,主要是執(zhí)行剛才上面準備好的callbacks和timeouts。

if self._callbacks:
  poll_timeout = 0.0
elif self._timeouts:
  poll_timeout = self._timeouts[0].deadline - self.time()
  poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
  poll_timeout = _POLL_TIMEOUT

這里又進行了一次callbacks的檢查,如果有需要執(zhí)行的回調(diào),那么就讓poll等待的時間為0,如果有timeouts,就讓poll等待的時間為還有多久觸發(fā)timeout事件的時間,同時這個時間不能超過預設(shè)的最長時間。

接下來進行了一次狀態(tài)檢測,如果IOLoop已經(jīng)停止,那么跳出循環(huán)。

try:
  event_pairs = self._impl.poll(poll_timeout)

開始等待IO事件了。

self._events.update(event_pairs)
while self._events:
  fd, events = self._events.popitem()
  try:
    fd_obj, handler_func = self._handlers[fd]
    handler_func(fd_obj, events)
  except (OSError, IOError) as e:
    if errno_from_exception(e) == errno.EPIPE:
      pass
    else:
      self.handle_callback_exception(self._handlers.get(fd))
   except Exception:
      self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None

這里得到IO事件的觸發(fā)者,然后得到它的處理函數(shù),并且執(zhí)行這個函數(shù),然后進行異常處理。

這就是IOLoop的大致思路,通過IOLoop,我們就可讓一個工作變成一組可序列化并且粒度足夠小的事件,依次執(zhí)行。通過select/epoll機制來實現(xiàn)同時對多個socket進行同時處理,避免輪詢浪費CPU時間,是效率高的關(guān)鍵因素。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 抽空看了下tornado的源碼,會將幾個關(guān)鍵部分以專欄文章的形式記錄下來,算是對學習的一個匯總,也希望能對使用to...
    throwsterY閱讀 786評論 2 3
  • 本文將通過最簡單的Hello world代碼分析Tornado本身的結(jié)構(gòu),在此基礎(chǔ)上再簡單介紹下Tornado適用...
    靡不有初LB閱讀 3,076評論 0 7
  • epoll概述 epoll是linux中IO多路復用的一種機制,I/O多路復用就是通過一種機制,一個進程可以監(jiān)視多...
    發(fā)仔很忙閱讀 11,098評論 4 35
  • Tornado 所謂的異步:就是你調(diào)用我之后,我發(fā)現(xiàn)數(shù)據(jù)沒準備好,那我就不處理,而是跳到程序的其他地方繼續(xù)執(zhí)行,等...
    OMSobliga閱讀 6,257評論 3 11
  • 名稱 libev - 一個 C 編寫的功能全面的高性能事件循環(huán)。 概要 示例程序 關(guān)于 libev Libev 是...
    hanpfei閱讀 15,542評論 0 5

友情鏈接更多精彩內(nèi)容