Rxjs入門與初步應(yīng)用

學(xué)習(xí)資料

https://cn.rx.js.org/
https://github.com/RxJS-CN/rxjs-articles-translation
http://www.itdecent.cn/p/eaf28d5ce6c0

概念

Observable 和 Observer

Observable 可觀察者對象,負(fù)責(zé)發(fā)出數(shù)據(jù);Observer 觀察者,負(fù)責(zé)接收數(shù)據(jù)
怎么這么抽象?試著把Observable對象理解為一種數(shù)據(jù)格式,類似于數(shù)組、鏈表等等
Observable實(shí)現(xiàn)了下?兩種設(shè)計(jì)模式:

  • 觀察者模式(Observer Pattern)
  • 迭代器模式(Iterator Pattern)

觀察者模式

定義:觀察者模式定義了對象之間的一對多依賴,這樣一來,當(dāng)一個(gè)對象改變狀態(tài)時(shí),它的所有依賴著都會(huì)收到通知并自動(dòng)更新
觀察者模式對“治”這個(gè)問題提的解決?法是這樣,將邏輯分為發(fā)布者(Publisher)和觀察者(Observer),其中發(fā)布者只管負(fù)責(zé)產(chǎn)?事件,它會(huì)通知所有注冊掛上號(hào)的觀察者,?不關(guān)?這些觀察者如何處理這些事件,相對的,觀察者可以被注冊上某個(gè)發(fā)布者,只管接收到事件之后就處理,?不關(guān)?這些數(shù)據(jù)是如何產(chǎn)?的。


發(fā)布者和觀察者的關(guān)系
發(fā)布者和觀察者的關(guān)系
// Subject
function Observable() {
  this.observers = [];
  this.data = 'data update';
  // 用于關(guān)聯(lián)observer
  this.addObserve = function(observe) {
    this.observers.push(observe)
  };
  // 用于取消關(guān)聯(lián)observer
  this.removeObserve = function(observe) {
    const index = this.observers.findIndex(item => item == observe);
    this.observers.splice(index,1);
  }

  this.notifyObservers = function () {
    for (let i=0; i<this.observers.length; i++) {
      this.observers[i].update(this.data);
    }
  }
    // 用于發(fā)送更新數(shù)據(jù)到observer
  this.updateData = function(data) {
    this.data = data;
    this.notifyObservers();
  }
}

// 所有 observer 都必須實(shí)現(xiàn)update方法
function Observer() {
  this.update = function (data) {
    console.log(data);
  }
}

const observable = new Observable();
const observer1 = new Observer();
const observer2 = new Observer();
// 建立關(guān)聯(lián)
observable.addObserve(observer1);
observable.addObserve(observer2);
observable.notifyObservers();
observable.updateData(123);

迭代器模式

定義:提供一種方法順序訪問一個(gè)聚合對象中的各個(gè)元素,而又不暴露其內(nèi)部的表示
數(shù)據(jù)集合的實(shí)現(xiàn)?式很多,可以是?個(gè)數(shù)組,也可以是?個(gè)樹形結(jié)構(gòu),也可以是?個(gè)單向鏈表……迭代器的作?就是提供?個(gè)通?的接?,讓使?者完全不?關(guān)?這個(gè)數(shù)據(jù)集合的具體實(shí)現(xiàn)?式

function Iterator(list) {
  this.list = list;
  this.porsition = 0;
  this.hasNext = function() {
    if (this.porsition > this.list.length || this.list[porsition] === null) return false;
    return true;
  }
  this.next = function() {
    const item = this.list[this.porsition];
    this.porsition++;
    return item;
  }
  this.isDone = function() {
    return this.porsition >= this.list.length;
  }
}

在使?RxJS的過程中絕對看不到類似這樣的代碼,實(shí)際上,你都看不到上?所說的三個(gè)函數(shù),因?yàn)椋?所說的是“拉”式的迭代器實(shí)現(xiàn),?RxJS實(shí)現(xiàn)的是“推”式的迭代器實(shí)現(xiàn)

Subscription

訂閱,表示建立關(guān)聯(lián)關(guān)系

理解Observable、Observe 和 Subscribtions 之間的關(guān)系

