學(xué)習(xí)資料
https://cn.rx.js.org/
https://github.com/RxJS-CN/rxjs-articles-translation
http://www.itdecent.cn/p/eaf28d5ce6c0
概念
Observable 和 Observer
Observable 可觀察者對象,負(fù)責(zé)發(fā)出數(shù)據(jù);Observer 觀察者,負(fù)責(zé)接收數(shù)據(jù)
怎么這么抽象?試著把Observable對象理解為一種數(shù)據(jù)格式,類似于數(shù)組、鏈表等等
Observable實(shí)現(xiàn)了下?兩種設(shè)計(jì)模式:
- 觀察者模式(Observer Pattern)
- 迭代器模式(Iterator Pattern)
觀察者模式
定義:觀察者模式定義了對象之間的一對多依賴,這樣一來,當(dāng)一個(gè)對象改變狀態(tài)時(shí),它的所有依賴著都會(huì)收到通知并自動(dòng)更新
觀察者模式對“治”這個(gè)問題提的解決?法是這樣,將邏輯分為發(fā)布者(Publisher)和觀察者(Observer),其中發(fā)布者只管負(fù)責(zé)產(chǎn)?事件,它會(huì)通知所有注冊掛上號(hào)的觀察者,?不關(guān)?這些觀察者如何處理這些事件,相對的,觀察者可以被注冊上某個(gè)發(fā)布者,只管接收到事件之后就處理,?不關(guān)?這些數(shù)據(jù)是如何產(chǎn)?的。

