響應(yīng)式思維 (Thinking Reactively) | Ben Lesh
Ben Lesh 是 RxJS 庫的領(lǐng)導(dǎo)者和布道者,提倡使用響應(yīng)式思維來抽象邏輯和編寫程序,現(xiàn)就職于 Google 。而本文則是對他的一篇研報(bào)的記錄,該研報(bào)是在 AngularConnect 會(huì)議中匯報(bào)的。研報(bào)首先從一個(gè)實(shí)例開始談起:
實(shí)例: Drag & Drop
內(nèi)容
每次 在目標(biāo)上按下鼠標(biāo)(mousedown) ,開始監(jiān)聽 頁面上鼠標(biāo)移動(dòng)(mousemove) 直到 鼠標(biāo)彈起(mouseup)
相關(guān)概念(基礎(chǔ)函數(shù))定義:
const target = document.querySelect('#target')
const targetMouseDown$ = Observable.fromEvent(target, 'mousedown')
const docMouseMove$ = Observable.fromEvent(target, 'mouseover')
const docMouseUp$ = Observable.fromEvent(target, 'mouseup')
- 變量后
$表示該變量是 Observable 。- Observable 本質(zhì)是一個(gè)函數(shù),后面 Ben 會(huì)解釋。
分析
- 頁面上鼠標(biāo)移動(dòng)(mousemove)直到鼠標(biāo)彈起(mouseup) =>
docMouseMove$.takeUntil(docMouseUp$)
操作符
takeUntil使得docMouseMove$持續(xù)推送數(shù)據(jù),直到docMouseUp$推送一個(gè)通知(數(shù)據(jù))后停止。附上takeUntil彈珠圖:takeUntil
- 每次在目標(biāo)上按下鼠標(biāo)(mousedown),開始監(jiān)聽 =>
const dragDrop$ = targetMouseDown$.switchMap(() =>
docMouseMove$.takeUntil(docMouseUp$)
)
操作符
switchMap將targetMouseDown$推送的值傳入進(jìn)內(nèi)部函數(shù)(該例不傳入推送值,推送僅僅作為通知使用),然后 切換 至docMouseMove$并壓平輸出(例中targtMouseDown$僅僅推送一次,內(nèi)部函數(shù)僅執(zhí)行一次,因此無須壓平)。附上switchMap彈珠圖:switchMap
小結(jié)
解決問題的思路應(yīng)該是從后向前推導(dǎo),確認(rèn)好每個(gè)事件流,根據(jù)問題組織事件流。
進(jìn)一步了解響應(yīng)式思維
流變量和非流變量 (自己臆想的,慎看)
在系統(tǒng)中流變量都是 Observable。
變量顧名思義是變化的,根據(jù)廣義、狹義相對論可知,變化是針對參照物來說的(提高點(diǎn) X 格),而非流變量的參照物是整個(gè)程序和時(shí)間軸。但是如果將參照物改為某一行代碼,非流變量隨時(shí)間是不變的,只是一個(gè)占位符。
var c = a + b // 站在這里,發(fā)現(xiàn) c 一直不變。
doSometing(c) // 對非流變量的操作,僅調(diào)用一次
流變量則是一種指向流( stream )的標(biāo)識符。
var c$ = a$.combineLatest(b$, (a, b) => a + b) // 站在這里感受下涓涓細(xì)流
c$.subscribe(doSomething) // 對流變量的操作,回調(diào)多次
操作符
combineLatest對任一個(gè) Observable 推送的值,都與其他 Observable 最后值融合。具體融合方法以函數(shù)形式給出。慣例附上combineLatest彈珠圖:combineLatest
沒有了操作符,Observable 就是 。。。
// 名字就是標(biāo)識符,可以改成 asy.ok(...)
promise.then(successFn, errorFn)
// 名字就是標(biāo)識符,可以改成 asy.ok(...)
observable.subscribe(nextFn, errorFn, completeFn)
大概 60+ 的操作符,請查看官網(wǎng)
重點(diǎn):解密 Observable
Observable 內(nèi)部是什么?
- 攪亂腦汁的復(fù)雜異步
- 應(yīng)用于航空科技的算法
- 黑魔法
- 獨(dú)角獸
以上是 Ben 總結(jié)的。
Observable 僅僅是一個(gè)函數(shù)
- Observable 有一個(gè)名為 observer 的參數(shù):
const myObservable = observer => {}
- observer 對象會(huì)有幾個(gè)方法:
const myObservable = observer => {
let i = 0
const id = setInterval(() => {
observer.next(i++) // next 方法
if (i === 10) observer.complete() // complete 方法
}, 200)
}
- Observable 會(huì)返回銷毀邏輯:
const myObservable = observer => {
let i = 0
const id = setInterval(() => {
observer.next(i++) // next 方法
if (i === 10) observer.complete() // complete 方法
}, 200)
return () => clearInterval(id) // 用于終止訂閱
}
- 調(diào)用 Observable 函數(shù)的同時(shí)訂閱了你的 observer
const myObservable = (observer) => {
let i = 0
const id = setInterval(() => {
observer.next(i++) // next 方法
if (i === 10) observer.complete() // complete 方法
}, 200)
return () => clearInterval(id) // 用于終止訂閱
}
// 訂閱你的 observer
const teardown = myObservable({
next(x) {console.log(x)},
error(err) {console.error(err)},
complete() {console.info('done)}
})
// 1 秒后取消訂閱
setTimeout(() => {
teardown()
}, 1000)
操作符也是一個(gè)函數(shù)
操作符函數(shù)吃進(jìn)一個(gè) Observable 吐出一個(gè) Observable :
const operator = InputObservable => OutputObservable
- 將 OutputObservable 變量展開成函數(shù)形式:
const operator = (InputObservable) => {
return (OutObserver) => {...}
}
- 操作符是一個(gè)函數(shù),她的參數(shù)是 Observable 并且輸出也是 Observable ,也就是通過 InputObservable 構(gòu)建 OutputObservable :
const operator = InputObservable => {
return OutObserver => {
return InputObservable(InObserver)
}
}
- Observable 就是一個(gè)擁有 observer 參數(shù)的函數(shù),而 observer 對象的形式是約定好的:
const observer = {
next: (data) => {...},
error: (err) => {...},
complete: () => {...}
}
也可以短方法聲明:
const observer = {
next(data) {...},
error(err) {...},
complete() {...}
}
- 構(gòu)建 InObserver 和 OutObserver 之間的映射關(guān)系(操作符是 Observable 和 Observer 之間的操作,而此時(shí)還沒有給出映射函數(shù),所以 InObserver 和 OutObserver 其實(shí)現(xiàn)在還沒有變化 ):
const operator = InputObservable => {
return OutObserver => {
return InputObservable({
next(data) {
OutObserver.next(data)
},
error(err) {
OutObserver.error(err)
},
complete() {
OutObserver.complete()
}
})
}
}
不難看出
next(data) { OutObserver.next(data)}等同于next = OutObserver.next,error和complete類似,意味著此時(shí)InObserver等于OutObserver。
- 最后添加操作推送數(shù)據(jù)的映射函數(shù):
const operator = (InputObservable, mapFn) => {
return OutObserver => {
return InputObservable({
next(data) {
OutObserver.next(mapFn(data))
},
error(err) {
OutObserver.error(err)
},
complete() {
OutObserver.complete()
}
})
}
}
單獨(dú)分析 next 方法來觀察 InObserver 和 OutObserver 的關(guān)系:
function InObserver.next(data) {
let newData = mapFn(data)
OutObserver.next(newData)
}
- 驗(yàn)證?,F(xiàn)在把之前創(chuàng)建的
myObservable和operator應(yīng)用到程序中:
const source = operator(myObservable, x => x + '!')
const teardown = source({
next(data) {
console.log(data)
},
error(err) {
consol.log(err)
},
complete() {
console.log('done')
}
})
// 4 秒后取消訂閱
setTimeout(() => {
teardown()
}, 4000)
輸出的結(jié)果:
0!
1!
2!
...
這里的結(jié)果有問題,因?yàn)槭盏?
complete推送后,并沒有取消訂閱,因此上面代碼設(shè)置了顯式的取消訂閱過程。Ben 在另外的研報(bào)中詳細(xì)介紹了使用 safeObserver 解決上述問題。
- 酷,來幾個(gè)串行操作。
const source = operator(operator(myObservable, x => x + '!'), x => x + '?')
const teardown = source({
next(data) {
console.log(data)
},
error(err) {
consol.log(err)
},
complete() {
console.log('done')
}
})
// 4 秒后取消訂閱
setTimeout(() => {
teardown()
}, 4000)
太繁瑣了,想想就頭痛:
const source = operator(operator(opserator(observable, mapFn), mapFn), mapFn)
- 把 Observable 函數(shù)用類來包裹(注意僅僅是把 Observable 函數(shù)打包進(jìn)類里,并不是把 Observable 函數(shù)轉(zhuǎn)化成類),操作符作為類的方法,這樣便可以使用鏈?zhǔn)綄懛ㄕ{(diào)用操作符了:
class Observable {
constructor(observableFn) {
this.subscribe = observableFn // 好記憶的標(biāo)識 subscribe
}
}
const myObservable = new Observable((observer) => {...})
const teardown = myObservable.subscribe({
next(data) { console.log(data) },
error(err) { consol.log(err) },
complete() { console.log('done') }
})
- 添加 map 操作符到類中:
class Observable {
constructor(observableFn) {
this.subscribe = observableFn // 好記憶的標(biāo)識 subscribe
}
map(mapFn) {
return new Observable(observer => {
return this.subscribe({
next(data) {
observer.next(mapFn(data))
},
error(err) {
observer.error(err)
},
complete() {
observer.complete()
}
})
})
}
}
現(xiàn)在使用鏈?zhǔn)綄懖僮鞣囋嚕?/p>
myObservable
.map(x => x + '!')
.map(x => x + '?')
.map(x => x + '.')
.subscribe({
next(data) {
console.log(data)
}
})
小結(jié)
- Observable 就是函數(shù)。
- 因?yàn)楹瘮?shù)僅在調(diào)用時(shí)執(zhí)行,所以 Observable 是惰性的。
- 操作符也是函數(shù),輸入是 Observable,輸出也是 Observable 。
- 鏈?zhǔn)讲僮骶褪沁B接每個(gè)操作的 observer 。
圖示
正常推送數(shù)據(jù)
下面將 Observable 的運(yùn)行過程可視化,先給出 Observable 的訂閱實(shí)例:
Observable.interval(1000) // like setInterval
.filter(x => x % 2 === 0)
.map(x => x + x)
.subscribe(next, error, complete)
- 開始是數(shù)據(jù)的產(chǎn)生者

