前言
node timer 是node中一個非常重要的模塊, 幾乎所有的server相關的服務都離不開timer模塊。在node.js 基礎庫中,任何一個TCP I/O模塊都會產生一個timer(定時器)對象,以便記錄請求/響應是否超時。 比如發(fā)起了一個http請求,請求頭上有個connection:keep-alive ,讓服務器維持TCP連接,但是這個連接不可能一直維持,所以會給它設置一個超時時間,一旦超時就會斷開連接,這個超時斷開操作正是通過Timers實現(xiàn)的,即使沒有keep-alive,每次請求的時候 也會設置一個超時時間,避免數(shù)據(jù)遲遲不返回占用server端連接。
所以可以肯定的說,只要你使用node編寫web服務,一定會用到timer
定時器的實現(xiàn)
Node 中的setTimeout setInternal setImmediate API 都在 lib/timers.js中實現(xiàn)
下面我們以setTimeout為例閱讀一下timer的實現(xiàn)源碼
setTimeout定義
function setTimeout(callback, after, arg1, arg2, arg3) {
if (typeof callback !== 'function') {
throw new ERR_INVALID_CALLBACK();
}
var i, args;
switch (arguments.length) {
// fast cases
case 1:
case 2:
break;
case 3:
args = [arg1];
break;
case 4:
args = [arg1, arg2];
break;
default:
args = [arg1, arg2, arg3];
for (i = 5; i < arguments.length; i++) {
// extend array dynamically, makes .apply run much faster in v6.0.0
args[i - 2] = arguments[i];
}
break;
}
const timeout = new Timeout(callback, after, args, false, false);
active(timeout);
return timeout;
}
可以看到setTimeout 接收5個參數(shù),根據(jù)平常的setTimeout使用習慣,我們知道after是我們傳入的定時時間,
參數(shù)的處理我們暫時先按下不表,繼續(xù)往下看,new Timeout跟active(timeout)分別做了什么?
Timeout構造函數(shù)
// Timer constructor function.
// The entire prototype is defined in lib/timers.js
function Timeout(callback, after, args, isRepeat, isUnrefed) {
after *= 1; // coalesce to number or NaN
if (!(after >= 1 && after <= TIMEOUT_MAX)) {
if (after > TIMEOUT_MAX) {
process.emitWarning(`${after} does not fit into` +
' a 32-bit signed integer.' +
'\nTimeout duration was set to 1.',
'TimeoutOverflowWarning');
}
after = 1; // schedule on next tick, follows browser behavior
}
this._called = false;
this._idleTimeout = after;
this._idlePrev = this;
this._idleNext = this;
this._idleStart = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onTimeout = null;
this._onTimeout = callback;
this._timerArgs = args;
this._repeat = isRepeat ? after : null;
this._destroyed = false;
this[unrefedSymbol] = isUnrefed;
initAsyncResource(this, 'Timeout');
}
這個Timeout 生成的timer實例 表示Node.js層面的定時器對象,比如 setTimeout、setInterval返回的對象
var timer = setTimeout(() => {}, 1000);
(劇透一下: 還有一個由底層C++ time_wrap 模塊提供的runtime層面的定時器對象:
const Timer = process.binding('timer_wrap').Timer;)
Timeout構造函數(shù)的整個原型鏈是在lib/timers.js 中定義的,如下述代碼所示
Timeout.prototype.unref = function() {
if (this._handle) {
this._handle.unref();
} else if (typeof this._onTimeout === 'function') {
const now = TimerWrap.now();
if (!this._idleStart) this._idleStart = now;
var delay = this._idleStart + this._idleTimeout - now;
if (delay < 0) delay = 0;
// Prevent running cb again when unref() is called during the same cb
if (this._called && !this._repeat) {
unenroll(this);
return;
}
const handle = reuse(this);
if (handle !== null) {
handle._list = undefined;
}
this._handle = handle || new TimerWrap();
this._handle.owner = this;
this._handle.start(delay);
this._handle.unref();
}
return this;
};
Timeout.prototype.ref = function() {
if (this._handle)
this._handle.ref();
return this;
};
Timeout.prototype.close = function() {
this._onTimeout = null;
if (this._handle) {
if (destroyHooksExist() &&
typeof this[async_id_symbol] === 'number' &&
!this._destroyed) {
emitDestroy(this[async_id_symbol]);
this._destroyed = true;
}
this._idleTimeout = -1;
this._handle.close();
} else {
unenroll(this);
}
return this;
};
注意到 Timeout的原型鏈上有unref 跟ref 兩個方法,他們分別對應的是對refedLists 跟unrefedLists的處理,
(劇透一下,timer有精妙的數(shù)據(jù)結構設計,維持了兩個鏈表,refedLists 是給Node.js外部的定時器(即第三方
代碼)使用的,而 unrefedLists 是給內部模塊(如 net 、http、http2)使用)
平時的代碼編寫中我們可能不會在應用層用到它,它主要是給 TimerList生成的時候用的(埋個伏筆,后續(xù)講到)
active(timer)
active 是激活,如何激活一個定時器呢? 就是把它插入到定時器隊列里去,所以active(timer)主要做的是
insert(timer, false), 我們看一下insert的實現(xiàn)
// The underlying logic for scheduling or re-scheduling a timer.
//
// Appends a timer onto the end of an existing timers list, or creates a new
// TimerWrap backed list if one does not already exist for the specified timeout
// duration.
function insert(item, unrefed, start) {
const msecs = item._idleTimeout;
if (msecs < 0 || msecs === undefined) return;
if (typeof start === 'number') {
item._idleStart = start;
} else {
item._idleStart = TimerWrap.now();
}
const lists = unrefed === true ? unrefedLists : refedLists;
// Use an existing list if there is one, otherwise we need to make a new one.
var list = lists[msecs];
if (list === undefined) {
debug('no %d list was found in insert, creating a new one', msecs);
lists[msecs] = list = new TimersList(msecs, unrefed);
}
if (!item[async_id_symbol] || item._destroyed) {
item._destroyed = false;
initAsyncResource(item, 'Timeout');
}
L.append(list, item);
assert(!L.isEmpty(list)); // list is not empty
}
體現(xiàn)node timer編寫者智慧的時刻到了!
Node.js 會使用一個雙向鏈表 來保存所有定時相同的timer, 對于同一個鏈表中的所有timer ,只會創(chuàng)建一個Timer對象。當鏈表中前面的timer超時的時候,會出發(fā)回調,在回調中重新計算下一次的超時時間,然后重置Timer對象以減少重復Timer對象的創(chuàng)建開銷。
看代碼174行,在timer實例上掛載了一個item._idleStart = TimerWrap.now(); 屬性, 查閱代碼我們知道,這個TimerWrap方法 是由底層C++模塊 time_wrap 提供的。
const {
Timer: TimerWrap,
setupTimers,
} = process.binding('timer_wrap');
timer_wrap 是橋接層 模塊用來封裝一些底層api 給js調用
class TimerWrap : public HandleWrap {
public:
static void Initialize(Local target,
Local unused,
Local context) {
Environment* env = Environment::GetCurrent(context);
Local constructor = env->NewFunctionTemplate(New);
Local timerString = FIXED_ONE_BYTE_STRING(env->isolate(), "Timer");
constructor->InstanceTemplate()->SetInternalFieldCount(1);
constructor->SetClassName(timerString);
env->SetTemplateMethod(constructor, "now", Now);
AsyncWrap::AddWrapMethods(env, constructor);
env->SetProtoMethod(constructor, "close", HandleWrap::Close);
env->SetProtoMethod(constructor, "ref", HandleWrap::Ref);
env->SetProtoMethod(constructor, "unref", HandleWrap::Unref);
env->SetProtoMethod(constructor, "hasRef", HandleWrap::HasRef);
env->SetProtoMethod(constructor, "start", Start);
env->SetProtoMethod(constructor, "stop", Stop);
target->Set(timerString, constructor->GetFunction());
target->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "setupTimers"),
env->NewFunctionTemplate(SetupTimers)
->GetFunction(env->context()).ToLocalChecked()).FromJust();
}
size_t self_size() const override { return sizeof(*this); }
? 可以在timer_wrap.cc 中看到TimeWrap是一個類, 上面定義了一些公有靜態(tài)方法,私有靜態(tài)方法, 在初始化的時候將定義的方法掛載到 contructor原型上,addon導出start,stop, now方法供js層調用。
? TimerWrap.now()方法獲取當前event循環(huán) 時間,賦值給_idleStart(這很重要大家需要留意)
? 繼續(xù)往下走,會定義根據(jù)unrefed 的ture/false 決定使用 unrefedLists/refedLists, 在這里,用的是refedLists。 refedLists是一個空對象,它將存儲的 key值是超時時間, value值是存儲 具有相同超時timer 的雙向列表。它們的key代表這一組定時器的超時時間, key對應的value, 都是一個定時器鏈表,比如 lists[1000]對應的就是由一個或多個超時時間為1000ms的timer組成的鏈表。
當你的代碼中第一次調用定時器方法時,例如:
let timer1 = setTimeout(() => {}, 1000);
這時候,lists對象中是空的,沒有任何鏈表,Timers就會在對應的位置上(這個例子中是lists[1000]) 創(chuàng)建一個TimersList 作為鏈表頭部,并且把剛才創(chuàng)建的新的timer放入鏈表中。
參見代碼:
// Use an existing list if there is one, otherwise we need to make a new one.
var list = lists[msecs];
if (list === undefined) {
debug('no %d list was found in insert, creating a new one', msecs);
lists[msecs] = list = new TimersList(msecs, unrefed);
}
可以試試把它打印出來:
var timer1 = setTimeout(() => {}, 1000)
timer1._idlePrev //=> TimersList {….}
timer1._idlePrev._idleNext //=> timer1
node 在lib/internal/linklist.js中抽象出鏈表的 基礎操作,每個item都是一個個node層的timer。
我們看看TimersList中做了什么
function TimersList(msecs, unrefed) {
this._idleNext = this; // Create the list with the linkedlist properties to
this._idlePrev = this; // prevent any unnecessary hidden class changes.
this._unrefed = unrefed;
this.msecs = msecs;
const timer = this._timer = new TimerWrap();
timer._list = this;
if (unrefed === true)
timer.unref();
timer.start(msecs);
}
初始化鏈表, 實例化底層timer, 如上所述,TimerWrap是橋接層,導出了start方法給js 層調用, 如代碼所示, 實例化操作完成后,執(zhí)行了timer.start(msecs) 方法
TimerWrap(Environment* env, Local object)
: HandleWrap(env,
object,
reinterpret_cast(&handle_),
AsyncWrap::PROVIDER_TIMERWRAP) {
int r = uv_timer_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0);
}
static void Start(const FunctionCallbackInfo& args) {
TimerWrap* wrap = Unwrap(args.Holder());
CHECK(HandleWrap::IsAlive(wrap));
int64_t timeout = args[0]->IntegerValue();
int err = uv_timer_start(&wrap->handle_, OnTimeout, timeout, 0);
args.GetReturnValue().Set(err);
}
首先TimerWrap 會執(zhí)行uv_timer_init方法,
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER);
handle->timer_cb = NULL;
handle->repeat = 0;
return 0;
}
參數(shù)有兩個: 第一個是 主event_loop, 第二個參數(shù)為指向uv_timer_t 的指針, 主要是為了初始化handle
handle-> flags 標示此timer是否已經結束
handle-> type 標示此timer的類型,在deps/uv/doc/handle.rst 中有枚舉定義
handle-> repeat 是否重復執(zhí)行
handle-> timer_cb 超時后需要執(zhí)行的回調
handle->timeout 定時執(zhí)行的時間
接下來會執(zhí)行 uv_timer_start方法,
int uv_timer_start(uv_timer_t* handle,
uv_timer_cb cb,
uint64_t timeout,
uint64_t repeat) {
uint64_t clamped_timeout;
if (cb == NULL)
return UV_EINVAL;
if (uv__is_active(handle))
uv_timer_stop(handle);
clamped_timeout = handle->loop->time + timeout;
if (clamped_timeout < timeout)
clamped_timeout = (uint64_t) -1;
handle->timer_cb = cb;
handle->timeout = clamped_timeout;
handle->repeat = repeat;
/* start_id is the second index to be compared in uv__timer_cmp() */
handle->start_id = handle->loop->timer_counter++;
heap_insert((struct heap*) &handle->loop->timer_heap,
(struct heap_node*) &handle->heap_node,
timer_less_than);
uv__handle_start(handle);
return 0;
}
從上述代碼中可以看到,uv_timer_start 接收四個參數(shù)uv_timer_t handle, 回調函數(shù),超時時間, 是否重復。根據(jù)傳過來的參數(shù)對handle進行屬性設置
start_id是由timer_counter自增得到,用來在uv__timer_cmp()中作為第二個比較指標,即 先加入的定時器一定先超時
將timer節(jié)點插入到最小堆中, 最小堆在 heap-inl.h頭文件中實現(xiàn)
最小堆結構體定義:
/* A binary min heap. The usual properties hold: the root is the lowest
* element in the set, the height of the tree is at most log2(nodes) and
* it's always a complete binary tree.
*
* The heap function try hard to detect corrupted tree nodes at the cost
* of a minor reduction in performance. Compile with -DNDEBUG to disable.
*/
struct heap {
struct heap_node* min;
unsigned int nelts;
};
最小堆的根節(jié)點一定是最小的元素,最小堆的高度 最多是 log2(nodes),它經常是一個完全二叉樹, 最小堆的插入節(jié)點的時間復雜度是O(lgn),主要是為了優(yōu)化頻繁的timer插入性能
heap_node* min 是初始化的最小節(jié)點
nelts 表示 最小堆中節(jié)點的個數(shù)。
最小堆節(jié)點的結構體定義:
struct heap_node {
struct heap_node* left;
struct heap_node* right;
struct heap_node* parent;
};
下面看一下timer節(jié)點是怎樣插入到最小堆中的
heap_insert有三個參數(shù), 最小堆, 新節(jié)點, 節(jié)點比較函數(shù)
首先是定義父子節(jié)點指針,初始化新節(jié)點的 left,right,parent節(jié)點,
第123-124行根據(jù)nelts 堆節(jié)點的個數(shù),算出最小堆的高度K,以及最小根節(jié)點到最大葉子節(jié)點的路徑path
139-140行表示找到最后一個node節(jié)點,插入到它后面
146-147 行表示 如果node新節(jié)點比node的parent節(jié)點小,就交換它倆, 直到 新節(jié)點比parent節(jié)點大, 插入操作結束。
以上是最小堆的插入操作。
繼續(xù)往下走,到了 uv__handle_start(handle) ,
#define uv__handle_start(h) \
do { \
assert(((h)->flags & UV__HANDLE_CLOSING) == 0); \
if (((h)->flags & UV__HANDLE_ACTIVE) != 0) break; \
(h)->flags |= UV__HANDLE_ACTIVE; \
if (((h)->flags & UV__HANDLE_REF) != 0) uv__active_handle_add(h); \
} \
while (0)
#define uv__active_handle_add(h) \
do { \
(h)->loop->active_handles++; \
} \
while (0)
執(zhí)行一次active handle add操作,并將event_loop的active_handles 加1.
到此為止,我們nodejs中定義的一個定時器已經加到 定時器鏈表里了,并且隨著event_loop的執(zhí)行,將會執(zhí)行超時定時器的回調函數(shù)。
流水賬式的記錄了這么多,可能大家已經看暈了,下面借用網上 Starkwang.log 大神的圖來描述一下這個定時器鏈表
一開始鏈表是這樣的:
插入節(jié)點后:
定時器的超時時間不同,不停的插入后,將變成這樣
大家注意到每個鏈表的最左側是個TimersList,正如我們前面說的,TimersList包含了我們所要復用的Timer對象,也就是底層C++實現(xiàn)的那個定時器,它承擔了整個鏈表的計時工作。
如圖所示,等到一個超時時間 比如1000ms到了之后會執(zhí)行timer1,
1. 承擔計時任務的TimerList對象中的Timer (也就是C++實現(xiàn)的)觸發(fā)回調,執(zhí)行timer1 所綁定的回調函數(shù)
2. 把timer1 從鏈表中移除
3. 重新計算多久后出發(fā)下一次回調(即timer2 對應的回調),重置Timer,重復1過程。
那么剛才我們所描述的定時器 行為 又是怎么實現(xiàn) 的呢? 前面我們閱讀了 timer加入鏈表的實現(xiàn),那么定時器如何被調度的呢?帶著這些問題我們繼續(xù)往下閱讀源碼。
如上圖代碼所示,在event loop中,執(zhí)行uv__update_time(loop)更新時間后 立即調用 uv__run_timers(loop), 可見timer的優(yōu)先級相當高,
uv__run_timers()是在 timer.c中定義,我們繼續(xù)看看 timer.c 做了些什么?
(ps. 大家可能疑惑怎么突然扯到 uv_run() 了,它是從哪出發(fā)執(zhí)行的呢? 它其實是在node.cc start() 函數(shù)里啟動的,即node一開始運行就啟動它了)
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
heap_node = heap_min((struct heap*) &loop->timer_heap);
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;
uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle);
}
}
在uv__run_timers, 通過 heap_min取出最小的timer節(jié)點, 如果為空則跳出循環(huán),
#define container_of(ptr, type, member) \
((type *) ((char *) (ptr) - offsetof(type, member)))
調用container_of 通過heap_node的偏移取出對象的首地址, 如果最小timer 的timeout時間 大于當前event loop 的時間則說明過期時間還沒到,則退出循環(huán)。 如果到了時間了, 則先通過uv_timer_stop(handle) 將handle從堆中刪除,如果發(fā)現(xiàn)是重復的定時器,就通過uv_timer_again(handle) 再重復加入到堆中,執(zhí)行handle->timer_cb(handle) 的回調后繼續(xù)循環(huán)。
還記得uv_timer_start()函數(shù)中的 74-79行么?
? handle->timeout = clamped_timeout
? clamped_timeout = handle->loop->time + timeout(定時器超時時間)
所以當event loop的時間更新后只需要去檢查是否 有timer到期 要執(zhí)行即可。
最小堆插入的時候也是 到期時間越短的越在前面。
結語:
至此我們基本上已經看完了 timer 實現(xiàn)的整個流程,整個timer模塊閃爍著開發(fā)者的智慧精髓
數(shù)據(jù)結構抽象
- linkedlist.js 抽象出鏈表的基礎操作。
以空間換時間
- 相同超時時間的定時器分組,而不是使用一個unrefTimer,復雜度降到 O(1)。
對象復用
- 相同超時時間的定時器共享一個底層的 C的 timer。
80/20法則
- 優(yōu)化主要路徑的性能。
listOnTimeout
timer_wrapper 中在Initialize 的時候會setupTimers, 設置定時器,每個event loop 周期都會去檢查處理定時器,主要處理操作是在 listOnTimeout中
在代碼223行, 會使用L.peek方法依次取出鏈表中的節(jié)點,每取出一個 定時器節(jié)點,就檢查當前event loop的時間,跟定時器的啟動時間差值(第224行)
如果差值小于定時器超時時間則繼續(xù)執(zhí)行 uv_timer_start,并返回跳出循環(huán);
如果差值大于等于定時器超時時間,則執(zhí)行L.remove(timer),將定時器從鏈表中移除,執(zhí)行tryOnTimeout方法
// An optimization so that the try/finally only de-optimizes (since at least v8
// 4.7) what is in this smaller function.
function tryOnTimeout(timer, start) {
timer._called = true;
const timerAsyncId = (typeof timer[async_id_symbol] === 'number') ?
timer[async_id_symbol] : null;
var threw = true;
if (timerAsyncId !== null)
emitBefore(timerAsyncId, timer[trigger_async_id_symbol]);
try {
ontimeout(timer, start);
threw = false;
} finally {
if (timerAsyncId !== null) {
if (!threw)
emitAfter(timerAsyncId);
if (!timer._repeat && destroyHooksExist() &&
!timer._destroyed) {
emitDestroy(timerAsyncId);
timer._destroyed = true;
}
}
}
}
function ontimeout(timer, start) {
const args = timer._timerArgs;
if (typeof timer._onTimeout !== 'function')
return promiseResolve(timer._onTimeout, args[0]);
if (start === undefined && timer._repeat)
start = TimerWrap.now();
if (!args)
timer._onTimeout();
else
Reflect.apply(timer._onTimeout, timer, args);
if (timer._repeat)
rearm(timer, start);
}
在ontimeout里執(zhí)行 timer._onTimeout()函數(shù), 即定時器的回調函數(shù)。
至此定時器timer的添加到執(zhí)行的代碼都已經閱讀完, 整個timers的流程分析到此結束。該文是本菜研究學習timer源碼的心路歷程,貼出來跟大家分享,希望能幫到后續(xù)學習timer模塊的同學,同時也歡迎大家指正。
參考鏈接:
https://zhuanlan.zhihu.com/p/30763470
https://yjhjstz.gitbooks.io/deep-into-node/content/chapter3/chapter3-1.html
https://github.com/xtx1130/blog/issues/14