Observable 是信號(hào)源、生產(chǎn)者,Observer 觀察者、消費(fèi)者
Observable 和 Observer 之間的關(guān)系:
如果比作報(bào)社和讀者,報(bào)社是 Observable,是數(shù)據(jù)源,提供報(bào)紙,讀者是 Observer,負(fù)責(zé)消費(fèi)(處理)數(shù)據(jù),閱讀報(bào)紙,讀者向報(bào)社訂閱(subscribe)報(bào)紙后,報(bào)社將讀者列入他們派送名單,定期派送報(bào)紙,報(bào)紙是數(shù)據(jù)。
可以比作前端(Observer)通過 登陸 owncloud 賬號(hào)(subscribe)時(shí)時(shí)獲取 UI(Observable)更新的 UI稿(數(shù)據(jù))
也可以是愛奇藝會(huì)員(Observer)點(diǎn)擊播放愛奇藝視頻(subscribe)觀看愛奇藝網(wǎng)站(Observable)提供的視頻(數(shù)據(jù))
Observable、Observe 和 Subscribtions核心就是解決分工與數(shù)據(jù)傳遞。

subscribe 擴(kuò)展知識(shí)

可以通過add、remove 操作子subscription
父subscription取消訂閱,子subscription也會(huì)一起取消訂閱

var observ1 = Rx.Observable.interval(500)
var observ2 = Rx.Observable.interval(800)
var observ3 = Rx.Observable.interval(800)

var subsc1 = observ1.subscribe(x => console.log('first: ' + x))
var subsc2 = observ2.subscribe(x => console.log('second: ' + x))
var subsc3 = observ3.subscribe(x => console.log('three: ' + x))

subsc1.add(subsc2)
subsc1.add(subsc3)

setTimeout(() => {
  subsc1.remove(subsc2)
  subsc1.unsubscribe()
}, 1100)
// second: n將會(huì)一直執(zhí)行下去,我們添加了 subsc1.add(subsc2) , 在1.1S后移除了它,所以在 unsubscribe() 時(shí),我們并沒有清除掉它

unsubscribe:釋放資源和取消Observable執(zhí)行的功能

Operators

操作符,類似于管道,對數(shù)據(jù)源發(fā)出的數(shù)據(jù)進(jìn)行過濾或其他處理,使數(shù)據(jù)源發(fā)出的數(shù)據(jù)更加滿足Observe 的需求

Subject

有一些場景,需要將Cold Observable 轉(zhuǎn)成 Hot Observable,在這個(gè)場景下需要?個(gè)“中間?”做串接的事情,這個(gè)中間?有兩個(gè)職責(zé):

  • 中間?要提供subscribe?法,讓其他?能夠訂閱??的數(shù)據(jù)源。
  • 中間?要能夠有辦法接受推送的數(shù)據(jù),包括Cold Observable推送的數(shù)據(jù)。

上?所說的第?個(gè)職責(zé),相當(dāng)于?個(gè)Observable,第?個(gè)?作,相當(dāng)于?個(gè)Observer。在RxJS中,提供了名為Subject的類型,?個(gè)Subject既有Observable的接口,也具有Observer的接口,?個(gè)Subject就具備上述的兩個(gè)職責(zé)。

Cold Observable 和 Hot Observable的區(qū)別

好比視頻和電視頻道的區(qū)別,視頻沒有時(shí)間限制,任何時(shí)候想看都可以看,電視有時(shí)間限制
視頻是 Cold Observable,電視是 Hot Observable
Cold Observable:你見或者不見,我一直在,只要你愿意去取的話
Hot Observable:過了這個(gè)村,沒有這個(gè)店

var interval$ = Rx.Observable.interval(500);