- 流程的最后是你的回調(diào)處理,也就是數(shù)據(jù)的消費(fèi)者

- 最初推送的是 0 ,每一步的圖示:




- 然后是推送 1 :


然后就沒有然后了, 1 被 filter 過濾掉了。
重點(diǎn):異常處理
異常處理過程可能會(huì)給我們帶來一些疑惑,主要是因?yàn)橐韵聨讉€(gè)事件影響:
-
error()被調(diào)用 -
complete()被調(diào)用 - 取消訂閱
這些事件發(fā)生后, Observable 將不再推送數(shù)據(jù)。繼續(xù)給出實(shí)例:
Observable.interval(1000)
.map(x => {
if (x === 1) {
throw new Error('haha')
}
return x
})
.subscribe(next, error, complete)
處理推送 0 的過程略過。
生產(chǎn)者推送 1 是,拋出了異常:




當(dāng)拋出異常后,Observable 不會(huì)繼續(xù)推送數(shù)據(jù)(取消訂閱),而消費(fèi)者將會(huì)使用
error()處理異常。這個(gè)還有個(gè)問題,Ben 在他的專欄里提到過,多播場景的錯(cuò)誤捕獲。
- 當(dāng)異常拋出后,Observable 就掛掉了,如果還想繼續(xù)推送如何實(shí)現(xiàn)?答案是:創(chuàng)建 observer 分支。
Observable.interval(10000)
.switchMap(() => this.http.get(url).catch(err => Observable.empty()))
.subscribe(data => render(data))
先來學(xué)習(xí)操作符 catch ,它會(huì)捕獲和處理 Observable 推送的異常,并返回一個(gè)新的 Observable 或者繼續(xù)拋出異常。附上彈珠圖:

