RxJS Schedulers 調(diào)度器

Rxjs調(diào)度器

本文采用 RxjsV5.5 版本,這個(gè)版本和先前版本最大不同之處是import方式,以及引入了 pipe 操作符來(lái)替代鏈?zhǔn)讲僮?,在此不贅述?/p>

今天主要是來(lái)將調(diào)度器scheduler,這個(gè)通常用的比較少,但是了解它還是有作用的。
顧名思義,調(diào)度器可以理解為調(diào)度Observable,Observer,根據(jù) 官方scheduler文檔 可以知道它有以下幾個(gè)作用:

  • 調(diào)度器是一種數(shù)據(jù)結(jié)構(gòu)。它可以依據(jù)優(yōu)先級(jí)和其它一些配置知道如何來(lái)存儲(chǔ)隊(duì)列任務(wù)(queue tasks)
  • 調(diào)度器可以充當(dāng)執(zhí)行環(huán)境。這表示任務(wù)什么時(shí)候什么地方執(zhí)行,是立即執(zhí)行,還是在回調(diào)函數(shù)中執(zhí)行(使用setTimeout或者setInterval, 又或者 animation frame)
  • 調(diào)度器擁有一個(gè)虛擬時(shí)鐘。它通過(guò)getter方法 now() 提供了time的概念。任務(wù)會(huì)根據(jù)調(diào)度安排,在特定的時(shí)間執(zhí)行。

Marco && microtask 宏任務(wù)和微任務(wù)的概念

setTimeout | setInterval 都?xì)w屬于宏任務(wù),而 promises 屬于微任務(wù)。

宏任務(wù):

  • setTimeout
  • setInterval
  • setImmediate
  • I/O
  • UI rendering

微任務(wù):

  • process.nextTick
  • Promise
  • MutationObserver
const log = console.log;
const macro = v => setTimeout(() => log(v));
const micro = v => Promise.resolve().then(() => log(v));

log(1);
macro(2);
micro(3);
log(4);

// 執(zhí)行結(jié)果
1 4 3 2

可以看出宏任務(wù)和微任務(wù)都屬于異步操作,但是Promise會(huì)優(yōu)于setTimeout先執(zhí)行。這是因?yàn)?,在每一次事件循環(huán)中,宏任務(wù)只會(huì)提取一個(gè)執(zhí)行,而微任務(wù)會(huì)一直提取,直到微任務(wù)隊(duì)列為空為止。換句話說(shuō)就是 事件隊(duì)列完成之后,會(huì)執(zhí)行微任務(wù),最后才執(zhí)行宏任務(wù)

更多關(guān)于微任務(wù)和宏任務(wù)的區(qū)別:

setTimeout(() => log(1));
setTimeout(() => log(2), 0);
log(3);

// 因?yàn)榈谝粋€(gè)setTimeout沒(méi)有設(shè)置delay,它將先于第二個(gè)setTimeout進(jìn)入事件隊(duì)列
// 運(yùn)行結(jié)果
3
1
2

可能會(huì)好奇為什么講這?這是因?yàn)橄旅嬲{(diào)度就會(huì)用到微任務(wù)和宏任務(wù)的概念

Rxjs操作符與對(duì)應(yīng)的調(diào)度器

  • 默認(rèn)同步的操作符,比如:of | from | range,它們默認(rèn)的調(diào)度器為 queue
  • 默認(rèn)異步的操作符,比如:timer | interval,它們默認(rèn)的調(diào)度器為 async, 內(nèi)部使用 setInterval

改變默認(rèn)調(diào)度器

像一些操作符最后一個(gè)參數(shù)可以為一個(gè)Scheduler,我們可以通過(guò)傳參的形式來(lái)改變默認(rèn)的調(diào)度器類型,比如下列操作符

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

示例:改變 of 默認(rèn)的調(diào)度方式

// RxJSV5.5+版本的引入方式
import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';

const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);

// 輸出結(jié)果
2
1