// Cold Observable
interval$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(()=>{
  interval$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 2000);

// Hot Observable
const subject$ = new Rx.Subject();
interval$.subscribe(subject$);
subject$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(() => {
  subject$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 1500);

回過頭來講subject,subject相當(dāng)于一個(gè)轉(zhuǎn)換器,它將 Cold Observable 轉(zhuǎn)化成 Hot Observable。這就要求subject同時(shí)是observe又是 observable。subject 既能訂閱數(shù)據(jù)源,同時(shí)本身又是數(shù)據(jù)源,能發(fā)出數(shù)據(jù)。
舉個(gè)不十分恰當(dāng)?shù)睦?,比如手機(jī),它可以接收基站信號(hào)(Observe),同時(shí)也可以發(fā)出信號(hào)(Observable)

Schedulers

調(diào)度器
Scheduer是?種數(shù)據(jù)結(jié)構(gòu),可以根據(jù)優(yōu)先級(jí)或者其他某種條件來安排任務(wù)執(zhí)?隊(duì)列
http://www.itdecent.cn/p/5624c8a6bd2b

類型 執(zhí)行類型 內(nèi)部調(diào)用
queue Sync同步的方式 scheduler.schedule(task, delay) scheduler.flush()
asap Async(異步微任務(wù)) Promise.resolve().then(() => task)
async Async(異步宏任務(wù)) id = setInterval(task, delay) clearInterval(id)
animationFrame Async id = requestAnimationFrame(task) cancelAnimationFrame(id)

為什么要學(xué)

我們學(xué)習(xí)RxJS,并不是因?yàn)镽xJS是?項(xiàng)炫酷的技術(shù),也不是因?yàn)镽xJS是?個(gè)最新的技術(shù),是因?yàn)镽xJS的的確確能夠幫助我們解決問題,?且這些問題長期以來?直在困擾我們,沒有好的解決辦法,這些問題包括:

  • 如何控制?量代碼的復(fù)雜度;
  • 如何保持代碼可讀;
  • 如何處理異步操作。

Rxjs 引用了兩個(gè)重要的編程思想,讓代碼更加清爽,更加容易維護(hù):
函數(shù)式
響應(yīng)式

函數(shù)式

  • 聲明式

聲明式區(qū)別于命令式,命令式強(qiáng)調(diào)的是告訴機(jī)器怎么去做(how),一步步的告訴計(jì)算機(jī)如何完成一項(xiàng)工作
聲明式強(qiáng)調(diào)的是告訴機(jī)器你想要什么(what),不關(guān)注內(nèi)部實(shí)現(xiàn),聲明式把通用的共性抽離出來,避免重復(fù)代碼
應(yīng)用聲明式的困難點(diǎn):歸納和提取完備的what,是件很困難、很技術(shù)化的工作,令人望而卻步聲明式能夠應(yīng)用在特定領(lǐng)域如SQL中,是工具的編寫者,已經(jīng)歸納和提取what,替你完成了

// 命令式
// how:寫for循環(huán)一一處理
function double(arr) {
  const results = []
  for (let i = 0; i < arr.length; i++){
  results.push(arr[i] * 2)
  }
  return results
}
function addOne(arr) {
  const results = []
  for (let i = 0; i < arr.length; i++){
  results.push(arr[i] + 1)
  }
  return results
}
// 聲明式
// what:通過map把每一項(xiàng)加倍或+1,不關(guān)注內(nèi)部實(shí)現(xiàn)
function double(arr) {
  return arr.map(function(item) {return item * 2});
} function addOne(arr) {
  return arr.map(function(item) {return item + 1});
}
  • 純函數(shù):

函數(shù)的執(zhí)?過程完全由輸?參數(shù)決定,不會(huì)受除參數(shù)之外的任何數(shù)據(jù)影響,只要入?yún)⒉蛔?,返回的參?shù)也不會(huì)變
函數(shù)不會(huì)修改任何外部狀態(tài),?如修改全局變量或傳?的參數(shù)對象
純函數(shù)沒有副作用,是穩(wěn)定的,可以和其他純函數(shù)像搭積木一樣一起組合使用,獲得更強(qiáng)的處理能力

  • 數(shù)據(jù)不可變性

有數(shù)據(jù),替換?法是通過產(chǎn)?新的數(shù)據(jù),來實(shí)現(xiàn)這種"變化",也就是說,當(dāng)我們需要數(shù)據(jù)狀態(tài)發(fā)?改變時(shí),保持原有數(shù)據(jù)不變,產(chǎn)??個(gè)新的數(shù)據(jù)來體現(xiàn)這種變化

JavaScript中數(shù)組的push、pop、sort函數(shù)都會(huì)改變?個(gè)數(shù)組的內(nèi)容,由此引發(fā)的bug可不少。這些不純的函數(shù)導(dǎo)致JavaScript天?不是?個(gè)純粹意義上的函數(shù)式編程語?

響應(yīng)式

EXCEL 中的公式就是典型的響應(yīng)式,數(shù)據(jù)改變了公式計(jì)算結(jié)果也會(huì)跟著變
類似于MVVM中的M->V

體驗(yàn)兩個(gè)小例子:

測試鼠標(biāo)按住時(shí)間