// Subject
function Observable() {
this.observers = [];
this.data = 'data update';
// 用于關(guān)聯(lián)observer
this.addObserve = function(observe) {
this.observers.push(observe)
};
// 用于取消關(guān)聯(lián)observer
this.removeObserve = function(observe) {
const index = this.observers.findIndex(item => item == observe);
this.observers.splice(index,1);
}
this.notifyObservers = function () {
for (let i=0; i<this.observers.length; i++) {
this.observers[i].update(this.data);
}
}
// 用于發(fā)送更新數(shù)據(jù)到observer
this.updateData = function(data) {
this.data = data;
this.notifyObservers();
}
}
// 所有 observer 都必須實(shí)現(xiàn)update方法
function Observer() {
this.update = function (data) {
console.log(data);
}
}
const observable = new Observable();
const observer1 = new Observer();
const observer2 = new Observer();
// 建立關(guān)聯(lián)
observable.addObserve(observer1);
observable.addObserve(observer2);
observable.notifyObservers();
observable.updateData(123);
迭代器模式
定義:提供一種方法順序訪問一個(gè)聚合對象中的各個(gè)元素,而又不暴露其內(nèi)部的表示
數(shù)據(jù)集合的實(shí)現(xiàn)?式很多,可以是?個(gè)數(shù)組,也可以是?個(gè)樹形結(jié)構(gòu),也可以是?個(gè)單向鏈表……迭代器的作?就是提供?個(gè)通?的接?,讓使?者完全不?關(guān)?這個(gè)數(shù)據(jù)集合的具體實(shí)現(xiàn)?式
function Iterator(list) {
this.list = list;
this.porsition = 0;
this.hasNext = function() {
if (this.porsition > this.list.length || this.list[porsition] === null) return false;
return true;
}
this.next = function() {
const item = this.list[this.porsition];
this.porsition++;
return item;
}
this.isDone = function() {
return this.porsition >= this.list.length;
}
}
在使?RxJS的過程中絕對看不到類似這樣的代碼,實(shí)際上,你都看不到上?所說的三個(gè)函數(shù),因?yàn)椋?所說的是“拉”式的迭代器實(shí)現(xiàn),?RxJS實(shí)現(xiàn)的是“推”式的迭代器實(shí)現(xiàn)
Subscription
訂閱,表示建立關(guān)聯(lián)關(guān)系
理解Observable、Observe 和 Subscribtions 之間的關(guān)系
Observable 是信號(hào)源、生產(chǎn)者,Observer 觀察者、消費(fèi)者
Observable 和 Observer 之間的關(guān)系:
如果比作報(bào)社和讀者,報(bào)社是 Observable,是數(shù)據(jù)源,提供報(bào)紙,讀者是 Observer,負(fù)責(zé)消費(fèi)(處理)數(shù)據(jù),閱讀報(bào)紙,讀者向報(bào)社訂閱(subscribe)報(bào)紙后,報(bào)社將讀者列入他們派送名單,定期派送報(bào)紙,報(bào)紙是數(shù)據(jù)。
可以比作前端(Observer)通過 登陸 owncloud 賬號(hào)(subscribe)時(shí)時(shí)獲取 UI(Observable)更新的 UI稿(數(shù)據(jù))
也可以是愛奇藝會(huì)員(Observer)點(diǎn)擊播放愛奇藝視頻(subscribe)觀看愛奇藝網(wǎng)站(Observable)提供的視頻(數(shù)據(jù))
Observable、Observe 和 Subscribtions核心就是解決分工與數(shù)據(jù)傳遞。
subscribe 擴(kuò)展知識(shí)
可以通過add、remove 操作子subscription
父subscription取消訂閱,子subscription也會(huì)一起取消訂閱
var observ1 = Rx.Observable.interval(500)
var observ2 = Rx.Observable.interval(800)
var observ3 = Rx.Observable.interval(800)
var subsc1 = observ1.subscribe(x => console.log('first: ' + x))
var subsc2 = observ2.subscribe(x => console.log('second: ' + x))
var subsc3 = observ3.subscribe(x => console.log('three: ' + x))
subsc1.add(subsc2)
subsc1.add(subsc3)
setTimeout(() => {
subsc1.remove(subsc2)
subsc1.unsubscribe()
}, 1100)
// second: n將會(huì)一直執(zhí)行下去,我們添加了 subsc1.add(subsc2) , 在1.1S后移除了它,所以在 unsubscribe() 時(shí),我們并沒有清除掉它
unsubscribe:釋放資源和取消Observable執(zhí)行的功能
Operators
操作符,類似于管道,對數(shù)據(jù)源發(fā)出的數(shù)據(jù)進(jìn)行過濾或其他處理,使數(shù)據(jù)源發(fā)出的數(shù)據(jù)更加滿足Observe 的需求
Subject
有一些場景,需要將Cold Observable 轉(zhuǎn)成 Hot Observable,在這個(gè)場景下需要?個(gè)“中間?”做串接的事情,這個(gè)中間?有兩個(gè)職責(zé):
- 中間?要提供subscribe?法,讓其他?能夠訂閱??的數(shù)據(jù)源。
- 中間?要能夠有辦法接受推送的數(shù)據(jù),包括Cold Observable推送的數(shù)據(jù)。
上?所說的第?個(gè)職責(zé),相當(dāng)于?個(gè)Observable,第?個(gè)?作,相當(dāng)于?個(gè)Observer。在RxJS中,提供了名為Subject的類型,?個(gè)Subject既有Observable的接口,也具有Observer的接口,?個(gè)Subject就具備上述的兩個(gè)職責(zé)。
Cold Observable 和 Hot Observable的區(qū)別
好比視頻和電視頻道的區(qū)別,視頻沒有時(shí)間限制,任何時(shí)候想看都可以看,電視有時(shí)間限制
視頻是 Cold Observable,電視是 Hot Observable
Cold Observable:你見或者不見,我一直在,只要你愿意去取的話
Hot Observable:過了這個(gè)村,沒有這個(gè)店
var interval$ = Rx.Observable.interval(500);
// Cold Observable
interval$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(()=>{
interval$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 2000);
// Hot Observable
const subject$ = new Rx.Subject();
interval$.subscribe(subject$);
subject$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(() => {
subject$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 1500);
回過頭來講subject,subject相當(dāng)于一個(gè)轉(zhuǎn)換器,它將 Cold Observable 轉(zhuǎn)化成 Hot Observable。這就要求subject同時(shí)是observe又是 observable。subject 既能訂閱數(shù)據(jù)源,同時(shí)本身又是數(shù)據(jù)源,能發(fā)出數(shù)據(jù)。
舉個(gè)不十分恰當(dāng)?shù)睦?,比如手機(jī),它可以接收基站信號(hào)(Observe),同時(shí)也可以發(fā)出信號(hào)(Observable)
Schedulers
調(diào)度器
Scheduer是?種數(shù)據(jù)結(jié)構(gòu),可以根據(jù)優(yōu)先級(jí)或者其他某種條件來安排任務(wù)執(zhí)?隊(duì)列
http://www.itdecent.cn/p/5624c8a6bd2b
| 類型 | 執(zhí)行類型 | 內(nèi)部調(diào)用 |
|---|---|---|
| queue | Sync同步的方式 | scheduler.schedule(task, delay) scheduler.flush() |
| asap | Async(異步微任務(wù)) | Promise.resolve().then(() => task) |
| async | Async(異步宏任務(wù)) | id = setInterval(task, delay) clearInterval(id) |
| animationFrame | Async | id = requestAnimationFrame(task) cancelAnimationFrame(id) |
為什么要學(xué)
我們學(xué)習(xí)RxJS,并不是因?yàn)镽xJS是?項(xiàng)炫酷的技術(shù),也不是因?yàn)镽xJS是?個(gè)最新的技術(shù),是因?yàn)镽xJS的的確確能夠幫助我們解決問題,?且這些問題長期以來?直在困擾我們,沒有好的解決辦法,這些問題包括:
- 如何控制?量代碼的復(fù)雜度;
- 如何保持代碼可讀;
- 如何處理異步操作。
Rxjs 引用了兩個(gè)重要的編程思想,讓代碼更加清爽,更加容易維護(hù):
函數(shù)式
響應(yīng)式
函數(shù)式
- 聲明式
聲明式區(qū)別于命令式,命令式強(qiáng)調(diào)的是告訴機(jī)器怎么去做(how),一步步的告訴計(jì)算機(jī)如何完成一項(xiàng)工作
聲明式強(qiáng)調(diào)的是告訴機(jī)器你想要什么(what),不關(guān)注內(nèi)部實(shí)現(xiàn),聲明式把通用的共性抽離出來,避免重復(fù)代碼
應(yīng)用聲明式的困難點(diǎn):歸納和提取完備的what,是件很困難、很技術(shù)化的工作,令人望而卻步聲明式能夠應(yīng)用在特定領(lǐng)域如SQL中,是工具的編寫者,已經(jīng)歸納和提取what,替你完成了
// 命令式
// how:寫for循環(huán)一一處理
function double(arr) {
const results = []
for (let i = 0; i < arr.length; i++){
results.push(arr[i] * 2)
}
return results
}
function addOne(arr) {
const results = []
for (let i = 0; i < arr.length; i++){
results.push(arr[i] + 1)
}
return results
}
// 聲明式
// what:通過map把每一項(xiàng)加倍或+1,不關(guān)注內(nèi)部實(shí)現(xiàn)
function double(arr) {
return arr.map(function(item) {return item * 2});
} function addOne(arr) {
return arr.map(function(item) {return item + 1});
}
- 純函數(shù):
函數(shù)的執(zhí)?過程完全由輸?參數(shù)決定,不會(huì)受除參數(shù)之外的任何數(shù)據(jù)影響,只要入?yún)⒉蛔?,返回的參?shù)也不會(huì)變
函數(shù)不會(huì)修改任何外部狀態(tài),?如修改全局變量或傳?的參數(shù)對象
純函數(shù)沒有副作用,是穩(wěn)定的,可以和其他純函數(shù)像搭積木一樣一起組合使用,獲得更強(qiáng)的處理能力
- 數(shù)據(jù)不可變性
有數(shù)據(jù),替換?法是通過產(chǎn)?新的數(shù)據(jù),來實(shí)現(xiàn)這種"變化",也就是說,當(dāng)我們需要數(shù)據(jù)狀態(tài)發(fā)?改變時(shí),保持原有數(shù)據(jù)不變,產(chǎn)??個(gè)新的數(shù)據(jù)來體現(xiàn)這種變化
JavaScript中數(shù)組的push、pop、sort函數(shù)都會(huì)改變?個(gè)數(shù)組的內(nèi)容,由此引發(fā)的bug可不少。這些不純的函數(shù)導(dǎo)致JavaScript天?不是?個(gè)純粹意義上的函數(shù)式編程語?
響應(yīng)式
EXCEL 中的公式就是典型的響應(yīng)式,數(shù)據(jù)改變了公式計(jì)算結(jié)果也會(huì)跟著變
類似于MVVM中的M->V
體驗(yàn)兩個(gè)小例子:
測試鼠標(biāo)按住時(shí)間
const buttonDom = document.querySelector('#button');
const mouseDown$ = Rx.Observable.fromEvent(buttonDom, 'mousedown');
const mouseUp$ = Rx.Observable.fromEvent(buttonDom, 'mouseup');
const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(), (mouseUpEvent, mouseDownEvent)=>{
return mouseUpEvent.timestamp - mouseDownEvent.timestamp;
});
holdTime$.subscribe((ms)=>{
document.querySelector('#holdTime').innerText = ms;
});
takeUntil,統(tǒng)計(jì)5秒內(nèi)用戶點(diǎn)擊數(shù)
const click$ = Rx.Observable.fromEvent(document, 'click');
click$.bufferWhen(()=>Rx.Observable.interval(5000)).subscribe(arr=>console.log(arr.length))
彈珠圖
彈珠圖可以用來表示數(shù)據(jù)流,例如:
--a---b-c---d---X---|->
a, b, c, d 表示發(fā)出的數(shù)據(jù)
X 表示錯(cuò)誤
|表示 '結(jié)束' 信號(hào)
---> 是時(shí)間軸
彈珠圖在線演示:https://rxviz.com/
操作符
創(chuàng)建
| 功能需求 | 適用的操作符 |
|---|---|
| 直接操作觀察者 | create |
| 根據(jù)有限的數(shù)據(jù)產(chǎn)生同步數(shù)據(jù)流 | of |
| 產(chǎn)生一個(gè)數(shù)值范圍內(nèi)的數(shù)據(jù) | range |
| 以循環(huán)方式產(chǎn)生數(shù)據(jù) | generate |
| 重復(fù)產(chǎn)生數(shù)據(jù)流中的數(shù)據(jù) | repeat 和 repeatWhen |
| 產(chǎn)生空數(shù)據(jù)流 | empty |
| 產(chǎn)生直接出錯(cuò)的數(shù)據(jù)流 | throw |
| 產(chǎn)生永遠(yuǎn)不完結(jié)的數(shù)據(jù)流 | never |
| 間隔給定時(shí)間持續(xù)產(chǎn)生數(shù)據(jù) | interval 和 timer |
| 從數(shù)組等枚舉類型數(shù)據(jù)產(chǎn)生數(shù)據(jù)流 | from |
| 從Promise 對象產(chǎn)生數(shù)據(jù)流 | fromPromise |
| 從外部事件對象產(chǎn)生數(shù)據(jù)流 | fromEvent 和 fromEventPattern |
| 從Ajax 請求結(jié)果產(chǎn)生數(shù)據(jù)流 | ajax |
| 延遲產(chǎn)生數(shù)據(jù)流 | defer |
from 和 toArray
from:數(shù)組轉(zhuǎn) Observable
toArray:Observable 轉(zhuǎn)數(shù)組
fromPromise 和 toPromise
fromPromise:promise轉(zhuǎn) Observable
toPromise:Observable 轉(zhuǎn)promise
合并
| 功能需求 | 使用的操作符 |
|---|---|
| 把多個(gè)數(shù)據(jù)流以首位相連的方式合并 | concat 和 concatAll |
| 把多個(gè)數(shù)據(jù)流中數(shù)據(jù)以先到先得方式合并 | merge 和 mergeAll |
| 把多個(gè)數(shù)據(jù)流中的數(shù)據(jù)以一一對應(yīng)的方式合并 | zip 和 zipAll |
| 持續(xù)合并多個(gè)數(shù)據(jù)流中最新產(chǎn)生的數(shù)據(jù) | combineLatest、combineAll 和 withLatestFrom |
| 從多個(gè)數(shù)據(jù)流中選取第一個(gè)產(chǎn)生內(nèi)容的數(shù)據(jù)流 | race |
| 在數(shù)據(jù)流前面添加一個(gè)指定數(shù)據(jù) | startWith |
| 只獲取多個(gè)數(shù)據(jù)流最后產(chǎn)生的那個(gè)數(shù)據(jù) | forkJoin |
| 從高階數(shù)據(jù)流中切換數(shù)據(jù)源 | switch 和 exhaust |
對of產(chǎn)生的數(shù)據(jù)進(jìn)行concat和merge操作哦產(chǎn)生的不同結(jié)果
例1:merge:事件的合并處理
startWith和concat的關(guān)聯(lián)關(guān)系
zip
拉鏈,一對一咬合