圖示具體流程:


網(wǎng)絡(luò)不好,查詢過程超時(shí)。

取消 Ajax Observable 的訂閱

轉(zhuǎn)化異常到新的 Observable

取消 Observable.empty 的訂閱

小結(jié):
- 創(chuàng)建另外一個(gè) observer 鏈
- 使用 catch 增強(qiáng)這個(gè)鏈的魯棒性
- 守護(hù)了原始的 observer 鏈
響應(yīng)式思維的適用場景
Ben 在研報(bào)的最后分析了響應(yīng)式思維的使用場景,這里簡單的將 PPT 頁翻譯,具體的實(shí)際應(yīng)用還是需要在編程中發(fā)覺和選擇的。
- 將多個(gè)事件融合在一起
- 添加延時(shí)
- 客戶端限制流量
- 協(xié)調(diào)異步任務(wù)
- 需要注銷機(jī)制
總結(jié)
最后將該文的具體內(nèi)容概述為以下 6 個(gè)方面。
- 逆向思維
- 任何的變量都可以被觀察
- Observable 是函數(shù)
- Observer 鏈處理計(jì)算
- 調(diào)用
error()會(huì)終止 observer 鏈 - 盡情使用 Rx 。
本文是在我學(xué)習(xí) RxJS 過程中為了加強(qiáng)記憶和便于理解而記錄的,里面添加了大量的個(gè)人學(xué)習(xí)傾向,局限于個(gè)人知識面有限,難免有不當(dāng)和錯(cuò)誤之處,歡迎大家批評指導(dǎo)。