const buttonDom = document.querySelector('#button');
const mouseDown$ = Rx.Observable.fromEvent(buttonDom, 'mousedown');
const mouseUp$ = Rx.Observable.fromEvent(buttonDom, 'mouseup');
const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(), (mouseUpEvent, mouseDownEvent)=>{
    return mouseUpEvent.timestamp - mouseDownEvent.timestamp;
});
holdTime$.subscribe((ms)=>{
    document.querySelector('#holdTime').innerText = ms;
});

takeUntil,統(tǒng)計(jì)5秒內(nèi)用戶點(diǎn)擊數(shù)

const click$ = Rx.Observable.fromEvent(document, 'click');
click$.bufferWhen(()=>Rx.Observable.interval(5000)).subscribe(arr=>console.log(arr.length))

彈珠圖

彈珠圖可以用來表示數(shù)據(jù)流,例如:

--a---b-c---d---X---|->
a, b, c, d 表示發(fā)出的數(shù)據(jù)
X 表示錯(cuò)誤
|表示 '結(jié)束' 信號(hào)
---> 是時(shí)間軸

彈珠圖在線演示:https://rxviz.com/

操作符

創(chuàng)建

功能需求 適用的操作符
直接操作觀察者 create
根據(jù)有限的數(shù)據(jù)產(chǎn)生同步數(shù)據(jù)流 of
產(chǎn)生一個(gè)數(shù)值范圍內(nèi)的數(shù)據(jù) range
以循環(huán)方式產(chǎn)生數(shù)據(jù) generate
重復(fù)產(chǎn)生數(shù)據(jù)流中的數(shù)據(jù) repeat 和 repeatWhen
產(chǎn)生空數(shù)據(jù)流 empty
產(chǎn)生直接出錯(cuò)的數(shù)據(jù)流 throw
產(chǎn)生永遠(yuǎn)不完結(jié)的數(shù)據(jù)流 never
間隔給定時(shí)間持續(xù)產(chǎn)生數(shù)據(jù) interval 和 timer
從數(shù)組等枚舉類型數(shù)據(jù)產(chǎn)生數(shù)據(jù)流 from
從Promise 對象產(chǎn)生數(shù)據(jù)流 fromPromise
從外部事件對象產(chǎn)生數(shù)據(jù)流 fromEvent 和 fromEventPattern
從Ajax 請求結(jié)果產(chǎn)生數(shù)據(jù)流 ajax
延遲產(chǎn)生數(shù)據(jù)流 defer

from 和 toArray
from:數(shù)組轉(zhuǎn) Observable
toArray:Observable 轉(zhuǎn)數(shù)組
fromPromise 和 toPromise
fromPromise:promise轉(zhuǎn) Observable
toPromise:Observable 轉(zhuǎn)promise

合并

功能需求 使用的操作符
把多個(gè)數(shù)據(jù)流以首位相連的方式合并 concat 和 concatAll
把多個(gè)數(shù)據(jù)流中數(shù)據(jù)以先到先得方式合并 merge 和 mergeAll
把多個(gè)數(shù)據(jù)流中的數(shù)據(jù)以一一對應(yīng)的方式合并 zip 和 zipAll
持續(xù)合并多個(gè)數(shù)據(jù)流中最新產(chǎn)生的數(shù)據(jù) combineLatest、combineAll 和 withLatestFrom
從多個(gè)數(shù)據(jù)流中選取第一個(gè)產(chǎn)生內(nèi)容的數(shù)據(jù)流 race
在數(shù)據(jù)流前面添加一個(gè)指定數(shù)據(jù) startWith
只獲取多個(gè)數(shù)據(jù)流最后產(chǎn)生的那個(gè)數(shù)據(jù) forkJoin
從高階數(shù)據(jù)流中切換數(shù)據(jù)源 switch 和 exhaust

對of產(chǎn)生的數(shù)據(jù)進(jìn)行concat和merge操作哦產(chǎn)生的不同結(jié)果

例1:merge:事件的合并處理

startWith和concat的關(guān)聯(lián)關(guān)系
zip
拉鏈,一對一咬合

QQ截圖20190417140650.png
QQ截圖20190417140650.png
QQ圖片20190417141214.png
QQ圖片20190417141214.png

例2:zip應(yīng)用

  • 讓of產(chǎn)生的數(shù)據(jù)流交叉輸出
  • 實(shí)現(xiàn)異步隊(duì)列

forkJoin
forkJoin就是RxJS界的Promise.all,Promise.all等待所有輸?的Promise對象成功之后把結(jié)果合并,forkJoin等待所有輸?的Observable對象完結(jié)之后把最后?個(gè)數(shù)據(jù)合并