例2:zip應(yīng)用
- 讓of產(chǎn)生的數(shù)據(jù)流交叉輸出
- 實(shí)現(xiàn)異步隊(duì)列
forkJoin
forkJoin就是RxJS界的Promise.all,Promise.all等待所有輸?的Promise對象成功之后把結(jié)果合并,forkJoin等待所有輸?的Observable對象完結(jié)之后把最后?個(gè)數(shù)據(jù)合并
const testList = [
this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:1},params)),
this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:2},params)),
];
Observable.create(observer => {
forkJoin(testList).subscribe((data)=>{
observer.next(data);
});
})subscribe((res)=>{
if(res.some(data=>data==false)) return;
// 1-入門測成績, 2-出門測成績
this.scoreTestType = {
1: { scoreTestDetail: res[0] },
2: { scoreTestDetail: res[1] }
};
})
輔助
| 功能需求 | 使用的操作符 |
|---|---|
| 統(tǒng)計(jì)數(shù)據(jù)流中產(chǎn)生的所有數(shù)據(jù)個(gè)數(shù) | count |
| 獲得數(shù)據(jù)流中最大或最小的數(shù)據(jù) | max 和 min |
| 對數(shù)據(jù)流中所有數(shù)據(jù)進(jìn)行規(guī)約操作 | reduce |
| 判斷是否所有數(shù)據(jù)滿足某個(gè)判定條件 | every |
| 找到第一個(gè)滿足判定條件的數(shù)據(jù) | find 和 findIndex |
| 判斷一個(gè)數(shù)據(jù)流是否不包含任何數(shù)據(jù) | isEmpty |
| 如果一個(gè)數(shù)據(jù)流為空就默認(rèn)產(chǎn)生一個(gè)指定數(shù)據(jù) | defaultEmpty |
數(shù)學(xué)類操作符有四個(gè):count、max、min、reduce
遍歷上游Observable對象中吐出的所有數(shù)據(jù)才給下游傳遞數(shù)據(jù)、只有在上游完結(jié)的時(shí)候,才給下游傳遞唯?數(shù)據(jù)
過濾
| 功能需求 | 使用的操作符 |
|---|---|
| 過濾掉不滿足判定條件的數(shù)據(jù) | filter |
| 獲得滿足判定條件的第一個(gè)數(shù)據(jù) | first |
| 獲得滿足判定條件的最后一個(gè)數(shù)據(jù) | last |
| 從數(shù)據(jù)流中選取最先出現(xiàn)的若干數(shù)據(jù) | take |
| 從數(shù)據(jù)流中選取最后出現(xiàn)的若干數(shù)據(jù) | takeLast |
| 從數(shù)據(jù)流中選取數(shù)據(jù)直到某種情況發(fā)生 | takeWhile 和 takeUntil |
| 從數(shù)據(jù)流中忽略最先出現(xiàn)的若干數(shù)據(jù) | skip |
| 從數(shù)據(jù)流中忽略數(shù)據(jù)直到某種情況發(fā)生 | skipWhile 和 skipUntil |
| 基于時(shí)間的數(shù)據(jù)流量篩選 | throttleTime、debounceTime 和 auditTime |
| 基于數(shù)據(jù)內(nèi)容的數(shù)據(jù)流量篩選 | throttle、debounce 和 audit |
| 基于采樣方式的數(shù)據(jù)流量篩選 | sample 和 sampleTime |
| 刪除重復(fù)的數(shù)據(jù) | distinct |
| 刪除重復(fù)的連續(xù)數(shù)據(jù) | distinctUntil 和 distinctUntilKeyChange |
| 忽略數(shù)據(jù)流中的所有數(shù)據(jù) | ignoreElement |
| 只選取指定出現(xiàn)位置的數(shù)據(jù) | elementAt |
| 判斷是否只有一個(gè)數(shù)據(jù)滿足判定條件 | single |
takeUntil讓我們可以?Observable對象作為notifier來控制另?個(gè)Observable對象的數(shù)據(jù)產(chǎn)?,使用起來非常靈活
有損回壓控制:throttle、debounce、audit、sample、throttleTime、debounceTime、auditTime、sampleTime
對比:http://www.itdecent.cn/p/a176d28c9eb5
例3:防抖和節(jié)流
debounce,去抖動(dòng)。策略是當(dāng)事件被觸發(fā)時(shí),設(shè)定一個(gè)周期延遲執(zhí)行動(dòng)作,若期間又被觸發(fā),則重新設(shè)定周期,直到周期結(jié)束,執(zhí)行動(dòng)作。 這是debounce的基本思想,在后期又?jǐn)U展了前緣debounce,即執(zhí)行動(dòng)作在前,然后設(shè)定周期,周期內(nèi)有事件被觸發(fā),不執(zhí)行動(dòng)作,且周期重新設(shè)定。
// 暴力版
var debounce = (fn, wait) => {
let timer, timeStamp=0;
let context, args;
let run = ()=>{
timer= setTimeout(()=>{
fn.apply(context,args);
},wait);
}
let clean = () => {
clearTimeout(timer);
}
return function(){
context=this;
args = arguments;
let now = (new Date()).getTime();
if(now - timeStamp < wait){
console.log('reset',now);
clean(); // clear running timer
run(); // reset new timer from current time
} else{
console.log('set',now);
run(); // last timer alreay executed, set a new timer
}
timeStamp = now;
}
}
// rxls
let foo$ = Rx.Observable.fromEvent(document, 'click');
foo$.debounceTime(2000).subscribe(
console.log,
null,
() => console.log('complete')
);
throttling,節(jié)流的策略是,固定周期內(nèi),只執(zhí)行一次動(dòng)作,若有新事件觸發(fā),不執(zhí)行。周期結(jié)束后,又有事件觸發(fā),開始新的周期。 節(jié)流策略也分前緣和延遲兩種。與debounce類似,延遲是指 周期結(jié)束后執(zhí)行動(dòng)作,前緣是指執(zhí)行動(dòng)作后再開始周期。
throttling的特點(diǎn)在連續(xù)高頻觸發(fā)事件時(shí),動(dòng)作會(huì)被定期執(zhí)行,響應(yīng)平滑。
// 簡單版: 定時(shí)器期間,只執(zhí)行最后一次操作
var throttling = (fn, wait) => {
let timer;
let context, args;
let run = () => {
timer=setTimeout(()=>{
fn.apply(context,args);
clearTimeout(timer);
timer=null;
},wait);
}
return function () {
context=this;
args=arguments;
if(!timer){
console.log("throttle, set");
run();
}else{
console.log("throttle, ignore");
}
}
}
// rxjs
let foo$ = Rx.Observable.fromEvent(document, 'click');
foo$.throttleTime(2000).subscribe(
console.log,
null,
() => console.log('complete')
);
轉(zhuǎn)化
| 功能需求 | 使用的操作符 |
|---|---|
| 將每個(gè)元素用映射函數(shù)產(chǎn)生新的數(shù)據(jù) | map |
| 將數(shù)據(jù)流中每個(gè)元素映射為同一數(shù)據(jù) | mapTo |
| 提取數(shù)據(jù)流中每個(gè)數(shù)據(jù)的某個(gè)字段 | pluck |
| 產(chǎn)生高階 Observable 對象 | windowTime、windowCount、windowToggle 和window |
| 產(chǎn)生數(shù)組構(gòu)成的數(shù)據(jù)流 | bufferTime、BufferCount、bufferWhen、bufferToggle 和 buffer |
| 映射產(chǎn)生高階 Observable 對象然后合并 | concatMap、mergeMap(flatMap)、switchMap、exhaustMap |
| 產(chǎn)生規(guī)約運(yùn)算結(jié)果組成的數(shù)據(jù)流 | scan 和 mergeScan |
scan可能是RxJS中對構(gòu)建交互式應(yīng)?程序最重要的?個(gè)操作符,因?yàn)樗軌蚓S持應(yīng)?的當(dāng)前狀態(tài),???可以根據(jù)數(shù)據(jù)流持續(xù)更新這些狀態(tài),另???可以持續(xù)把更新的狀態(tài)傳給另?個(gè)數(shù)據(jù)流?來做必要處理。
定義:public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable對源 Observable 使用累加器函數(shù), 返回生成的中間值, 可選的初始值index 是賦給 acc 的初始值
let foo$ = Rx.Observable.interval(1000);
// acc 為上次返回值
// cur 更新的值,此處由foo$提供
foo$.scan((acc, cur) => {
return cur
}, 0).subscribe((data)=>console.log(data));
acc 是上一個(gè) scan 的返回值
subscript data 顯示的是當(dāng)前值
scan 和 reduce 的區(qū)別
reduce需要數(shù)據(jù)結(jié)束才能輸出結(jié)果
scan可以輸出中間狀態(tài)
無損回壓控制
數(shù)據(jù)組合成數(shù)組:bufferTime、bufferCount、bufferWhen、bufferToggle、buffer
數(shù)據(jù)組合成Observable:windowTime、windowCount、windowToggle 和window
bufferCount
支持兩個(gè)參數(shù) bufferSize 和 startBufferEvery
bufferSize 表示 緩存區(qū)長度,緩存區(qū)長度達(dá)到bufferSize的時(shí)候傳新的數(shù)據(jù)給下游
startBufferEvery 可選,表示 新的緩存區(qū)長度,即新數(shù)據(jù)個(gè)數(shù),從上次bufferCount觸發(fā)以后,上游每發(fā)出startBufferEvery個(gè)數(shù)據(jù)后向下游傳出數(shù)據(jù),數(shù)組中舊數(shù)據(jù)個(gè)數(shù)為bufferSize- startBufferEvery
如果不填startBufferEvery,則默認(rèn)值為 bufferSize,都是新數(shù)據(jù)
如果startBufferEvery大于bufferSize,則會(huì)丟失startBufferEvery-bufferSize個(gè)數(shù)據(jù)
例4: 判斷連續(xù)輸入是否正確
召喚隱藏英雄
const code = [
"ArrowUp",
"ArrowUp",
"ArrowDown",
"ArrowDown",
"ArrowLeft",
"ArrowRight",
"ArrowLeft",
"ArrowRight",
"KeyB",
"KeyA",
"KeyB",
"KeyA"
]
Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code)
.bufferCount(12, 1)
.subscribe(last12key => {
if (_.isEqual(last12key, code)) {
console.log('隱藏的彩蛋 \(^o^)/~')
}
});
bufferToggle
利?Observable來控制緩沖窗口的開和關(guān)
有兩個(gè)參數(shù)openings 和 closingSelector,openings 是一個(gè)Observable,控制每個(gè)緩沖窗口的開始時(shí)間,closingSelector是一個(gè)返回Observable的函數(shù)(這樣能夠靈活控制取值范圍),控制每個(gè)緩沖窗口的結(jié)束時(shí)間(相對于開始時(shí)間而言)