// 如果使用of默認(rèn)的調(diào)度 queue
of(1).subscribe(val => log(val));
log(2);
// 輸出結(jié)果
1
2

subscribeOn && observeOn

先說(shuō) subscribeOn,它的作用

  • 改變?cè)?source observables)的執(zhí)行時(shí)機(jī)
  • 只能用一次

示例

// 同步版本
const log = console.log;
let a$ = Rx.Observable.create(observer => {
  setTimeout(() => observer.next(1));        // A
  setTimeout(() => observer.next(2));        // B
  setTimeout(() => observer.next(3));        // C
  setTimeout(() => observer.complete());     // D
})

let subscription = a$.subscribe({
  next: v => log(v),                         // E
  complete: () => log('完成')                // F
})

# 它的執(zhí)行順序?yàn)?A - E
B - E
C - E
D - F

// 異步實(shí)現(xiàn)版本
// 使用 'subscribeOn'
import { async } from 'rxjs/scheduler/async';
const log = console.log;
let a$ = Rx.Observable.create(observer => {
  setTimeout(() => observer.next(1));        // A
  setTimeout(() => observer.next(2));        // B
  setTimeout(() => observer.next(3));        // C
  setTimeout(() => observer.complete());     // D
})

let subscription = a$.subscribeOn(async)        // 使用異步調(diào)度
  .subscribe({
    next: v => log(v),                       // E
    complete: () => log('完成')              // F
  })

# 現(xiàn)在它的執(zhí)行順序?yàn)?A - B - C - D
E - E - E - F

另外示例:

import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';

const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);

// 可以使用 subscribeOn 等同寫(xiě)為
of(1)
  .subscribeOn(async)
  .subscribe(val => log(val));

log(2);

observeOn 它的作用:

  • 改變通知的 Notifications 執(zhí)行時(shí)機(jī),即Observabls中的Next, Error, Complete函數(shù)
  • 能夠用于每個(gè)操作符的前面,即可以多次使用
var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
});

console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 設(shè)置為 async
.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
console.log('after subscribe');

// 執(zhí)行結(jié)果
"before subscribe"
"after subscribe"
1
2
3
"complete"

調(diào)度 Schedulers

總共有4中調(diào)度:

類型 執(zhí)行類型 內(nèi)部調(diào)用
queue Sync同步的方式 scheduler.schedule(task, delay) scheduler.flush()
asap Async(異步微任務(wù)) Promise.resolve().then(() => task)
async Async(異步宏任務(wù)) id = setInterval(task, delay) clearInterval(id)
animationFrame Async id = requestAnimationFrame(task) cancelAnimationFrame(id)

queue

特點(diǎn):

  • 同步執(zhí)行
  • 任務(wù)按順序執(zhí)行
  • 當(dāng)前任務(wù)結(jié)束后才執(zhí)行下一個(gè)任務(wù)
  • 性能優(yōu)于事件隊(duì)列

示例

import { queue } from 'rxjs/scheduler/queue';

const log = console.log;
queue.schedule(() => log(1));
log(2);
queue.schedule(() => log(3));

// 執(zhí)行結(jié)果
1
2
3


// 注意這種情況 使用回調(diào)
queue.schedule(() => {
  queue.schedule(() => log(1));
  log(2);
  queue.schedule(() => log(3));
});

// 執(zhí)行結(jié)果
2
1
3

asap (as soon as possible)

特點(diǎn):

  • 異步執(zhí)行(微任務(wù))
  • 任務(wù)在next tick之前執(zhí)行,即比宏任務(wù)先執(zhí)行
  • 內(nèi)部實(shí)現(xiàn)使用 promise
  • 性能優(yōu)于事件隊(duì)列

示例

import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';

const log = console.log;
setTimeout(() => log(1));          // 異步 宏任務(wù)
asap.schedule(() => log(2));       // 異步 微任務(wù)
queue.schedule(() => log(3));      // 同步

// 執(zhí)行結(jié)果
3
2
1

async