const testList = [
  this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:1},params)),
  this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:2},params)),
];
Observable.create(observer => {
  forkJoin(testList).subscribe((data)=>{
    observer.next(data);
  });
})subscribe((res)=>{
  if(res.some(data=>data==false)) return;
  // 1-入門測成績, 2-出門測成績
  this.scoreTestType = {
    1: { scoreTestDetail: res[0] },
    2: { scoreTestDetail: res[1] }
  };
})

輔助

功能需求 使用的操作符
統(tǒng)計(jì)數(shù)據(jù)流中產(chǎn)生的所有數(shù)據(jù)個(gè)數(shù) count
獲得數(shù)據(jù)流中最大或最小的數(shù)據(jù) max 和 min
對數(shù)據(jù)流中所有數(shù)據(jù)進(jìn)行規(guī)約操作 reduce
判斷是否所有數(shù)據(jù)滿足某個(gè)判定條件 every
找到第一個(gè)滿足判定條件的數(shù)據(jù) find 和 findIndex
判斷一個(gè)數(shù)據(jù)流是否不包含任何數(shù)據(jù) isEmpty
如果一個(gè)數(shù)據(jù)流為空就默認(rèn)產(chǎn)生一個(gè)指定數(shù)據(jù) defaultEmpty

數(shù)學(xué)類操作符有四個(gè):count、max、min、reduce
遍歷上游Observable對象中吐出的所有數(shù)據(jù)才給下游傳遞數(shù)據(jù)、只有在上游完結(jié)的時(shí)候,才給下游傳遞唯?數(shù)據(jù)

過濾

功能需求 使用的操作符
過濾掉不滿足判定條件的數(shù)據(jù) filter
獲得滿足判定條件的第一個(gè)數(shù)據(jù) first
獲得滿足判定條件的最后一個(gè)數(shù)據(jù) last
從數(shù)據(jù)流中選取最先出現(xiàn)的若干數(shù)據(jù) take
從數(shù)據(jù)流中選取最后出現(xiàn)的若干數(shù)據(jù) takeLast
從數(shù)據(jù)流中選取數(shù)據(jù)直到某種情況發(fā)生 takeWhile 和 takeUntil
從數(shù)據(jù)流中忽略最先出現(xiàn)的若干數(shù)據(jù) skip
從數(shù)據(jù)流中忽略數(shù)據(jù)直到某種情況發(fā)生 skipWhile 和 skipUntil
基于時(shí)間的數(shù)據(jù)流量篩選 throttleTime、debounceTime 和 auditTime
基于數(shù)據(jù)內(nèi)容的數(shù)據(jù)流量篩選 throttle、debounce 和 audit
基于采樣方式的數(shù)據(jù)流量篩選 sample 和 sampleTime
刪除重復(fù)的數(shù)據(jù) distinct
刪除重復(fù)的連續(xù)數(shù)據(jù) distinctUntil 和 distinctUntilKeyChange
忽略數(shù)據(jù)流中的所有數(shù)據(jù) ignoreElement
只選取指定出現(xiàn)位置的數(shù)據(jù) elementAt
判斷是否只有一個(gè)數(shù)據(jù)滿足判定條件 single

takeUntil讓我們可以?Observable對象作為notifier來控制另?個(gè)Observable對象的數(shù)據(jù)產(chǎn)?,使用起來非常靈活
有損回壓控制:throttle、debounce、audit、sample、throttleTime、debounceTime、auditTime、sampleTime
對比:http://www.itdecent.cn/p/a176d28c9eb5

例3:防抖和節(jié)流

debounce,去抖動(dòng)。策略是當(dāng)事件被觸發(fā)時(shí),設(shè)定一個(gè)周期延遲執(zhí)行動(dòng)作,若期間又被觸發(fā),則重新設(shè)定周期,直到周期結(jié)束,執(zhí)行動(dòng)作。 這是debounce的基本思想,在后期又?jǐn)U展了前緣debounce,即執(zhí)行動(dòng)作在前,然后設(shè)定周期,周期內(nèi)有事件被觸發(fā),不執(zhí)行動(dòng)作,且周期重新設(shè)定。