對例4進(jìn)行優(yōu)化:限定3s時(shí)間內(nèi)連續(xù)輸入正確
const code = [
"ArrowUp",
"ArrowUp",
"ArrowDown",
"ArrowDown",
"ArrowLeft",
"ArrowRight",
"ArrowLeft",
"ArrowRight",
"KeyB",
"KeyA",
"KeyB",
"KeyA"
]
Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code)
.bufferToggle(Rx.Observable.timer(0, 3000), i=>Rx.Observable.interval(3000))
.subscribe(last12key => {
console.log(last12key);
if (_.isEqual(last12key, code)) {
console.log('隱藏的彩蛋 \(^o^)/~')
}
});
高階Observable

高階Observable和一階Observable的關(guān)系
正如二維數(shù)組和一維數(shù)組的關(guān)系
相關(guān)的操作符
打平:concatAll、mergeAll、zipAll、combineAll、forkJoin、switch、exhaust
組合:windowTime、windowCount、windowToggle 和window、groupBy分組
(前四個(gè)是按順序分組,最后一個(gè)打亂了順序)
轉(zhuǎn)化:concatMap、mergeMap(flatMap)、switchMap、exhaustMap、mergeScan
cancatMap=一對多的map+concatAll
映射:concatMapTo、mergeMapTo、switchMapTo
生成高階函數(shù)
const ho$ = Rx.Observable.interval(1000)
.take(2)
.concat(Rx.Observable.never()) // 添加了一個(gè)never數(shù)據(jù)流
.map(x => Rx.Observable.interval(1500).map(y => x+':'+y).take(3));
ho$.subscribe(
console.log,
null,
() => console.log('complete')
);
// 打平
ho$.zipAll().subscribe(
console.log,
null,
() => console.log('complete')
);
例4:拖拽
const box = document.querySelector('.box');
const mousedown$ = Rx.Observable.fromEvent(box, 'mousedown');
const mousemove$ = Rx.Observable.fromEvent(box, 'mousemove');
const mouseup$ = Rx.Observable.fromEvent(box, 'mouseup');
const mouseout$ = Rx.Observable.fromEvent(box, 'mouseout$');
mousedown$.mergeMap((md) => {
const stop$ = mouseup$.merge(mouseout$);
return mousemove$.takeUntil(stop$).map((mm) =>{
return {
target: md.target,
x: mm.clientX - md.offsetX,
y: mm.clientY - md.offsetY
}
});
}).subscribe((obj) => {
console.log(obj);
obj.target.style.top = obj.y + 'px';
obj.target.style.left = obj.x + 'px';
});
綜上:在RxJS中,創(chuàng)建類操作符是數(shù)據(jù)流的源頭,其余所有操作符最重要的三類就是合并類、過濾類和轉(zhuǎn)化類。不夸張地說,使?RxJS解決問題絕大部分時(shí)間就是在使?這三種操作符
多播
Observable和Observer的關(guān)系,就是前者在播放內(nèi)容,后者在收聽內(nèi)容。播放內(nèi)容的?式分為三種:
- 單播(unicast):微信發(fā)給朋友,只有一個(gè)接收者
- ?播(broadcast):朋友圈廣告,所有人都能看得見
- 多播(multicast):群聊天,發(fā)給一群人,只有選中的朋友才能看見