特點(diǎn):

  • 異步執(zhí)行(宏任務(wù))
  • 內(nèi)部實(shí)現(xiàn)使用 setInterval
  • 使用事件隊(duì)列,性能比上面的方式要差

示例

import { async } from 'rxjs/scheduler/async';
import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';

const log = console.log;

async.schedule(() => log(1));        // 異步 宏任務(wù)
asap.schedule(() => log(2));       // 異步 微任務(wù)
queue.schedule(() => log(3));      // 同步

// 執(zhí)行結(jié)果
3
2
1

取消任務(wù) cancelling tasks

使用 AsyncSchedulerAsyncAction用來(lái)創(chuàng)建一個(gè)異步調(diào)用,async.schedule() 方法會(huì)返回一個(gè) subcription

import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';

const log = console.log;

const s = new AsyncScheduler(AsyncAction); // 創(chuàng)建一個(gè)async示例
const DELAY = 0;
let subscription;
subscription = s.schedule((v) => log(v), DELAY, 1);  // 異步調(diào)度A
s.schedule((v) => log(v), DELAY, 2); // 異步調(diào)度B
log(3);
subscription.unsubscribe();  // 取消異步調(diào)度A

// 結(jié)果 并沒(méi)有A的值,因?yàn)樗蝗∠?3
2

內(nèi)部時(shí)鐘 now() 方法

