最近開始閱讀開源項目的源碼了,鑒于一直用Tornado做項目,就從它著手開始吧。
今天分析的是Tornado的"Killer Technique",哈哈,其實沒那么夸張了。我們知道Tornado采用了與Node.js相同的單線程事件驅(qū)動模型,那么它就需要一個事件輪詢機制,我沒有看過Node.js的源碼,所以不太清楚它的機制。Tornado在IO層面主要使用了兩種解決方案:
- select
- epoll
通過Configurable類實現(xiàn)IOLoop的工廠模式,在*NIX系統(tǒng)下默認采用了epoll的方式。
接下來我們就逐行進行分析,IOLoop類的代碼不會都看,閱讀的順序是從start函數(shù)開始。
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
這部分其實就是進行了一些狀態(tài)檢測,然后設(shè)置日志,設(shè)置標志位的值。不多啰嗦了。
主要來看下面這個大的循環(huán)體:
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
首先在線程同步鎖下將所有的callback獲取出來,然后清空原來的callback數(shù)組。
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts if x.callback is not None]
heapq.heapify(self._timeouts)
這部分比較長,主要是處理了延時事件,支持異步的gen.sleep的實現(xiàn)就與這部分有關(guān)。
首先聲明空數(shù)組due_timeouts來存放被trigger的事件回調(diào),這個timeouts隊列比較有意思,它用到了heapq這個包來保證數(shù)組內(nèi)的元素存放順序是一個堆的結(jié)構(gòu),這樣一來每次取出來的元素都是最小的。然后只需要判斷取出的最小元素是不是過時了,如果這個事件被取消了,那么直接pop掉進入下次循環(huán);如果這個事件過時了,就直接加入待執(zhí)行的due_timeouts中,進入下次循環(huán);如果沒有過時,由于這是最小元素,所以它后面的元素肯定也沒有過時所以干脆直接跳出循環(huán),接著進行下面的內(nèi)容。
接下來是一個優(yōu)化操作,如果取消的事件多于512個并且大于總數(shù)的一半時,就把timeouts進行清理,清理結(jié)束后再進行堆排序。
下面一部分代碼就不放了,主要是執(zhí)行剛才上面準備好的callbacks和timeouts。
if self._callbacks:
poll_timeout = 0.0
elif self._timeouts:
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
這里又進行了一次callbacks的檢查,如果有需要執(zhí)行的回調(diào),那么就讓poll等待的時間為0,如果有timeouts,就讓poll等待的時間為還有多久觸發(fā)timeout事件的時間,同時這個時間不能超過預設(shè)的最長時間。
接下來進行了一次狀態(tài)檢測,如果IOLoop已經(jīng)停止,那么跳出循環(huán)。
try:
event_pairs = self._impl.poll(poll_timeout)
開始等待IO事件了。
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
這里得到IO事件的觸發(fā)者,然后得到它的處理函數(shù),并且執(zhí)行這個函數(shù),然后進行異常處理。
這就是IOLoop的大致思路,通過IOLoop,我們就可讓一個工作變成一組可序列化并且粒度足夠小的事件,依次執(zhí)行。通過select/epoll機制來實現(xiàn)同時對多個socket進行同時處理,避免輪詢浪費CPU時間,是效率高的關(guān)鍵因素。