// 暴力版
var debounce = (fn, wait) => {
    let timer, timeStamp=0;
    let context, args;
 
    let run = ()=>{
        timer= setTimeout(()=>{
            fn.apply(context,args);
        },wait);
    }
    let clean = () => {
        clearTimeout(timer);
    }
 
    return function(){
        context=this;
        args = arguments;
        let now = (new Date()).getTime();
 
        if(now - timeStamp < wait){
            console.log('reset',now);
            clean();  // clear running timer 
            run();    // reset new timer from current time
        } else{
            console.log('set',now);
            run();    // last timer alreay executed, set a new timer
        }
        timeStamp = now;
    }
}

// rxls
let foo$ = Rx.Observable.fromEvent(document, 'click');
foo$.debounceTime(2000).subscribe(
  console.log,
  null,
  () => console.log('complete')
);

throttling,節(jié)流的策略是,固定周期內(nèi),只執(zhí)行一次動(dòng)作,若有新事件觸發(fā),不執(zhí)行。周期結(jié)束后,又有事件觸發(fā),開始新的周期。 節(jié)流策略也分前緣和延遲兩種。與debounce類似,延遲是指 周期結(jié)束后執(zhí)行動(dòng)作,前緣是指執(zhí)行動(dòng)作后再開始周期。
throttling的特點(diǎn)在連續(xù)高頻觸發(fā)事件時(shí),動(dòng)作會(huì)被定期執(zhí)行,響應(yīng)平滑。

// 簡單版: 定時(shí)器期間,只執(zhí)行最后一次操作
var throttling = (fn, wait) => {
    let timer;
    let context, args;
 
    let run = () => {
        timer=setTimeout(()=>{
            fn.apply(context,args);
            clearTimeout(timer);
            timer=null;
        },wait);
    }
 
    return function () {
        context=this;
        args=arguments;
        if(!timer){
            console.log("throttle, set");
            run();
        }else{
            console.log("throttle, ignore");
        }
    }
}

// rxjs
let foo$ = Rx.Observable.fromEvent(document, 'click');
foo$.throttleTime(2000).subscribe(
  console.log,
  null,
  () => console.log('complete')
);

轉(zhuǎn)化

功能需求 使用的操作符
將每個(gè)元素用映射函數(shù)產(chǎn)生新的數(shù)據(jù) map
將數(shù)據(jù)流中每個(gè)元素映射為同一數(shù)據(jù) mapTo
提取數(shù)據(jù)流中每個(gè)數(shù)據(jù)的某個(gè)字段 pluck
產(chǎn)生高階 Observable 對象 windowTime、windowCount、windowToggle 和window
產(chǎn)生數(shù)組構(gòu)成的數(shù)據(jù)流 bufferTime、BufferCount、bufferWhen、bufferToggle 和 buffer
映射產(chǎn)生高階 Observable 對象然后合并 concatMap、mergeMap(flatMap)、switchMap、exhaustMap
產(chǎn)生規(guī)約運(yùn)算結(jié)果組成的數(shù)據(jù)流 scan 和 mergeScan

scan可能是RxJS中對構(gòu)建交互式應(yīng)?程序最重要的?個(gè)操作符,因?yàn)樗軌蚓S持應(yīng)?的當(dāng)前狀態(tài),???可以根據(jù)數(shù)據(jù)流持續(xù)更新這些狀態(tài),另???可以持續(xù)把更新的狀態(tài)傳給另?個(gè)數(shù)據(jù)流?來做必要處理。
定義:public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable對源 Observable 使用累加器函數(shù), 返回生成的中間值, 可選的初始值index 是賦給 acc 的初始值

let foo$ = Rx.Observable.interval(1000);
// acc 為上次返回值
// cur 更新的值,此處由foo$提供
foo$.scan((acc, cur) => {
    return cur
}, 0).subscribe((data)=>console.log(data));

acc 是上一個(gè) scan 的返回值
subscript data 顯示的是當(dāng)前值

scan 和 reduce 的區(qū)別

reduce需要數(shù)據(jù)結(jié)束才能輸出結(jié)果
scan可以輸出中間狀態(tài)

無損回壓控制

數(shù)據(jù)組合成數(shù)組:bufferTime、bufferCount、bufferWhen、bufferToggle、buffer
數(shù)據(jù)組合成Observable:windowTime、windowCount、windowToggle 和window

