Rxjs調(diào)度器
本文采用 RxjsV5.5 版本,這個(gè)版本和先前版本最大不同之處是import方式,以及引入了 pipe 操作符來(lái)替代鏈?zhǔn)讲僮?,在此不贅述?/p>
今天主要是來(lái)將調(diào)度器scheduler,這個(gè)通常用的比較少,但是了解它還是有作用的。
顧名思義,調(diào)度器可以理解為調(diào)度Observable,Observer,根據(jù) 官方scheduler文檔 可以知道它有以下幾個(gè)作用:
- 調(diào)度器是一種數(shù)據(jù)結(jié)構(gòu)。它可以依據(jù)優(yōu)先級(jí)和其它一些配置知道如何來(lái)存儲(chǔ)隊(duì)列任務(wù)(
queue tasks) - 調(diào)度器可以充當(dāng)執(zhí)行環(huán)境。這表示任務(wù)什么時(shí)候什么地方執(zhí)行,是立即執(zhí)行,還是在回調(diào)函數(shù)中執(zhí)行(使用
setTimeout或者setInterval, 又或者animation frame) - 調(diào)度器擁有一個(gè)虛擬時(shí)鐘。它通過(guò)getter方法
now()提供了time的概念。任務(wù)會(huì)根據(jù)調(diào)度安排,在特定的時(shí)間執(zhí)行。
Marco && microtask 宏任務(wù)和微任務(wù)的概念
像 setTimeout | setInterval 都?xì)w屬于宏任務(wù),而 promises 屬于微任務(wù)。
宏任務(wù):
setTimeoutsetIntervalsetImmediateI/OUI rendering
微任務(wù):
process.nextTickPromiseMutationObserver
const log = console.log;
const macro = v => setTimeout(() => log(v));
const micro = v => Promise.resolve().then(() => log(v));
log(1);
macro(2);
micro(3);
log(4);
// 執(zhí)行結(jié)果
1 4 3 2
可以看出宏任務(wù)和微任務(wù)都屬于異步操作,但是Promise會(huì)優(yōu)于setTimeout先執(zhí)行。這是因?yàn)?,在每一次事件循環(huán)中,宏任務(wù)只會(huì)提取一個(gè)執(zhí)行,而微任務(wù)會(huì)一直提取,直到微任務(wù)隊(duì)列為空為止。換句話說(shuō)就是 事件隊(duì)列完成之后,會(huì)執(zhí)行微任務(wù),最后才執(zhí)行宏任務(wù)。
更多關(guān)于微任務(wù)和宏任務(wù)的區(qū)別:
setTimeout(() => log(1));
setTimeout(() => log(2), 0);
log(3);
// 因?yàn)榈谝粋€(gè)setTimeout沒(méi)有設(shè)置delay,它將先于第二個(gè)setTimeout進(jìn)入事件隊(duì)列
// 運(yùn)行結(jié)果
3
1
2
可能會(huì)好奇為什么講這?這是因?yàn)橄旅嬲{(diào)度就會(huì)用到微任務(wù)和宏任務(wù)的概念
Rxjs操作符與對(duì)應(yīng)的調(diào)度器
- 默認(rèn)同步的操作符,比如:
of | from | range,它們默認(rèn)的調(diào)度器為queue - 默認(rèn)異步的操作符,比如:
timer | interval,它們默認(rèn)的調(diào)度器為async, 內(nèi)部使用setInterval
改變默認(rèn)調(diào)度器
像一些操作符最后一個(gè)參數(shù)可以為一個(gè)Scheduler,我們可以通過(guò)傳參的形式來(lái)改變默認(rèn)的調(diào)度器類型,比如下列操作符
bindCallbackbindNodeCallbackcombineLatestconcatemptyfromfromPromiseintervalmergeofrangethrowtimer
示例:改變 of 默認(rèn)的調(diào)度方式
// RxJSV5.5+版本的引入方式
import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';
const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);
// 輸出結(jié)果
2
1
// 如果使用of默認(rèn)的調(diào)度 queue
of(1).subscribe(val => log(val));
log(2);
// 輸出結(jié)果
1
2
subscribeOn && observeOn
先說(shuō) subscribeOn,它的作用
- 改變?cè)?
source observables)的執(zhí)行時(shí)機(jī) - 只能用一次
示例
// 同步版本
const log = console.log;
let a$ = Rx.Observable.create(observer => {
setTimeout(() => observer.next(1)); // A
setTimeout(() => observer.next(2)); // B
setTimeout(() => observer.next(3)); // C
setTimeout(() => observer.complete()); // D
})
let subscription = a$.subscribe({
next: v => log(v), // E
complete: () => log('完成') // F
})
# 它的執(zhí)行順序?yàn)?A - E
B - E
C - E
D - F
// 異步實(shí)現(xiàn)版本
// 使用 'subscribeOn'
import { async } from 'rxjs/scheduler/async';
const log = console.log;
let a$ = Rx.Observable.create(observer => {
setTimeout(() => observer.next(1)); // A
setTimeout(() => observer.next(2)); // B
setTimeout(() => observer.next(3)); // C
setTimeout(() => observer.complete()); // D
})
let subscription = a$.subscribeOn(async) // 使用異步調(diào)度
.subscribe({
next: v => log(v), // E
complete: () => log('完成') // F
})
# 現(xiàn)在它的執(zhí)行順序?yàn)?A - B - C - D
E - E - E - F
另外示例:
import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';
const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);
// 可以使用 subscribeOn 等同寫(xiě)為
of(1)
.subscribeOn(async)
.subscribe(val => log(val));
log(2);
observeOn 它的作用:
- 改變通知的
Notifications執(zhí)行時(shí)機(jī),即Observabls中的Next, Error, Complete函數(shù) - 能夠用于每個(gè)操作符的前面,即可以多次使用
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 設(shè)置為 async
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('after subscribe');
// 執(zhí)行結(jié)果
"before subscribe"
"after subscribe"
1
2
3
"complete"
調(diào)度 Schedulers
總共有4中調(diào)度:
| 類型 | 執(zhí)行類型 | 內(nèi)部調(diào)用 |
|---|---|---|
| queue | Sync同步的方式 | scheduler.schedule(task, delay) scheduler.flush() |
| asap | Async(異步微任務(wù)) | Promise.resolve().then(() => task) |
| async | Async(異步宏任務(wù)) | id = setInterval(task, delay) clearInterval(id) |
| animationFrame | Async | id = requestAnimationFrame(task) cancelAnimationFrame(id) |
queue
特點(diǎn):
- 同步執(zhí)行
- 任務(wù)按順序執(zhí)行
- 當(dāng)前任務(wù)結(jié)束后才執(zhí)行下一個(gè)任務(wù)
- 性能優(yōu)于事件隊(duì)列
示例
import { queue } from 'rxjs/scheduler/queue';
const log = console.log;
queue.schedule(() => log(1));
log(2);
queue.schedule(() => log(3));
// 執(zhí)行結(jié)果
1
2
3
// 注意這種情況 使用回調(diào)
queue.schedule(() => {
queue.schedule(() => log(1));
log(2);
queue.schedule(() => log(3));
});
// 執(zhí)行結(jié)果
2
1
3
asap (as soon as possible)
特點(diǎn):
- 異步執(zhí)行(微任務(wù))
- 任務(wù)在next tick之前執(zhí)行,即比宏任務(wù)先執(zhí)行
- 內(nèi)部實(shí)現(xiàn)使用
promise - 性能優(yōu)于事件隊(duì)列
示例
import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';
const log = console.log;
setTimeout(() => log(1)); // 異步 宏任務(wù)
asap.schedule(() => log(2)); // 異步 微任務(wù)
queue.schedule(() => log(3)); // 同步
// 執(zhí)行結(jié)果
3
2
1
async
特點(diǎn):
- 異步執(zhí)行(宏任務(wù))
- 內(nèi)部實(shí)現(xiàn)使用
setInterval - 使用事件隊(duì)列,性能比上面的方式要差
示例
import { async } from 'rxjs/scheduler/async';
import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';
const log = console.log;
async.schedule(() => log(1)); // 異步 宏任務(wù)
asap.schedule(() => log(2)); // 異步 微任務(wù)
queue.schedule(() => log(3)); // 同步
// 執(zhí)行結(jié)果
3
2
1
取消任務(wù) cancelling tasks
使用 AsyncScheduler 和 AsyncAction用來(lái)創(chuàng)建一個(gè)異步調(diào)用,async.schedule() 方法會(huì)返回一個(gè) subcription
import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';
const log = console.log;
const s = new AsyncScheduler(AsyncAction); // 創(chuàng)建一個(gè)async示例
const DELAY = 0;
let subscription;
subscription = s.schedule((v) => log(v), DELAY, 1); // 異步調(diào)度A
s.schedule((v) => log(v), DELAY, 2); // 異步調(diào)度B
log(3);
subscription.unsubscribe(); // 取消異步調(diào)度A
// 結(jié)果 并沒(méi)有A的值,因?yàn)樗蝗∠?3
2
內(nèi)部時(shí)鐘
now()方法
上面談?wù){(diào)度器特性時(shí),談到了虛擬時(shí)鐘
import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';
const log = console.log;
const s = new AsyncScheduler(AsyncAction); // 創(chuàng)建一個(gè)async示例
const DELAY = 2000;
const start = Date.now();
s.schedule((v) => log(v), DELAY, 1); // 異步調(diào)度A
s.schedule((v) => log(v), DELAY, 2); // 異步調(diào)度B
// s.now() 使用調(diào)度內(nèi)部時(shí)鐘
s.schedule(() => log(`${s.now() - start}ms`), DELAY)
log(3);
// 結(jié)果
3
2
1
2002ms // 這個(gè)時(shí)間會(huì)根據(jù)機(jī)器的執(zhí)行速度而定
animationFrame 動(dòng)畫(huà)調(diào)度
特點(diǎn):
- 異步執(zhí)行
- 內(nèi)部實(shí)現(xiàn)使用
requestAnimationFrame - 適用于 DEVICE FRAME RATE
- 沒(méi)有激活時(shí)很慢
- 平衡 CPU/GPU 負(fù)載
60FPS = 1000 / 60ms
使用 setInterval 的問(wèn)題
- 忽略 DEVICE FRAME RATE
- 會(huì)一直運(yùn)行,很耗電
- 不會(huì)考慮 CPU/GPU 負(fù)載
let token;
const paintFrame = () => {
// 動(dòng)畫(huà)
token = setInterval(paintFrame, 1000/60);
}
paintFrame();
setTimeout(() => clearInterval(token), 2000); // 2s后清除動(dòng)畫(huà)
使用 requestFrameAnimation 代替
let token;
const paintFrame = (timestamp) => {
// 動(dòng)畫(huà)
token = requestFrameAnimation(paintFrame);
}
paintFrame();
setTimeout(() => cancelFrameAnimation(token), 2000); // 2s后清除動(dòng)畫(huà)
使用動(dòng)畫(huà)調(diào)度示例
import { animationFrame } from 'rxjs/scheduler/animationFrame';
const DELAY = 0;
let state = { angle: 0 };
const div = document.querySelector('.circle');
let subscription;
// 放上去之后就停止動(dòng)畫(huà)
div.addEventListener('mouseover', () => {
if (!subscription) return;
subscription.unsubscribe();
});
// 離開(kāi)之后開(kāi)始動(dòng)畫(huà)
div.addEventListener('mouseout', () => {
subscription = animationFrame.schedule(work, DELAY);
});
// 動(dòng)畫(huà)函數(shù)
const work = () => {
let { angle } = state;
state = { angle: ++angle%360 };
div.style.transform = `rotate(${angle}deg)`;
subscription = animationFrame.schedule(work, DELAY);
}
subscription = animationFrame.schedule(work, DELAY);
VirtualTime Scheduler 虛擬時(shí)間調(diào)度
上面雖然列舉了調(diào)度的4種類型,下面的虛擬時(shí)間其實(shí)也是一種調(diào)度
特點(diǎn):
- 同步執(zhí)行
- 通過(guò)delay延遲將所有的動(dòng)作進(jìn)行排隊(duì)(Queues all actions sorting by delay)
- 需要手動(dòng)執(zhí)行 使用
flush()
示例
import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
const log = console.log;
const s = new VirtualTimeScheduler(VirtualAction); // 創(chuàng)建一個(gè)scheduler
const start = Date.now();
// tasks are sorted by delay
s.schedule(v => log(v), 2000, 2); // 2000ms的delay
s.schedule(v => log(v), 50, 2); // 50ms的delay
s.flush(); // 手動(dòng)執(zhí)行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`); // 虛擬時(shí)鐘
log(`Execution: ${Date.now() - start}ms`); // 程序執(zhí)行時(shí)間
// 結(jié)果
1
2
3
VirtualTimeScheduler: 2000ms
Execution: 6ms
示例2:
import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
import { interval } from 'rxjs/observable/interval';
const log = console.log;
const s = new VirtualTimeScheduler(VirtualAction); // 創(chuàng)建一個(gè)scheduler
const start = Date.now();
// 3600 * 1000表示 1小時(shí)
// take(24) 表示 24小時(shí) 即每個(gè)小時(shí)運(yùn)行一次
interval(3600 * 1000, s).pipe(take(24))
.subscribe(v => log(v))
s.flush(); // 手動(dòng)執(zhí)行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`); // 虛擬時(shí)鐘
log(`Execution: ${Date.now() - start}ms`); // 程序執(zhí)行時(shí)間
// 結(jié)果
0
1
2
// ...
23
VirtualTimeScheduler: 86400000ms (1天)
Execution: 25ms
總結(jié)
調(diào)度器使用到的場(chǎng)景不多,但是了解它,對(duì)任務(wù)的調(diào)度可以跟細(xì)粒的進(jìn)行控制。動(dòng)畫(huà)調(diào)度用的比較多一點(diǎn),虛擬時(shí)鐘調(diào)度對(duì)定時(shí)任務(wù)比較有用。