前面的例??都是單播
RxJS是?持?個(gè)Observable被多次subscribe的,所以,RxJS?持多播,但是,表?上看到的是多播,實(shí)質(zhì)上還是單播
const tick$ = Rx.Observable.interval(1000).take(3);
tick$.subscribe(value => console.log('observer 1: ' + value));
setTimeout(() => {
tick$.subscribe(value => console.log('observer 2: ' + value));
}, 2000);
第?個(gè)Observer依然接收到了0、1、2總共三個(gè)數(shù)據(jù)。為什么會(huì)是這樣的結(jié)果?因?yàn)閕nterval這個(gè)操作符產(chǎn)?的是?個(gè)Cold Observable對象。
Cold Observable,就是每次被subscribe都產(chǎn)??個(gè)全新的數(shù)據(jù)序列的數(shù)據(jù)流,例如對interval產(chǎn)?的Observable對象每subscribe?次,都會(huì)產(chǎn)??個(gè)全新的遞增整數(shù)序列,從0開始產(chǎn)?Hot Observable:fromPromise、fromEvent、fromEventPattern就是異步的創(chuàng)建操作符真正的多播,必定是?論有多少Observer來subscribe,推給Observer的都是?樣的數(shù)據(jù)源把Cold Observable變成Hot Observable,用的是Subject
Subject