bufferCount
支持兩個(gè)參數(shù) bufferSize 和 startBufferEvery
bufferSize 表示 緩存區(qū)長度,緩存區(qū)長度達(dá)到bufferSize的時(shí)候傳新的數(shù)據(jù)給下游
startBufferEvery 可選,表示 新的緩存區(qū)長度,即新數(shù)據(jù)個(gè)數(shù),從上次bufferCount觸發(fā)以后,上游每發(fā)出startBufferEvery個(gè)數(shù)據(jù)后向下游傳出數(shù)據(jù),數(shù)組中舊數(shù)據(jù)個(gè)數(shù)為bufferSize- startBufferEvery
如果不填startBufferEvery,則默認(rèn)值為 bufferSize,都是新數(shù)據(jù)
如果startBufferEvery大于bufferSize,則會(huì)丟失startBufferEvery-bufferSize個(gè)數(shù)據(jù)

例4: 判斷連續(xù)輸入是否正確

召喚隱藏英雄

const code = [
  "ArrowUp",
  "ArrowUp",
  "ArrowDown",
  "ArrowDown",
  "ArrowLeft",
  "ArrowRight",
  "ArrowLeft",
  "ArrowRight",
  "KeyB",
  "KeyA",
  "KeyB",
  "KeyA"
]

Rx.Observable.fromEvent(document, 'keyup')
  .map(e => e.code)
  .bufferCount(12, 1)
  .subscribe(last12key => {
    if (_.isEqual(last12key, code)) {
      console.log('隱藏的彩蛋 \(^o^)/~')
    }
  });

bufferToggle
利?Observable來控制緩沖窗口的開和關(guān)
有兩個(gè)參數(shù)openings 和 closingSelector,openings 是一個(gè)Observable,控制每個(gè)緩沖窗口的開始時(shí)間,closingSelector是一個(gè)返回Observable的函數(shù)(這樣能夠靈活控制取值范圍),控制每個(gè)緩沖窗口的結(jié)束時(shí)間(相對于開始時(shí)間而言)

clipboard.png
clipboard.png

對例4進(jìn)行優(yōu)化:限定3s時(shí)間內(nèi)連續(xù)輸入正確

 const code = [
   "ArrowUp",
   "ArrowUp",
   "ArrowDown",
   "ArrowDown",
   "ArrowLeft",
   "ArrowRight",
   "ArrowLeft",
   "ArrowRight",
   "KeyB",
   "KeyA",
   "KeyB",
   "KeyA"
 ]

Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code)
.bufferToggle(Rx.Observable.timer(0, 3000), i=>Rx.Observable.interval(3000))
.subscribe(last12key => {
console.log(last12key);
if (_.isEqual(last12key, code)) {
console.log('隱藏的彩蛋 \(^o^)/~')
}
});

高階Observable

QQ圖片20190417115019.png
QQ圖片20190417115019.png

高階Observable和一階Observable的關(guān)系
正如二維數(shù)組和一維數(shù)組的關(guān)系

相關(guān)的操作符

打平:concatAll、mergeAll、zipAll、combineAll、forkJoin、switch、exhaust
組合:windowTime、windowCount、windowToggle 和window、groupBy分組
(前四個(gè)是按順序分組,最后一個(gè)打亂了順序)
轉(zhuǎn)化:concatMap、mergeMap(flatMap)、switchMap、exhaustMap、mergeScan
cancatMap=一對多的map+concatAll
映射:concatMapTo、mergeMapTo、switchMapTo
生成高階函數(shù)

const ho$ = Rx.Observable.interval(1000)
  .take(2)
  .concat(Rx.Observable.never())  // 添加了一個(gè)never數(shù)據(jù)流
  .map(x => Rx.Observable.interval(1500).map(y => x+':'+y).take(3));
ho$.subscribe(
  console.log,
  null,
  () => console.log('complete')
);

// 打平
ho$.zipAll().subscribe(
  console.log,
  null,
  () => console.log('complete')
);

例4:拖拽

const box = document.querySelector('.box');
const mousedown$ = Rx.Observable.fromEvent(box, 'mousedown');
const mousemove$ = Rx.Observable.fromEvent(box, 'mousemove');
const mouseup$ = Rx.Observable.fromEvent(box, 'mouseup');
const mouseout$ = Rx.Observable.fromEvent(box, 'mouseout$');

mousedown$.mergeMap((md) => {
  const stop$ = mouseup$.merge(mouseout$);
  return mousemove$.takeUntil(stop$).map((mm) =>{
    return {
      target: md.target,
      x: mm.clientX - md.offsetX,
      y: mm.clientY - md.offsetY
    }
  });
}).subscribe((obj) => {
  console.log(obj);
  obj.target.style.top = obj.y + 'px';
  obj.target.style.left = obj.x + 'px';
});