上面談?wù){(diào)度器特性時(shí),談到了虛擬時(shí)鐘

import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';

const log = console.log;

const s = new AsyncScheduler(AsyncAction); // 創(chuàng)建一個(gè)async示例
const DELAY = 2000;
const start = Date.now();

s.schedule((v) => log(v), DELAY, 1);  // 異步調(diào)度A
s.schedule((v) => log(v), DELAY, 2); // 異步調(diào)度B

// s.now() 使用調(diào)度內(nèi)部時(shí)鐘
s.schedule(() => log(`${s.now() - start}ms`), DELAY)
log(3);

// 結(jié)果
3
2
1
2002ms        // 這個(gè)時(shí)間會(huì)根據(jù)機(jī)器的執(zhí)行速度而定

animationFrame 動(dòng)畫(huà)調(diào)度

特點(diǎn):

  • 異步執(zhí)行
  • 內(nèi)部實(shí)現(xiàn)使用 requestAnimationFrame
  • 適用于 DEVICE FRAME RATE
  • 沒(méi)有激活時(shí)很慢
  • 平衡 CPU/GPU 負(fù)載

60FPS = 1000 / 60ms

使用 setInterval 的問(wèn)題

  • 忽略 DEVICE FRAME RATE
  • 會(huì)一直運(yùn)行,很耗電
  • 不會(huì)考慮 CPU/GPU 負(fù)載
let token;
const paintFrame = () => {
  // 動(dòng)畫(huà)
  token = setInterval(paintFrame, 1000/60);
}
paintFrame();

setTimeout(() => clearInterval(token), 2000); // 2s后清除動(dòng)畫(huà)

使用 requestFrameAnimation 代替

let token;
const paintFrame = (timestamp) => {
  // 動(dòng)畫(huà)
  token = requestFrameAnimation(paintFrame);
}
paintFrame();

setTimeout(() => cancelFrameAnimation(token), 2000); // 2s后清除動(dòng)畫(huà)

使用動(dòng)畫(huà)調(diào)度示例

import { animationFrame } from 'rxjs/scheduler/animationFrame';


const DELAY = 0;
let state = { angle: 0 };
const div = document.querySelector('.circle');

let subscription;

// 放上去之后就停止動(dòng)畫(huà)
div.addEventListener('mouseover', () => {
  if (!subscription) return;
  subscription.unsubscribe();
});

// 離開(kāi)之后開(kāi)始動(dòng)畫(huà)
div.addEventListener('mouseout', () => {
  subscription = animationFrame.schedule(work, DELAY);
});

// 動(dòng)畫(huà)函數(shù)
const work = () => {
  let { angle } = state;
  state = { angle: ++angle%360 };
  div.style.transform = `rotate(${angle}deg)`;
  subscription = animationFrame.schedule(work, DELAY);
}

subscription = animationFrame.schedule(work, DELAY);

VirtualTime Scheduler 虛擬時(shí)間調(diào)度

上面雖然列舉了調(diào)度的4種類型,下面的虛擬時(shí)間其實(shí)也是一種調(diào)度

特點(diǎn):

  • 同步執(zhí)行
  • 通過(guò)delay延遲將所有的動(dòng)作進(jìn)行排隊(duì)(Queues all actions sorting by delay)
  • 需要手動(dòng)執(zhí)行 使用flush()

示例

import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';

const log = console.log;


const s = new VirtualTimeScheduler(VirtualAction); // 創(chuàng)建一個(gè)scheduler
const start = Date.now();

// tasks are sorted by delay
s.schedule(v => log(v), 2000, 2);  // 2000ms的delay
s.schedule(v => log(v), 50, 2);    // 50ms的delay

s.flush();                         // 手動(dòng)執(zhí)行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`);  // 虛擬時(shí)鐘
log(`Execution: ${Date.now() - start}ms`);  // 程序執(zhí)行時(shí)間

// 結(jié)果
1
2
3
VirtualTimeScheduler: 2000ms
Execution: 6ms

示例2:

import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
import { interval } from 'rxjs/observable/interval';

const log = console.log;


const s = new VirtualTimeScheduler(VirtualAction); // 創(chuàng)建一個(gè)scheduler
const start = Date.now();

// 3600 * 1000表示 1小時(shí)
// take(24) 表示 24小時(shí) 即每個(gè)小時(shí)運(yùn)行一次
interval(3600 * 1000, s).pipe(take(24))
  .subscribe(v => log(v))

s.flush();                         // 手動(dòng)執(zhí)行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`);  // 虛擬時(shí)鐘
log(`Execution: ${Date.now() - start}ms`);  // 程序執(zhí)行時(shí)間

// 結(jié)果
0
1
2
// ...
23
VirtualTimeScheduler: 86400000ms (1天)
Execution: 25ms

總結(jié)

調(diào)度器使用到的場(chǎng)景不多,但是了解它,對(duì)任務(wù)的調(diào)度可以跟細(xì)粒的進(jìn)行控制。動(dòng)畫(huà)調(diào)度用的比較多一點(diǎn),虛擬時(shí)鐘調(diào)度對(duì)定時(shí)任務(wù)比較有用。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • GCD調(diào)度隊(duì)列是執(zhí)行任務(wù)的強(qiáng)大工具。調(diào)度隊(duì)列允許您相對(duì)于調(diào)度者異步或者同步的執(zhí)行任意代碼塊。您能夠使用調(diào)度隊(duì)列來(lái)執(zhí)...
    坤坤同學(xué)閱讀 6,744評(píng)論 1 3
  • 當(dāng)和底層系統(tǒng)交互時(shí),必須花費(fèi)大量時(shí)間為任務(wù)做好準(zhǔn)備。調(diào)用內(nèi)核或者其他系統(tǒng)層需要切換上下文,這也是比在進(jìn)程內(nèi)部調(diào)用昂...
    坤坤同學(xué)閱讀 1,851評(píng)論 0 16
  • ———-正文開(kāi)始———- 最近發(fā)現(xiàn)有不少介紹JS單線程運(yùn)行機(jī)制的文章,但是發(fā)現(xiàn)很多都僅僅是介紹某一部分的知識(shí),而且...
    流動(dòng)碼文閱讀 2,291評(píng)論 7 32
  • 夜晚醒來(lái),有的時(shí)候其實(shí)就是為了緩解焦慮,看一下時(shí)間。又或者起夜上個(gè)廁所。 在用了手電,自發(fā)電式手電、手電等所有能提...
    someday閱讀 534評(píng)論 0 0

友情鏈接更多精彩內(nèi)容