var interval$ = Rx.Observable.interval(500);
const subject$ = new Rx.Subject();
interval$.subscribe(subject$);
subject$.map(val=>'a'+val).subscribe(x => console.log(x));
setTimeout(() => {
subject$.map(val=>'b'+val).subscribe(x => console.log(x));
}, 1500);
Subject不能重復(fù)使?
Subject可以有多個(gè)上游
例5:scan管理react狀態(tài)
class Counter extends React.Component {
state = {count: 0}
onIncrement() {
this.setState({count: this.state.count + 1});
}
onDecrement() {
this.setState({count: this.state.count - 1});
}
render() {
return (
<CounterView
count={this.state.count}
onIncrement={this.onIncrement.bind(this)}
onDecrement={this.onDecrement.bind(this)}
/>
);
}
}
export default Counter;
// subject作為橋梁進(jìn)行狀態(tài)維護(hù)
class RxCounter extends React.Component {
constructor() {
super(...arguments);
this.state = {count: 0};
this.counter = new Subject();
const observer = value => this.setState({count: value});
this.counter.scan((result, inc) => result + inc, 0)
.subscribe(observer);
}
render() {
return <CounterView
count={this.state.count}
onIncrement={()=> this.counter.next(1)}
onDecrement={()=> this.counter.next(-1)}
/>
}
}
export default RxCounter;
例6:買房放租
const house$ = new Rx.Subject();
const housecount$ = house$.scan((has, one) => has = has+one, 0).startWith(0);
const month$ = Rx.Observable.interval(1000);
const salary$ = month$.mapTo(1);
const rent$ = month$.withLatestFrom(housecount$).map(arr=>arr[1]*0.5);
// 月收入累加
const income$ = salary$.merge(rent$);
const cash$ = income$.scan((has, one)=>{
has = has + one;
if (has >= 100) {
has -= 100;
console.log('買房啦');
house$.next(1);
}
return has;
}, 0)
cash$.subscribe(
(data)=>{
console.log('進(jìn)賬,余額:',data)
},
null,
()=>{
console.log('complete');
}
)