綜上:在RxJS中,創(chuàng)建類操作符是數(shù)據(jù)流的源頭,其余所有操作符最重要的三類就是合并類、過濾類和轉(zhuǎn)化類。不夸張地說,使?RxJS解決問題絕大部分時(shí)間就是在使?這三種操作符

多播

Observable和Observer的關(guān)系,就是前者在播放內(nèi)容,后者在收聽內(nèi)容。播放內(nèi)容的?式分為三種:

  • 單播(unicast):微信發(fā)給朋友,只有一個(gè)接收者
  • ?播(broadcast):朋友圈廣告,所有人都能看得見
  • 多播(multicast):群聊天,發(fā)給一群人,只有選中的朋友才能看見
clipboard.png
clipboard.png

前面的例??都是單播
RxJS是?持?個(gè)Observable被多次subscribe的,所以,RxJS?持多播,但是,表?上看到的是多播,實(shí)質(zhì)上還是單播

const tick$ = Rx.Observable.interval(1000).take(3);

tick$.subscribe(value => console.log('observer 1: ' + value));

setTimeout(() => {
    tick$.subscribe(value => console.log('observer 2: ' + value));
}, 2000);

第?個(gè)Observer依然接收到了0、1、2總共三個(gè)數(shù)據(jù)。為什么會(huì)是這樣的結(jié)果?因?yàn)閕nterval這個(gè)操作符產(chǎn)?的是?個(gè)Cold Observable對象。
Cold Observable,就是每次被subscribe都產(chǎn)??個(gè)全新的數(shù)據(jù)序列的數(shù)據(jù)流,例如對interval產(chǎn)?的Observable對象每subscribe?次,都會(huì)產(chǎn)??個(gè)全新的遞增整數(shù)序列,從0開始產(chǎn)?Hot Observable:fromPromise、fromEvent、fromEventPattern就是異步的創(chuàng)建操作符真正的多播,必定是?論有多少Observer來subscribe,推給Observer的都是?樣的數(shù)據(jù)源把Cold Observable變成Hot Observable,用的是Subject

Subject

clipboard.png
clipboard.png
var interval$ = Rx.Observable.interval(500);

const subject$ = new Rx.Subject();
    interval$.subscribe(subject$);
    subject$.map(val=>'a'+val).subscribe(x => console.log(x));
    setTimeout(() => {
      subject$.map(val=>'b'+val).subscribe(x => console.log(x));
    }, 1500);

Subject不能重復(fù)使?
Subject可以有多個(gè)上游

例5:scan管理react狀態(tài)

class Counter extends React.Component {
  state = {count: 0}
  onIncrement() {
    this.setState({count: this.state.count + 1});
  }
  onDecrement() {
    this.setState({count: this.state.count - 1});
  }
  render() {
    return (
      <CounterView
        count={this.state.count}
        onIncrement={this.onIncrement.bind(this)}
        onDecrement={this.onDecrement.bind(this)}
      />
    );
  }
}
export default Counter;

// subject作為橋梁進(jìn)行狀態(tài)維護(hù)
class RxCounter extends React.Component {
  constructor() {
    super(...arguments);
    this.state = {count: 0};
    this.counter = new Subject();
    const observer = value => this.setState({count: value});
    this.counter.scan((result, inc) => result + inc, 0)
    .subscribe(observer);
  }
  render() {
    return <CounterView
      count={this.state.count}
      onIncrement={()=> this.counter.next(1)}
      onDecrement={()=> this.counter.next(-1)}
    />
  }
}
export default RxCounter;

例6:買房放租

const house$ = new Rx.Subject();
const housecount$ = house$.scan((has, one) => has = has+one, 0).startWith(0);

const month$ = Rx.Observable.interval(1000);
const salary$ = month$.mapTo(1);
const rent$ = month$.withLatestFrom(housecount$).map(arr=>arr[1]*0.5);

// 月收入累加
const income$ = salary$.merge(rent$);

const cash$ = income$.scan((has, one)=>{
  has = has + one;
  if (has >= 100) {
    has -= 100;
    console.log('買房啦');
    house$.next(1);
  }
  return has;
}, 0)

cash$.subscribe(
  (data)=>{
    console.log('進(jìn)賬,余額:',data)
  },
  null,
  ()=>{
    console.log('complete');
  }
)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容